zhimin-z commited on
Commit
17e5dc2
·
1 Parent(s): 3f0eb80

add streaming

Browse files
Files changed (1) hide show
  1. msr.py +180 -416
msr.py CHANGED
@@ -1,8 +1,3 @@
1
- """
2
- Minimalist PR Metadata Mining Script
3
- Mines PR metadata from locally downloaded GHArchive data via DuckDB and saves to HuggingFace dataset.
4
- """
5
-
6
  import json
7
  import os
8
  import time
@@ -30,31 +25,35 @@ load_dotenv()
30
 
31
  AGENTS_REPO = "SWE-Arena/bot_metadata"
32
  PR_METADATA_REPO = "SWE-Arena/pr_metadata"
33
- LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata" # HuggingFace dataset for leaderboard data
34
- LEADERBOARD_TIME_FRAME_DAYS = 180 # Time frame for leaderboard
35
- GHARCHIVE_DATA_DIR = "../gharchive/data" # Local GHArchive data directory
36
- DUCKDB_CACHE_FILE = "cache.duckdb" # Persistent DuckDB database for caching
 
 
 
 
37
 
38
- # DuckDB performance configuration
39
- DUCKDB_THREADS = 8 # Number of threads for parallel processing
40
- DUCKDB_MEMORY_LIMIT = "64GB" # Memory limit to prevent OOM crashes
41
 
42
  # Download configuration
43
- DOWNLOAD_WORKERS = 4 # Number of parallel download threads
44
- DOWNLOAD_RETRY_DELAY = 2 # Initial retry delay in seconds
45
- MAX_RETRIES = 5 # Maximum number of retries for each API call
46
 
47
  # Upload configuration
48
- UPLOAD_DELAY_SECONDS = 5 # Delay between individual file uploads to avoid rate limits
49
- UPLOAD_INITIAL_BACKOFF = 60 # Initial backoff time in seconds (1 minute)
50
- UPLOAD_MAX_BACKOFF = 3600 # Maximum backoff time in seconds (60 minutes)
51
 
52
  # Scheduler configuration
53
- SCHEDULE_ENABLED = True # Enable/disable scheduler
54
- SCHEDULE_DAY_OF_MONTH = 8 # Day of month (1-31) - 8nd is in the second week
55
- SCHEDULE_HOUR = 0 # Hour (0-23) - 12am midnight
56
- SCHEDULE_MINUTE = 0 # Minute (0-59)
57
- SCHEDULE_TIMEZONE = 'UTC' # Timezone for scheduling
58
 
59
  # =============================================================================
60
  # UTILITY FUNCTIONS
@@ -85,40 +84,24 @@ def save_jsonl(filename, data):
85
 
86
 
87
  def normalize_date_format(date_string):
88
- """
89
- Convert date strings or datetime objects to standardized ISO 8601 format with Z suffix.
90
- Handles both 'T' and space-separated datetime formats (including newlines).
91
- Examples:
92
- - 2025-10-15T23:23:47.983068 -> 2025-10-15T23:23:47Z
93
- - 2025-06-17 21:21:07+00 -> 2025-06-17T21:21:07Z
94
- - datetime object -> 2025-10-15T23:23:47Z
95
- """
96
  if not date_string or date_string == 'N/A':
97
  return 'N/A'
98
 
99
  try:
100
  import re
101
 
102
- # Handle datetime objects directly
103
  if isinstance(date_string, datetime):
104
  return date_string.strftime('%Y-%m-%dT%H:%M:%SZ')
105
 
106
- # Remove all whitespace (spaces, newlines, tabs) and replace with single space
107
  date_string = re.sub(r'\s+', ' ', date_string.strip())
108
-
109
- # Replace space with 'T' for ISO format compatibility
110
  date_string = date_string.replace(' ', 'T')
111
 
112
- # Fix incomplete timezone offset (+00 or -00 -> +00:00 or -00:00)
113
- # Check if timezone offset exists and is incomplete
114
  if len(date_string) >= 3:
115
  if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]:
116
  date_string = date_string + ':00'
117
 
118
- # Parse the date string (handles both with and without microseconds)
119
  dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
120
-
121
- # Convert to standardized format
122
  return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
123
  except Exception as e:
124
  print(f"Warning: Could not parse date '{date_string}': {e}")
@@ -138,23 +121,13 @@ def get_hf_token():
138
  # =============================================================================
139
 
140
  def download_file(url):
141
- """
142
- Download a GHArchive file with retry logic.
143
-
144
- Args:
145
- url: URL to download
146
-
147
- Returns:
148
- bool: True if successful, False otherwise
149
- """
150
  filename = url.split("/")[-1]
151
  filepath = os.path.join(GHARCHIVE_DATA_DIR, filename)
152
 
153
- # Skip if json.gz already exists
154
  if os.path.exists(filepath):
155
  return True
156
 
157
- # Download with retry logic
158
  for attempt in range(MAX_RETRIES):
159
  try:
160
  response = requests.get(url, timeout=30)
@@ -165,12 +138,10 @@ def download_file(url):
165
 
166
  except requests.exceptions.HTTPError as e:
167
  if e.response.status_code == 404:
168
- # File doesn't exist, don't retry
169
  return False
170
  else:
171
- # Other HTTP errors, retry
172
  if attempt < MAX_RETRIES - 1:
173
- wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt) # Exponential backoff
174
  print(f" ⚠ {filename}: HTTP error {e.response.status_code}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
175
  time.sleep(wait_time)
176
  else:
@@ -179,16 +150,14 @@ def download_file(url):
179
  except (requests.exceptions.Timeout,
180
  requests.exceptions.ConnectionError,
181
  requests.exceptions.ReadTimeout) as e:
182
- # Timeout/connection errors, retry
183
  if attempt < MAX_RETRIES - 1:
184
- wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt) # Exponential backoff
185
  print(f" ⚠ {filename}: {type(e).__name__}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
186
  time.sleep(wait_time)
187
  else:
188
  print(f" ✗ {filename}: Failed after {MAX_RETRIES} attempts - {type(e).__name__}")
189
 
190
  except Exception as e:
191
- # Other errors, retry
192
  if attempt < MAX_RETRIES - 1:
193
  wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
194
  print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
@@ -200,17 +169,9 @@ def download_file(url):
200
 
201
 
202
  def download_all_gharchive_data():
203
- """
204
- Download all GHArchive data files for the last LEADERBOARD_TIME_FRAME_DAYS.
205
- Uses parallel downloads with ThreadPoolExecutor.
206
-
207
- Returns:
208
- bool: True if all downloads completed (some may have failed), False if critical error
209
- """
210
- # Create data directory if it doesn't exist
211
  os.makedirs(GHARCHIVE_DATA_DIR, exist_ok=True)
212
 
213
- # Generate URLs for last N days (hourly files: 0-23 for each day)
214
  end_date = datetime.now()
215
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
216
 
@@ -218,7 +179,6 @@ def download_all_gharchive_data():
218
  current_date = start_date
219
  while current_date <= end_date:
220
  date_str = current_date.strftime("%Y-%m-%d")
221
- # Generate hourly URLs for this day (0-23)
222
  for hour in range(24):
223
  url = f"https://data.gharchive.org/{date_str}-{hour}.json.gz"
224
  urls.append(url)
@@ -228,10 +188,7 @@ def download_all_gharchive_data():
228
 
229
  try:
230
  with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor:
231
- # Submit all downloads
232
  futures = [executor.submit(download_file, url) for url in urls]
233
-
234
- # Wait for downloads to complete
235
  for future in as_completed(futures):
236
  downloads_processed += 1
237
 
@@ -246,25 +203,20 @@ def download_all_gharchive_data():
246
 
247
 
248
  # =============================================================================
249
- # HUGGINGFACE API WRAPPERS WITH ENHANCED BACKOFF
250
  # =============================================================================
251
 
252
  def is_retryable_error(e):
253
- """
254
- Check if exception is retryable (rate limit or timeout error).
255
- """
256
- # Check for rate limit error (429)
257
  if isinstance(e, HfHubHTTPError):
258
  if e.response.status_code == 429:
259
  return True
260
 
261
- # Check for timeout errors
262
  if isinstance(e, (requests.exceptions.Timeout,
263
  requests.exceptions.ReadTimeout,
264
  requests.exceptions.ConnectTimeout)):
265
  return True
266
 
267
- # Check if it's a timeout error wrapped in HfHubHTTPError
268
  if isinstance(e, Exception):
269
  error_str = str(e).lower()
270
  if 'timeout' in error_str or 'timed out' in error_str:
@@ -285,7 +237,7 @@ def is_retryable_error(e):
285
  )
286
  )
287
  def list_repo_files_with_backoff(api, **kwargs):
288
- """Wrapper for api.list_repo_files() with exponential backoff for retryable errors."""
289
  return api.list_repo_files(**kwargs)
290
 
291
 
@@ -301,7 +253,7 @@ def list_repo_files_with_backoff(api, **kwargs):
301
  )
302
  )
303
  def hf_hub_download_with_backoff(**kwargs):
304
- """Wrapper for hf_hub_download() with exponential backoff for retryable errors."""
305
  return hf_hub_download(**kwargs)
306
 
307
 
@@ -317,7 +269,7 @@ def hf_hub_download_with_backoff(**kwargs):
317
  )
318
  )
319
  def upload_file_with_backoff(api, **kwargs):
320
- """Wrapper for api.upload_file() with exponential backoff for retryable errors."""
321
  return api.upload_file(**kwargs)
322
 
323
 
@@ -333,44 +285,30 @@ def upload_file_with_backoff(api, **kwargs):
333
  )
334
  )
335
  def upload_folder_with_backoff(api, **kwargs):
336
- """Wrapper for api.upload_folder() with exponential backoff for retryable errors."""
337
  return api.upload_folder(**kwargs)
338
 
339
 
340
  def get_duckdb_connection():
341
  """
342
- Initialize DuckDB connection with persistent database and optimized parallelization.
343
-
344
- Returns:
345
- DuckDB connection object
346
  """
347
- # Use persistent database for caching results
348
  conn = duckdb.connect(DUCKDB_CACHE_FILE)
349
 
350
- # Optimize for parallel processing with memory limits
351
- conn.execute(f"SET threads TO {DUCKDB_THREADS};") # Configure parallel threads
352
- conn.execute("SET preserve_insertion_order = false;") # Better parallelization
353
- conn.execute("SET enable_object_cache = true;") # Cache objects for reuse
354
- conn.execute("SET temp_directory = '/tmp/duckdb_temp';") # Use fast temp storage if needed
355
- conn.execute(f"SET memory_limit = '{DUCKDB_MEMORY_LIMIT}';") # Limit memory to prevent OOM crashes
356
- conn.execute(f"SET max_memory = '{DUCKDB_MEMORY_LIMIT}';") # Hard memory cap
357
 
358
  return conn
359
 
360
 
361
  def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DIR):
362
- """
363
- Generate file path patterns for GHArchive data in date range.
364
- Only includes files that actually exist on disk.
365
-
366
- Args:
367
- start_date: Start datetime
368
- end_date: End datetime
369
- data_dir: Directory containing GHArchive data files
370
-
371
- Returns:
372
- List of file path patterns (hourly JSON.gz files) that exist
373
- """
374
  file_patterns = []
375
  missing_dates = set()
376
 
@@ -378,184 +316,160 @@ def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DI
378
  end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
379
 
380
  while current_date <= end_day:
381
- # Pattern for hourly JSON.gz files: 2024-11-15-{0..23}.json.gz
382
  date_has_files = False
383
  for hour in range(24):
384
  pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}-{hour}.json.gz")
385
- # Only add pattern if file exists
386
  if os.path.exists(pattern):
387
  file_patterns.append(pattern)
388
  date_has_files = True
389
 
390
- # Track missing dates
391
  if not date_has_files:
392
  missing_dates.add(current_date.strftime('%Y-%m-%d'))
393
 
394
- # Move to next day
395
  current_date += timedelta(days=1)
396
 
397
- # Print warning about missing dates
398
  if missing_dates:
399
- print(f" Warning: Skipping {len(missing_dates)} date(s) with no data files: {', '.join(sorted(missing_dates))}")
400
 
401
  return file_patterns
402
 
403
 
404
  # =============================================================================
405
- # DUCKDB QUERY FUNCTIONS
406
  # =============================================================================
407
 
408
- def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
409
- """
410
- Fetch PR metadata for ALL agents using ONE comprehensive DuckDB query.
411
-
412
- This query fetches:
413
- 1. PRs authored by agents (user.login matches identifier)
414
- 2. PR status (opened, merged, closed)
415
-
 
 
 
 
416
  Args:
417
  conn: DuckDB connection instance
418
- identifiers: List of GitHub usernames/bot identifiers
419
  start_date: Start datetime (timezone-aware)
420
  end_date: End datetime (timezone-aware)
421
-
422
  Returns:
423
- Dictionary mapping agent identifier to list of PR metadata:
424
- {
425
- 'agent-identifier': [
426
- {
427
- 'html_url': PR URL,
428
- 'created_at': Creation timestamp,
429
- 'merged_at': Merge timestamp (if merged, else None),
430
- 'closed_at': Close timestamp (if closed but not merged, else None)
431
- },
432
- ...
433
- ],
434
- ...
435
- }
436
  """
437
- # Generate file path patterns for the time range
438
- file_patterns = generate_file_path_patterns(start_date, end_date)
439
-
440
- if not file_patterns:
441
- print(" ✗ Error: No GHArchive data files found for the specified date range")
442
- return {}
443
-
444
- # Build identifier list for IN clause with proper escaping
445
  identifier_list = ', '.join([f"'{id}'" for id in identifiers])
446
-
447
- # Build file patterns list for SQL (as JSON array string)
448
- file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']'
449
-
450
- # ============================================================================
451
- # REFINED DUCKDB QUERY - Using struct accessors for parsed JSON
452
- # ============================================================================
453
- query = f"""
454
- WITH pr_events AS (
455
- -- Get all PR opened/closed events
456
- SELECT
457
- CONCAT(
458
- REPLACE(repo.url, 'api.github.com/repos/', 'github.com/'),
459
- '/pull/',
460
- CAST(payload.pull_request.number AS VARCHAR)
461
- ) as url,
462
- actor.login as pr_author,
463
- created_at as event_time,
464
- payload.action as event_action
465
- FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
466
- WHERE
467
- type = 'PullRequestEvent'
468
- AND payload.action IN ('opened', 'closed')
469
- AND payload.pull_request.number IS NOT NULL
470
- AND actor.login IN ({identifier_list})
471
- ),
472
- pr_timeline AS (
473
- -- Build timeline: opened_at and closed_at (closed could mean merged or rejected)
474
- SELECT
475
- url,
476
- pr_author,
477
- MIN(CASE WHEN event_action = 'opened' THEN event_time END) as created_at,
478
- MAX(CASE WHEN event_action = 'closed' THEN event_time END) as closed_at,
479
- -- Note: GHArchive doesn't distinguish merged vs closed, so merged_at = NULL
480
- NULL as merged_at
481
- FROM pr_events
482
- GROUP BY url, pr_author
483
- )
484
- SELECT
485
- url,
486
- pr_author,
487
- created_at,
488
- merged_at,
489
- closed_at
490
- FROM pr_timeline
491
- WHERE created_at IS NOT NULL
492
- ORDER BY created_at DESC
493
- """
494
-
495
- try:
496
- # Execute the query
497
- results = conn.execute(query).fetchall()
498
-
499
- if not results:
500
- print(f" ⚠ Warning: Query returned 0 results")
501
- print(f" Checked {len(identifiers)} agent(s): {', '.join(identifiers)}")
502
- return {}
503
-
504
- # Group results by agent identifier
505
- metadata_by_agent = defaultdict(list)
506
- unique_urls = set()
507
-
508
- for row in results:
509
- url = row[0]
510
- pr_author = row[1]
511
- created_at = normalize_date_format(row[2]) if row[2] else None
512
- merged_at = normalize_date_format(row[3]) if row[3] else None
513
- closed_at = normalize_date_format(row[4]) if row[4] else None
514
-
515
- # Skip if no valid URL
516
- if not url:
517
- continue
518
-
519
- # Track unique URLs for verification
520
- unique_urls.add(url)
521
-
522
- # Build metadata record
523
- pr_metadata = {
524
- 'html_url': url,
525
- 'created_at': created_at,
526
- 'merged_at': merged_at,
527
- 'closed_at': closed_at,
528
- }
529
-
530
- metadata_by_agent[pr_author].append(pr_metadata)
531
-
532
- # Log results per agent
533
- agents_with_data = sum(1 for prs in metadata_by_agent.values() if prs)
534
- print(f" ✓ Coverage: {agents_with_data}/{len(identifiers)} agents have PR data")
535
 
536
- for agent_id in sorted(metadata_by_agent.keys()):
537
- pr_count = len(metadata_by_agent[agent_id])
538
- print(f" - {agent_id}: {pr_count} PRs")
539
-
540
- # Convert defaultdict to regular dict before returning
541
- return dict(metadata_by_agent)
542
-
543
- except Exception as e:
544
- print(f" ✗ DuckDB query error: {str(e)}")
545
- import traceback
546
- traceback.print_exc()
547
- return {}
548
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
549
 
550
- # =============================================================================
551
- # HUGGINGFACE STORAGE FUNCTIONS WITH BATCH UPLOAD
552
- # =============================================================================
553
 
554
  def group_metadata_by_date(metadata_list):
555
- """
556
- Group PR metadata by date (year.month.day) for daily storage.
557
- Returns dict: {(year, month, day): [metadata_list]}
558
- """
559
  grouped = defaultdict(list)
560
 
561
  for pr_meta in metadata_list:
@@ -574,21 +488,7 @@ def group_metadata_by_date(metadata_list):
574
 
575
 
576
  def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type, commit_message, max_retries=MAX_RETRIES):
577
- """
578
- Upload a single file with exponential backoff retry logic.
579
-
580
- Args:
581
- api: HfApi instance
582
- local_path: Local file path
583
- repo_path: Path in repository
584
- repo_id: Repository ID
585
- repo_type: Repository type (e.g., "dataset")
586
- commit_message: Commit message
587
- max_retries: Maximum number of retries
588
-
589
- Returns:
590
- bool: True if successful, False otherwise
591
- """
592
  for attempt in range(max_retries):
593
  try:
594
  upload_file_with_backoff(
@@ -602,7 +502,6 @@ def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type
602
  return True
603
  except Exception as e:
604
  if attempt < max_retries - 1:
605
- # Calculate exponential backoff
606
  wait_time = min(UPLOAD_INITIAL_BACKOFF * (2 ** attempt), UPLOAD_MAX_BACKOFF)
607
  print(f" {e} error on attempt {attempt + 1}/{max_retries}. Retrying in {wait_time}s...")
608
  time.sleep(wait_time)
@@ -613,16 +512,7 @@ def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type
613
 
614
 
615
  def batch_upload_pr_metadata(all_metadata):
616
- """
617
- Upload PR metadata for all agents with time gaps between uploads.
618
- Each agent's data is uploaded as separate daily files with retry logic.
619
-
620
- Args:
621
- all_metadata: Dictionary mapping agent identifier to list of PR metadata
622
-
623
- Returns:
624
- tuple: (success_count, error_count)
625
- """
626
  try:
627
  token = get_hf_token()
628
  if not token:
@@ -634,7 +524,6 @@ def batch_upload_pr_metadata(all_metadata):
634
  error_count = 0
635
  total_files = 0
636
 
637
- # First, calculate total number of files to upload
638
  for agent_identifier, metadata_list in all_metadata.items():
639
  if metadata_list:
640
  grouped = group_metadata_by_date(metadata_list)
@@ -648,28 +537,21 @@ def batch_upload_pr_metadata(all_metadata):
648
  if not metadata_list:
649
  continue
650
 
651
- # Group by date
652
  grouped = group_metadata_by_date(metadata_list)
653
 
654
- # Create temporary files for this agent
655
  agent_temp_dir = tempfile.mkdtemp()
656
 
657
  try:
658
- # Prepare all files locally
659
  local_files = []
660
  for (pr_year, month, day), day_metadata in grouped.items():
661
  filename = f"{pr_year}.{month:02d}.{day:02d}.jsonl"
662
  local_path = os.path.join(agent_temp_dir, filename)
663
  repo_path = f"{agent_identifier}/{filename}"
664
 
665
- # Sort by created_at for better organization
666
  day_metadata.sort(key=lambda x: x.get('created_at', ''), reverse=True)
667
-
668
- # Save to temp file
669
  save_jsonl(local_path, day_metadata)
670
  local_files.append((local_path, repo_path, len(day_metadata)))
671
 
672
- # Upload each file with delay
673
  agent_success = 0
674
  agent_error = 0
675
 
@@ -691,12 +573,10 @@ def batch_upload_pr_metadata(all_metadata):
691
  agent_error += 1
692
  error_count += 1
693
 
694
- # Add delay between uploads (except for last file)
695
  if file_idx < len(local_files):
696
  time.sleep(UPLOAD_DELAY_SECONDS)
697
 
698
  finally:
699
- # Clean up temp directory
700
  if os.path.exists(agent_temp_dir):
701
  import shutil
702
  shutil.rmtree(agent_temp_dir)
@@ -716,22 +596,14 @@ def batch_upload_pr_metadata(all_metadata):
716
 
717
 
718
  def load_agents_from_hf():
719
- """
720
- Load all agent metadata JSON files from HuggingFace dataset.
721
-
722
- The github_identifier is extracted from the filename (e.g., 'agent-name[bot].json' -> 'agent-name[bot]')
723
- """
724
  try:
725
  api = HfApi()
726
  agents = []
727
 
728
- # List all files in the repository
729
  files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset")
730
-
731
- # Filter for JSON files only
732
  json_files = [f for f in files if f.endswith('.json')]
733
 
734
- # Download and parse each JSON file
735
  for json_file in json_files:
736
  try:
737
  file_path = hf_hub_download_with_backoff(
@@ -743,11 +615,9 @@ def load_agents_from_hf():
743
  with open(file_path, 'r') as f:
744
  agent_data = json.load(f)
745
 
746
- # Only process agents with status == "public"
747
  if agent_data.get('status') != 'public':
748
  continue
749
 
750
- # Extract github_identifier from filename (remove .json extension)
751
  github_identifier = json_file.replace('.json', '')
752
  agent_data['github_identifier'] = github_identifier
753
 
@@ -758,7 +628,6 @@ def load_agents_from_hf():
758
  continue
759
 
760
  print(f"Download complete: {len(agents)} agents")
761
-
762
  return agents
763
 
764
  except Exception as e:
@@ -766,28 +635,14 @@ def load_agents_from_hf():
766
  return []
767
 
768
 
769
- # =============================================================================
770
- # LEADERBOARD DATA COMPUTATION & STORAGE
771
- # =============================================================================
772
-
773
  def calculate_pr_stats_from_metadata(metadata_list):
774
- """
775
- Calculate statistics from a list of PR metadata.
776
-
777
- Returns a dictionary with comprehensive PR metrics.
778
- Acceptance rate = merged PRs / (merged PRs + closed but not merged PRs) * 100
779
- """
780
  total_prs = len(metadata_list)
781
  merged = sum(1 for pr_meta in metadata_list if pr_meta.get('merged_at'))
782
-
783
- # Count closed PRs (rejected) - those with closed_at but no merged_at
784
  closed_not_merged = sum(1 for pr_meta in metadata_list
785
  if pr_meta.get('closed_at') and not pr_meta.get('merged_at'))
786
 
787
- # Total decisions made = merged + closed (rejected)
788
  total_decisions = merged + closed_not_merged
789
-
790
- # Calculate acceptance rate based on decisions made
791
  acceptance_rate = (merged / total_decisions * 100) if total_decisions > 0 else 0
792
 
793
  return {
@@ -798,36 +653,14 @@ def calculate_pr_stats_from_metadata(metadata_list):
798
 
799
 
800
  def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
801
- """
802
- Calculate monthly metrics for all agents for visualization.
803
-
804
- Args:
805
- all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata
806
- agents: List of agent dictionaries with metadata
807
-
808
- Returns:
809
- dict: {
810
- 'agents': list of agent names,
811
- 'months': list of month labels (e.g., '2025-01'),
812
- 'data': {
813
- agent_name: {
814
- 'acceptance_rates': list of acceptance rates by month,
815
- 'total_prs': list of PR counts by month,
816
- 'merged_prs': list of merged PR counts by month,
817
- }
818
- }
819
- }
820
- """
821
- # Create mapping from agent_identifier to agent_name
822
  identifier_to_name = {agent.get('github_identifier'): agent.get('name') for agent in agents if agent.get('github_identifier')}
823
 
824
  if not all_metadata_dict:
825
  return {'agents': [], 'months': [], 'data': {}}
826
 
827
- # Group by agent and month
828
  agent_month_data = defaultdict(lambda: defaultdict(list))
829
 
830
- # Flatten the dict of lists into a single list with agent_identifier added
831
  for agent_identifier, metadata_list in all_metadata_dict.items():
832
  for pr_meta in metadata_list:
833
  created_at = pr_meta.get('created_at')
@@ -835,7 +668,6 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
835
  if not created_at:
836
  continue
837
 
838
- # Get agent_name from identifier
839
  agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
840
 
841
  try:
@@ -846,13 +678,11 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
846
  print(f"Warning: Could not parse date '{created_at}': {e}")
847
  continue
848
 
849
- # Get all unique months and sort them
850
  all_months = set()
851
  for agent_data in agent_month_data.values():
852
  all_months.update(agent_data.keys())
853
  months = sorted(list(all_months))
854
 
855
- # Calculate metrics for each agent and month
856
  result_data = {}
857
  for agent_name, month_dict in agent_month_data.items():
858
  acceptance_rates = []
@@ -863,17 +693,11 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
863
  for month in months:
864
  prs_in_month = month_dict.get(month, [])
865
 
866
- # Count merged PRs
867
  merged_count = sum(1 for pr in prs_in_month if pr.get('merged_at'))
868
-
869
- # Count closed but not merged
870
  closed_not_merged_count = sum(1 for pr in prs_in_month
871
  if pr.get('closed_at') and not pr.get('merged_at'))
872
-
873
- # Total PRs created in this month
874
  total_count = len(prs_in_month)
875
 
876
- # Calculate acceptance rate
877
  total_decisions = merged_count + closed_not_merged_count
878
  acceptance_rate = (merged_count / total_decisions * 100) if total_decisions > 0 else None
879
 
@@ -899,16 +723,7 @@ def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
899
 
900
 
901
  def construct_leaderboard_from_metadata(all_metadata_dict, agents):
902
- """
903
- Construct leaderboard from in-memory PR metadata.
904
-
905
- Args:
906
- all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata
907
- agents: List of agent dictionaries with metadata
908
-
909
- Returns:
910
- Dictionary of agent stats.
911
- """
912
  if not agents:
913
  print("Error: No agents found")
914
  return {}
@@ -919,10 +734,7 @@ def construct_leaderboard_from_metadata(all_metadata_dict, agents):
919
  identifier = agent.get('github_identifier')
920
  agent_name = agent.get('name', 'Unknown')
921
 
922
- # Get metadata for this agent from the dictionary
923
  bot_metadata = all_metadata_dict.get(identifier, [])
924
-
925
- # Calculate stats
926
  stats = calculate_pr_stats_from_metadata(bot_metadata)
927
 
928
  cache_dict[identifier] = {
@@ -936,16 +748,7 @@ def construct_leaderboard_from_metadata(all_metadata_dict, agents):
936
 
937
 
938
  def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
939
- """
940
- Save leaderboard data and monthly metrics to HuggingFace dataset as swe-pr.json.
941
-
942
- Args:
943
- leaderboard_dict: Dictionary of agent stats from construct_leaderboard_from_metadata()
944
- monthly_metrics: Monthly metrics data from calculate_monthly_metrics_by_agent()
945
-
946
- Returns:
947
- bool: True if successful, False otherwise
948
- """
949
  try:
950
  token = get_hf_token()
951
  if not token:
@@ -954,7 +757,6 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
954
  api = HfApi(token=token)
955
  filename = "swe-pr.json"
956
 
957
- # Combine leaderboard and monthly metrics
958
  combined_data = {
959
  'last_updated': datetime.now(timezone.utc).isoformat(),
960
  'leaderboard': leaderboard_dict,
@@ -964,12 +766,10 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
964
  }
965
  }
966
 
967
- # Save locally first
968
  with open(filename, 'w') as f:
969
  json.dump(combined_data, f, indent=2)
970
 
971
  try:
972
- # Upload to HuggingFace with retry logic
973
  upload_file_with_backoff(
974
  api=api,
975
  path_or_fileobj=filename,
@@ -979,7 +779,6 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
979
  )
980
  return True
981
  finally:
982
- # Always clean up local file
983
  if os.path.exists(filename):
984
  os.remove(filename)
985
 
@@ -991,21 +790,19 @@ def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
991
 
992
 
993
  # =============================================================================
994
- # MAIN MINING FUNCTION
995
  # =============================================================================
996
 
997
  def mine_all_agents():
998
  """
999
- Mine PR metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace.
1000
- Downloads GHArchive data first, then uses ONE DuckDB query for ALL agents, then batch uploads with time gaps.
1001
  """
1002
- # Step 1: Download GHArchive data
1003
  print(f"\n[1/5] Downloading GHArchive data...")
1004
 
1005
  if not download_all_gharchive_data():
1006
  print("Warning: Download had errors, continuing with available data...")
1007
 
1008
- # Step 2: Load agent metadata from HuggingFace
1009
  print(f"\n[2/5] Loading agent metadata...")
1010
 
1011
  agents = load_agents_from_hf()
@@ -1013,7 +810,6 @@ def mine_all_agents():
1013
  print("Error: No agents found")
1014
  return
1015
 
1016
- # Extract all identifiers
1017
  identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
1018
  if not identifiers:
1019
  print("Error: No valid agent identifiers found")
@@ -1021,55 +817,42 @@ def mine_all_agents():
1021
 
1022
  print(f"\n[3/5] Mining PR metadata ({len(identifiers)} agents, {LEADERBOARD_TIME_FRAME_DAYS} days)...")
1023
 
1024
- # Initialize DuckDB connection
1025
  try:
1026
  conn = get_duckdb_connection()
1027
  except Exception as e:
1028
  print(f"Failed to initialize DuckDB connection: {str(e)}")
1029
  return
1030
 
1031
- # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
1032
  current_time = datetime.now(timezone.utc)
1033
  end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
1034
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
1035
 
1036
  try:
1037
- # Use single query for all agents
1038
- all_metadata = fetch_all_pr_metadata_single_query(
1039
  conn, identifiers, start_date, end_date
1040
  )
1041
 
1042
- # Calculate summary statistics
1043
  total_prs = sum(len(metadata_list) for metadata_list in all_metadata.values())
1044
  agents_with_data = sum(1 for metadata_list in all_metadata.values() if metadata_list)
1045
 
1046
- print(f"Query complete: {total_prs} PRs found for {agents_with_data}/{len(agents)} agents")
1047
-
1048
  except Exception as e:
1049
  print(f"Error during DuckDB fetch: {str(e)}")
1050
  import traceback
1051
  traceback.print_exc()
1052
  return
1053
  finally:
1054
- # Close DuckDB connection
1055
  conn.close()
1056
 
1057
- # Step 4: Batch upload PR metadata with time gaps
1058
  print(f"\n[4/5] Uploading PR metadata...")
1059
 
1060
  success_count, error_count = batch_upload_pr_metadata(all_metadata)
1061
 
1062
- # Step 5: Construct and save leaderboard data
1063
  print(f"\n[5/5] Saving leaderboard...")
1064
 
1065
  try:
1066
- # Construct leaderboard from in-memory data
1067
  leaderboard_dict = construct_leaderboard_from_metadata(all_metadata, agents)
1068
-
1069
- # Calculate monthly metrics from in-memory data
1070
  monthly_metrics = calculate_monthly_metrics_by_agent(all_metadata, agents)
1071
-
1072
- # Save to HuggingFace
1073
  save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics)
1074
 
1075
  print(f"\nCOMPLETE: {success_count} files uploaded" + (f", {error_count} errors" if error_count > 0 else ""))
@@ -1085,30 +868,16 @@ def mine_all_agents():
1085
  # =============================================================================
1086
 
1087
  def setup_scheduler():
1088
- """
1089
- Set up APScheduler to run mining jobs periodically.
1090
- Schedule is configurable via environment variables.
1091
-
1092
- Environment variables:
1093
- - SCHEDULE_ENABLED: Enable/disable scheduler (default: true)
1094
- - SCHEDULE_DAY_OF_MONTH: Day of month to run (default: 8, second week)
1095
- - SCHEDULE_HOUR: Hour to run (0-23, default: 0)
1096
- - SCHEDULE_MINUTE: Minute to run (0-59, default: 0)
1097
- - SCHEDULE_TIMEZONE: Timezone for scheduling (default: UTC)
1098
- """
1099
- # Configure logging for APScheduler
1100
  logging.basicConfig(
1101
  level=logging.INFO,
1102
  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
1103
  )
1104
 
1105
- # Disable verbose HTTP request logging from httpx (used by huggingface_hub)
1106
  logging.getLogger('httpx').setLevel(logging.WARNING)
1107
 
1108
- # Create scheduler
1109
  scheduler = BlockingScheduler(timezone=SCHEDULE_TIMEZONE)
1110
 
1111
- # Create cron trigger with configured schedule (monthly on specific day)
1112
  trigger = CronTrigger(
1113
  day=SCHEDULE_DAY_OF_MONTH,
1114
  hour=SCHEDULE_HOUR,
@@ -1116,7 +885,6 @@ def setup_scheduler():
1116
  timezone=SCHEDULE_TIMEZONE
1117
  )
1118
 
1119
- # Add job to scheduler
1120
  scheduler.add_job(
1121
  mine_all_agents,
1122
  trigger=trigger,
@@ -1125,13 +893,11 @@ def setup_scheduler():
1125
  replace_existing=True
1126
  )
1127
 
1128
- # Print schedule information
1129
  from datetime import datetime
1130
  next_run = trigger.get_next_fire_time(None, datetime.now(trigger.timezone))
1131
  print(f"Scheduler: Monthly on day {SCHEDULE_DAY_OF_MONTH} at {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d} {SCHEDULE_TIMEZONE}")
1132
  print(f"Next run: {next_run}\n")
1133
 
1134
- # Start scheduler (blocking call)
1135
  print(f"\nScheduler started")
1136
  scheduler.start()
1137
 
@@ -1142,8 +908,6 @@ def setup_scheduler():
1142
 
1143
  if __name__ == "__main__":
1144
  if SCHEDULE_ENABLED:
1145
- # Run with scheduler
1146
  setup_scheduler()
1147
  else:
1148
- # Run without scheduler, just mine once
1149
- mine_all_agents()
 
 
 
 
 
 
1
  import json
2
  import os
3
  import time
 
25
 
26
  AGENTS_REPO = "SWE-Arena/bot_metadata"
27
  PR_METADATA_REPO = "SWE-Arena/pr_metadata"
28
+ LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata"
29
+ LEADERBOARD_TIME_FRAME_DAYS = 180
30
+ GHARCHIVE_DATA_DIR = "../gharchive/data"
31
+ DUCKDB_CACHE_FILE = "cache.duckdb"
32
+
33
+ # OPTIMIZED DUCKDB CONFIGURATION
34
+ DUCKDB_THREADS = 8
35
+ DUCKDB_MEMORY_LIMIT = "64GB"
36
 
37
+ # Streaming batch configuration
38
+ BATCH_SIZE_DAYS = 7 # Process 1 week at a time (~168 hourly files)
39
+ # At this size: ~7 days × 24 files × ~100MB per file = ~16GB uncompressed per batch
40
 
41
  # Download configuration
42
+ DOWNLOAD_WORKERS = 4
43
+ DOWNLOAD_RETRY_DELAY = 2
44
+ MAX_RETRIES = 5
45
 
46
  # Upload configuration
47
+ UPLOAD_DELAY_SECONDS = 5
48
+ UPLOAD_INITIAL_BACKOFF = 60
49
+ UPLOAD_MAX_BACKOFF = 3600
50
 
51
  # Scheduler configuration
52
+ SCHEDULE_ENABLED = False
53
+ SCHEDULE_DAY_OF_MONTH = 8
54
+ SCHEDULE_HOUR = 0
55
+ SCHEDULE_MINUTE = 0
56
+ SCHEDULE_TIMEZONE = 'UTC'
57
 
58
  # =============================================================================
59
  # UTILITY FUNCTIONS
 
84
 
85
 
86
  def normalize_date_format(date_string):
87
+ """Convert date strings or datetime objects to standardized ISO 8601 format with Z suffix."""
 
 
 
 
 
 
 
88
  if not date_string or date_string == 'N/A':
89
  return 'N/A'
90
 
91
  try:
92
  import re
93
 
 
94
  if isinstance(date_string, datetime):
95
  return date_string.strftime('%Y-%m-%dT%H:%M:%SZ')
96
 
 
97
  date_string = re.sub(r'\s+', ' ', date_string.strip())
 
 
98
  date_string = date_string.replace(' ', 'T')
99
 
 
 
100
  if len(date_string) >= 3:
101
  if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]:
102
  date_string = date_string + ':00'
103
 
 
104
  dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
 
 
105
  return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
106
  except Exception as e:
107
  print(f"Warning: Could not parse date '{date_string}': {e}")
 
121
  # =============================================================================
122
 
123
  def download_file(url):
124
+ """Download a GHArchive file with retry logic."""
 
 
 
 
 
 
 
 
125
  filename = url.split("/")[-1]
126
  filepath = os.path.join(GHARCHIVE_DATA_DIR, filename)
127
 
 
128
  if os.path.exists(filepath):
129
  return True
130
 
 
131
  for attempt in range(MAX_RETRIES):
132
  try:
133
  response = requests.get(url, timeout=30)
 
138
 
139
  except requests.exceptions.HTTPError as e:
140
  if e.response.status_code == 404:
 
141
  return False
142
  else:
 
143
  if attempt < MAX_RETRIES - 1:
144
+ wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
145
  print(f" ⚠ {filename}: HTTP error {e.response.status_code}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
146
  time.sleep(wait_time)
147
  else:
 
150
  except (requests.exceptions.Timeout,
151
  requests.exceptions.ConnectionError,
152
  requests.exceptions.ReadTimeout) as e:
 
153
  if attempt < MAX_RETRIES - 1:
154
+ wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
155
  print(f" ⚠ {filename}: {type(e).__name__}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
156
  time.sleep(wait_time)
157
  else:
158
  print(f" ✗ {filename}: Failed after {MAX_RETRIES} attempts - {type(e).__name__}")
159
 
160
  except Exception as e:
 
161
  if attempt < MAX_RETRIES - 1:
162
  wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
163
  print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
 
169
 
170
 
171
  def download_all_gharchive_data():
172
+ """Download all GHArchive data files for the last LEADERBOARD_TIME_FRAME_DAYS."""
 
 
 
 
 
 
 
173
  os.makedirs(GHARCHIVE_DATA_DIR, exist_ok=True)
174
 
 
175
  end_date = datetime.now()
176
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
177
 
 
179
  current_date = start_date
180
  while current_date <= end_date:
181
  date_str = current_date.strftime("%Y-%m-%d")
 
182
  for hour in range(24):
183
  url = f"https://data.gharchive.org/{date_str}-{hour}.json.gz"
184
  urls.append(url)
 
188
 
189
  try:
190
  with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor:
 
191
  futures = [executor.submit(download_file, url) for url in urls]
 
 
192
  for future in as_completed(futures):
193
  downloads_processed += 1
194
 
 
203
 
204
 
205
  # =============================================================================
206
+ # HUGGINGFACE API WRAPPERS
207
  # =============================================================================
208
 
209
  def is_retryable_error(e):
210
+ """Check if exception is retryable (rate limit or timeout error)."""
 
 
 
211
  if isinstance(e, HfHubHTTPError):
212
  if e.response.status_code == 429:
213
  return True
214
 
 
215
  if isinstance(e, (requests.exceptions.Timeout,
216
  requests.exceptions.ReadTimeout,
217
  requests.exceptions.ConnectTimeout)):
218
  return True
219
 
 
220
  if isinstance(e, Exception):
221
  error_str = str(e).lower()
222
  if 'timeout' in error_str or 'timed out' in error_str:
 
237
  )
238
  )
239
  def list_repo_files_with_backoff(api, **kwargs):
240
+ """Wrapper for api.list_repo_files() with exponential backoff."""
241
  return api.list_repo_files(**kwargs)
242
 
243
 
 
253
  )
254
  )
255
  def hf_hub_download_with_backoff(**kwargs):
256
+ """Wrapper for hf_hub_download() with exponential backoff."""
257
  return hf_hub_download(**kwargs)
258
 
259
 
 
269
  )
270
  )
271
  def upload_file_with_backoff(api, **kwargs):
272
+ """Wrapper for api.upload_file() with exponential backoff."""
273
  return api.upload_file(**kwargs)
274
 
275
 
 
285
  )
286
  )
287
  def upload_folder_with_backoff(api, **kwargs):
288
+ """Wrapper for api.upload_folder() with exponential backoff."""
289
  return api.upload_folder(**kwargs)
290
 
291
 
292
  def get_duckdb_connection():
293
  """
294
+ Initialize DuckDB connection with OPTIMIZED memory settings.
295
+ Uses persistent database and reduced memory footprint.
 
 
296
  """
 
297
  conn = duckdb.connect(DUCKDB_CACHE_FILE)
298
 
299
+ # OPTIMIZED SETTINGS
300
+ conn.execute(f"SET threads TO {DUCKDB_THREADS};")
301
+ conn.execute("SET preserve_insertion_order = false;")
302
+ conn.execute("SET enable_object_cache = true;")
303
+ conn.execute("SET temp_directory = '/tmp/duckdb_temp';")
304
+ conn.execute(f"SET memory_limit = '{DUCKDB_MEMORY_LIMIT}';") # Per-query limit
305
+ conn.execute(f"SET max_memory = '{DUCKDB_MEMORY_LIMIT}';") # Hard cap
306
 
307
  return conn
308
 
309
 
310
  def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DIR):
311
+ """Generate file path patterns for GHArchive data in date range (only existing files)."""
 
 
 
 
 
 
 
 
 
 
 
312
  file_patterns = []
313
  missing_dates = set()
314
 
 
316
  end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
317
 
318
  while current_date <= end_day:
 
319
  date_has_files = False
320
  for hour in range(24):
321
  pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}-{hour}.json.gz")
 
322
  if os.path.exists(pattern):
323
  file_patterns.append(pattern)
324
  date_has_files = True
325
 
 
326
  if not date_has_files:
327
  missing_dates.add(current_date.strftime('%Y-%m-%d'))
328
 
 
329
  current_date += timedelta(days=1)
330
 
 
331
  if missing_dates:
332
+ print(f" Skipping {len(missing_dates)} date(s) with no data")
333
 
334
  return file_patterns
335
 
336
 
337
  # =============================================================================
338
+ # STREAMING BATCH PROCESSING
339
  # =============================================================================
340
 
341
+ def fetch_all_pr_metadata_streaming(conn, identifiers, start_date, end_date):
342
+ """
343
+ OPTIMIZED: Fetch PR metadata using streaming batch processing.
344
+
345
+ Processes GHArchive files in BATCH_SIZE_DAYS chunks to limit memory usage.
346
+ Instead of loading 180 days (4,344 files) at once, processes 7 days at a time.
347
+
348
+ This prevents OOM errors by:
349
+ 1. Only keeping ~168 hourly files in memory per batch (vs 4,344)
350
+ 2. Incrementally building the results dictionary
351
+ 3. Allowing DuckDB to garbage collect after each batch
352
+
353
  Args:
354
  conn: DuckDB connection instance
355
+ identifiers: List of GitHub usernames/bot identifiers (~1500)
356
  start_date: Start datetime (timezone-aware)
357
  end_date: End datetime (timezone-aware)
358
+
359
  Returns:
360
+ Dictionary mapping agent identifier to list of PR metadata
 
 
 
 
 
 
 
 
 
 
 
 
361
  """
 
 
 
 
 
 
 
 
362
  identifier_list = ', '.join([f"'{id}'" for id in identifiers])
363
+ metadata_by_agent = defaultdict(list)
364
+
365
+ # Calculate total batches
366
+ total_days = (end_date - start_date).days
367
+ total_batches = (total_days // BATCH_SIZE_DAYS) + 1
368
+
369
+ # Process in configurable batches
370
+ current_date = start_date
371
+ batch_num = 0
372
+ total_prs = 0
373
+
374
+ print(f" Streaming {total_batches} batches of {BATCH_SIZE_DAYS}-day intervals...")
375
+
376
+ while current_date <= end_date:
377
+ batch_num += 1
378
+ batch_end = min(current_date + timedelta(days=BATCH_SIZE_DAYS - 1), end_date)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
379
 
380
+ # Get file patterns for THIS BATCH ONLY (not all 180 days)
381
+ file_patterns = generate_file_path_patterns(current_date, batch_end)
382
+
383
+ if not file_patterns:
384
+ print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} - NO DATA")
385
+ current_date = batch_end + timedelta(days=1)
386
+ continue
387
+
388
+ # Progress indicator
389
+ print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} ({len(file_patterns)} files)... ", end="", flush=True)
390
+
391
+ # Build file patterns SQL for THIS BATCH
392
+ file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']'
393
+
394
+ # Query for this batch
395
+ query = f"""
396
+ WITH pr_events AS (
397
+ SELECT
398
+ CONCAT(
399
+ REPLACE(repo.url, 'api.github.com/repos/', 'github.com/'),
400
+ '/pull/',
401
+ CAST(payload.pull_request.number AS VARCHAR)
402
+ ) as url,
403
+ actor.login as pr_author,
404
+ created_at as event_time,
405
+ payload.action as event_action
406
+ FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
407
+ WHERE
408
+ type = 'PullRequestEvent'
409
+ AND payload.action IN ('opened', 'closed')
410
+ AND payload.pull_request.number IS NOT NULL
411
+ AND actor.login IN ({identifier_list})
412
+ ),
413
+ pr_timeline AS (
414
+ SELECT
415
+ url,
416
+ pr_author,
417
+ MIN(CASE WHEN event_action = 'opened' THEN event_time END) as created_at,
418
+ MAX(CASE WHEN event_action = 'closed' THEN event_time END) as closed_at,
419
+ NULL as merged_at
420
+ FROM pr_events
421
+ GROUP BY url, pr_author
422
+ )
423
+ SELECT url, pr_author, created_at, merged_at, closed_at
424
+ FROM pr_timeline
425
+ WHERE created_at IS NOT NULL
426
+ """
427
+
428
+ try:
429
+ results = conn.execute(query).fetchall()
430
+ batch_prs = 0
431
+
432
+ # Add results to accumulating dictionary
433
+ for row in results:
434
+ url = row[0]
435
+ pr_author = row[1]
436
+ created_at = normalize_date_format(row[2]) if row[2] else None
437
+ merged_at = normalize_date_format(row[3]) if row[3] else None
438
+ closed_at = normalize_date_format(row[4]) if row[4] else None
439
+
440
+ if not url:
441
+ continue
442
+
443
+ pr_metadata = {
444
+ 'html_url': url,
445
+ 'created_at': created_at,
446
+ 'merged_at': merged_at,
447
+ 'closed_at': closed_at,
448
+ }
449
+
450
+ metadata_by_agent[pr_author].append(pr_metadata)
451
+ batch_prs += 1
452
+ total_prs += 1
453
+
454
+ print(f"✓ {batch_prs} PRs found")
455
+
456
+ except Exception as e:
457
+ print(f"\n ✗ Batch {batch_num} error: {str(e)}")
458
+ import traceback
459
+ traceback.print_exc()
460
+
461
+ # Move to next batch
462
+ current_date = batch_end + timedelta(days=1)
463
+
464
+ # Final summary
465
+ agents_with_data = sum(1 for prs in metadata_by_agent.values() if prs)
466
+ print(f"\n ✓ Complete: {total_prs} PRs found for {agents_with_data}/{len(identifiers)} agents")
467
+
468
+ return dict(metadata_by_agent)
469
 
 
 
 
470
 
471
  def group_metadata_by_date(metadata_list):
472
+ """Group PR metadata by date for daily storage."""
 
 
 
473
  grouped = defaultdict(list)
474
 
475
  for pr_meta in metadata_list:
 
488
 
489
 
490
  def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type, commit_message, max_retries=MAX_RETRIES):
491
+ """Upload a single file with exponential backoff retry logic."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
492
  for attempt in range(max_retries):
493
  try:
494
  upload_file_with_backoff(
 
502
  return True
503
  except Exception as e:
504
  if attempt < max_retries - 1:
 
505
  wait_time = min(UPLOAD_INITIAL_BACKOFF * (2 ** attempt), UPLOAD_MAX_BACKOFF)
506
  print(f" {e} error on attempt {attempt + 1}/{max_retries}. Retrying in {wait_time}s...")
507
  time.sleep(wait_time)
 
512
 
513
 
514
  def batch_upload_pr_metadata(all_metadata):
515
+ """Upload PR metadata for all agents with time gaps between uploads."""
 
 
 
 
 
 
 
 
 
516
  try:
517
  token = get_hf_token()
518
  if not token:
 
524
  error_count = 0
525
  total_files = 0
526
 
 
527
  for agent_identifier, metadata_list in all_metadata.items():
528
  if metadata_list:
529
  grouped = group_metadata_by_date(metadata_list)
 
537
  if not metadata_list:
538
  continue
539
 
 
540
  grouped = group_metadata_by_date(metadata_list)
541
 
 
542
  agent_temp_dir = tempfile.mkdtemp()
543
 
544
  try:
 
545
  local_files = []
546
  for (pr_year, month, day), day_metadata in grouped.items():
547
  filename = f"{pr_year}.{month:02d}.{day:02d}.jsonl"
548
  local_path = os.path.join(agent_temp_dir, filename)
549
  repo_path = f"{agent_identifier}/{filename}"
550
 
 
551
  day_metadata.sort(key=lambda x: x.get('created_at', ''), reverse=True)
 
 
552
  save_jsonl(local_path, day_metadata)
553
  local_files.append((local_path, repo_path, len(day_metadata)))
554
 
 
555
  agent_success = 0
556
  agent_error = 0
557
 
 
573
  agent_error += 1
574
  error_count += 1
575
 
 
576
  if file_idx < len(local_files):
577
  time.sleep(UPLOAD_DELAY_SECONDS)
578
 
579
  finally:
 
580
  if os.path.exists(agent_temp_dir):
581
  import shutil
582
  shutil.rmtree(agent_temp_dir)
 
596
 
597
 
598
  def load_agents_from_hf():
599
+ """Load all agent metadata JSON files from HuggingFace dataset."""
 
 
 
 
600
  try:
601
  api = HfApi()
602
  agents = []
603
 
 
604
  files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset")
 
 
605
  json_files = [f for f in files if f.endswith('.json')]
606
 
 
607
  for json_file in json_files:
608
  try:
609
  file_path = hf_hub_download_with_backoff(
 
615
  with open(file_path, 'r') as f:
616
  agent_data = json.load(f)
617
 
 
618
  if agent_data.get('status') != 'public':
619
  continue
620
 
 
621
  github_identifier = json_file.replace('.json', '')
622
  agent_data['github_identifier'] = github_identifier
623
 
 
628
  continue
629
 
630
  print(f"Download complete: {len(agents)} agents")
 
631
  return agents
632
 
633
  except Exception as e:
 
635
  return []
636
 
637
 
 
 
 
 
638
  def calculate_pr_stats_from_metadata(metadata_list):
639
+ """Calculate statistics from a list of PR metadata."""
 
 
 
 
 
640
  total_prs = len(metadata_list)
641
  merged = sum(1 for pr_meta in metadata_list if pr_meta.get('merged_at'))
 
 
642
  closed_not_merged = sum(1 for pr_meta in metadata_list
643
  if pr_meta.get('closed_at') and not pr_meta.get('merged_at'))
644
 
 
645
  total_decisions = merged + closed_not_merged
 
 
646
  acceptance_rate = (merged / total_decisions * 100) if total_decisions > 0 else 0
647
 
648
  return {
 
653
 
654
 
655
  def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
656
+ """Calculate monthly metrics for all agents for visualization."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
657
  identifier_to_name = {agent.get('github_identifier'): agent.get('name') for agent in agents if agent.get('github_identifier')}
658
 
659
  if not all_metadata_dict:
660
  return {'agents': [], 'months': [], 'data': {}}
661
 
 
662
  agent_month_data = defaultdict(lambda: defaultdict(list))
663
 
 
664
  for agent_identifier, metadata_list in all_metadata_dict.items():
665
  for pr_meta in metadata_list:
666
  created_at = pr_meta.get('created_at')
 
668
  if not created_at:
669
  continue
670
 
 
671
  agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
672
 
673
  try:
 
678
  print(f"Warning: Could not parse date '{created_at}': {e}")
679
  continue
680
 
 
681
  all_months = set()
682
  for agent_data in agent_month_data.values():
683
  all_months.update(agent_data.keys())
684
  months = sorted(list(all_months))
685
 
 
686
  result_data = {}
687
  for agent_name, month_dict in agent_month_data.items():
688
  acceptance_rates = []
 
693
  for month in months:
694
  prs_in_month = month_dict.get(month, [])
695
 
 
696
  merged_count = sum(1 for pr in prs_in_month if pr.get('merged_at'))
 
 
697
  closed_not_merged_count = sum(1 for pr in prs_in_month
698
  if pr.get('closed_at') and not pr.get('merged_at'))
 
 
699
  total_count = len(prs_in_month)
700
 
 
701
  total_decisions = merged_count + closed_not_merged_count
702
  acceptance_rate = (merged_count / total_decisions * 100) if total_decisions > 0 else None
703
 
 
723
 
724
 
725
  def construct_leaderboard_from_metadata(all_metadata_dict, agents):
726
+ """Construct leaderboard from in-memory PR metadata."""
 
 
 
 
 
 
 
 
 
727
  if not agents:
728
  print("Error: No agents found")
729
  return {}
 
734
  identifier = agent.get('github_identifier')
735
  agent_name = agent.get('name', 'Unknown')
736
 
 
737
  bot_metadata = all_metadata_dict.get(identifier, [])
 
 
738
  stats = calculate_pr_stats_from_metadata(bot_metadata)
739
 
740
  cache_dict[identifier] = {
 
748
 
749
 
750
  def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
751
+ """Save leaderboard data and monthly metrics to HuggingFace dataset."""
 
 
 
 
 
 
 
 
 
752
  try:
753
  token = get_hf_token()
754
  if not token:
 
757
  api = HfApi(token=token)
758
  filename = "swe-pr.json"
759
 
 
760
  combined_data = {
761
  'last_updated': datetime.now(timezone.utc).isoformat(),
762
  'leaderboard': leaderboard_dict,
 
766
  }
767
  }
768
 
 
769
  with open(filename, 'w') as f:
770
  json.dump(combined_data, f, indent=2)
771
 
772
  try:
 
773
  upload_file_with_backoff(
774
  api=api,
775
  path_or_fileobj=filename,
 
779
  )
780
  return True
781
  finally:
 
782
  if os.path.exists(filename):
783
  os.remove(filename)
784
 
 
790
 
791
 
792
  # =============================================================================
793
+ # MINING FUNCTION
794
  # =============================================================================
795
 
796
  def mine_all_agents():
797
  """
798
+ Mine PR metadata for all agents using STREAMING batch processing.
799
+ Downloads GHArchive data, then uses BATCH-based DuckDB queries.
800
  """
 
801
  print(f"\n[1/5] Downloading GHArchive data...")
802
 
803
  if not download_all_gharchive_data():
804
  print("Warning: Download had errors, continuing with available data...")
805
 
 
806
  print(f"\n[2/5] Loading agent metadata...")
807
 
808
  agents = load_agents_from_hf()
 
810
  print("Error: No agents found")
811
  return
812
 
 
813
  identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
814
  if not identifiers:
815
  print("Error: No valid agent identifiers found")
 
817
 
818
  print(f"\n[3/5] Mining PR metadata ({len(identifiers)} agents, {LEADERBOARD_TIME_FRAME_DAYS} days)...")
819
 
 
820
  try:
821
  conn = get_duckdb_connection()
822
  except Exception as e:
823
  print(f"Failed to initialize DuckDB connection: {str(e)}")
824
  return
825
 
 
826
  current_time = datetime.now(timezone.utc)
827
  end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
828
  start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
829
 
830
  try:
831
+ # USE STREAMING FUNCTION INSTEAD
832
+ all_metadata = fetch_all_pr_metadata_streaming(
833
  conn, identifiers, start_date, end_date
834
  )
835
 
 
836
  total_prs = sum(len(metadata_list) for metadata_list in all_metadata.values())
837
  agents_with_data = sum(1 for metadata_list in all_metadata.values() if metadata_list)
838
 
 
 
839
  except Exception as e:
840
  print(f"Error during DuckDB fetch: {str(e)}")
841
  import traceback
842
  traceback.print_exc()
843
  return
844
  finally:
 
845
  conn.close()
846
 
 
847
  print(f"\n[4/5] Uploading PR metadata...")
848
 
849
  success_count, error_count = batch_upload_pr_metadata(all_metadata)
850
 
 
851
  print(f"\n[5/5] Saving leaderboard...")
852
 
853
  try:
 
854
  leaderboard_dict = construct_leaderboard_from_metadata(all_metadata, agents)
 
 
855
  monthly_metrics = calculate_monthly_metrics_by_agent(all_metadata, agents)
 
 
856
  save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics)
857
 
858
  print(f"\nCOMPLETE: {success_count} files uploaded" + (f", {error_count} errors" if error_count > 0 else ""))
 
868
  # =============================================================================
869
 
870
  def setup_scheduler():
871
+ """Set up APScheduler to run mining jobs periodically."""
 
 
 
 
 
 
 
 
 
 
 
872
  logging.basicConfig(
873
  level=logging.INFO,
874
  format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
875
  )
876
 
 
877
  logging.getLogger('httpx').setLevel(logging.WARNING)
878
 
 
879
  scheduler = BlockingScheduler(timezone=SCHEDULE_TIMEZONE)
880
 
 
881
  trigger = CronTrigger(
882
  day=SCHEDULE_DAY_OF_MONTH,
883
  hour=SCHEDULE_HOUR,
 
885
  timezone=SCHEDULE_TIMEZONE
886
  )
887
 
 
888
  scheduler.add_job(
889
  mine_all_agents,
890
  trigger=trigger,
 
893
  replace_existing=True
894
  )
895
 
 
896
  from datetime import datetime
897
  next_run = trigger.get_next_fire_time(None, datetime.now(trigger.timezone))
898
  print(f"Scheduler: Monthly on day {SCHEDULE_DAY_OF_MONTH} at {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d} {SCHEDULE_TIMEZONE}")
899
  print(f"Next run: {next_run}\n")
900
 
 
901
  print(f"\nScheduler started")
902
  scheduler.start()
903
 
 
908
 
909
  if __name__ == "__main__":
910
  if SCHEDULE_ENABLED:
 
911
  setup_scheduler()
912
  else:
913
+ mine_all_agents()