zhimin-z commited on
Commit
f5d00b4
·
1 Parent(s): c3011cc
Files changed (1) hide show
  1. msr.py +184 -469
msr.py CHANGED
@@ -25,31 +25,35 @@ load_dotenv()
25
 
26
  AGENTS_REPO = "SWE-Arena/bot_metadata"
27
  REVIEW_METADATA_REPO = "SWE-Arena/review_metadata"
28
- LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata" # HuggingFace dataset for leaderboard data
29
- LEADERBOARD_TIME_FRAME_DAYS = 180 # Time frame for leaderboard
30
- GHARCHIVE_DATA_DIR = "../gharchive/data" # Local GHArchive data directory
31
- DUCKDB_CACHE_FILE = "cache.duckdb" # Persistent DuckDB database for caching
32
 
33
- # DuckDB performance configuration
34
- DUCKDB_THREADS = 8 # Number of threads for parallel processing
35
- DUCKDB_MEMORY_LIMIT = "64GB" # Memory limit to prevent OOM crashes
 
 
 
 
36
 
37
  # Download configuration
38
- DOWNLOAD_WORKERS = 4 # Number of parallel download threads
39
- DOWNLOAD_RETRY_DELAY = 2 # Initial retry delay in seconds
40
- MAX_RETRIES = 5 # Maximum number of retries for each API call
41
 
42
  # Upload configuration
43
- UPLOAD_DELAY_SECONDS = 5 # Delay between individual file uploads to avoid rate limits
44
- UPLOAD_INITIAL_BACKOFF = 60 # Initial backoff time in seconds (1 minute)
45
- UPLOAD_MAX_BACKOFF = 3600 # Maximum backoff time in seconds (60 minutes)
46
 
47
  # Scheduler configuration
48
- SCHEDULE_ENABLED = True # Enable/disable scheduler
49
- SCHEDULE_DAY_OF_MONTH = 22 # Day of month (1-31) - 22nd is in the fourth week
50
- SCHEDULE_HOUR = 0 # Hour (0-23) - 12am midnight
51
- SCHEDULE_MINUTE = 0 # Minute (0-59)
52
- SCHEDULE_TIMEZONE = 'UTC' # Timezone for scheduling
53
 
54
  # =============================================================================
55
  # UTILITY FUNCTIONS
@@ -80,34 +84,24 @@ def save_jsonl(filename, data):
80
 
81
 
82
  def normalize_date_format(date_string):
83
- """
84
- Convert date strings to standardized ISO 8601 format with Z suffix.
85
- Handles both 'T' and space-separated datetime formats (including newlines).
86
- Examples:
87
- - 2025-10-15T23:23:47.983068 -> 2025-10-15T23:23:47Z
88
- - 2025-06-17 21:21:07+00 -> 2025-06-17T21:21:07Z
89
- """
90
  if not date_string or date_string == 'N/A':
91
  return 'N/A'
92
 
93
  try:
94
  import re
95
- # Remove all whitespace (spaces, newlines, tabs) and replace with single space
96
- date_string = re.sub(r'\s+', ' ', date_string.strip())
97
 
98
- # Replace space with 'T' for ISO format compatibility
 
 
 
99
  date_string = date_string.replace(' ', 'T')
100
 
101
- # Fix incomplete timezone offset (+00 or -00 -> +00:00 or -00:00)
102
- # Check if timezone offset exists and is incomplete
103
  if len(date_string) >= 3:
104
  if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]:
105
  date_string = date_string + ':00'
106
 
107
- # Parse the date string (handles both with and without microseconds)
108
  dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
109
-
110
- # Convert to standardized format
111
  return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
112
  except Exception as e:
113
  print(f"Warning: Could not parse date '{date_string}': {e}")
@@ -127,23 +121,13 @@ def get_hf_token():
127
  # =============================================================================
128
 
129
  def download_file(url):
130
- """
131
- Download a GHArchive file with retry logic.
132
-
133
- Args:
134
- url: URL to download
135
-
136
- Returns:
137
- bool: True if successful, False otherwise
138
- """
139
  filename = url.split("/")[-1]
140
  filepath = os.path.join(GHARCHIVE_DATA_DIR, filename)
141
 
142
- # Skip if json.gz already exists
143
  if os.path.exists(filepath):
144
  return True
145
 
146
- # Download with retry logic
147
  for attempt in range(MAX_RETRIES):
148
  try:
149
  response = requests.get(url, timeout=30)
@@ -154,12 +138,10 @@ def download_file(url):
154
 
155
  except requests.exceptions.HTTPError as e:
156
  if e.response.status_code == 404:
157
- # File doesn't exist, don't retry
158
  return False
159
  else:
160
- # Other HTTP errors, retry
161
  if attempt < MAX_RETRIES - 1:
162
- wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt) # Exponential backoff
163
  print(f" ⚠ {filename}: HTTP error {e.response.status_code}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
164
  time.sleep(wait_time)
165
  else:
@@ -168,16 +150,14 @@ def download_file(url):
168
  except (requests.exceptions.Timeout,
169
  requests.exceptions.ConnectionError,
170
  requests.exceptions.ReadTimeout) as e:
171
- # Timeout/connection errors, retry
172
  if attempt < MAX_RETRIES - 1:
173
- wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt) # Exponential backoff
174
  print(f" ⚠ {filename}: {type(e).__name__}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
175
  time.sleep(wait_time)
176
  else:
177
  print(f" ✗ {filename}: Failed after {MAX_RETRIES} attempts - {type(e).__name__}")
178
 
179
  except Exception as e:
180
- # Other errors, retry
181
  if attempt < MAX_RETRIES - 1:
182
  wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
183
  print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
@@ -189,17 +169,9 @@ def download_file(url):
189
 
190
 
191
  def download_all_gharchive_data():
192
- """
193
- Download all GHArchive data files for the last LEADERBOARD_TIME_FRAME_DAYS.
194
- Uses parallel downloads with ThreadPoolExecutor.
195
-
196
- Returns:
197
- bool: True if all downloads completed (some may have failed), False if critical error
198
- """
199
- # Create data directory if it doesn't exist
200
  os.makedirs(GHARCHIVE_DATA_DIR, exist_ok=True)
201
 
202
- # Generate URLs for last N days (hourly files: 0-23 for each day)
203
  end_date = datetime.now()
204
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
205
 
@@ -207,7 +179,6 @@ def download_all_gharchive_data():
207
  current_date = start_date
208
  while current_date <= end_date:
209
  date_str = current_date.strftime("%Y-%m-%d")
210
- # Generate hourly URLs for this day (0-23)
211
  for hour in range(24):
212
  url = f"https://data.gharchive.org/{date_str}-{hour}.json.gz"
213
  urls.append(url)
@@ -217,10 +188,7 @@ def download_all_gharchive_data():
217
 
218
  try:
219
  with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor:
220
- # Submit all downloads
221
  futures = [executor.submit(download_file, url) for url in urls]
222
-
223
- # Wait for downloads to complete
224
  for future in as_completed(futures):
225
  downloads_processed += 1
226
 
@@ -235,25 +203,20 @@ def download_all_gharchive_data():
235
 
236
 
237
  # =============================================================================
238
- # HUGGINGFACE API WRAPPERS WITH ENHANCED BACKOFF
239
  # =============================================================================
240
 
241
  def is_retryable_error(e):
242
- """
243
- Check if exception is retryable (rate limit or timeout error).
244
- """
245
- # Check for rate limit error (429)
246
  if isinstance(e, HfHubHTTPError):
247
  if e.response.status_code == 429:
248
  return True
249
 
250
- # Check for timeout errors
251
  if isinstance(e, (requests.exceptions.Timeout,
252
  requests.exceptions.ReadTimeout,
253
  requests.exceptions.ConnectTimeout)):
254
  return True
255
 
256
- # Check if it's a timeout error wrapped in HfHubHTTPError
257
  if isinstance(e, Exception):
258
  error_str = str(e).lower()
259
  if 'timeout' in error_str or 'timed out' in error_str:
@@ -274,7 +237,7 @@ def is_retryable_error(e):
274
  )
275
  )
276
  def list_repo_files_with_backoff(api, **kwargs):
277
- """Wrapper for api.list_repo_files() with exponential backoff for retryable errors."""
278
  return api.list_repo_files(**kwargs)
279
 
280
 
@@ -290,7 +253,7 @@ def list_repo_files_with_backoff(api, **kwargs):
290
  )
291
  )
292
  def hf_hub_download_with_backoff(**kwargs):
293
- """Wrapper for hf_hub_download() with exponential backoff for retryable errors."""
294
  return hf_hub_download(**kwargs)
295
 
296
 
@@ -306,7 +269,7 @@ def hf_hub_download_with_backoff(**kwargs):
306
  )
307
  )
308
  def upload_file_with_backoff(api, **kwargs):
309
- """Wrapper for api.upload_file() with exponential backoff for retryable errors."""
310
  return api.upload_file(**kwargs)
311
 
312
 
@@ -322,44 +285,30 @@ def upload_file_with_backoff(api, **kwargs):
322
  )
323
  )
324
  def upload_folder_with_backoff(api, **kwargs):
325
- """Wrapper for api.upload_folder() with exponential backoff for retryable errors."""
326
  return api.upload_folder(**kwargs)
327
 
328
 
329
  def get_duckdb_connection():
330
  """
331
- Initialize DuckDB connection with persistent database and optimized parallelization.
332
-
333
- Returns:
334
- DuckDB connection object
335
  """
336
- # Use persistent database for caching results
337
  conn = duckdb.connect(DUCKDB_CACHE_FILE)
338
 
339
- # Optimize for parallel processing with memory limits
340
- conn.execute(f"SET threads TO {DUCKDB_THREADS};") # Configure parallel threads
341
- conn.execute("SET preserve_insertion_order = false;") # Better parallelization
342
- conn.execute("SET enable_object_cache = true;") # Cache objects for reuse
343
- conn.execute("SET temp_directory = '/tmp/duckdb_temp';") # Use fast temp storage if needed
344
- conn.execute(f"SET memory_limit = '{DUCKDB_MEMORY_LIMIT}';") # Limit memory to prevent OOM crashes
345
- conn.execute(f"SET max_memory = '{DUCKDB_MEMORY_LIMIT}';") # Hard memory cap
346
 
347
  return conn
348
 
349
 
350
  def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DIR):
351
- """
352
- Generate file path patterns for GHArchive data in date range.
353
- Only includes files that actually exist on disk.
354
-
355
- Args:
356
- start_date: Start datetime
357
- end_date: End datetime
358
- data_dir: Directory containing GHArchive data files
359
-
360
- Returns:
361
- List of file path patterns (hourly JSON.gz files) that exist
362
- """
363
  file_patterns = []
364
  missing_dates = set()
365
 
@@ -367,40 +316,39 @@ def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DI
367
  end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
368
 
369
  while current_date <= end_day:
370
- # Pattern for hourly JSON.gz files: 2024-11-15-{0..23}.json.gz
371
  date_has_files = False
372
  for hour in range(24):
373
  pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}-{hour}.json.gz")
374
- # Only add pattern if file exists
375
  if os.path.exists(pattern):
376
  file_patterns.append(pattern)
377
  date_has_files = True
378
 
379
- # Track missing dates
380
  if not date_has_files:
381
  missing_dates.add(current_date.strftime('%Y-%m-%d'))
382
 
383
- # Move to next day
384
  current_date += timedelta(days=1)
385
 
386
- # Print warning about missing dates
387
  if missing_dates:
388
- print(f" Warning: Skipping {len(missing_dates)} date(s) with no data files: {', '.join(sorted(missing_dates))}")
389
 
390
  return file_patterns
391
 
392
 
393
  # =============================================================================
394
- # DUCKDB QUERY FUNCTIONS
395
  # =============================================================================
396
 
397
- def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
398
  """
399
- Fetch PR review metadata for ALL agents using ONE comprehensive DuckDB query.
400
 
401
- This query combines:
402
- 1. Review events (PullRequestReviewEvent) for all agents
403
- 2. PR status (PullRequestEvent with action='closed')
 
 
 
 
404
 
405
  Args:
406
  conn: DuckDB connection instance
@@ -409,218 +357,128 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
409
  end_date: End datetime (timezone-aware)
410
 
411
  Returns:
412
- Dictionary mapping agent identifier to list of PR metadata:
413
- {
414
- 'agent-identifier': [
415
- {
416
- 'url': PR URL,
417
- 'reviewed_at': Review timestamp,
418
- 'merged_at': Merge timestamp (if merged, else None),
419
- 'closed_at': Close timestamp (if closed, else None)
420
- },
421
- ...
422
- ],
423
- ...
424
- }
425
  """
426
- # Generate file path patterns for review period
427
- review_patterns = generate_file_path_patterns(start_date, end_date)
428
 
429
- # Generate file path patterns for PR status (use same lookback as reviews)
430
- status_start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
431
- status_patterns = generate_file_path_patterns(status_start_date, end_date)
432
 
433
- # Build identifier list for IN clause
434
- identifier_list = ', '.join([f"'{id}'" for id in identifiers])
 
 
435
 
436
- # Convert file patterns to SQL array format for direct interpolation
437
- review_patterns_sql = str(review_patterns).replace("'", "'")
438
- status_patterns_sql = str(status_patterns).replace("'", "'")
439
-
440
- # Build comprehensive query with CTEs using direct SQL array format (JSON.gz format)
441
- # Optimized: Single file scan + ROW_NUMBER() deduplication (no DISTINCT)
442
- query = f"""
443
- WITH all_review_events AS (
444
- -- Single file scan for all three event types (optimization: 3x I/O reduction)
445
- SELECT
446
- TRY_CAST(type AS VARCHAR) as event_type,
447
- TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) as reviewer,
448
- TRY_CAST(json_extract_string(repo, '$.name') AS VARCHAR) as repo_name,
449
- payload,
450
- created_at
451
- FROM read_json({review_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
452
- WHERE
453
- TRY_CAST(type AS VARCHAR) IN ('PullRequestReviewEvent', 'IssueCommentEvent', 'PullRequestReviewCommentEvent')
454
- AND TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) IN ({identifier_list})
455
- ),
456
-
457
- review_events AS (
458
- -- Process events conditionally based on type
459
- SELECT
460
- CASE
461
- WHEN event_type = 'IssueCommentEvent'
462
- THEN TRY_CAST(json_extract_string(payload, '$.issue.html_url') AS VARCHAR)
463
- ELSE TRY_CAST(json_extract_string(payload, '$.pull_request.html_url') AS VARCHAR)
464
- END as url,
465
- CASE
466
- WHEN event_type = 'PullRequestReviewEvent'
467
- THEN COALESCE(
468
- TRY_CAST(json_extract_string(payload, '$.review.submitted_at') AS VARCHAR),
469
- TRY_CAST(created_at AS VARCHAR)
470
- )
471
- ELSE TRY_CAST(created_at AS VARCHAR)
472
- END as reviewed_at,
473
- reviewer,
474
- repo_name,
475
- CASE
476
- WHEN event_type = 'IssueCommentEvent'
477
- THEN TRY_CAST(json_extract_string(payload, '$.issue.number') AS INTEGER)
478
- ELSE TRY_CAST(json_extract_string(payload, '$.pull_request.number') AS INTEGER)
479
- END as pr_number
480
- FROM all_review_events
481
- WHERE
482
- -- Validate required fields per event type
483
- (event_type = 'PullRequestReviewEvent' AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL)
484
- OR (event_type = 'IssueCommentEvent' AND json_extract_string(payload, '$.issue.pull_request.url') IS NOT NULL AND json_extract_string(payload, '$.issue.html_url') IS NOT NULL)
485
- OR (event_type = 'PullRequestReviewCommentEvent' AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL)
486
- ),
487
-
488
- pr_status AS (
489
- -- Get merge/close status for those PRs
490
- SELECT
491
- TRY_CAST(json_extract_string(payload, '$.pull_request.html_url') AS VARCHAR) as url,
492
- TRY_CAST(json_extract_string(payload, '$.pull_request.merged') AS BOOLEAN) as is_merged,
493
- TRY_CAST(json_extract_string(payload, '$.pull_request.merged_at') AS VARCHAR) as merged_at,
494
- TRY_CAST(json_extract_string(payload, '$.pull_request.closed_at') AS VARCHAR) as closed_at,
495
- created_at,
496
- ROW_NUMBER() OVER (PARTITION BY json_extract_string(payload, '$.pull_request.html_url') ORDER BY created_at DESC) as rn
497
- FROM read_json({status_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
498
- WHERE
499
- type = 'PullRequestEvent'
500
- AND TRY_CAST(json_extract_string(payload, '$.action') AS VARCHAR) = 'closed'
501
- AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL
502
- AND json_extract_string(payload, '$.pull_request.html_url') IN (
503
- SELECT DISTINCT url FROM review_events
504
- )
505
- ),
506
-
507
- deduplicated_reviews AS (
508
- -- Efficient deduplication using ROW_NUMBER() instead of DISTINCT (optimization: prevents massive hash table)
509
- SELECT
510
- re.reviewer,
511
- re.url,
512
- re.reviewed_at,
513
- ps.merged_at,
514
- ps.closed_at,
515
- ROW_NUMBER() OVER (
516
- PARTITION BY re.reviewer, re.url, re.reviewed_at
517
- ORDER BY re.reviewed_at
518
- ) as row_num
519
- FROM review_events re
520
- LEFT JOIN (SELECT * FROM pr_status WHERE rn = 1) ps ON re.url = ps.url
521
- )
522
 
523
- -- Return deduplicated results (row_num = 1 ensures uniqueness without DISTINCT)
524
- SELECT
525
- reviewer,
526
- url,
527
- reviewed_at,
528
- merged_at,
529
- closed_at
530
- FROM deduplicated_reviews
531
- WHERE row_num = 1
532
- ORDER BY reviewer, reviewed_at DESC
533
- """
534
 
535
- try:
536
- # Create cache table name based on date range
537
- cache_table_name = f"pr_cache_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}"
538
-
539
- # Check if cache exists and is valid
540
- cache_exists = conn.execute(f"""
541
- SELECT COUNT(*) FROM information_schema.tables
542
- WHERE table_name = '{cache_table_name}'
543
- """).fetchone()[0] > 0
544
-
545
- if cache_exists:
546
- results = conn.execute(f"""
547
- SELECT reviewer, url, reviewed_at, merged_at, closed_at
548
- FROM {cache_table_name}
549
- WHERE reviewer IN ({identifier_list})
550
- """).fetchall()
551
- else:
552
- # Execute query using f-string interpolation
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
553
  results = conn.execute(query).fetchall()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
554
 
555
- # Cache the complete results for all future queries in this date range
556
- if len(results) > 0:
557
- conn.execute(f"""
558
- CREATE TABLE {cache_table_name} AS
559
- SELECT * FROM (
560
- SELECT UNNEST($1) as reviewer, UNNEST($2) as url,
561
- UNNEST($3) as reviewed_at, UNNEST($4) as merged_at,
562
- UNNEST($5) as closed_at
563
- )
564
- """, [
565
- [r[0] for r in results],
566
- [r[1] for r in results],
567
- [r[2] for r in results],
568
- [r[3] for r in results],
569
- [r[4] for r in results]
570
- ])
571
-
572
- # Group results by agent with verification
573
- metadata_by_agent = defaultdict(list)
574
- unique_reviews = set()
575
- duplicate_count = 0
576
-
577
- for row in results:
578
- reviewer = row[0]
579
- url = row[1]
580
- reviewed_at = normalize_date_format(row[2]) if row[2] else None
581
- merged_at = normalize_date_format(row[3]) if row[3] else None
582
- closed_at = normalize_date_format(row[4]) if row[4] else None
583
-
584
- # Track unique review combinations for verification
585
- review_key = (reviewer, url, reviewed_at)
586
- if review_key in unique_reviews:
587
- duplicate_count += 1
588
- unique_reviews.add(review_key)
589
-
590
- metadata_by_agent[reviewer].append({
591
- 'url': url,
592
- 'reviewed_at': reviewed_at,
593
- 'merged_at': merged_at,
594
- 'closed_at': closed_at,
595
- })
596
-
597
- # Verification: Ensure we have unique reviews (no duplicates from query)
598
- total_reviews = len(results)
599
- if duplicate_count > 0:
600
- print(f" Warning: Found {duplicate_count} duplicate review entries in query results!")
601
- print(f" Total: {total_reviews}, Unique: {len(unique_reviews)}")
602
- else:
603
- print(f" Verification passed: {len(unique_reviews)} unique reviews retrieved (no duplicates)")
604
 
605
- # Convert defaultdict to regular dict
606
- return dict(metadata_by_agent)
607
 
608
- except Exception as e:
609
- print(f"DuckDB error: {str(e)}")
610
- import traceback
611
- traceback.print_exc()
612
- return {}
 
 
 
 
 
 
 
 
613
 
614
 
615
  # =============================================================================
616
- # HUGGINGFACE STORAGE FUNCTIONS WITH BATCH UPLOAD
617
  # =============================================================================
618
 
619
  def group_metadata_by_date(metadata_list):
620
- """
621
- Group review metadata by date (year.month.day) for daily storage.
622
- Returns dict: {(year, month, day): [metadata_list]}
623
- """
624
  grouped = defaultdict(list)
625
 
626
  for review_meta in metadata_list:
@@ -639,21 +497,7 @@ def group_metadata_by_date(metadata_list):
639
 
640
 
641
  def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type, commit_message, max_retries=MAX_RETRIES):
642
- """
643
- Upload a single file with exponential backoff retry logic.
644
-
645
- Args:
646
- api: HfApi instance
647
- local_path: Local file path
648
- repo_path: Path in repository
649
- repo_id: Repository ID
650
- repo_type: Repository type (e.g., "dataset")
651
- commit_message: Commit message
652
- max_retries: Maximum number of retries
653
-
654
- Returns:
655
- bool: True if successful, False otherwise
656
- """
657
  for attempt in range(max_retries):
658
  try:
659
  upload_file_with_backoff(
@@ -667,7 +511,6 @@ def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type
667
  return True
668
  except Exception as e:
669
  if attempt < max_retries - 1:
670
- # Calculate exponential backoff
671
  wait_time = min(UPLOAD_INITIAL_BACKOFF * (2 ** attempt), UPLOAD_MAX_BACKOFF)
672
  print(f" {e} error on attempt {attempt + 1}/{max_retries}. Retrying in {wait_time}s...")
673
  time.sleep(wait_time)
@@ -678,16 +521,7 @@ def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type
678
 
679
 
680
  def batch_upload_review_metadata(all_metadata):
681
- """
682
- Upload review metadata for all agents with time gaps between uploads.
683
- Each agent's data is uploaded as separate daily files with retry logic.
684
-
685
- Args:
686
- all_metadata: Dictionary mapping agent identifier to list of PR metadata
687
-
688
- Returns:
689
- tuple: (success_count, error_count)
690
- """
691
  try:
692
  token = get_hf_token()
693
  if not token:
@@ -699,7 +533,6 @@ def batch_upload_review_metadata(all_metadata):
699
  error_count = 0
700
  total_files = 0
701
 
702
- # First, calculate total number of files to upload
703
  for agent_identifier, metadata_list in all_metadata.items():
704
  if metadata_list:
705
  grouped = group_metadata_by_date(metadata_list)
@@ -713,28 +546,21 @@ def batch_upload_review_metadata(all_metadata):
713
  if not metadata_list:
714
  continue
715
 
716
- # Group by date
717
  grouped = group_metadata_by_date(metadata_list)
718
 
719
- # Create temporary files for this agent
720
  agent_temp_dir = tempfile.mkdtemp()
721
 
722
  try:
723
- # Prepare all files locally
724
  local_files = []
725
  for (review_year, month, day), day_metadata in grouped.items():
726
  filename = f"{review_year}.{month:02d}.{day:02d}.jsonl"
727
  local_path = os.path.join(agent_temp_dir, filename)
728
  repo_path = f"{agent_identifier}/{filename}"
729
 
730
- # Sort by reviewed_at for better organization
731
  day_metadata.sort(key=lambda x: x.get('reviewed_at', ''), reverse=True)
732
-
733
- # Save to temp file
734
  save_jsonl(local_path, day_metadata)
735
  local_files.append((local_path, repo_path, len(day_metadata)))
736
 
737
- # Upload each file with delay
738
  agent_success = 0
739
  agent_error = 0
740
 
@@ -756,12 +582,10 @@ def batch_upload_review_metadata(all_metadata):
756
  agent_error += 1
757
  error_count += 1
758
 
759
- # Add delay between uploads (except for last file)
760
  if file_idx < len(local_files):
761
  time.sleep(UPLOAD_DELAY_SECONDS)
762
 
763
  finally:
764
- # Clean up temp directory
765
  if os.path.exists(agent_temp_dir):
766
  import shutil
767
  shutil.rmtree(agent_temp_dir)
@@ -781,22 +605,14 @@ def batch_upload_review_metadata(all_metadata):
781
 
782
 
783
  def load_agents_from_hf():
784
- """
785
- Load all agent metadata JSON files from HuggingFace dataset.
786
-
787
- The github_identifier is extracted from the filename (e.g., 'agent-name[bot].json' -> 'agent-name[bot]')
788
- """
789
  try:
790
  api = HfApi()
791
  agents = []
792
 
793
- # List all files in the repository
794
  files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset")
795
-
796
- # Filter for JSON files only
797
  json_files = [f for f in files if f.endswith('.json')]
798
 
799
- # Download and parse each JSON file
800
  for json_file in json_files:
801
  try:
802
  file_path = hf_hub_download_with_backoff(
@@ -808,11 +624,9 @@ def load_agents_from_hf():
808
  with open(file_path, 'r') as f:
809
  agent_data = json.load(f)
810
 
811
- # Only process agents with status == "public"
812
  if agent_data.get('status') != 'public':
813
  continue
814
 
815
- # Extract github_identifier from filename (remove .json extension)
816
  github_identifier = json_file.replace('.json', '')
817
  agent_data['github_identifier'] = github_identifier
818
 
@@ -823,7 +637,6 @@ def load_agents_from_hf():
823
  continue
824
 
825
  print(f"Download complete: {len(agents)} agents")
826
-
827
  return agents
828
 
829
  except Exception as e:
@@ -831,13 +644,12 @@ def load_agents_from_hf():
831
  return []
832
 
833
 
834
- def get_pr_status_from_metadata(review_meta):
835
- """
836
- Derive PR status from merged_at and closed_at fields.
837
 
838
- Returns:
839
- str: 'merged', 'closed', or 'open'
840
- """
841
  merged_at = review_meta.get('merged_at')
842
  closed_at = review_meta.get('closed_at')
843
 
@@ -850,23 +662,15 @@ def get_pr_status_from_metadata(review_meta):
850
 
851
 
852
  def calculate_review_stats_from_metadata(metadata_list):
853
- """
854
- Calculate statistics from a list of review metadata.
855
-
856
- Returns:
857
- Dictionary with review metrics (total_reviews, merged_prs, acceptance_rate, etc.)
858
- """
859
  total_reviews = len(metadata_list)
860
 
861
- # Count merged PRs
862
  merged_prs = sum(1 for review_meta in metadata_list
863
- if get_pr_status_from_metadata(review_meta) == 'merged')
864
 
865
- # Count rejected PRs
866
  rejected_prs = sum(1 for review_meta in metadata_list
867
  if get_pr_status_from_metadata(review_meta) == 'closed')
868
 
869
- # Count pending PRs
870
  pending_prs = sum(1 for review_meta in metadata_list
871
  if get_pr_status_from_metadata(review_meta) == 'open')
872
 
@@ -883,36 +687,14 @@ def calculate_review_stats_from_metadata(metadata_list):
883
 
884
 
885
  def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
886
- """
887
- Calculate monthly metrics for all agents for visualization.
888
-
889
- Args:
890
- all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata
891
- agents: List of agent dictionaries with metadata
892
-
893
- Returns:
894
- dict: {
895
- 'agents': list of agent names,
896
- 'months': list of month labels (e.g., '2025-01'),
897
- 'data': {
898
- agent_name: {
899
- 'acceptance_rates': list of acceptance rates by month,
900
- 'total_reviews': list of review counts by month,
901
- 'merged_prs': list of merged PR counts by month,
902
- }
903
- }
904
- }
905
- """
906
- # Create mapping from agent_identifier to agent_name
907
  identifier_to_name = {agent.get('github_identifier'): agent.get('name') for agent in agents if agent.get('github_identifier')}
908
 
909
  if not all_metadata_dict:
910
  return {'agents': [], 'months': [], 'data': {}}
911
 
912
- # Group by agent and month
913
  agent_month_data = defaultdict(lambda: defaultdict(list))
914
 
915
- # Flatten the dict of lists into a single list with agent_identifier added
916
  for agent_identifier, metadata_list in all_metadata_dict.items():
917
  for review_meta in metadata_list:
918
  reviewed_at = review_meta.get('reviewed_at')
@@ -920,7 +702,6 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
920
  if not reviewed_at:
921
  continue
922
 
923
- # Get agent_name from identifier
924
  agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
925
 
926
  try:
@@ -931,13 +712,11 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
931
  print(f"Warning: Could not parse date '{reviewed_at}': {e}")
932
  continue
933
 
934
- # Get all unique months and sort them
935
  all_months = set()
936
  for agent_data in agent_month_data.values():
937
  all_months.update(agent_data.keys())
938
  months = sorted(list(all_months))
939
 
940
- # Calculate metrics for each agent and month
941
  result_data = {}
942
  for agent_name, month_dict in agent_month_data.items():
943
  acceptance_rates = []
@@ -947,18 +726,14 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
947
  for month in months:
948
  reviews_in_month = month_dict.get(month, [])
949
 
950
- # Count merged PRs
951
  merged_count = sum(1 for review in reviews_in_month
952
  if get_pr_status_from_metadata(review) == 'merged')
953
 
954
- # Count rejected PRs
955
  rejected_count = sum(1 for review in reviews_in_month
956
  if get_pr_status_from_metadata(review) == 'closed')
957
 
958
- # Total reviews
959
  total_count = len(reviews_in_month)
960
 
961
- # Calculate acceptance rate (exclude pending PRs)
962
  completed_count = merged_count + rejected_count
963
  acceptance_rate = (merged_count / completed_count * 100) if completed_count > 0 else None
964
 
@@ -982,16 +757,7 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
982
 
983
 
984
  def construct_leaderboard_from_metadata(all_metadata_dict, agents):
985
- """
986
- Construct leaderboard from in-memory review metadata.
987
-
988
- Args:
989
- all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata
990
- agents: List of agent dictionaries with metadata
991
-
992
- Returns:
993
- Dictionary of agent stats.
994
- """
995
  if not agents:
996
  print("Error: No agents found")
997
  return {}
@@ -1002,10 +768,7 @@ def construct_leaderboard_from_metadata(all_metadata_dict, agents):
1002
  identifier = agent.get('github_identifier')
1003
  agent_name = agent.get('name', 'Unknown')
1004
 
1005
- # Get metadata for this agent from the dictionary
1006
  bot_metadata = all_metadata_dict.get(identifier, [])
1007
-
1008
- # Calculate stats
1009
  stats = calculate_review_stats_from_metadata(bot_metadata)
1010
 
1011
  cache_dict[identifier] = {
@@ -1019,16 +782,7 @@ def construct_leaderboard_from_metadata(all_metadata_dict, agents):
1019
 
1020
 
1021
  def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
1022
- """
1023
- Save leaderboard data and monthly metrics to HuggingFace dataset as swe-review.json.
1024
-
1025
- Args:
1026
- leaderboard_dict: Dictionary of agent stats from construct_leaderboard_from_metadata()
1027
- monthly_metrics: Monthly metrics data from calculate_monthly_metrics_by_agent()
1028
-
1029
- Returns:
1030
- bool: True if successful, False otherwise
1031
- """
1032
  try:
1033
  token = get_hf_token()
1034
  if not token:
@@ -1037,7 +791,6 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
1037
  api = HfApi(token=token)
1038
  filename = "swe-review.json"
1039
 
1040
- # Combine leaderboard and monthly metrics
1041
  combined_data = {
1042
  'last_updated': datetime.now(timezone.utc).isoformat(),
1043
  'leaderboard': leaderboard_dict,
@@ -1047,12 +800,10 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
1047
  }
1048
  }
1049
 
1050
- # Save locally first
1051
  with open(filename, 'w') as f:
1052
  json.dump(combined_data, f, indent=2)
1053
 
1054
  try:
1055
- # Upload to HuggingFace with retry logic
1056
  upload_file_with_backoff(
1057
  api=api,
1058
  path_or_fileobj=filename,
@@ -1062,7 +813,6 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
1062
  )
1063
  return True
1064
  finally:
1065
- # Always clean up local file
1066
  if os.path.exists(filename):
1067
  os.remove(filename)
1068
 
@@ -1074,21 +824,19 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
1074
 
1075
 
1076
  # =============================================================================
1077
- # MAIN MINING FUNCTION
1078
  # =============================================================================
1079
 
1080
  def mine_all_agents():
1081
  """
1082
- Mine review metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace.
1083
- Downloads GHArchive data first, then uses ONE DuckDB query for ALL agents, then batch uploads with time gaps.
1084
  """
1085
- # Step 1: Download GHArchive data
1086
  print(f"\n[1/5] Downloading GHArchive data...")
1087
 
1088
  if not download_all_gharchive_data():
1089
  print("Warning: Download had errors, continuing with available data...")
1090
 
1091
- # Step 2: Load agent metadata from HuggingFace
1092
  print(f"\n[2/5] Loading agent metadata...")
1093
 
1094
  agents = load_agents_from_hf()
@@ -1096,7 +844,6 @@ def mine_all_agents():
1096
  print("Error: No agents found")
1097
  return
1098
 
1099
- # Extract all identifiers
1100
  identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
1101
  if not identifiers:
1102
  print("Error: No valid agent identifiers found")
@@ -1104,55 +851,42 @@ def mine_all_agents():
1104
 
1105
  print(f"\n[3/5] Mining review metadata ({len(identifiers)} agents, {LEADERBOARD_TIME_FRAME_DAYS} days)...")
1106
 
1107
- # Initialize DuckDB connection
1108
  try:
1109
  conn = get_duckdb_connection()
1110
  except Exception as e:
1111
  print(f"Failed to initialize DuckDB connection: {str(e)}")
1112
  return
1113
 
1114
- # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
1115
  current_time = datetime.now(timezone.utc)
1116
  end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
1117
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
1118
 
1119
  try:
1120
- # Use single query for all agents
1121
- all_metadata = fetch_all_pr_metadata_single_query(
1122
  conn, identifiers, start_date, end_date
1123
  )
1124
 
1125
- # Calculate summary statistics
1126
- total_prs = sum(len(metadata_list) for metadata_list in all_metadata.values())
1127
  agents_with_data = sum(1 for metadata_list in all_metadata.values() if metadata_list)
1128
 
1129
- print(f"Query complete: {total_prs} PRs found for {agents_with_data}/{len(agents)} agents")
1130
-
1131
  except Exception as e:
1132
  print(f"Error during DuckDB fetch: {str(e)}")
1133
  import traceback
1134
  traceback.print_exc()
1135
  return
1136
  finally:
1137
- # Close DuckDB connection
1138
  conn.close()
1139
 
1140
- # Step 4: Batch upload review metadata with time gaps
1141
  print(f"\n[4/5] Uploading review metadata...")
1142
 
1143
  success_count, error_count = batch_upload_review_metadata(all_metadata)
1144
 
1145
- # Step 5: Construct and save leaderboard data
1146
  print(f"\n[5/5] Saving leaderboard...")
1147
 
1148
  try:
1149
- # Construct leaderboard from in-memory data
1150
  leaderboard_dict = construct_leaderboard_from_metadata(all_metadata, agents)
1151
-
1152
- # Calculate monthly metrics from in-memory data
1153
  monthly_metrics = calculate_monthly_metrics_by_agent(all_metadata, agents)
1154
-
1155
- # Save to HuggingFace
1156
  save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics)
1157
 
1158
  print(f"\nCOMPLETE: {success_count} files uploaded" + (f", {error_count} errors" if error_count > 0 else ""))
@@ -1168,30 +902,16 @@ def mine_all_agents():
1168
  # =============================================================================
1169
 
1170
  def setup_scheduler():
1171
- """
1172
- Set up APScheduler to run mining jobs periodically.
1173
- Schedule is configurable via environment variables.
1174
-
1175
- Environment variables:
1176
- - SCHEDULE_ENABLED: Enable/disable scheduler (default: true)
1177
- - SCHEDULE_DAY_OF_MONTH: Day of month to run (default: 22, fourth week)
1178
- - SCHEDULE_HOUR: Hour to run (0-23, default: 0)
1179
- - SCHEDULE_MINUTE: Minute to run (0-59, default: 0)
1180
- - SCHEDULE_TIMEZONE: Timezone for scheduling (default: UTC)
1181
- """
1182
- # Configure logging for APScheduler
1183
  logging.basicConfig(
1184
  level=logging.INFO,
1185
  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
1186
  )
1187
 
1188
- # Disable verbose HTTP request logging from httpx (used by huggingface_hub)
1189
  logging.getLogger('httpx').setLevel(logging.WARNING)
1190
 
1191
- # Create scheduler
1192
  scheduler = BlockingScheduler(timezone=SCHEDULE_TIMEZONE)
1193
 
1194
- # Create cron trigger with configured schedule (monthly on specific day)
1195
  trigger = CronTrigger(
1196
  day=SCHEDULE_DAY_OF_MONTH,
1197
  hour=SCHEDULE_HOUR,
@@ -1199,7 +919,6 @@ def setup_scheduler():
1199
  timezone=SCHEDULE_TIMEZONE
1200
  )
1201
 
1202
- # Add job to scheduler
1203
  scheduler.add_job(
1204
  mine_all_agents,
1205
  trigger=trigger,
@@ -1208,13 +927,11 @@ def setup_scheduler():
1208
  replace_existing=True
1209
  )
1210
 
1211
- # Print schedule information
1212
  from datetime import datetime
1213
  next_run = trigger.get_next_fire_time(None, datetime.now(trigger.timezone))
1214
  print(f"Scheduler: Monthly on day {SCHEDULE_DAY_OF_MONTH} at {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d} {SCHEDULE_TIMEZONE}")
1215
  print(f"Next run: {next_run}\n")
1216
 
1217
- # Start scheduler (blocking call)
1218
  print(f"\nScheduler started")
1219
  scheduler.start()
1220
 
@@ -1225,8 +942,6 @@ def setup_scheduler():
1225
 
1226
  if __name__ == "__main__":
1227
  if SCHEDULE_ENABLED:
1228
- # Run with scheduler
1229
  setup_scheduler()
1230
  else:
1231
- # Run without scheduler, just mine once
1232
  mine_all_agents()
 
25
 
26
  AGENTS_REPO = "SWE-Arena/bot_metadata"
27
  REVIEW_METADATA_REPO = "SWE-Arena/review_metadata"
28
+ LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata"
29
+ LEADERBOARD_TIME_FRAME_DAYS = 180
30
+ GHARCHIVE_DATA_DIR = "../gharchive/data"
31
+ DUCKDB_CACHE_FILE = "cache.duckdb"
32
 
33
+ # OPTIMIZED DUCKDB CONFIGURATION
34
+ DUCKDB_THREADS = 8
35
+ DUCKDB_MEMORY_LIMIT = "64GB"
36
+
37
+ # Streaming batch configuration
38
+ BATCH_SIZE_DAYS = 7 # Process 1 week at a time (~168 hourly files)
39
+ # At this size: ~7 days × 24 files × ~100MB per file = ~16GB uncompressed per batch
40
 
41
  # Download configuration
42
+ DOWNLOAD_WORKERS = 4
43
+ DOWNLOAD_RETRY_DELAY = 2
44
+ MAX_RETRIES = 5
45
 
46
  # Upload configuration
47
+ UPLOAD_DELAY_SECONDS = 5
48
+ UPLOAD_INITIAL_BACKOFF = 60
49
+ UPLOAD_MAX_BACKOFF = 3600
50
 
51
  # Scheduler configuration
52
+ SCHEDULE_ENABLED = False
53
+ SCHEDULE_DAY_OF_MONTH = 22
54
+ SCHEDULE_HOUR = 0
55
+ SCHEDULE_MINUTE = 0
56
+ SCHEDULE_TIMEZONE = 'UTC'
57
 
58
  # =============================================================================
59
  # UTILITY FUNCTIONS
 
84
 
85
 
86
  def normalize_date_format(date_string):
87
+ """Convert date strings or datetime objects to standardized ISO 8601 format with Z suffix."""
 
 
 
 
 
 
88
  if not date_string or date_string == 'N/A':
89
  return 'N/A'
90
 
91
  try:
92
  import re
 
 
93
 
94
+ if isinstance(date_string, datetime):
95
+ return date_string.strftime('%Y-%m-%dT%H:%M:%SZ')
96
+
97
+ date_string = re.sub(r'\s+', ' ', date_string.strip())
98
  date_string = date_string.replace(' ', 'T')
99
 
 
 
100
  if len(date_string) >= 3:
101
  if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]:
102
  date_string = date_string + ':00'
103
 
 
104
  dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
 
 
105
  return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
106
  except Exception as e:
107
  print(f"Warning: Could not parse date '{date_string}': {e}")
 
121
  # =============================================================================
122
 
123
  def download_file(url):
124
+ """Download a GHArchive file with retry logic."""
 
 
 
 
 
 
 
 
125
  filename = url.split("/")[-1]
126
  filepath = os.path.join(GHARCHIVE_DATA_DIR, filename)
127
 
 
128
  if os.path.exists(filepath):
129
  return True
130
 
 
131
  for attempt in range(MAX_RETRIES):
132
  try:
133
  response = requests.get(url, timeout=30)
 
138
 
139
  except requests.exceptions.HTTPError as e:
140
  if e.response.status_code == 404:
 
141
  return False
142
  else:
 
143
  if attempt < MAX_RETRIES - 1:
144
+ wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
145
  print(f" ⚠ {filename}: HTTP error {e.response.status_code}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
146
  time.sleep(wait_time)
147
  else:
 
150
  except (requests.exceptions.Timeout,
151
  requests.exceptions.ConnectionError,
152
  requests.exceptions.ReadTimeout) as e:
 
153
  if attempt < MAX_RETRIES - 1:
154
+ wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
155
  print(f" ⚠ {filename}: {type(e).__name__}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
156
  time.sleep(wait_time)
157
  else:
158
  print(f" ✗ {filename}: Failed after {MAX_RETRIES} attempts - {type(e).__name__}")
159
 
160
  except Exception as e:
 
161
  if attempt < MAX_RETRIES - 1:
162
  wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
163
  print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
 
169
 
170
 
171
  def download_all_gharchive_data():
172
+ """Download all GHArchive data files for the last LEADERBOARD_TIME_FRAME_DAYS."""
 
 
 
 
 
 
 
173
  os.makedirs(GHARCHIVE_DATA_DIR, exist_ok=True)
174
 
 
175
  end_date = datetime.now()
176
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
177
 
 
179
  current_date = start_date
180
  while current_date <= end_date:
181
  date_str = current_date.strftime("%Y-%m-%d")
 
182
  for hour in range(24):
183
  url = f"https://data.gharchive.org/{date_str}-{hour}.json.gz"
184
  urls.append(url)
 
188
 
189
  try:
190
  with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor:
 
191
  futures = [executor.submit(download_file, url) for url in urls]
 
 
192
  for future in as_completed(futures):
193
  downloads_processed += 1
194
 
 
203
 
204
 
205
  # =============================================================================
206
+ # HUGGINGFACE API WRAPPERS
207
  # =============================================================================
208
 
209
  def is_retryable_error(e):
210
+ """Check if exception is retryable (rate limit or timeout error)."""
 
 
 
211
  if isinstance(e, HfHubHTTPError):
212
  if e.response.status_code == 429:
213
  return True
214
 
 
215
  if isinstance(e, (requests.exceptions.Timeout,
216
  requests.exceptions.ReadTimeout,
217
  requests.exceptions.ConnectTimeout)):
218
  return True
219
 
 
220
  if isinstance(e, Exception):
221
  error_str = str(e).lower()
222
  if 'timeout' in error_str or 'timed out' in error_str:
 
237
  )
238
  )
239
  def list_repo_files_with_backoff(api, **kwargs):
240
+ """Wrapper for api.list_repo_files() with exponential backoff."""
241
  return api.list_repo_files(**kwargs)
242
 
243
 
 
253
  )
254
  )
255
  def hf_hub_download_with_backoff(**kwargs):
256
+ """Wrapper for hf_hub_download() with exponential backoff."""
257
  return hf_hub_download(**kwargs)
258
 
259
 
 
269
  )
270
  )
271
  def upload_file_with_backoff(api, **kwargs):
272
+ """Wrapper for api.upload_file() with exponential backoff."""
273
  return api.upload_file(**kwargs)
274
 
275
 
 
285
  )
286
  )
287
  def upload_folder_with_backoff(api, **kwargs):
288
+ """Wrapper for api.upload_folder() with exponential backoff."""
289
  return api.upload_folder(**kwargs)
290
 
291
 
292
  def get_duckdb_connection():
293
  """
294
+ Initialize DuckDB connection with OPTIMIZED memory settings.
295
+ Uses persistent database and reduced memory footprint.
 
 
296
  """
 
297
  conn = duckdb.connect(DUCKDB_CACHE_FILE)
298
 
299
+ # OPTIMIZED SETTINGS
300
+ conn.execute(f"SET threads TO {DUCKDB_THREADS};")
301
+ conn.execute("SET preserve_insertion_order = false;")
302
+ conn.execute("SET enable_object_cache = true;")
303
+ conn.execute("SET temp_directory = '/tmp/duckdb_temp';")
304
+ conn.execute(f"SET memory_limit = '{DUCKDB_MEMORY_LIMIT}';")
305
+ conn.execute(f"SET max_memory = '{DUCKDB_MEMORY_LIMIT}';")
306
 
307
  return conn
308
 
309
 
310
  def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DIR):
311
+ """Generate file path patterns for GHArchive data in date range (only existing files)."""
 
 
 
 
 
 
 
 
 
 
 
312
  file_patterns = []
313
  missing_dates = set()
314
 
 
316
  end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
317
 
318
  while current_date <= end_day:
 
319
  date_has_files = False
320
  for hour in range(24):
321
  pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}-{hour}.json.gz")
 
322
  if os.path.exists(pattern):
323
  file_patterns.append(pattern)
324
  date_has_files = True
325
 
 
326
  if not date_has_files:
327
  missing_dates.add(current_date.strftime('%Y-%m-%d'))
328
 
 
329
  current_date += timedelta(days=1)
330
 
 
331
  if missing_dates:
332
+ print(f" Skipping {len(missing_dates)} date(s) with no data")
333
 
334
  return file_patterns
335
 
336
 
337
  # =============================================================================
338
+ # STREAMING BATCH PROCESSING FOR REVIEW METADATA
339
  # =============================================================================
340
 
341
+ def fetch_all_review_metadata_streaming(conn, identifiers, start_date, end_date):
342
  """
343
+ OPTIMIZED: Fetch review metadata using streaming batch processing.
344
 
345
+ Processes GHArchive files in BATCH_SIZE_DAYS chunks to limit memory usage.
346
+ Instead of loading 180 days (4,344 files) at once, processes 7 days at a time.
347
+
348
+ This prevents OOM errors by:
349
+ 1. Only keeping ~168 hourly files in memory per batch (vs 4,344)
350
+ 2. Incrementally building the results dictionary
351
+ 3. Allowing DuckDB to garbage collect after each batch
352
 
353
  Args:
354
  conn: DuckDB connection instance
 
357
  end_date: End datetime (timezone-aware)
358
 
359
  Returns:
360
+ Dictionary mapping agent identifier to list of review metadata
 
 
 
 
 
 
 
 
 
 
 
 
361
  """
362
+ identifier_list = ', '.join([f"'{id}'" for id in identifiers])
363
+ metadata_by_agent = defaultdict(list)
364
 
365
+ # Calculate total batches
366
+ total_days = (end_date - start_date).days
367
+ total_batches = (total_days // BATCH_SIZE_DAYS) + 1
368
 
369
+ # Process in configurable batches
370
+ current_date = start_date
371
+ batch_num = 0
372
+ total_reviews = 0
373
 
374
+ print(f" Streaming {total_batches} batches of {BATCH_SIZE_DAYS}-day intervals...")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
375
 
376
+ while current_date <= end_date:
377
+ batch_num += 1
378
+ batch_end = min(current_date + timedelta(days=BATCH_SIZE_DAYS - 1), end_date)
 
 
 
 
 
 
 
 
379
 
380
+ # Get file patterns for THIS BATCH ONLY
381
+ file_patterns = generate_file_path_patterns(current_date, batch_end)
382
+
383
+ if not file_patterns:
384
+ print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} - NO DATA")
385
+ current_date = batch_end + timedelta(days=1)
386
+ continue
387
+
388
+ # Progress indicator
389
+ print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} ({len(file_patterns)} files)... ", end="", flush=True)
390
+
391
+ # Build file patterns SQL for THIS BATCH
392
+ file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']'
393
+
394
+ # SIMPLIFIED query for review metadata
395
+ # Focuses on PullRequestReviewEvent and tracks PR status
396
+ query = f"""
397
+ WITH review_events AS (
398
+ SELECT
399
+ payload.pull_request.html_url as pr_url,
400
+ actor.login as reviewer,
401
+ COALESCE(payload.review.submitted_at, created_at) as reviewed_at
402
+ FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
403
+ WHERE
404
+ type = 'PullRequestReviewEvent'
405
+ AND payload.pull_request.html_url IS NOT NULL
406
+ AND actor.login IN ({identifier_list})
407
+ ),
408
+ pr_status AS (
409
+ SELECT
410
+ payload.pull_request.html_url as pr_url,
411
+ payload.pull_request.merged as is_merged,
412
+ payload.pull_request.merged_at as merged_at,
413
+ payload.pull_request.closed_at as closed_at,
414
+ ROW_NUMBER() OVER (PARTITION BY payload.pull_request.html_url ORDER BY created_at DESC) as rn
415
+ FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
416
+ WHERE
417
+ type = 'PullRequestEvent'
418
+ AND payload.action = 'closed'
419
+ AND payload.pull_request.html_url IS NOT NULL
420
+ AND payload.pull_request.html_url IN (SELECT DISTINCT pr_url FROM review_events)
421
+ )
422
+ SELECT
423
+ re.reviewer,
424
+ re.pr_url as url,
425
+ re.reviewed_at,
426
+ ps.merged_at,
427
+ ps.closed_at
428
+ FROM review_events re
429
+ LEFT JOIN (SELECT * FROM pr_status WHERE rn = 1) ps ON re.pr_url = ps.pr_url
430
+ ORDER BY re.reviewer, re.reviewed_at DESC
431
+ """
432
+
433
+ try:
434
  results = conn.execute(query).fetchall()
435
+ batch_reviews = 0
436
+
437
+ # Add results to accumulating dictionary
438
+ for row in results:
439
+ reviewer = row[0]
440
+ url = row[1]
441
+ reviewed_at = normalize_date_format(row[2]) if row[2] else None
442
+ merged_at = normalize_date_format(row[3]) if row[3] else None
443
+ closed_at = normalize_date_format(row[4]) if row[4] else None
444
+
445
+ if not url or not reviewed_at:
446
+ continue
447
+
448
+ review_metadata = {
449
+ 'url': url,
450
+ 'reviewed_at': reviewed_at,
451
+ 'merged_at': merged_at,
452
+ 'closed_at': closed_at,
453
+ }
454
 
455
+ metadata_by_agent[reviewer].append(review_metadata)
456
+ batch_reviews += 1
457
+ total_reviews += 1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
458
 
459
+ print(f"✓ {batch_reviews} reviews found")
 
460
 
461
+ except Exception as e:
462
+ print(f"\n ✗ Batch {batch_num} error: {str(e)}")
463
+ import traceback
464
+ traceback.print_exc()
465
+
466
+ # Move to next batch
467
+ current_date = batch_end + timedelta(days=1)
468
+
469
+ # Final summary
470
+ agents_with_data = sum(1 for reviews in metadata_by_agent.values() if reviews)
471
+ print(f"\n ✓ Complete: {total_reviews} reviews found for {agents_with_data}/{len(identifiers)} agents")
472
+
473
+ return dict(metadata_by_agent)
474
 
475
 
476
  # =============================================================================
477
+ # HUGGINGFACE STORAGE FUNCTIONS
478
  # =============================================================================
479
 
480
  def group_metadata_by_date(metadata_list):
481
+ """Group review metadata by date for daily storage."""
 
 
 
482
  grouped = defaultdict(list)
483
 
484
  for review_meta in metadata_list:
 
497
 
498
 
499
  def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type, commit_message, max_retries=MAX_RETRIES):
500
+ """Upload a single file with exponential backoff retry logic."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
501
  for attempt in range(max_retries):
502
  try:
503
  upload_file_with_backoff(
 
511
  return True
512
  except Exception as e:
513
  if attempt < max_retries - 1:
 
514
  wait_time = min(UPLOAD_INITIAL_BACKOFF * (2 ** attempt), UPLOAD_MAX_BACKOFF)
515
  print(f" {e} error on attempt {attempt + 1}/{max_retries}. Retrying in {wait_time}s...")
516
  time.sleep(wait_time)
 
521
 
522
 
523
  def batch_upload_review_metadata(all_metadata):
524
+ """Upload review metadata for all agents with time gaps between uploads."""
 
 
 
 
 
 
 
 
 
525
  try:
526
  token = get_hf_token()
527
  if not token:
 
533
  error_count = 0
534
  total_files = 0
535
 
 
536
  for agent_identifier, metadata_list in all_metadata.items():
537
  if metadata_list:
538
  grouped = group_metadata_by_date(metadata_list)
 
546
  if not metadata_list:
547
  continue
548
 
 
549
  grouped = group_metadata_by_date(metadata_list)
550
 
 
551
  agent_temp_dir = tempfile.mkdtemp()
552
 
553
  try:
 
554
  local_files = []
555
  for (review_year, month, day), day_metadata in grouped.items():
556
  filename = f"{review_year}.{month:02d}.{day:02d}.jsonl"
557
  local_path = os.path.join(agent_temp_dir, filename)
558
  repo_path = f"{agent_identifier}/{filename}"
559
 
 
560
  day_metadata.sort(key=lambda x: x.get('reviewed_at', ''), reverse=True)
 
 
561
  save_jsonl(local_path, day_metadata)
562
  local_files.append((local_path, repo_path, len(day_metadata)))
563
 
 
564
  agent_success = 0
565
  agent_error = 0
566
 
 
582
  agent_error += 1
583
  error_count += 1
584
 
 
585
  if file_idx < len(local_files):
586
  time.sleep(UPLOAD_DELAY_SECONDS)
587
 
588
  finally:
 
589
  if os.path.exists(agent_temp_dir):
590
  import shutil
591
  shutil.rmtree(agent_temp_dir)
 
605
 
606
 
607
  def load_agents_from_hf():
608
+ """Load all agent metadata JSON files from HuggingFace dataset."""
 
 
 
 
609
  try:
610
  api = HfApi()
611
  agents = []
612
 
 
613
  files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset")
 
 
614
  json_files = [f for f in files if f.endswith('.json')]
615
 
 
616
  for json_file in json_files:
617
  try:
618
  file_path = hf_hub_download_with_backoff(
 
624
  with open(file_path, 'r') as f:
625
  agent_data = json.load(f)
626
 
 
627
  if agent_data.get('status') != 'public':
628
  continue
629
 
 
630
  github_identifier = json_file.replace('.json', '')
631
  agent_data['github_identifier'] = github_identifier
632
 
 
637
  continue
638
 
639
  print(f"Download complete: {len(agents)} agents")
 
640
  return agents
641
 
642
  except Exception as e:
 
644
  return []
645
 
646
 
647
+ # =============================================================================
648
+ # STATISTICS CALCULATION
649
+ # =============================================================================
650
 
651
+ def get_pr_status_from_metadata(review_meta):
652
+ """Derive PR status from merged_at and closed_at fields."""
 
653
  merged_at = review_meta.get('merged_at')
654
  closed_at = review_meta.get('closed_at')
655
 
 
662
 
663
 
664
  def calculate_review_stats_from_metadata(metadata_list):
665
+ """Calculate statistics from a list of review metadata."""
 
 
 
 
 
666
  total_reviews = len(metadata_list)
667
 
 
668
  merged_prs = sum(1 for review_meta in metadata_list
669
+ if get_pr_status_from_metadata(review_meta) == 'merged')
670
 
 
671
  rejected_prs = sum(1 for review_meta in metadata_list
672
  if get_pr_status_from_metadata(review_meta) == 'closed')
673
 
 
674
  pending_prs = sum(1 for review_meta in metadata_list
675
  if get_pr_status_from_metadata(review_meta) == 'open')
676
 
 
687
 
688
 
689
  def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
690
+ """Calculate monthly metrics for all agents for visualization."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
691
  identifier_to_name = {agent.get('github_identifier'): agent.get('name') for agent in agents if agent.get('github_identifier')}
692
 
693
  if not all_metadata_dict:
694
  return {'agents': [], 'months': [], 'data': {}}
695
 
 
696
  agent_month_data = defaultdict(lambda: defaultdict(list))
697
 
 
698
  for agent_identifier, metadata_list in all_metadata_dict.items():
699
  for review_meta in metadata_list:
700
  reviewed_at = review_meta.get('reviewed_at')
 
702
  if not reviewed_at:
703
  continue
704
 
 
705
  agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
706
 
707
  try:
 
712
  print(f"Warning: Could not parse date '{reviewed_at}': {e}")
713
  continue
714
 
 
715
  all_months = set()
716
  for agent_data in agent_month_data.values():
717
  all_months.update(agent_data.keys())
718
  months = sorted(list(all_months))
719
 
 
720
  result_data = {}
721
  for agent_name, month_dict in agent_month_data.items():
722
  acceptance_rates = []
 
726
  for month in months:
727
  reviews_in_month = month_dict.get(month, [])
728
 
 
729
  merged_count = sum(1 for review in reviews_in_month
730
  if get_pr_status_from_metadata(review) == 'merged')
731
 
 
732
  rejected_count = sum(1 for review in reviews_in_month
733
  if get_pr_status_from_metadata(review) == 'closed')
734
 
 
735
  total_count = len(reviews_in_month)
736
 
 
737
  completed_count = merged_count + rejected_count
738
  acceptance_rate = (merged_count / completed_count * 100) if completed_count > 0 else None
739
 
 
757
 
758
 
759
  def construct_leaderboard_from_metadata(all_metadata_dict, agents):
760
+ """Construct leaderboard from in-memory review metadata."""
 
 
 
 
 
 
 
 
 
761
  if not agents:
762
  print("Error: No agents found")
763
  return {}
 
768
  identifier = agent.get('github_identifier')
769
  agent_name = agent.get('name', 'Unknown')
770
 
 
771
  bot_metadata = all_metadata_dict.get(identifier, [])
 
 
772
  stats = calculate_review_stats_from_metadata(bot_metadata)
773
 
774
  cache_dict[identifier] = {
 
782
 
783
 
784
  def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
785
+ """Save leaderboard data and monthly metrics to HuggingFace dataset."""
 
 
 
 
 
 
 
 
 
786
  try:
787
  token = get_hf_token()
788
  if not token:
 
791
  api = HfApi(token=token)
792
  filename = "swe-review.json"
793
 
 
794
  combined_data = {
795
  'last_updated': datetime.now(timezone.utc).isoformat(),
796
  'leaderboard': leaderboard_dict,
 
800
  }
801
  }
802
 
 
803
  with open(filename, 'w') as f:
804
  json.dump(combined_data, f, indent=2)
805
 
806
  try:
 
807
  upload_file_with_backoff(
808
  api=api,
809
  path_or_fileobj=filename,
 
813
  )
814
  return True
815
  finally:
 
816
  if os.path.exists(filename):
817
  os.remove(filename)
818
 
 
824
 
825
 
826
  # =============================================================================
827
+ # MINING FUNCTION
828
  # =============================================================================
829
 
830
  def mine_all_agents():
831
  """
832
+ Mine review metadata for all agents using STREAMING batch processing.
833
+ Downloads GHArchive data, then uses BATCH-based DuckDB queries.
834
  """
 
835
  print(f"\n[1/5] Downloading GHArchive data...")
836
 
837
  if not download_all_gharchive_data():
838
  print("Warning: Download had errors, continuing with available data...")
839
 
 
840
  print(f"\n[2/5] Loading agent metadata...")
841
 
842
  agents = load_agents_from_hf()
 
844
  print("Error: No agents found")
845
  return
846
 
 
847
  identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
848
  if not identifiers:
849
  print("Error: No valid agent identifiers found")
 
851
 
852
  print(f"\n[3/5] Mining review metadata ({len(identifiers)} agents, {LEADERBOARD_TIME_FRAME_DAYS} days)...")
853
 
 
854
  try:
855
  conn = get_duckdb_connection()
856
  except Exception as e:
857
  print(f"Failed to initialize DuckDB connection: {str(e)}")
858
  return
859
 
 
860
  current_time = datetime.now(timezone.utc)
861
  end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
862
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
863
 
864
  try:
865
+ # USE STREAMING FUNCTION
866
+ all_metadata = fetch_all_review_metadata_streaming(
867
  conn, identifiers, start_date, end_date
868
  )
869
 
870
+ total_reviews = sum(len(metadata_list) for metadata_list in all_metadata.values())
 
871
  agents_with_data = sum(1 for metadata_list in all_metadata.values() if metadata_list)
872
 
 
 
873
  except Exception as e:
874
  print(f"Error during DuckDB fetch: {str(e)}")
875
  import traceback
876
  traceback.print_exc()
877
  return
878
  finally:
 
879
  conn.close()
880
 
 
881
  print(f"\n[4/5] Uploading review metadata...")
882
 
883
  success_count, error_count = batch_upload_review_metadata(all_metadata)
884
 
 
885
  print(f"\n[5/5] Saving leaderboard...")
886
 
887
  try:
 
888
  leaderboard_dict = construct_leaderboard_from_metadata(all_metadata, agents)
 
 
889
  monthly_metrics = calculate_monthly_metrics_by_agent(all_metadata, agents)
 
 
890
  save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics)
891
 
892
  print(f"\nCOMPLETE: {success_count} files uploaded" + (f", {error_count} errors" if error_count > 0 else ""))
 
902
  # =============================================================================
903
 
904
  def setup_scheduler():
905
+ """Set up APScheduler to run mining jobs periodically."""
 
 
 
 
 
 
 
 
 
 
 
906
  logging.basicConfig(
907
  level=logging.INFO,
908
  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
909
  )
910
 
 
911
  logging.getLogger('httpx').setLevel(logging.WARNING)
912
 
 
913
  scheduler = BlockingScheduler(timezone=SCHEDULE_TIMEZONE)
914
 
 
915
  trigger = CronTrigger(
916
  day=SCHEDULE_DAY_OF_MONTH,
917
  hour=SCHEDULE_HOUR,
 
919
  timezone=SCHEDULE_TIMEZONE
920
  )
921
 
 
922
  scheduler.add_job(
923
  mine_all_agents,
924
  trigger=trigger,
 
927
  replace_existing=True
928
  )
929
 
 
930
  from datetime import datetime
931
  next_run = trigger.get_next_fire_time(None, datetime.now(trigger.timezone))
932
  print(f"Scheduler: Monthly on day {SCHEDULE_DAY_OF_MONTH} at {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d} {SCHEDULE_TIMEZONE}")
933
  print(f"Next run: {next_run}\n")
934
 
 
935
  print(f"\nScheduler started")
936
  scheduler.start()
937
 
 
942
 
943
  if __name__ == "__main__":
944
  if SCHEDULE_ENABLED:
 
945
  setup_scheduler()
946
  else:
 
947
  mine_all_agents()