zhimin-z commited on
Commit
43cd77a
·
1 Parent(s): e967f1b
Files changed (1) hide show
  1. msr.py +86 -51
msr.py CHANGED
@@ -30,6 +30,10 @@ LEADERBOARD_TIME_FRAME_DAYS = 180 # Time frame for leaderboard
30
  GHARCHIVE_DATA_DIR = "../gharchive/data" # Local GHArchive data directory
31
  DUCKDB_CACHE_FILE = "../gharchive/gharchive_cache.duckdb" # Persistent DuckDB database for caching
32
 
 
 
 
 
33
  # Download configuration
34
  DOWNLOAD_WORKERS = 4 # Number of parallel download threads
35
  DOWNLOAD_RETRY_DELAY = 2 # Initial retry delay in seconds
@@ -332,11 +336,13 @@ def get_duckdb_connection():
332
  # Use persistent database for caching results
333
  conn = duckdb.connect(DUCKDB_CACHE_FILE)
334
 
335
- # Optimize for 96-core CPU parallelization with 754GB RAM
336
- conn.execute("SET threads TO 8;") # Use all available cores
337
  conn.execute("SET preserve_insertion_order = false;") # Better parallelization
338
  conn.execute("SET enable_object_cache = true;") # Cache objects for reuse
339
  conn.execute("SET temp_directory = '/tmp/duckdb_temp';") # Use fast temp storage if needed
 
 
340
 
341
  return conn
342
 
@@ -428,54 +434,51 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
428
  identifier_list = ', '.join([f"'{id}'" for id in identifiers])
429
 
430
  # Build comprehensive query with CTEs using parameterized file lists (JSON.gz format)
 
431
  query = f"""
432
- WITH review_events AS (
433
- -- Get all review events for ALL agents
434
  SELECT
435
- TRY_CAST(json_extract_string(payload, '$.pull_request.html_url') AS VARCHAR) as url,
436
- COALESCE(
437
- TRY_CAST(json_extract_string(payload, '$.review.submitted_at') AS VARCHAR),
438
- TRY_CAST(created_at AS VARCHAR)
439
- ) as reviewed_at,
440
  TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) as reviewer,
441
  TRY_CAST(json_extract_string(repo, '$.name') AS VARCHAR) as repo_name,
442
- TRY_CAST(json_extract_string(payload, '$.pull_request.number') AS INTEGER) as pr_number
 
443
  FROM read_json($review_patterns, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
444
  WHERE
445
- TRY_CAST(type AS VARCHAR) = 'PullRequestReviewEvent'
446
  AND TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) IN ({identifier_list})
447
- AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL
448
-
449
- UNION ALL
450
-
451
- -- Get PR comments (IssueCommentEvent on PRs)
452
- SELECT
453
- TRY_CAST(json_extract_string(payload, '$.issue.html_url') AS VARCHAR) as url,
454
- TRY_CAST(created_at AS VARCHAR) as reviewed_at,
455
- TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) as reviewer,
456
- TRY_CAST(json_extract_string(repo, '$.name') AS VARCHAR) as repo_name,
457
- TRY_CAST(json_extract_string(payload, '$.issue.number') AS INTEGER) as pr_number
458
- FROM read_json($review_patterns, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
459
- WHERE
460
- TRY_CAST(type AS VARCHAR) = 'IssueCommentEvent'
461
- AND TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) IN ({identifier_list})
462
- AND json_extract_string(payload, '$.issue.pull_request.url') IS NOT NULL
463
- AND json_extract_string(payload, '$.issue.html_url') IS NOT NULL
464
-
465
- UNION ALL
466
 
467
- -- Get review comments (PullRequestReviewCommentEvent)
 
468
  SELECT
469
- TRY_CAST(json_extract_string(payload, '$.pull_request.html_url') AS VARCHAR) as url,
470
- TRY_CAST(created_at AS VARCHAR) as reviewed_at,
471
- TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) as reviewer,
472
- TRY_CAST(json_extract_string(repo, '$.name') AS VARCHAR) as repo_name,
473
- TRY_CAST(json_extract_string(payload, '$.pull_request.number') AS INTEGER) as pr_number
474
- FROM read_json($review_patterns, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
475
  WHERE
476
- TRY_CAST(type AS VARCHAR) = 'PullRequestReviewCommentEvent'
477
- AND TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) IN ({identifier_list})
478
- AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL
 
479
  ),
480
 
481
  pr_status AS (
@@ -495,18 +498,34 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
495
  AND json_extract_string(payload, '$.pull_request.html_url') IN (
496
  SELECT DISTINCT url FROM review_events
497
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
498
  )
499
 
500
- -- Join review events with PR status
501
- SELECT DISTINCT
502
- re.reviewer,
503
- re.url,
504
- re.reviewed_at,
505
- ps.merged_at,
506
- ps.closed_at
507
- FROM review_events re
508
- LEFT JOIN (SELECT * FROM pr_status WHERE rn = 1) ps ON re.url = ps.url
509
- ORDER BY re.reviewer, re.reviewed_at DESC
510
  """
511
 
512
  try:
@@ -546,8 +565,10 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
546
  [r[4] for r in results]
547
  ])
548
 
549
- # Group results by agent
550
  metadata_by_agent = defaultdict(list)
 
 
551
 
552
  for row in results:
553
  reviewer = row[0]
@@ -556,6 +577,12 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
556
  merged_at = normalize_date_format(row[3]) if row[3] else None
557
  closed_at = normalize_date_format(row[4]) if row[4] else None
558
 
 
 
 
 
 
 
559
  metadata_by_agent[reviewer].append({
560
  'url': url,
561
  'reviewed_at': reviewed_at,
@@ -563,6 +590,14 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
563
  'closed_at': closed_at,
564
  })
565
 
 
 
 
 
 
 
 
 
566
  # Convert defaultdict to regular dict
567
  return dict(metadata_by_agent)
568
 
 
30
  GHARCHIVE_DATA_DIR = "../gharchive/data" # Local GHArchive data directory
31
  DUCKDB_CACHE_FILE = "../gharchive/gharchive_cache.duckdb" # Persistent DuckDB database for caching
32
 
33
+ # DuckDB performance configuration
34
+ DUCKDB_THREADS = 8 # Number of threads for parallel processing
35
+ DUCKDB_MEMORY_LIMIT = "64GB" # Memory limit to prevent OOM crashes
36
+
37
  # Download configuration
38
  DOWNLOAD_WORKERS = 4 # Number of parallel download threads
39
  DOWNLOAD_RETRY_DELAY = 2 # Initial retry delay in seconds
 
336
  # Use persistent database for caching results
337
  conn = duckdb.connect(DUCKDB_CACHE_FILE)
338
 
339
+ # Optimize for parallel processing with memory limits
340
+ conn.execute(f"SET threads TO {DUCKDB_THREADS};") # Configure parallel threads
341
  conn.execute("SET preserve_insertion_order = false;") # Better parallelization
342
  conn.execute("SET enable_object_cache = true;") # Cache objects for reuse
343
  conn.execute("SET temp_directory = '/tmp/duckdb_temp';") # Use fast temp storage if needed
344
+ conn.execute(f"SET memory_limit = '{DUCKDB_MEMORY_LIMIT}';") # Limit memory to prevent OOM crashes
345
+ conn.execute(f"SET max_memory = '{DUCKDB_MEMORY_LIMIT}';") # Hard memory cap
346
 
347
  return conn
348
 
 
434
  identifier_list = ', '.join([f"'{id}'" for id in identifiers])
435
 
436
  # Build comprehensive query with CTEs using parameterized file lists (JSON.gz format)
437
+ # Optimized: Single file scan + ROW_NUMBER() deduplication (no DISTINCT)
438
  query = f"""
439
+ WITH all_review_events AS (
440
+ -- Single file scan for all three event types (optimization: 3x I/O reduction)
441
  SELECT
442
+ TRY_CAST(type AS VARCHAR) as event_type,
 
 
 
 
443
  TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) as reviewer,
444
  TRY_CAST(json_extract_string(repo, '$.name') AS VARCHAR) as repo_name,
445
+ payload,
446
+ created_at
447
  FROM read_json($review_patterns, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
448
  WHERE
449
+ TRY_CAST(type AS VARCHAR) IN ('PullRequestReviewEvent', 'IssueCommentEvent', 'PullRequestReviewCommentEvent')
450
  AND TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) IN ({identifier_list})
451
+ ),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
452
 
453
+ review_events AS (
454
+ -- Process events conditionally based on type
455
  SELECT
456
+ CASE
457
+ WHEN event_type = 'IssueCommentEvent'
458
+ THEN TRY_CAST(json_extract_string(payload, '$.issue.html_url') AS VARCHAR)
459
+ ELSE TRY_CAST(json_extract_string(payload, '$.pull_request.html_url') AS VARCHAR)
460
+ END as url,
461
+ CASE
462
+ WHEN event_type = 'PullRequestReviewEvent'
463
+ THEN COALESCE(
464
+ TRY_CAST(json_extract_string(payload, '$.review.submitted_at') AS VARCHAR),
465
+ TRY_CAST(created_at AS VARCHAR)
466
+ )
467
+ ELSE TRY_CAST(created_at AS VARCHAR)
468
+ END as reviewed_at,
469
+ reviewer,
470
+ repo_name,
471
+ CASE
472
+ WHEN event_type = 'IssueCommentEvent'
473
+ THEN TRY_CAST(json_extract_string(payload, '$.issue.number') AS INTEGER)
474
+ ELSE TRY_CAST(json_extract_string(payload, '$.pull_request.number') AS INTEGER)
475
+ END as pr_number
476
+ FROM all_review_events
477
  WHERE
478
+ -- Validate required fields per event type
479
+ (event_type = 'PullRequestReviewEvent' AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL)
480
+ OR (event_type = 'IssueCommentEvent' AND json_extract_string(payload, '$.issue.pull_request.url') IS NOT NULL AND json_extract_string(payload, '$.issue.html_url') IS NOT NULL)
481
+ OR (event_type = 'PullRequestReviewCommentEvent' AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL)
482
  ),
483
 
484
  pr_status AS (
 
498
  AND json_extract_string(payload, '$.pull_request.html_url') IN (
499
  SELECT DISTINCT url FROM review_events
500
  )
501
+ ),
502
+
503
+ deduplicated_reviews AS (
504
+ -- Efficient deduplication using ROW_NUMBER() instead of DISTINCT (optimization: prevents massive hash table)
505
+ SELECT
506
+ re.reviewer,
507
+ re.url,
508
+ re.reviewed_at,
509
+ ps.merged_at,
510
+ ps.closed_at,
511
+ ROW_NUMBER() OVER (
512
+ PARTITION BY re.reviewer, re.url, re.reviewed_at
513
+ ORDER BY re.reviewed_at
514
+ ) as row_num
515
+ FROM review_events re
516
+ LEFT JOIN (SELECT * FROM pr_status WHERE rn = 1) ps ON re.url = ps.url
517
  )
518
 
519
+ -- Return deduplicated results (row_num = 1 ensures uniqueness without DISTINCT)
520
+ SELECT
521
+ reviewer,
522
+ url,
523
+ reviewed_at,
524
+ merged_at,
525
+ closed_at
526
+ FROM deduplicated_reviews
527
+ WHERE row_num = 1
528
+ ORDER BY reviewer, reviewed_at DESC
529
  """
530
 
531
  try:
 
565
  [r[4] for r in results]
566
  ])
567
 
568
+ # Group results by agent with verification
569
  metadata_by_agent = defaultdict(list)
570
+ unique_reviews = set()
571
+ duplicate_count = 0
572
 
573
  for row in results:
574
  reviewer = row[0]
 
577
  merged_at = normalize_date_format(row[3]) if row[3] else None
578
  closed_at = normalize_date_format(row[4]) if row[4] else None
579
 
580
+ # Track unique review combinations for verification
581
+ review_key = (reviewer, url, reviewed_at)
582
+ if review_key in unique_reviews:
583
+ duplicate_count += 1
584
+ unique_reviews.add(review_key)
585
+
586
  metadata_by_agent[reviewer].append({
587
  'url': url,
588
  'reviewed_at': reviewed_at,
 
590
  'closed_at': closed_at,
591
  })
592
 
593
+ # Verification: Ensure we have unique reviews (no duplicates from query)
594
+ total_reviews = len(results)
595
+ if duplicate_count > 0:
596
+ print(f" Warning: Found {duplicate_count} duplicate review entries in query results!")
597
+ print(f" Total: {total_reviews}, Unique: {len(unique_reviews)}")
598
+ else:
599
+ print(f" Verification passed: {len(unique_reviews)} unique reviews retrieved (no duplicates)")
600
+
601
  # Convert defaultdict to regular dict
602
  return dict(metadata_by_agent)
603