import threading import time from datetime import datetime, timedelta from typing import Callable, Optional class DailyRefresh: """Simple daily auto-refresh mechanism.""" def __init__(self, refresh_callback: Callable, refresh_hour: int = 2): """ Initialize daily refresh. Args: refresh_callback: Function to call for refresh refresh_hour: Hour of day to refresh (0-23), default 2 AM """ self.refresh_callback = refresh_callback self.refresh_hour = refresh_hour self.timer: Optional[threading.Timer] = None self.running = False def _calculate_next_refresh(self) -> float: """Calculate seconds until next refresh time.""" now = datetime.now() # Calculate next refresh time (today or tomorrow at refresh_hour) next_refresh = now.replace(hour=self.refresh_hour, minute=0, second=0, microsecond=0) # If we've passed today's refresh time, schedule for tomorrow if now >= next_refresh: next_refresh += timedelta(days=1) return (next_refresh - now).total_seconds() def _refresh_and_reschedule(self): """Execute refresh and schedule the next one.""" if not self.running: return print(f"[{datetime.now()}] Auto-refreshing leaderboard...") # Execute refresh try: self.refresh_callback() print("✅ Auto-refresh completed successfully") except Exception as e: print(f"❌ Auto-refresh failed: {e}") # Schedule next refresh if self.running: self._schedule_next() def _schedule_next(self): """Schedule the next refresh.""" seconds_until_next = self._calculate_next_refresh() print(f"Next auto-refresh scheduled in {seconds_until_next/3600:.1f} hours") self.timer = threading.Timer(seconds_until_next, self._refresh_and_reschedule) self.timer.daemon = True # Don't prevent app shutdown self.timer.start() def start(self): """Start the daily auto-refresh.""" if self.running: return self.running = True print(f"🔄 Daily auto-refresh enabled (refresh time: {self.refresh_hour:02d}:00)") self._schedule_next() def stop(self): """Stop the auto-refresh.""" self.running = False if self.timer: self.timer.cancel() print("🛑 Daily auto-refresh stopped")