Claude commited on
Commit
42207fa
·
unverified ·
1 Parent(s): 7183c61

Refactor msr.py to use BigQuery instead of GitHub API

Browse files

Complete refactor of PR metadata mining to use Google BigQuery + GitHub Archive:

Removed:
- All GitHub API code (TokenPool, request_with_backoff, pagination logic)
- Complex recursive time partitioning for 1000-result limit workaround
- Parallel execution with token pool management
- ~529 lines of code removed

Added:
- BigQuery integration with comprehensive single-query approach
- Support for querying githubarchive.day.YYYYMMDD tables
- Efficient batch fetching for all agents in one query

Benefits:
- ONE BigQuery query for ALL agents (vs hundreds of GitHub API calls)
- No rate limiting issues
- Simpler, more maintainable code
- Fetches only required fields: url, created_at, merged_at, closed_at
- Follows same pattern as review metadata mining script

Files changed (1) hide show
  1. msr.py +298 -827
msr.py CHANGED
@@ -1,17 +1,16 @@
1
  """
2
  Minimalist PR Metadata Mining Script
3
- Mines PR metadata from GitHub and saves to HuggingFace dataset.
4
  """
5
 
6
  import json
7
  import os
8
- import time
9
- import requests
10
  from datetime import datetime, timezone, timedelta
11
  from collections import defaultdict
12
  from huggingface_hub import HfApi, hf_hub_download
13
  from dotenv import load_dotenv
14
- import random
15
 
16
  # Load environment variables
17
  load_dotenv()
@@ -52,772 +51,242 @@ def save_jsonl(filename, data):
52
  f.write(json.dumps(item) + '\n')
53
 
54
 
55
- def get_github_tokens():
56
- """Get all GitHub tokens from environment variables (all vars starting with GITHUB_TOKEN)."""
57
- tokens = []
58
- for key, value in os.environ.items():
59
- if key.startswith('GITHUB_TOKEN') and value:
60
- tokens.append(value)
61
-
62
- if not tokens:
63
- print("Warning: No GITHUB_TOKEN* found. API rate limits: 60/hour (authenticated: 5000/hour)")
64
- else:
65
- print(f"✓ Loaded {len(tokens)} GitHub token(s) for token pool")
66
-
67
- return tokens
68
-
69
-
70
- def get_github_token():
71
- """Get primary GitHub token from environment variables (for backward compatibility)."""
72
- token = os.getenv('GITHUB_TOKEN')
73
- if not token:
74
- print("Warning: GITHUB_TOKEN not found. API rate limits: 60/hour (authenticated: 5000/hour)")
75
- return token
76
-
77
-
78
- class TokenPool:
79
  """
80
- Hybrid token pool that manages GitHub tokens with parallel execution and round-robin fallback.
81
-
82
- Strategy:
83
- - 50% of tokens allocated to parallel pool (for concurrent API calls)
84
- - 50% of tokens allocated to round-robin pool (for rate limit fallback)
85
- - Automatically switches to round-robin when parallel tokens hit rate limits
86
- - Thread-safe for concurrent access
87
- """
88
- def __init__(self, tokens):
89
- import threading
90
-
91
- self.all_tokens = tokens if tokens else [None]
92
- self.lock = threading.Lock()
93
-
94
- # Split tokens into parallel and round-robin pools (50/50)
95
- total_tokens = len(self.all_tokens)
96
- split_point = max(1, total_tokens // 2) # At least 1 token in each pool
97
-
98
- self.parallel_tokens = self.all_tokens[:split_point]
99
- self.roundrobin_tokens = self.all_tokens[split_point:]
100
-
101
- # If only 1 token, use it in both pools
102
- if total_tokens == 1:
103
- self.parallel_tokens = self.all_tokens
104
- self.roundrobin_tokens = self.all_tokens
105
-
106
- # Track rate-limited tokens with reset times
107
- self.rate_limited_parallel = {} # {token: reset_timestamp}
108
- self.rate_limited_roundrobin = {} # {token: reset_timestamp}
109
-
110
- # Round-robin index for fallback pool
111
- self.roundrobin_index = 0
112
-
113
- # Statistics
114
- self.parallel_calls = 0
115
- self.roundrobin_calls = 0
116
- self.fallback_triggers = 0
117
-
118
- print(f"🔄 Hybrid Token Pool initialized:")
119
- print(f" Total tokens: {total_tokens}")
120
- print(f" Parallel pool: {len(self.parallel_tokens)} token(s)")
121
- print(f" Round-robin pool: {len(self.roundrobin_tokens)} token(s)")
122
-
123
- def _clean_expired_rate_limits(self):
124
- """Remove tokens from rate limit tracking if their reset time has passed."""
125
- current_time = time.time()
126
-
127
- # Clean parallel pool
128
- expired_parallel = [token for token, reset_time in self.rate_limited_parallel.items()
129
- if current_time >= reset_time]
130
- for token in expired_parallel:
131
- del self.rate_limited_parallel[token]
132
-
133
- # Clean round-robin pool
134
- expired_roundrobin = [token for token, reset_time in self.rate_limited_roundrobin.items()
135
- if current_time >= reset_time]
136
- for token in expired_roundrobin:
137
- del self.rate_limited_roundrobin[token]
138
-
139
- def get_parallel_token(self):
140
- """
141
- Get a token from the parallel pool for concurrent execution.
142
- Returns None if all parallel tokens are rate-limited.
143
- """
144
- with self.lock:
145
- self._clean_expired_rate_limits()
146
-
147
- # Find first non-rate-limited token in parallel pool
148
- for token in self.parallel_tokens:
149
- if token not in self.rate_limited_parallel:
150
- self.parallel_calls += 1
151
- return token
152
-
153
- return None # All parallel tokens are rate-limited
154
-
155
- def get_available_parallel_tokens(self):
156
- """
157
- Get all available tokens from parallel pool (not rate-limited).
158
- Used for batch parallel execution.
159
- """
160
- with self.lock:
161
- self._clean_expired_rate_limits()
162
- available = [token for token in self.parallel_tokens
163
- if token not in self.rate_limited_parallel]
164
- return available
165
-
166
- def get_roundrobin_token(self):
167
- """
168
- Get the next token from round-robin pool (fallback mechanism).
169
- Skips rate-limited tokens and rotates to the next available one.
170
- """
171
- with self.lock:
172
- self._clean_expired_rate_limits()
173
-
174
- attempts = 0
175
- max_attempts = len(self.roundrobin_tokens)
176
-
177
- while attempts < max_attempts:
178
- token = self.roundrobin_tokens[self.roundrobin_index]
179
- self.roundrobin_index = (self.roundrobin_index + 1) % len(self.roundrobin_tokens)
180
-
181
- if token not in self.rate_limited_roundrobin:
182
- self.roundrobin_calls += 1
183
- return token
184
-
185
- attempts += 1
186
-
187
- # All round-robin tokens are rate-limited
188
- return None
189
-
190
- def get_next_token(self):
191
- """
192
- Get the next available token (try parallel first, fallback to round-robin).
193
- This is the main method for backwards compatibility.
194
- """
195
- # Try parallel pool first
196
- token = self.get_parallel_token()
197
- if token:
198
- return token
199
-
200
- # Fallback to round-robin
201
- with self.lock:
202
- self.fallback_triggers += 1
203
-
204
- token = self.get_roundrobin_token()
205
- if token:
206
- return token
207
-
208
- # All tokens exhausted - return first parallel token anyway (will hit rate limit)
209
- return self.parallel_tokens[0] if self.parallel_tokens else None
210
-
211
- def get_headers(self):
212
- """Get headers with the next available token."""
213
- token = self.get_next_token()
214
- return {'Authorization': f'token {token}'} if token else {}
215
-
216
- def mark_rate_limited(self, token, reset_timestamp=None):
217
- """
218
- Mark a token as rate-limited with optional reset timestamp.
219
-
220
- Args:
221
- token: The token that hit rate limit
222
- reset_timestamp: Unix timestamp when rate limit resets (optional)
223
- """
224
- with self.lock:
225
- # Default to 1 hour from now if no reset time provided
226
- if reset_timestamp is None:
227
- reset_timestamp = time.time() + 3600
228
-
229
- # Mark in appropriate pool
230
- if token in self.parallel_tokens:
231
- self.rate_limited_parallel[token] = reset_timestamp
232
- print(f" ⚠️ Parallel token marked as rate-limited until {datetime.fromtimestamp(reset_timestamp, timezone.utc).strftime('%H:%M:%S UTC')}")
233
-
234
- if token in self.roundrobin_tokens:
235
- self.rate_limited_roundrobin[token] = reset_timestamp
236
- print(f" ⚠️ Round-robin token marked as rate-limited until {datetime.fromtimestamp(reset_timestamp, timezone.utc).strftime('%H:%M:%S UTC')}")
237
-
238
- def get_stats(self):
239
- """Get usage statistics for monitoring."""
240
- with self.lock:
241
- return {
242
- 'parallel_calls': self.parallel_calls,
243
- 'roundrobin_calls': self.roundrobin_calls,
244
- 'fallback_triggers': self.fallback_triggers,
245
- 'parallel_rate_limited': len(self.rate_limited_parallel),
246
- 'roundrobin_rate_limited': len(self.rate_limited_roundrobin)
247
- }
248
-
249
- def print_stats(self):
250
- """Print usage statistics."""
251
- stats = self.get_stats()
252
- total_calls = stats['parallel_calls'] + stats['roundrobin_calls']
253
-
254
- if total_calls > 0:
255
- print(f"\n📊 Token Pool Statistics:")
256
- print(f" Total API calls: {total_calls}")
257
- print(f" Parallel calls: {stats['parallel_calls']} ({stats['parallel_calls']/total_calls*100:.1f}%)")
258
- print(f" Round-robin calls: {stats['roundrobin_calls']} ({stats['roundrobin_calls']/total_calls*100:.1f}%)")
259
- print(f" Fallback triggers: {stats['fallback_triggers']}")
260
- print(f" Currently rate-limited: {stats['parallel_rate_limited']} parallel, {stats['roundrobin_rate_limited']} round-robin")
261
-
262
-
263
- def get_hf_token():
264
- """Get HuggingFace token from environment variables."""
265
- token = os.getenv('HF_TOKEN')
266
- if not token:
267
- print("Warning: HF_TOKEN not found in environment variables")
268
- return token
269
-
270
 
271
- # =============================================================================
272
- # GITHUB API FUNCTIONS
273
- # =============================================================================
274
-
275
- def request_with_backoff(method, url, *, headers=None, params=None, json_body=None, data=None, max_retries=10, timeout=30, token_pool=None, token=None):
276
  """
277
- Perform an HTTP request with exponential backoff and jitter for GitHub API.
278
- Retries on 403/429 (rate limits), 5xx server errors, and transient network exceptions.
279
-
280
- Args:
281
- token_pool: Optional TokenPool instance for marking rate-limited tokens
282
- token: Optional token string used for this request (for rate limit tracking)
283
 
284
- Returns the final requests.Response on success or non-retryable status, or None after exhausting retries.
285
- """
286
- delay = 1.0
287
- for attempt in range(max_retries):
288
- try:
289
- resp = requests.request(
290
- method,
291
- url,
292
- headers=headers or {},
293
- params=params,
294
- json=json_body,
295
- data=data,
296
- timeout=timeout
297
- )
298
 
299
- status = resp.status_code
300
-
301
- # Success
302
- if 200 <= status < 300:
303
- return resp
304
-
305
- # Rate limits or server errors -> retry with backoff
306
- if status in (403, 429) or 500 <= status < 600:
307
- wait = None
308
- reset_timestamp = None
309
-
310
- # Prefer Retry-After when present
311
- retry_after = resp.headers.get('Retry-After') or resp.headers.get('retry-after')
312
- if retry_after:
313
- try:
314
- wait = float(retry_after)
315
- except Exception:
316
- wait = None
317
-
318
- # Fallback to X-RateLimit-Reset when 403/429
319
- if wait is None and status in (403, 429):
320
- reset_hdr = resp.headers.get('X-RateLimit-Reset') or resp.headers.get('x-ratelimit-reset')
321
- if reset_hdr:
322
- try:
323
- reset_ts = int(float(reset_hdr))
324
- reset_timestamp = reset_ts
325
- wait = max(reset_ts - time.time() + 2, 1)
326
- except Exception:
327
- wait = None
328
-
329
- # Final fallback: exponential backoff with jitter
330
- if wait is None:
331
- wait = delay + random.uniform(0, 0.5)
332
-
333
- # Mark token as rate-limited if we have token pool and token info
334
- if status in (403, 429) and token_pool and token:
335
- token_pool.mark_rate_limited(token, reset_timestamp)
336
-
337
- # Cap individual wait to avoid extreme sleeps
338
- wait = max(1.0, min(wait, 120.0))
339
- print(f"GitHub API {status}. Backing off {wait:.1f}s (attempt {attempt + 1}/{max_retries})...")
340
- time.sleep(wait)
341
- delay = min(delay * 2, 60.0)
342
- continue
343
 
344
- # Non-retryable error; return response for caller to handle
345
- return resp
346
 
347
- except requests.RequestException as e:
348
- # Network error -> retry with backoff
349
- wait = delay + random.uniform(0, 0.5)
350
- wait = max(1.0, min(wait, 60.0))
351
- print(f"Request error: {e}. Retrying in {wait:.1f}s (attempt {attempt + 1}/{max_retries})...")
352
- time.sleep(wait)
353
- delay = min(delay * 2, 60.0)
354
 
355
- print(f"Exceeded max retries for {url}")
356
- return None
 
357
 
358
 
359
- def fetch_prs_within_day_partition(base_query, start_date, end_date, token_pool, prs_by_id, depth=0, max_depth=8):
360
  """
361
- Recursively fetch PRs within a time range by subdividing into smaller granularities.
362
- This function handles INTRA-DAY partitioning (hours → minutes → seconds).
363
-
364
- Used when a single day query hits the 1000-result limit.
365
- Recursion is bounded to prevent stack overflow.
366
 
367
  Args:
368
- base_query: Base query string (already includes the day in created: clause)
369
  start_date: Start datetime
370
  end_date: End datetime
371
- token_pool: TokenPool instance for rotating tokens
372
- prs_by_id: Dict to store deduplicated PRs by ID
373
- depth: Current recursion depth
374
- max_depth: Maximum allowed recursion depth
375
-
376
- Returns:
377
- Total number of new PRs found in this partition
378
- """
379
- # Safety limit on recursion depth
380
- if depth >= max_depth:
381
- print(f"{' ' * depth}⚠️ Max recursion depth ({max_depth}) reached. Some results may be missing.")
382
- return 0
383
-
384
- time_diff = end_date - start_date
385
- total_seconds = time_diff.total_seconds()
386
-
387
- # Determine granularity based on time range
388
- if total_seconds >= 3600: # >= 1 hour - subdivide by hours
389
- start_str = start_date.strftime('%Y-%m-%dT%H:00:00Z')
390
- end_str = end_date.strftime('%Y-%m-%dT%H:59:59Z')
391
- granularity = "hour"
392
- elif total_seconds >= 60: # >= 1 minute - subdivide by minutes
393
- start_str = start_date.strftime('%Y-%m-%dT%H:%M:00Z')
394
- end_str = end_date.strftime('%Y-%m-%dT%H:%M:59Z')
395
- granularity = "minute"
396
- else: # < 1 minute - subdivide by seconds
397
- start_str = start_date.strftime('%Y-%m-%dT%H:%M:%SZ')
398
- end_str = end_date.strftime('%Y-%m-%dT%H:%M:%SZ')
399
- granularity = "second"
400
-
401
- query = f'{base_query} created:{start_str}..{end_str}'
402
- indent = " " * depth
403
-
404
- print(f"{indent}[Depth {depth}] Searching {granularity} range: {start_str} to {end_str}...")
405
-
406
- page = 1
407
- per_page = 100
408
- total_in_partition = 0
409
-
410
- while True:
411
- url = 'https://api.github.com/search/issues'
412
- params = {
413
- 'q': query,
414
- 'per_page': per_page,
415
- 'page': page,
416
- 'sort': 'created',
417
- 'order': 'asc'
418
- }
419
-
420
- try:
421
- # Get token for tracking
422
- token = token_pool.get_next_token()
423
- headers = {'Authorization': f'token {token}'} if token else {}
424
-
425
- response = request_with_backoff('GET', url, headers=headers, params=params,
426
- token_pool=token_pool, token=token)
427
- if response is None:
428
- print(f"{indent} ✗ Retries exhausted for {start_str} to {end_str}")
429
- return total_in_partition
430
-
431
- if response.status_code != 200:
432
- print(f"{indent} ✗ HTTP {response.status_code} for {start_str} to {end_str}")
433
- return total_in_partition
434
-
435
- data = response.json()
436
- total_count = data.get('total_count', 0)
437
- items = data.get('items', [])
438
-
439
- if not items:
440
- break
441
-
442
- # Add PRs to global dict, count new ones
443
- for pr in items:
444
- pr_id = pr.get('id')
445
- if pr_id and pr_id not in prs_by_id:
446
- prs_by_id[pr_id] = pr
447
- total_in_partition += 1
448
-
449
- # Check if we hit the 1000-result limit
450
- if total_count > 1000 and page == 10:
451
- print(f"{indent} ⚠️ Hit 1000-result limit ({total_count} total). Subdividing {granularity}...")
452
-
453
- # Check if we can subdivide further
454
- if total_seconds < 2:
455
- print(f"{indent} ⚠️ Cannot subdivide further (< 2 seconds). Some results may be missing.")
456
- break
457
-
458
- # Subdivide based on current granularity
459
- if granularity == "hour":
460
- # Split hour into 4 parts (15-minute intervals)
461
- num_splits = 4
462
- elif granularity == "minute":
463
- # Split minute into 4 parts (15-second intervals)
464
- num_splits = 4
465
- else: # granularity == "second"
466
- # Can't subdivide seconds further meaningfully
467
- print(f"{indent} ⚠️ Already at second granularity. Cannot subdivide. Some results may be missing.")
468
- break
469
-
470
- split_duration = time_diff / num_splits
471
- total_from_splits = 0
472
-
473
- for i in range(num_splits):
474
- split_start = start_date + split_duration * i
475
- split_end = start_date + split_duration * (i + 1)
476
-
477
- count = fetch_prs_within_day_partition(
478
- base_query, split_start, split_end, token_pool, prs_by_id, depth + 1, max_depth
479
- )
480
- total_from_splits += count
481
-
482
- return total_from_splits
483
-
484
- # Normal pagination: check if there are more pages
485
- if len(items) < per_page or page >= 10:
486
- break
487
-
488
- page += 1
489
- time.sleep(0.5) # Courtesy delay between pages
490
-
491
- except Exception as e:
492
- print(f"{indent} ✗ Error fetching {start_str} to {end_str}: {str(e)}")
493
- return total_in_partition
494
-
495
- if total_in_partition > 0:
496
- print(f"{indent} ✓ Found {total_in_partition} PRs in {granularity} range")
497
-
498
- return total_in_partition
499
-
500
-
501
- def fetch_prs_with_time_partition(base_query, start_date, end_date, token_pool, prs_by_id):
502
- """
503
- Iteratively fetch PRs by iterating through each day in the date range.
504
- For each day, query with daily granularity.
505
- If a single day hits the 1000-result limit, subdivide that day recursively (hours → minutes → seconds).
506
-
507
- This hybrid iterative-recursive approach prevents deep recursion by:
508
- - Using iteration for the outer loop (days)
509
- - Using recursion only for intra-day partitioning (hours/minutes/seconds)
510
-
511
- Args:
512
- base_query: Base query string (e.g., 'is:pr author:{identifier}')
513
- start_date: Start date
514
- end_date: End date (inclusive)
515
- token_pool: TokenPool instance for rotating tokens
516
- prs_by_id: Dict to store deduplicated PRs by ID
517
 
518
  Returns:
519
- Total number of new PRs found
520
  """
 
521
  current_date = start_date
522
- total_found = 0
523
-
524
- # Iterate through each day
525
- while current_date <= end_date:
526
- day_start = current_date.replace(hour=0, minute=0, second=0, microsecond=0)
527
- day_end = day_start.replace(hour=23, minute=59, second=59, microsecond=999999)
528
-
529
- # Ensure we don't go past end_date
530
- if day_end > end_date:
531
- day_end = end_date
532
-
533
- day_str = current_date.strftime('%Y-%m-%d')
534
- print(f"\n📅 Processing day: {day_str}")
535
-
536
- # First, try a simple daily query
537
- query = f'{base_query} created:{day_str}'
538
- url = 'https://api.github.com/search/issues'
539
- params = {
540
- 'q': query,
541
- 'per_page': 1,
542
- 'page': 1,
543
- 'sort': 'created',
544
- 'order': 'asc'
545
- }
546
-
547
- try:
548
- # Get token for tracking
549
- token = token_pool.get_next_token()
550
- headers = {'Authorization': f'token {token}'} if token else {}
551
-
552
- response = request_with_backoff('GET', url, headers=headers, params=params,
553
- token_pool=token_pool, token=token)
554
- if response and response.status_code == 200:
555
- data = response.json()
556
- total_count = data.get('total_count', 0)
557
-
558
- if total_count > 1000:
559
- print(f" ⚠️ Day has {total_count} PRs (exceeds 1000-result limit). Subdividing by time of day...")
560
- # Use recursive intra-day partitioning
561
- count = fetch_prs_within_day_partition(
562
- base_query, day_start, day_end, token_pool, prs_by_id, depth=0
563
- )
564
- total_found += count
565
- else:
566
- # Normal case: fetch all PRs for this day
567
- print(f" Fetching {total_count} PRs...")
568
- page = 1
569
- per_page = 100
570
- day_count = 0
571
-
572
- while True:
573
- params_page = {
574
- 'q': query,
575
- 'per_page': per_page,
576
- 'page': page,
577
- 'sort': 'created',
578
- 'order': 'asc'
579
- }
580
-
581
- # Get token for tracking
582
- token = token_pool.get_next_token()
583
- headers = {'Authorization': f'token {token}'} if token else {}
584
-
585
- response = request_with_backoff('GET', url, headers=headers, params=params_page,
586
- token_pool=token_pool, token=token)
587
- if not response or response.status_code != 200:
588
- break
589
-
590
- items = response.json().get('items', [])
591
- if not items:
592
- break
593
-
594
- for pr in items:
595
- pr_id = pr.get('id')
596
- if pr_id and pr_id not in prs_by_id:
597
- prs_by_id[pr_id] = pr
598
- day_count += 1
599
-
600
- if len(items) < per_page:
601
- break
602
-
603
- page += 1
604
- time.sleep(0.5)
605
-
606
- if day_count > 0:
607
- print(f" ✓ Found {day_count} new PRs for {day_str}")
608
- total_found += day_count
609
-
610
- time.sleep(0.5) # Courtesy delay between days
611
-
612
- except Exception as e:
613
- print(f" ✗ Error processing {day_str}: {str(e)}")
614
- continue
615
 
616
- # Move to next day
 
 
617
  current_date += timedelta(days=1)
618
 
619
- return total_found
620
-
 
621
 
622
- def extract_pr_metadata(pr):
623
- """
624
- Extract minimal PR metadata for efficient storage.
625
- Only keeps essential fields: html_url, created_at, merged_at, closed_at.
626
- """
627
- pull_request = pr.get('pull_request', {})
628
 
629
- # Extract dates
630
- created_at = pr.get('created_at')
631
- merged_at = pull_request.get('merged_at')
632
- closed_at = pr.get('closed_at')
633
-
634
- # Only store closed_at if PR is closed but not merged
635
- if merged_at:
636
- closed_at = None # Don't store redundant info
637
 
638
- return {
639
- 'html_url': pr.get('html_url'),
640
- 'created_at': created_at,
641
- 'merged_at': merged_at,
642
- 'closed_at': closed_at
643
- }
644
 
 
 
 
645
 
646
- def fetch_prs_parallel(query_patterns, start_date, end_date, token_pool, max_workers=None):
647
  """
648
- Fetch PRs for multiple query patterns in parallel using available tokens.
 
 
 
 
649
 
650
  Args:
651
- query_patterns: List of query pattern strings
652
- start_date: Start date for PR search
653
- end_date: End date for PR search
654
- token_pool: TokenPool instance
655
- max_workers: Maximum number of concurrent workers (defaults to number of available parallel tokens)
656
 
657
  Returns:
658
- Dictionary mapping query pattern to list of PRs found
659
- """
660
- import concurrent.futures
661
-
662
- # Determine number of workers based on available parallel tokens
663
- available_tokens = token_pool.get_available_parallel_tokens()
664
- if not available_tokens:
665
- # Fall back to sequential if no parallel tokens available
666
- print(" ⚠️ No parallel tokens available, using sequential fallback")
667
- return None
668
-
669
- if max_workers is None:
670
- max_workers = len(available_tokens)
671
-
672
- print(f" 🚀 Starting parallel execution with {max_workers} worker(s)")
673
-
674
- results = {}
675
-
676
- def fetch_single_pattern(pattern):
677
- """Fetch PRs for a single query pattern."""
678
- prs_by_id = {}
679
- try:
680
- prs_found = fetch_prs_with_time_partition(
681
- pattern,
682
- start_date,
683
- end_date,
684
- token_pool,
685
- prs_by_id
686
- )
687
- return pattern, prs_by_id
688
- except Exception as e:
689
- print(f" ✗ Error in parallel fetch for pattern '{pattern}': {str(e)}")
690
- return pattern, {}
691
-
692
- # Execute patterns in parallel
693
- with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
694
- # Submit all tasks
695
- future_to_pattern = {
696
- executor.submit(fetch_single_pattern, pattern): pattern
697
- for pattern in query_patterns
698
  }
699
-
700
- # Collect results as they complete
701
- for future in concurrent.futures.as_completed(future_to_pattern):
702
- pattern = future_to_pattern[future]
703
- try:
704
- pattern_key, prs = future.result()
705
- results[pattern_key] = prs
706
- print(f" ✓ Parallel fetch completed for pattern: {pattern_key}")
707
- except Exception as e:
708
- print(f" ✗ Parallel fetch failed for pattern '{pattern}': {str(e)}")
709
- results[pattern] = {}
710
-
711
- return results
712
-
713
-
714
- def fetch_all_prs_metadata(identifier, agent_name, token_pool=None, use_parallel=True):
715
  """
716
- Fetch pull requests associated with a GitHub user or bot for the past LEADERBOARD_TIME_FRAME_DAYS.
717
- Returns lightweight metadata instead of full PR objects.
718
-
719
- This function employs time-based partitioning to navigate GitHub's 1000-result limit per query.
720
- It searches using multiple query patterns:
721
- - is:pr author:{identifier} (PRs authored by the bot)
722
- - is:pr "co-authored-by: {identifier}" (PRs with commits co-authored by the bot)
723
- - is:pr head:{identifier}/ (PRs with branch names starting with the bot identifier)
724
-
725
- Args:
726
- identifier: GitHub username or bot identifier
727
- agent_name: Human-readable name of the agent for metadata purposes
728
- token_pool: TokenPool instance for rotating tokens
729
-
730
- Returns:
731
- List of dictionaries containing minimal PR metadata
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
732
  """
733
 
734
- # Define query patterns per rules:
735
- # 1) author pattern only if identifier contains "[bot]"
736
- # 2) co-author and head patterns use identifier with "[bot]" removed
737
- stripped_id = identifier.replace('[bot]', '')
738
- query_patterns = []
739
- if '[bot]' in identifier:
740
- query_patterns.append(f'is:pr author:{identifier}')
741
- if stripped_id:
742
- query_patterns.append(f'is:pr "co-authored-by: {stripped_id}"')
743
- query_patterns.append(f'is:pr head:{stripped_id}/')
744
-
745
- # Use a dict to deduplicate PRs by ID
746
- prs_by_id = {}
747
 
748
- # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
749
- current_time = datetime.now(timezone.utc)
750
- end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0) # 12:00 AM today (UTC)
751
- start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
752
-
753
- # Try parallel execution first if enabled
754
- if use_parallel and len(query_patterns) > 1:
755
- print(f"\n🚀 Attempting parallel execution for {len(query_patterns)} query patterns...")
756
- parallel_start_time = time.time()
757
-
758
- parallel_results = fetch_prs_parallel(query_patterns, start_date, end_date, token_pool)
759
-
760
- if parallel_results is not None:
761
- # Merge results from parallel execution
762
- for pattern, pattern_prs in parallel_results.items():
763
- for pr_id, pr in pattern_prs.items():
764
- if pr_id not in prs_by_id:
765
- prs_by_id[pr_id] = pr
766
-
767
- parallel_duration = time.time() - parallel_start_time
768
- print(f"\n ✅ Parallel execution complete: {len(prs_by_id)} unique PRs found")
769
- print(f" ⏱️ Total time: {parallel_duration:.1f} seconds")
770
-
771
- # Print token pool statistics
772
- token_pool.print_stats()
773
- else:
774
- # Fallback to sequential execution
775
- print(" ⚠️ Parallel execution not available, falling back to sequential...")
776
- use_parallel = False
777
-
778
- # Sequential execution (fallback or if parallel disabled)
779
- if not use_parallel or len(query_patterns) <= 1:
780
- for query_pattern in query_patterns:
781
- print(f"\n🔍 Searching with query: {query_pattern}")
782
- print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')} (today excluded)")
783
-
784
- pattern_start_time = time.time()
785
- initial_count = len(prs_by_id)
786
-
787
- # Fetch with time partitioning
788
- prs_found = fetch_prs_with_time_partition(
789
- query_pattern,
790
- start_date,
791
- end_date,
792
- token_pool,
793
- prs_by_id
794
- )
795
 
796
- pattern_duration = time.time() - pattern_start_time
797
- new_prs = len(prs_by_id) - initial_count
798
 
799
- print(f" ✓ Pattern complete: {new_prs} new PRs found ({prs_found} total fetched, {len(prs_by_id) - initial_count - (prs_found - new_prs)} duplicates)")
800
- print(f" ⏱️ Time taken: {pattern_duration:.1f} seconds")
801
 
802
- time.sleep(1.0)
 
 
 
 
803
 
804
- # Convert to lightweight metadata
805
- all_prs = list(prs_by_id.values())
 
806
 
807
- print(f"\n✅ COMPLETE: Found {len(all_prs)} unique PRs for {identifier}")
808
- print(f"📦 Extracting minimal metadata...")
 
809
 
810
- metadata_list = [extract_pr_metadata(pr) for pr in all_prs]
 
 
 
 
 
811
 
812
- # Calculate memory savings
813
- import sys
814
- original_size = sys.getsizeof(str(all_prs))
815
- metadata_size = sys.getsizeof(str(metadata_list))
816
- savings_pct = ((original_size - metadata_size) / original_size * 100) if original_size > 0 else 0
817
 
818
- print(f"💾 Memory efficiency: {original_size // 1024}KB → {metadata_size // 1024}KB (saved {savings_pct:.1f}%)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
819
 
820
- return metadata_list
 
 
 
 
821
 
822
 
823
  # =============================================================================
@@ -846,50 +315,18 @@ def group_metadata_by_date(metadata_list):
846
  return dict(grouped)
847
 
848
 
849
- def upload_with_retry(api, path_or_fileobj, path_in_repo, repo_id, repo_type, token, max_retries=5):
850
- """
851
- Upload file to HuggingFace with exponential backoff retry logic.
852
- """
853
- delay = 2.0
854
-
855
- for attempt in range(max_retries):
856
- try:
857
- api.upload_file(
858
- path_or_fileobj=path_or_fileobj,
859
- path_in_repo=path_in_repo,
860
- repo_id=repo_id,
861
- repo_type=repo_type,
862
- token=token
863
- )
864
- if attempt > 0:
865
- print(f" ✓ Upload succeeded on attempt {attempt + 1}/{max_retries}")
866
- return True
867
-
868
- except Exception as e:
869
- if attempt < max_retries - 1:
870
- wait_time = delay + random.uniform(0, 1.0)
871
- print(f" ⚠️ Upload failed (attempt {attempt + 1}/{max_retries}): {str(e)}")
872
- print(f" ⏳ Retrying in {wait_time:.1f} seconds...")
873
- time.sleep(wait_time)
874
- delay = min(delay * 2, 60.0)
875
- else:
876
- print(f" ✗ Upload failed after {max_retries} attempts: {str(e)}")
877
- raise
878
-
879
-
880
  def save_pr_metadata_to_hf(metadata_list, agent_identifier):
881
  """
882
  Save PR metadata to HuggingFace dataset, organized by [agent_identifier]/YYYY.MM.DD.jsonl.
883
  Each file is stored in the agent's folder and named YYYY.MM.DD.jsonl for that day's PRs.
884
 
885
- This function APPENDS new metadata and DEDUPLICATES by html_url.
886
- Uses batch upload to avoid HuggingFace rate limits (256 commits/hour).
887
 
888
  Args:
889
  metadata_list: List of PR metadata dictionaries
890
  agent_identifier: GitHub identifier of the agent (used as folder name)
891
  """
892
- import tempfile
893
  import shutil
894
 
895
  try:
@@ -897,76 +334,64 @@ def save_pr_metadata_to_hf(metadata_list, agent_identifier):
897
  if not token:
898
  raise Exception("No HuggingFace token found")
899
 
900
- api = HfApi()
901
 
902
- # Group by exact date (year, month, day)
903
  grouped = group_metadata_by_date(metadata_list)
904
 
905
- # Create a temporary directory to prepare all files for batch upload
 
 
 
 
906
  temp_dir = tempfile.mkdtemp()
907
- agent_dir = os.path.join(temp_dir, agent_identifier)
908
- os.makedirs(agent_dir, exist_ok=True)
909
 
910
  try:
911
- print(f"📦 Preparing {len(grouped)} daily files for batch upload...")
912
 
 
913
  for (pr_year, month, day), day_metadata in grouped.items():
914
- # New structure: [agent_identifier]/YYYY.MM.DD.jsonl
915
  filename = f"{agent_identifier}/{pr_year}.{month:02d}.{day:02d}.jsonl"
916
- local_path = os.path.join(agent_dir, f"{pr_year}.{month:02d}.{day:02d}.jsonl")
917
 
918
- print(f" Preparing {len(day_metadata)} PRs for {filename}...")
 
919
 
920
- # Download existing file if it exists
921
- existing_metadata = []
922
- try:
923
- file_path = hf_hub_download(
924
- repo_id=PR_METADATA_REPO,
925
- filename=filename,
926
- repo_type="dataset",
927
- token=token
928
- )
929
- existing_metadata = load_jsonl(file_path)
930
- print(f" Found {len(existing_metadata)} existing PRs, merging...")
931
- except Exception:
932
- print(f" No existing file found, creating new...")
933
-
934
- # Merge and deduplicate by html_url
935
- existing_by_url = {meta['html_url']: meta for meta in existing_metadata if meta.get('html_url')}
936
- new_by_url = {meta['html_url']: meta for meta in day_metadata if meta.get('html_url')}
937
-
938
- # Update with new data (new data overwrites old)
939
- existing_by_url.update(new_by_url)
940
- merged_metadata = list(existing_by_url.values())
941
-
942
- # Save to temp directory
943
- save_jsonl(local_path, merged_metadata)
944
- print(f" ✓ Prepared {len(merged_metadata)} total PRs")
945
-
946
- # Batch upload entire folder in a single commit
947
- print(f"\n📤 Uploading all files for {agent_identifier} in one batch...")
948
- api.upload_folder(
949
  folder_path=temp_dir,
950
  repo_id=PR_METADATA_REPO,
951
- repo_type="dataset",
952
- token=token,
953
- commit_message=f"Update PR metadata for {agent_identifier}"
954
  )
955
- print(f" ✓ Successfully uploaded {len(grouped)} files in 1 commit")
956
 
957
- finally:
958
- # Clean up temporary directory
959
- shutil.rmtree(temp_dir, ignore_errors=True)
960
 
961
- return True
 
 
 
962
 
963
  except Exception as e:
964
- print(f"✗ Error saving PR metadata: {str(e)}")
 
 
965
  return False
966
 
967
 
968
  def load_agents_from_hf():
969
- """Load all agent metadata JSON files from HuggingFace dataset."""
 
 
 
 
970
  try:
971
  api = HfApi()
972
  agents = []
@@ -990,6 +415,11 @@ def load_agents_from_hf():
990
 
991
  with open(file_path, 'r') as f:
992
  agent_data = json.load(f)
 
 
 
 
 
993
  agents.append(agent_data)
994
 
995
  except Exception as e:
@@ -1011,54 +441,95 @@ def load_agents_from_hf():
1011
  def mine_all_agents():
1012
  """
1013
  Mine PR metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace.
 
1014
  """
1015
- # Initialize token pool
1016
- tokens = get_github_tokens()
1017
- token_pool = TokenPool(tokens)
1018
-
1019
  # Load agent metadata from HuggingFace
1020
  agents = load_agents_from_hf()
1021
  if not agents:
1022
  print("No agents found in HuggingFace dataset")
1023
  return
1024
 
 
 
 
 
 
 
1025
  print(f"\n{'='*80}")
1026
- print(f"Starting PR metadata mining for {len(agents)} agents")
1027
  print(f"Time frame: Last {LEADERBOARD_TIME_FRAME_DAYS} days")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1028
  print(f"{'='*80}\n")
1029
 
1030
- # Mine each agent
1031
- for agent in agents:
 
 
 
1032
  identifier = agent.get('github_identifier')
1033
- agent_name = agent.get('agent_name', 'Unknown')
1034
 
1035
  if not identifier:
1036
- print(f"Warning: Skipping agent without identifier: {agent}")
 
1037
  continue
1038
 
1039
- try:
1040
- print(f"\n{'='*80}")
1041
- print(f"Processing: {agent_name} ({identifier})")
1042
- print(f"{'='*80}")
1043
 
1044
- # Fetch PR metadata
1045
- metadata = fetch_all_prs_metadata(identifier, agent_name, token_pool)
1046
 
 
1047
  if metadata:
1048
- print(f"💾 Saving {len(metadata)} PR records...")
1049
- save_pr_metadata_to_hf(metadata, identifier)
1050
- print(f"✓ Successfully processed {agent_name}")
 
 
1051
  else:
1052
- print(f" No PRs found for {agent_name}")
 
1053
 
1054
  except Exception as e:
1055
- print(f"✗ Error processing {identifier}: {str(e)}")
1056
  import traceback
1057
  traceback.print_exc()
 
1058
  continue
1059
 
1060
  print(f"\n{'='*80}")
1061
- print(f"✅ Mining complete for all agents")
 
 
 
 
 
1062
  print(f"{'='*80}\n")
1063
 
1064
 
 
1
  """
2
  Minimalist PR Metadata Mining Script
3
+ Mines PR metadata from GitHub Archive via BigQuery and saves to HuggingFace dataset.
4
  """
5
 
6
  import json
7
  import os
8
+ import tempfile
 
9
  from datetime import datetime, timezone, timedelta
10
  from collections import defaultdict
11
  from huggingface_hub import HfApi, hf_hub_download
12
  from dotenv import load_dotenv
13
+ from google.cloud import bigquery
14
 
15
  # Load environment variables
16
  load_dotenv()
 
51
  f.write(json.dumps(item) + '\n')
52
 
53
 
54
+ def get_bigquery_client():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  """
56
+ Initialize BigQuery client using credentials from environment variable.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
+ Expects GOOGLE_APPLICATION_CREDENTIALS_JSON environment variable containing
59
+ the service account JSON credentials as a string.
 
 
 
60
  """
61
+ # Get the JSON content from environment variable
62
+ creds_json = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS_JSON')
 
 
 
 
63
 
64
+ if creds_json:
65
+ # Create a temporary file to store credentials
66
+ with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
67
+ temp_file.write(creds_json)
68
+ temp_path = temp_file.name
 
 
 
 
 
 
 
 
 
69
 
70
+ # Set environment variable to point to temp file
71
+ os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = temp_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
+ # Initialize BigQuery client
74
+ client = bigquery.Client()
75
 
76
+ # Clean up temp file
77
+ os.unlink(temp_path)
 
 
 
 
 
78
 
79
+ return client
80
+ else:
81
+ raise ValueError("GOOGLE_APPLICATION_CREDENTIALS_JSON not found in environment")
82
 
83
 
84
+ def generate_table_union_statements(start_date, end_date):
85
  """
86
+ Generate UNION ALL statements for githubarchive.day tables in date range.
 
 
 
 
87
 
88
  Args:
 
89
  start_date: Start datetime
90
  end_date: End datetime
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
 
92
  Returns:
93
+ String with UNION ALL SELECT statements for all tables in range
94
  """
95
+ table_names = []
96
  current_date = start_date
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
 
98
+ while current_date < end_date:
99
+ table_name = f"`githubarchive.day.{current_date.strftime('%Y%m%d')}`"
100
+ table_names.append(table_name)
101
  current_date += timedelta(days=1)
102
 
103
+ # Create UNION ALL chain
104
+ union_parts = [f"SELECT * FROM {table}" for table in table_names]
105
+ return " UNION ALL ".join(union_parts)
106
 
 
 
 
 
 
 
107
 
108
+ def get_hf_token():
109
+ """Get HuggingFace token from environment variables."""
110
+ token = os.getenv('HF_TOKEN')
111
+ if not token:
112
+ print("Warning: HF_TOKEN not found in environment variables")
113
+ return token
 
 
114
 
 
 
 
 
 
 
115
 
116
+ # =============================================================================
117
+ # BIGQUERY FUNCTIONS
118
+ # =============================================================================
119
 
120
+ def fetch_all_pr_metadata_single_query(client, identifiers, start_date, end_date):
121
  """
122
+ Fetch PR metadata for ALL agents using ONE comprehensive BigQuery query.
123
+
124
+ This query fetches:
125
+ 1. PRs authored by agents (user.login matches identifier)
126
+ 2. PRs from branches starting with agent identifier (head.ref pattern)
127
 
128
  Args:
129
+ client: BigQuery client instance
130
+ identifiers: List of GitHub usernames/bot identifiers
131
+ start_date: Start datetime (timezone-aware)
132
+ end_date: End datetime (timezone-aware)
 
133
 
134
  Returns:
135
+ Dictionary mapping agent identifier to list of PR metadata:
136
+ {
137
+ 'agent-identifier': [
138
+ {
139
+ 'url': PR URL,
140
+ 'created_at': Creation timestamp,
141
+ 'merged_at': Merge timestamp (if merged, else None),
142
+ 'closed_at': Close timestamp (if closed but not merged, else None)
143
+ },
144
+ ...
145
+ ],
146
+ ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  """
149
+ print(f"\n🔍 Querying BigQuery for ALL {len(identifiers)} agents in ONE QUERY")
150
+ print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
151
+
152
+ # Generate table UNION statements for the time range
153
+ table_union = generate_table_union_statements(start_date, end_date)
154
+
155
+ # Build identifier lists for SQL IN clauses
156
+ # For author matching, include identifiers with [bot]
157
+ author_list = ', '.join([f"'{id}'" for id in identifiers if '[bot]' in id])
158
+
159
+ # For branch matching, use stripped identifiers (without [bot])
160
+ stripped_identifiers = [id.replace('[bot]', '') for id in identifiers]
161
+ branch_patterns = ' OR '.join([f"JSON_EXTRACT_SCALAR(payload, '$.pull_request.head.ref') LIKE '{id}/%'"
162
+ for id in stripped_identifiers if id])
163
+
164
+ # Build comprehensive query with CTE
165
+ query = f"""
166
+ WITH pr_events AS (
167
+ -- Get all PR events (opened, closed) for all agents
168
+ SELECT
169
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.html_url') as url,
170
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.user.login') as pr_author,
171
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.head.ref') as branch_name,
172
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.created_at') as created_at,
173
+ CAST(JSON_EXTRACT_SCALAR(payload, '$.pull_request.merged') AS BOOL) as is_merged,
174
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.merged_at') as merged_at,
175
+ JSON_EXTRACT_SCALAR(payload, '$.pull_request.closed_at') as closed_at,
176
+ JSON_EXTRACT_SCALAR(payload, '$.action') as action,
177
+ created_at as event_time
178
+ FROM (
179
+ {table_union}
180
+ )
181
+ WHERE
182
+ type = 'PullRequestEvent'
183
+ AND JSON_EXTRACT_SCALAR(payload, '$.pull_request.html_url') IS NOT NULL
184
+ AND (
185
+ -- Match PRs authored by agents with [bot] suffix
186
+ {f"JSON_EXTRACT_SCALAR(payload, '$.pull_request.user.login') IN ({author_list})" if author_list else "FALSE"}
187
+ {"OR" if author_list and branch_patterns else ""}
188
+ -- Match PRs with branch names starting with agent identifier
189
+ {f"({branch_patterns})" if branch_patterns else ""}
190
+ )
191
+ ),
192
+
193
+ pr_latest_state AS (
194
+ -- Get the latest state for each PR (most recent event)
195
+ SELECT
196
+ url,
197
+ pr_author,
198
+ branch_name,
199
+ created_at,
200
+ merged_at,
201
+ closed_at,
202
+ ROW_NUMBER() OVER (PARTITION BY url ORDER BY event_time DESC) as row_num
203
+ FROM pr_events
204
+ )
205
+
206
+ -- Return deduplicated PR metadata
207
+ SELECT DISTINCT
208
+ url,
209
+ pr_author,
210
+ branch_name,
211
+ created_at,
212
+ merged_at,
213
+ -- Only include closed_at if PR is closed but not merged
214
+ CASE
215
+ WHEN merged_at IS NOT NULL THEN NULL
216
+ ELSE closed_at
217
+ END as closed_at
218
+ FROM pr_latest_state
219
+ WHERE row_num = 1
220
+ ORDER BY created_at DESC
221
  """
222
 
223
+ print(f" Querying {(end_date - start_date).days} days of GitHub Archive data...")
224
+ print(f" Agents: {', '.join(identifiers[:5])}{'...' if len(identifiers) > 5 else ''}")
 
 
 
 
 
 
 
 
 
 
 
225
 
226
+ try:
227
+ query_job = client.query(query)
228
+ results = list(query_job.result())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
229
 
230
+ print(f" ✓ Found {len(results)} total PRs across all agents")
 
231
 
232
+ # Group results by agent
233
+ metadata_by_agent = defaultdict(list)
234
 
235
+ for row in results:
236
+ # Convert datetime objects to ISO strings
237
+ created_at = row.created_at
238
+ if hasattr(created_at, 'isoformat'):
239
+ created_at = created_at.isoformat()
240
 
241
+ merged_at = row.merged_at
242
+ if hasattr(merged_at, 'isoformat'):
243
+ merged_at = merged_at.isoformat()
244
 
245
+ closed_at = row.closed_at
246
+ if hasattr(closed_at, 'isoformat'):
247
+ closed_at = closed_at.isoformat()
248
 
249
+ pr_data = {
250
+ 'url': row.url,
251
+ 'created_at': created_at,
252
+ 'merged_at': merged_at,
253
+ 'closed_at': closed_at,
254
+ }
255
 
256
+ # Assign to agent based on author or branch pattern
257
+ pr_author = row.pr_author
258
+ branch_name = row.branch_name or ''
 
 
259
 
260
+ # First, try to match by author
261
+ if pr_author and pr_author in identifiers:
262
+ metadata_by_agent[pr_author].append(pr_data)
263
+ else:
264
+ # Try to match by branch pattern
265
+ for identifier in identifiers:
266
+ stripped_id = identifier.replace('[bot]', '')
267
+ if stripped_id and branch_name.startswith(f"{stripped_id}/"):
268
+ metadata_by_agent[identifier].append(pr_data)
269
+ break
270
+
271
+ # Print breakdown by agent
272
+ print(f"\n 📊 Results breakdown by agent:")
273
+ for identifier in identifiers:
274
+ count = len(metadata_by_agent.get(identifier, []))
275
+ if count > 0:
276
+ metadata = metadata_by_agent[identifier]
277
+ merged_count = sum(1 for m in metadata if m['merged_at'] is not None)
278
+ closed_count = sum(1 for m in metadata if m['closed_at'] is not None and m['merged_at'] is None)
279
+ open_count = count - merged_count - closed_count
280
+ print(f" {identifier}: {count} PRs ({merged_count} merged, {closed_count} closed, {open_count} open)")
281
+
282
+ # Convert defaultdict to regular dict
283
+ return dict(metadata_by_agent)
284
 
285
+ except Exception as e:
286
+ print(f" ✗ BigQuery error: {str(e)}")
287
+ import traceback
288
+ traceback.print_exc()
289
+ return {}
290
 
291
 
292
  # =============================================================================
 
315
  return dict(grouped)
316
 
317
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
318
  def save_pr_metadata_to_hf(metadata_list, agent_identifier):
319
  """
320
  Save PR metadata to HuggingFace dataset, organized by [agent_identifier]/YYYY.MM.DD.jsonl.
321
  Each file is stored in the agent's folder and named YYYY.MM.DD.jsonl for that day's PRs.
322
 
323
+ This function OVERWRITES existing files completely with fresh data from BigQuery.
324
+ Uses batch upload to avoid rate limit (uploads entire folder in single operation).
325
 
326
  Args:
327
  metadata_list: List of PR metadata dictionaries
328
  agent_identifier: GitHub identifier of the agent (used as folder name)
329
  """
 
330
  import shutil
331
 
332
  try:
 
334
  if not token:
335
  raise Exception("No HuggingFace token found")
336
 
337
+ api = HfApi(token=token)
338
 
339
+ # Group by date (year, month, day)
340
  grouped = group_metadata_by_date(metadata_list)
341
 
342
+ if not grouped:
343
+ print(f" No valid metadata to save for {agent_identifier}")
344
+ return False
345
+
346
+ # Create a temporary directory for batch upload
347
  temp_dir = tempfile.mkdtemp()
348
+ agent_folder = os.path.join(temp_dir, agent_identifier)
349
+ os.makedirs(agent_folder, exist_ok=True)
350
 
351
  try:
352
+ print(f" 📦 Preparing batch upload for {len(grouped)} daily files...")
353
 
354
+ # Process each daily file
355
  for (pr_year, month, day), day_metadata in grouped.items():
 
356
  filename = f"{agent_identifier}/{pr_year}.{month:02d}.{day:02d}.jsonl"
357
+ local_filename = os.path.join(agent_folder, f"{pr_year}.{month:02d}.{day:02d}.jsonl")
358
 
359
+ # Sort by created_at for better organization
360
+ day_metadata.sort(key=lambda x: x.get('created_at', ''), reverse=True)
361
 
362
+ # Save to temp directory (complete overwrite, no merging)
363
+ save_jsonl(local_filename, day_metadata)
364
+ print(f" Prepared {len(day_metadata)} PRs for {filename}")
365
+
366
+ # Upload entire folder using upload_large_folder (optimized for large files)
367
+ print(f" 📤 Uploading {len(grouped)} files ({len(metadata_list)} total PRs)...")
368
+ api.upload_large_folder(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
369
  folder_path=temp_dir,
370
  repo_id=PR_METADATA_REPO,
371
+ repo_type="dataset"
 
 
372
  )
373
+ print(f" ✓ Batch upload complete for {agent_identifier}")
374
 
375
+ return True
 
 
376
 
377
+ finally:
378
+ # Always clean up temp directory
379
+ if os.path.exists(temp_dir):
380
+ shutil.rmtree(temp_dir)
381
 
382
  except Exception as e:
383
+ print(f" ✗ Error saving PR metadata: {str(e)}")
384
+ import traceback
385
+ traceback.print_exc()
386
  return False
387
 
388
 
389
  def load_agents_from_hf():
390
+ """
391
+ Load all agent metadata JSON files from HuggingFace dataset.
392
+
393
+ The github_identifier is extracted from the filename (e.g., 'agent-name[bot].json' -> 'agent-name[bot]')
394
+ """
395
  try:
396
  api = HfApi()
397
  agents = []
 
415
 
416
  with open(file_path, 'r') as f:
417
  agent_data = json.load(f)
418
+
419
+ # Extract github_identifier from filename (remove .json extension)
420
+ github_identifier = json_file.replace('.json', '')
421
+ agent_data['github_identifier'] = github_identifier
422
+
423
  agents.append(agent_data)
424
 
425
  except Exception as e:
 
441
  def mine_all_agents():
442
  """
443
  Mine PR metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace.
444
+ Uses ONE BigQuery query for ALL agents (most efficient approach).
445
  """
 
 
 
 
446
  # Load agent metadata from HuggingFace
447
  agents = load_agents_from_hf()
448
  if not agents:
449
  print("No agents found in HuggingFace dataset")
450
  return
451
 
452
+ # Extract all identifiers
453
+ identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
454
+ if not identifiers:
455
+ print("No valid agent identifiers found")
456
+ return
457
+
458
  print(f"\n{'='*80}")
459
+ print(f"Starting PR metadata mining for {len(identifiers)} agents")
460
  print(f"Time frame: Last {LEADERBOARD_TIME_FRAME_DAYS} days")
461
+ print(f"Data source: BigQuery + GitHub Archive (ONE QUERY FOR ALL AGENTS)")
462
+ print(f"{'='*80}\n")
463
+
464
+ # Initialize BigQuery client
465
+ try:
466
+ client = get_bigquery_client()
467
+ except Exception as e:
468
+ print(f"✗ Failed to initialize BigQuery client: {str(e)}")
469
+ return
470
+
471
+ # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
472
+ current_time = datetime.now(timezone.utc)
473
+ end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
474
+ start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
475
+
476
+ try:
477
+ all_metadata = fetch_all_pr_metadata_single_query(
478
+ client, identifiers, start_date, end_date
479
+ )
480
+ except Exception as e:
481
+ print(f"✗ Error during BigQuery fetch: {str(e)}")
482
+ import traceback
483
+ traceback.print_exc()
484
+ return
485
+
486
+ # Save results for each agent
487
+ print(f"\n{'='*80}")
488
+ print(f"💾 Saving results to HuggingFace for each agent...")
489
  print(f"{'='*80}\n")
490
 
491
+ success_count = 0
492
+ error_count = 0
493
+ no_data_count = 0
494
+
495
+ for i, agent in enumerate(agents, 1):
496
  identifier = agent.get('github_identifier')
497
+ agent_name = agent.get('name', agent.get('agent_name', 'Unknown'))
498
 
499
  if not identifier:
500
+ print(f"[{i}/{len(agents)}] Skipping agent without identifier")
501
+ error_count += 1
502
  continue
503
 
504
+ metadata = all_metadata.get(identifier, [])
 
 
 
505
 
506
+ print(f"[{i}/{len(agents)}] {agent_name} ({identifier}):")
 
507
 
508
+ try:
509
  if metadata:
510
+ print(f" 💾 Saving {len(metadata)} PR records...")
511
+ if save_pr_metadata_to_hf(metadata, identifier):
512
+ success_count += 1
513
+ else:
514
+ error_count += 1
515
  else:
516
+ print(f" No PRs found")
517
+ no_data_count += 1
518
 
519
  except Exception as e:
520
+ print(f" ✗ Error saving {identifier}: {str(e)}")
521
  import traceback
522
  traceback.print_exc()
523
+ error_count += 1
524
  continue
525
 
526
  print(f"\n{'='*80}")
527
+ print(f"✅ Mining complete!")
528
+ print(f" Total agents: {len(agents)}")
529
+ print(f" Successfully saved: {success_count}")
530
+ print(f" No data (skipped): {no_data_count}")
531
+ print(f" Errors: {error_count}")
532
+ print(f" BigQuery queries executed: 1")
533
  print(f"{'='*80}\n")
534
 
535