zhimin-z commited on
Commit
3f0eb80
·
1 Parent(s): b85e4dd
Files changed (1) hide show
  1. msr.py +77 -87
msr.py CHANGED
@@ -86,17 +86,23 @@ def save_jsonl(filename, data):
86
 
87
  def normalize_date_format(date_string):
88
  """
89
- Convert date strings to standardized ISO 8601 format with Z suffix.
90
  Handles both 'T' and space-separated datetime formats (including newlines).
91
  Examples:
92
  - 2025-10-15T23:23:47.983068 -> 2025-10-15T23:23:47Z
93
  - 2025-06-17 21:21:07+00 -> 2025-06-17T21:21:07Z
 
94
  """
95
  if not date_string or date_string == 'N/A':
96
  return 'N/A'
97
 
98
  try:
99
  import re
 
 
 
 
 
100
  # Remove all whitespace (spaces, newlines, tabs) and replace with single space
101
  date_string = re.sub(r'\s+', ' ', date_string.strip())
102
 
@@ -431,97 +437,73 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
431
  # Generate file path patterns for the time range
432
  file_patterns = generate_file_path_patterns(start_date, end_date)
433
 
434
- # Build identifier list for IN clause
 
 
 
 
435
  identifier_list = ', '.join([f"'{id}'" for id in identifiers])
436
 
437
- # Build file patterns list for SQL (as a JSON array string)
438
  file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']'
439
 
440
- # Build comprehensive query with CTEs using file lists (JSON.gz format)
 
 
441
  query = f"""
442
  WITH pr_events AS (
443
- -- Get all PR events (opened, closed) for all agents
444
- SELECT
445
- TRY_CAST(json_extract_string(payload, '$.pull_request.html_url') AS VARCHAR) as url,
446
- TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) as pr_author,
447
- TRY_CAST(json_extract_string(payload, '$.pull_request.created_at') AS VARCHAR) as created_at,
448
- TRY_CAST(json_extract_string(payload, '$.pull_request.merged') AS BOOLEAN) as is_merged,
449
- TRY_CAST(json_extract_string(payload, '$.pull_request.merged_at') AS VARCHAR) as merged_at,
450
- TRY_CAST(json_extract_string(payload, '$.pull_request.closed_at') AS VARCHAR) as closed_at,
451
- TRY_CAST(json_extract_string(payload, '$.action') AS VARCHAR) as action,
452
- created_at as event_time
453
- FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
454
- WHERE
455
  type = 'PullRequestEvent'
456
- AND json_extract_string(payload, '$.pull_request.html_url') IS NOT NULL
457
- AND TRY_CAST(json_extract_string(payload, '$.action') AS VARCHAR) = 'opened'
458
- AND TRY_CAST(json_extract_string(actor, '$.login') AS VARCHAR) IN ({identifier_list})
459
  ),
460
-
461
- pr_latest_state AS (
462
- -- Get the latest state for each PR (most recent event)
463
- SELECT
464
  url,
465
  pr_author,
466
- created_at,
467
- merged_at,
468
- closed_at,
469
- ROW_NUMBER() OVER (PARTITION BY url ORDER BY event_time DESC) as row_num
470
- FROM pr_events
 
471
  )
472
-
473
- -- Return deduplicated PR metadata (row_num = 1 already ensures uniqueness)
474
  SELECT
475
- url,
476
- pr_author,
477
- created_at,
478
- merged_at,
479
- closed_at
480
- FROM pr_latest_state
481
- WHERE row_num = 1
482
  ORDER BY created_at DESC
483
- """
484
 
485
  try:
486
- # Create cache table name based on date range
487
- cache_table_name = f"pr_cache_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}"
488
-
489
- # Check if cache exists and is valid
490
- cache_exists = conn.execute(f"""
491
- SELECT COUNT(*) FROM information_schema.tables
492
- WHERE table_name = '{cache_table_name}'
493
- """).fetchone()[0] > 0
494
-
495
- if cache_exists:
496
- results = conn.execute(f"""
497
- SELECT url, pr_author, created_at, merged_at, closed_at
498
- FROM {cache_table_name}
499
- WHERE pr_author IN ({identifier_list})
500
- """).fetchall()
501
- else:
502
- # Execute query
503
- results = conn.execute(query).fetchall()
504
-
505
- # Cache the complete results for all future queries in this date range
506
- if len(results) > 0:
507
- conn.execute(f"""
508
- CREATE TABLE {cache_table_name} AS
509
- SELECT * FROM (
510
- SELECT UNNEST($1) as url, UNNEST($2) as pr_author,
511
- UNNEST($3) as created_at, UNNEST($4) as merged_at,
512
- UNNEST($5) as closed_at
513
- )
514
- """, [
515
- [r[0] for r in results],
516
- [r[1] for r in results],
517
- [r[2] for r in results],
518
- [r[3] for r in results],
519
- [r[4] for r in results]
520
- ])
521
-
522
- # Group results by agent
523
  metadata_by_agent = defaultdict(list)
524
- unique_urls = set() # Track unique PR URLs for verification
525
 
526
  for row in results:
527
  url = row[0]
@@ -530,28 +512,36 @@ def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
530
  merged_at = normalize_date_format(row[3]) if row[3] else None
531
  closed_at = normalize_date_format(row[4]) if row[4] else None
532
 
533
- # Track unique URLs
 
 
 
 
534
  unique_urls.add(url)
535
 
536
- metadata_by_agent[pr_author].append({
 
537
  'html_url': url,
538
  'created_at': created_at,
539
  'merged_at': merged_at,
540
  'closed_at': closed_at,
541
- })
542
 
543
- # Verification: Ensure we have unique PRs (no duplicates)
544
- total_prs = sum(len(prs) for prs in metadata_by_agent.values())
545
- if total_prs != len(unique_urls):
546
- print(f" Warning: Duplicate PRs detected! Total: {total_prs}, Unique: {len(unique_urls)}")
547
- else:
548
- print(f" Verification passed: {len(unique_urls)} unique PRs retrieved")
 
 
 
549
 
550
- # Convert defaultdict to regular dict
551
  return dict(metadata_by_agent)
552
 
553
  except Exception as e:
554
- print(f"DuckDB error: {str(e)}")
555
  import traceback
556
  traceback.print_exc()
557
  return {}
 
86
 
87
  def normalize_date_format(date_string):
88
  """
89
+ Convert date strings or datetime objects to standardized ISO 8601 format with Z suffix.
90
  Handles both 'T' and space-separated datetime formats (including newlines).
91
  Examples:
92
  - 2025-10-15T23:23:47.983068 -> 2025-10-15T23:23:47Z
93
  - 2025-06-17 21:21:07+00 -> 2025-06-17T21:21:07Z
94
+ - datetime object -> 2025-10-15T23:23:47Z
95
  """
96
  if not date_string or date_string == 'N/A':
97
  return 'N/A'
98
 
99
  try:
100
  import re
101
+
102
+ # Handle datetime objects directly
103
+ if isinstance(date_string, datetime):
104
+ return date_string.strftime('%Y-%m-%dT%H:%M:%SZ')
105
+
106
  # Remove all whitespace (spaces, newlines, tabs) and replace with single space
107
  date_string = re.sub(r'\s+', ' ', date_string.strip())
108
 
 
437
  # Generate file path patterns for the time range
438
  file_patterns = generate_file_path_patterns(start_date, end_date)
439
 
440
+ if not file_patterns:
441
+ print(" ✗ Error: No GHArchive data files found for the specified date range")
442
+ return {}
443
+
444
+ # Build identifier list for IN clause with proper escaping
445
  identifier_list = ', '.join([f"'{id}'" for id in identifiers])
446
 
447
+ # Build file patterns list for SQL (as JSON array string)
448
  file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']'
449
 
450
+ # ============================================================================
451
+ # REFINED DUCKDB QUERY - Using struct accessors for parsed JSON
452
+ # ============================================================================
453
  query = f"""
454
  WITH pr_events AS (
455
+ -- Get all PR opened/closed events
456
+ SELECT
457
+ CONCAT(
458
+ REPLACE(repo.url, 'api.github.com/repos/', 'github.com/'),
459
+ '/pull/',
460
+ CAST(payload.pull_request.number AS VARCHAR)
461
+ ) as url,
462
+ actor.login as pr_author,
463
+ created_at as event_time,
464
+ payload.action as event_action
465
+ FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
466
+ WHERE
467
  type = 'PullRequestEvent'
468
+ AND payload.action IN ('opened', 'closed')
469
+ AND payload.pull_request.number IS NOT NULL
470
+ AND actor.login IN ({identifier_list})
471
  ),
472
+ pr_timeline AS (
473
+ -- Build timeline: opened_at and closed_at (closed could mean merged or rejected)
474
+ SELECT
 
475
  url,
476
  pr_author,
477
+ MIN(CASE WHEN event_action = 'opened' THEN event_time END) as created_at,
478
+ MAX(CASE WHEN event_action = 'closed' THEN event_time END) as closed_at,
479
+ -- Note: GHArchive doesn't distinguish merged vs closed, so merged_at = NULL
480
+ NULL as merged_at
481
+ FROM pr_events
482
+ GROUP BY url, pr_author
483
  )
 
 
484
  SELECT
485
+ url,
486
+ pr_author,
487
+ created_at,
488
+ merged_at,
489
+ closed_at
490
+ FROM pr_timeline
491
+ WHERE created_at IS NOT NULL
492
  ORDER BY created_at DESC
493
+ """
494
 
495
  try:
496
+ # Execute the query
497
+ results = conn.execute(query).fetchall()
498
+
499
+ if not results:
500
+ print(f" ⚠ Warning: Query returned 0 results")
501
+ print(f" Checked {len(identifiers)} agent(s): {', '.join(identifiers)}")
502
+ return {}
503
+
504
+ # Group results by agent identifier
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
505
  metadata_by_agent = defaultdict(list)
506
+ unique_urls = set()
507
 
508
  for row in results:
509
  url = row[0]
 
512
  merged_at = normalize_date_format(row[3]) if row[3] else None
513
  closed_at = normalize_date_format(row[4]) if row[4] else None
514
 
515
+ # Skip if no valid URL
516
+ if not url:
517
+ continue
518
+
519
+ # Track unique URLs for verification
520
  unique_urls.add(url)
521
 
522
+ # Build metadata record
523
+ pr_metadata = {
524
  'html_url': url,
525
  'created_at': created_at,
526
  'merged_at': merged_at,
527
  'closed_at': closed_at,
528
+ }
529
 
530
+ metadata_by_agent[pr_author].append(pr_metadata)
531
+
532
+ # Log results per agent
533
+ agents_with_data = sum(1 for prs in metadata_by_agent.values() if prs)
534
+ print(f" ✓ Coverage: {agents_with_data}/{len(identifiers)} agents have PR data")
535
+
536
+ for agent_id in sorted(metadata_by_agent.keys()):
537
+ pr_count = len(metadata_by_agent[agent_id])
538
+ print(f" - {agent_id}: {pr_count} PRs")
539
 
540
+ # Convert defaultdict to regular dict before returning
541
  return dict(metadata_by_agent)
542
 
543
  except Exception as e:
544
+ print(f"DuckDB query error: {str(e)}")
545
  import traceback
546
  traceback.print_exc()
547
  return {}