import json import os from functools import lru_cache from typing import List, Tuple import gradio as gr from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import HfHubHTTPError DATASET_ID = os.environ.get( "CIRCLECI_RESULTS_DATASET_ID", "transformers-community/circleci-test-results", ) MAX_ROWS = 200 API = HfApi() @lru_cache(maxsize=128) def _list_collection_files(pr_number: str) -> Tuple[str, ...]: """ Return the `collection_summary.json` paths stored for a specific PR. """ prefix = f"pr-{pr_number}" try: entries = API.list_repo_tree( repo_id=DATASET_ID, repo_type="dataset", path=prefix, recursive=True, ) except HfHubHTTPError as error: print(f"Failed to list repo tree for {prefix}: {error}") return tuple() files = [] for entry in entries: entry_type = getattr(entry, "type", None) if entry_type == "file" and entry.path.endswith("collection_summary.json"): files.append(entry.path) return tuple(files) def _load_payload(path: str) -> dict | None: try: local_path = hf_hub_download( repo_id=DATASET_ID, filename=path, repo_type="dataset", ) except Exception as error: print(f"Failed to download {path}: {error}") return None try: with open(local_path) as fp: return json.load(fp) except Exception as error: print(f"Failed to load JSON for {path}: {error}") return None def _extract_commit_from_path(path: str) -> str: parts = path.split("/") if len(parts) >= 2 and parts[1].startswith("sha-"): return parts[1][len("sha-") :] return "unknown" def _filter_records(repo: str, pr: str, sha: str) -> List[dict]: repo = repo.strip().lower() pr = pr.strip() sha = sha.strip().lower() if not pr: return [] file_paths = _list_collection_files(pr) records: List[dict] = [] for file_path in file_paths: commit = _extract_commit_from_path(file_path) if sha and not commit.lower().startswith(sha): continue payload = _load_payload(file_path) if payload is None: continue metadata = payload.get("metadata") or {} repository = (metadata.get("repository") or "").lower() if repo and repo not in repository: continue payload["__source_path"] = file_path payload["__commit"] = commit records.append(payload) def _sort_key(record: dict) -> str: metadata = record.get("metadata") or {} return metadata.get("collected_at") or "" records.sort(key=_sort_key, reverse=True) return records[:MAX_ROWS] def query(repo: str, pr: str, sha: str) -> Tuple[List[List[str]], str, str]: repo = repo.strip() pr = pr.strip() sha = sha.strip() if not pr: return [], json.dumps({"error": "PR number is required."}, indent=2), "Provide a PR number to search." records = _filter_records(repo, pr, sha) if not records: return [], json.dumps({"error": "No records found."}, indent=2), f"No records found for PR {pr}." table_rows = [] for record in records: metadata = record.get("metadata") or {} table_rows.append( [ metadata.get("collected_at", ""), metadata.get("repository", ""), metadata.get("branch", ""), metadata.get("pull_request_number", ""), (metadata.get("commit_sha") or "")[:12], metadata.get("workflow_id", ""), str(len(record.get("failures", []))), ] ) latest_payload = json.dumps(records[0], indent=2) status = f"Showing {len(records)} record(s) for PR {pr}." return table_rows, latest_payload, status def refresh_dataset() -> str: _list_collection_files.cache_clear() return "Cleared cached manifest. Data will be reloaded on next search." with gr.Blocks() as demo: gr.Markdown( """ # CircleCI Test Collection Helper Use the filters below to inspect CircleCI test aggregation records for the Transformers repository (or any repository that uploads data to the `transformers-community/circleci-test-results` dataset). """ ) with gr.Row(): repo_box = gr.Textbox(label="Repository", placeholder="huggingface/transformers") pr_box = gr.Textbox(label="PR number (required)") sha_box = gr.Textbox(label="Commit SHA (prefix accepted)") with gr.Row(): search_btn = gr.Button("Search") refresh_btn = gr.Button("Clear cache") table = gr.Dataframe( headers=[ "Collected at", "Repository", "Branch", "PR", "Commit", "Workflow ID", "Failures", ], wrap=True, ) json_view = gr.Code(label="Latest entry details", language="json") status = gr.Markdown("") search_btn.click(query, inputs=[repo_box, pr_box, sha_box], outputs=[table, json_view, status]) refresh_btn.click(refresh_dataset, outputs=status) if __name__ == "__main__": demo.queue(max_size=20).launch()