tox21_leaderboard / backend /auto_refresh.py
Tschoui's picture
✨ Add leaderboard refreshing mechanism
bc721cc
import threading
import time
from datetime import datetime, timedelta
from typing import Callable, Optional
class DailyRefresh:
"""Simple daily auto-refresh mechanism."""
def __init__(self, refresh_callback: Callable, refresh_hour: int = 2):
"""
Initialize daily refresh.
Args:
refresh_callback: Function to call for refresh
refresh_hour: Hour of day to refresh (0-23), default 2 AM
"""
self.refresh_callback = refresh_callback
self.refresh_hour = refresh_hour
self.timer: Optional[threading.Timer] = None
self.running = False
def _calculate_next_refresh(self) -> float:
"""Calculate seconds until next refresh time."""
now = datetime.now()
# Calculate next refresh time (today or tomorrow at refresh_hour)
next_refresh = now.replace(hour=self.refresh_hour, minute=0, second=0, microsecond=0)
# If we've passed today's refresh time, schedule for tomorrow
if now >= next_refresh:
next_refresh += timedelta(days=1)
return (next_refresh - now).total_seconds()
def _refresh_and_reschedule(self):
"""Execute refresh and schedule the next one."""
if not self.running:
return
print(f"[{datetime.now()}] Auto-refreshing leaderboard...")
# Execute refresh
try:
self.refresh_callback()
print("βœ… Auto-refresh completed successfully")
except Exception as e:
print(f"❌ Auto-refresh failed: {e}")
# Schedule next refresh
if self.running:
self._schedule_next()
def _schedule_next(self):
"""Schedule the next refresh."""
seconds_until_next = self._calculate_next_refresh()
print(f"Next auto-refresh scheduled in {seconds_until_next/3600:.1f} hours")
self.timer = threading.Timer(seconds_until_next, self._refresh_and_reschedule)
self.timer.daemon = True # Don't prevent app shutdown
self.timer.start()
def start(self):
"""Start the daily auto-refresh."""
if self.running:
return
self.running = True
print(f"πŸ”„ Daily auto-refresh enabled (refresh time: {self.refresh_hour:02d}:00)")
self._schedule_next()
def stop(self):
"""Stop the auto-refresh."""
self.running = False
if self.timer:
self.timer.cancel()
print("πŸ›‘ Daily auto-refresh stopped")