Spaces:
Running
Running
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Callable, Optional | |
| class DailyRefresh: | |
| """Simple daily auto-refresh mechanism.""" | |
| def __init__(self, refresh_callback: Callable, refresh_hour: int = 2): | |
| """ | |
| Initialize daily refresh. | |
| Args: | |
| refresh_callback: Function to call for refresh | |
| refresh_hour: Hour of day to refresh (0-23), default 2 AM | |
| """ | |
| self.refresh_callback = refresh_callback | |
| self.refresh_hour = refresh_hour | |
| self.timer: Optional[threading.Timer] = None | |
| self.running = False | |
| def _calculate_next_refresh(self) -> float: | |
| """Calculate seconds until next refresh time.""" | |
| now = datetime.now() | |
| # Calculate next refresh time (today or tomorrow at refresh_hour) | |
| next_refresh = now.replace(hour=self.refresh_hour, minute=0, second=0, microsecond=0) | |
| # If we've passed today's refresh time, schedule for tomorrow | |
| if now >= next_refresh: | |
| next_refresh += timedelta(days=1) | |
| return (next_refresh - now).total_seconds() | |
| def _refresh_and_reschedule(self): | |
| """Execute refresh and schedule the next one.""" | |
| if not self.running: | |
| return | |
| print(f"[{datetime.now()}] Auto-refreshing leaderboard...") | |
| # Execute refresh | |
| try: | |
| self.refresh_callback() | |
| print("β Auto-refresh completed successfully") | |
| except Exception as e: | |
| print(f"β Auto-refresh failed: {e}") | |
| # Schedule next refresh | |
| if self.running: | |
| self._schedule_next() | |
| def _schedule_next(self): | |
| """Schedule the next refresh.""" | |
| seconds_until_next = self._calculate_next_refresh() | |
| print(f"Next auto-refresh scheduled in {seconds_until_next/3600:.1f} hours") | |
| self.timer = threading.Timer(seconds_until_next, self._refresh_and_reschedule) | |
| self.timer.daemon = True # Don't prevent app shutdown | |
| self.timer.start() | |
| def start(self): | |
| """Start the daily auto-refresh.""" | |
| if self.running: | |
| return | |
| self.running = True | |
| print(f"π Daily auto-refresh enabled (refresh time: {self.refresh_hour:02d}:00)") | |
| self._schedule_next() | |
| def stop(self): | |
| """Stop the auto-refresh.""" | |
| self.running = False | |
| if self.timer: | |
| self.timer.cancel() | |
| print("π Daily auto-refresh stopped") |