|
|
import json |
|
|
import os |
|
|
from functools import lru_cache |
|
|
from typing import List, Tuple |
|
|
|
|
|
import gradio as gr |
|
|
from huggingface_hub import HfApi, hf_hub_download |
|
|
from huggingface_hub.utils import HfHubHTTPError |
|
|
|
|
|
DATASET_ID = os.environ.get( |
|
|
"CIRCLECI_RESULTS_DATASET_ID", |
|
|
"transformers-community/circleci-test-results", |
|
|
) |
|
|
MAX_ROWS = 200 |
|
|
API = HfApi() |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=128) |
|
|
def _list_collection_files(pr_number: str) -> Tuple[str, ...]: |
|
|
""" |
|
|
Return the `collection_summary.json` paths stored for a specific PR. |
|
|
""" |
|
|
prefix = f"pr-{pr_number}" |
|
|
try: |
|
|
entries = API.list_repo_tree( |
|
|
repo_id=DATASET_ID, |
|
|
repo_type="dataset", |
|
|
path=prefix, |
|
|
recursive=True, |
|
|
) |
|
|
except HfHubHTTPError as error: |
|
|
print(f"Failed to list repo tree for {prefix}: {error}") |
|
|
return tuple() |
|
|
|
|
|
files = [] |
|
|
for entry in entries: |
|
|
entry_type = getattr(entry, "type", None) |
|
|
if entry_type == "file" and entry.path.endswith("collection_summary.json"): |
|
|
files.append(entry.path) |
|
|
return tuple(files) |
|
|
|
|
|
|
|
|
def _load_payload(path: str) -> dict | None: |
|
|
try: |
|
|
local_path = hf_hub_download( |
|
|
repo_id=DATASET_ID, |
|
|
filename=path, |
|
|
repo_type="dataset", |
|
|
) |
|
|
except Exception as error: |
|
|
print(f"Failed to download {path}: {error}") |
|
|
return None |
|
|
|
|
|
try: |
|
|
with open(local_path) as fp: |
|
|
return json.load(fp) |
|
|
except Exception as error: |
|
|
print(f"Failed to load JSON for {path}: {error}") |
|
|
return None |
|
|
|
|
|
|
|
|
def _extract_commit_from_path(path: str) -> str: |
|
|
parts = path.split("/") |
|
|
if len(parts) >= 2 and parts[1].startswith("sha-"): |
|
|
return parts[1][len("sha-") :] |
|
|
return "unknown" |
|
|
|
|
|
|
|
|
def _filter_records(repo: str, pr: str, sha: str) -> List[dict]: |
|
|
repo = repo.strip().lower() |
|
|
pr = pr.strip() |
|
|
sha = sha.strip().lower() |
|
|
|
|
|
if not pr: |
|
|
return [] |
|
|
|
|
|
file_paths = _list_collection_files(pr) |
|
|
records: List[dict] = [] |
|
|
for file_path in file_paths: |
|
|
commit = _extract_commit_from_path(file_path) |
|
|
if sha and not commit.lower().startswith(sha): |
|
|
continue |
|
|
payload = _load_payload(file_path) |
|
|
if payload is None: |
|
|
continue |
|
|
metadata = payload.get("metadata") or {} |
|
|
repository = (metadata.get("repository") or "").lower() |
|
|
if repo and repo not in repository: |
|
|
continue |
|
|
payload["__source_path"] = file_path |
|
|
payload["__commit"] = commit |
|
|
records.append(payload) |
|
|
|
|
|
def _sort_key(record: dict) -> str: |
|
|
metadata = record.get("metadata") or {} |
|
|
return metadata.get("collected_at") or "" |
|
|
|
|
|
records.sort(key=_sort_key, reverse=True) |
|
|
return records[:MAX_ROWS] |
|
|
|
|
|
|
|
|
def query(repo: str, pr: str, sha: str) -> Tuple[List[List[str]], str, str]: |
|
|
repo = repo.strip() |
|
|
pr = pr.strip() |
|
|
sha = sha.strip() |
|
|
|
|
|
if not pr: |
|
|
return [], json.dumps({"error": "PR number is required."}, indent=2), "Provide a PR number to search." |
|
|
|
|
|
records = _filter_records(repo, pr, sha) |
|
|
if not records: |
|
|
return [], json.dumps({"error": "No records found."}, indent=2), f"No records found for PR {pr}." |
|
|
|
|
|
table_rows = [] |
|
|
for record in records: |
|
|
metadata = record.get("metadata") or {} |
|
|
table_rows.append( |
|
|
[ |
|
|
metadata.get("collected_at", ""), |
|
|
metadata.get("repository", ""), |
|
|
metadata.get("branch", ""), |
|
|
metadata.get("pull_request_number", ""), |
|
|
(metadata.get("commit_sha") or "")[:12], |
|
|
metadata.get("workflow_id", ""), |
|
|
str(len(record.get("failures", []))), |
|
|
] |
|
|
) |
|
|
|
|
|
latest_payload = json.dumps(records[0], indent=2) |
|
|
status = f"Showing {len(records)} record(s) for PR {pr}." |
|
|
return table_rows, latest_payload, status |
|
|
|
|
|
|
|
|
def refresh_dataset() -> str: |
|
|
_list_collection_files.cache_clear() |
|
|
return "Cleared cached manifest. Data will be reloaded on next search." |
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown( |
|
|
""" |
|
|
# CircleCI Test Collection Helper |
|
|
|
|
|
Use the filters below to inspect CircleCI test aggregation records for the Transformers repository (or any |
|
|
repository that uploads data to the `transformers-community/circleci-test-results` dataset). |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
repo_box = gr.Textbox(label="Repository", placeholder="huggingface/transformers") |
|
|
pr_box = gr.Textbox(label="PR number (required)") |
|
|
sha_box = gr.Textbox(label="Commit SHA (prefix accepted)") |
|
|
|
|
|
with gr.Row(): |
|
|
search_btn = gr.Button("Search") |
|
|
refresh_btn = gr.Button("Clear cache") |
|
|
|
|
|
table = gr.Dataframe( |
|
|
headers=[ |
|
|
"Collected at", |
|
|
"Repository", |
|
|
"Branch", |
|
|
"PR", |
|
|
"Commit", |
|
|
"Workflow ID", |
|
|
"Failures", |
|
|
], |
|
|
wrap=True, |
|
|
) |
|
|
json_view = gr.Code(label="Latest entry details", language="json") |
|
|
status = gr.Markdown("") |
|
|
|
|
|
search_btn.click(query, inputs=[repo_box, pr_box, sha_box], outputs=[table, json_view, status]) |
|
|
refresh_btn.click(refresh_dataset, outputs=status) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue(max_size=20).launch() |
|
|
|