circle-ci-viz / app.py
ArthurZ's picture
ArthurZ HF Staff
Update app.py
2262962 verified
raw
history blame
5.33 kB
import json
import os
from functools import lru_cache
from typing import List, Tuple
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import HfHubHTTPError
DATASET_ID = os.environ.get(
"CIRCLECI_RESULTS_DATASET_ID",
"transformers-community/circleci-test-results",
)
MAX_ROWS = 200
API = HfApi()
@lru_cache(maxsize=128)
def _list_collection_files(pr_number: str) -> Tuple[str, ...]:
"""
Return the `collection_summary.json` paths stored for a specific PR.
"""
prefix = f"pr-{pr_number}"
try:
entries = API.list_repo_tree(
repo_id=DATASET_ID,
repo_type="dataset",
path=prefix,
recursive=True,
)
except HfHubHTTPError as error:
print(f"Failed to list repo tree for {prefix}: {error}")
return tuple()
files = []
for entry in entries:
entry_type = getattr(entry, "type", None)
if entry_type == "file" and entry.path.endswith("collection_summary.json"):
files.append(entry.path)
return tuple(files)
def _load_payload(path: str) -> dict | None:
try:
local_path = hf_hub_download(
repo_id=DATASET_ID,
filename=path,
repo_type="dataset",
)
except Exception as error:
print(f"Failed to download {path}: {error}")
return None
try:
with open(local_path) as fp:
return json.load(fp)
except Exception as error:
print(f"Failed to load JSON for {path}: {error}")
return None
def _extract_commit_from_path(path: str) -> str:
parts = path.split("/")
if len(parts) >= 2 and parts[1].startswith("sha-"):
return parts[1][len("sha-") :]
return "unknown"
def _filter_records(repo: str, pr: str, sha: str) -> List[dict]:
repo = repo.strip().lower()
pr = pr.strip()
sha = sha.strip().lower()
if not pr:
return []
file_paths = _list_collection_files(pr)
records: List[dict] = []
for file_path in file_paths:
commit = _extract_commit_from_path(file_path)
if sha and not commit.lower().startswith(sha):
continue
payload = _load_payload(file_path)
if payload is None:
continue
metadata = payload.get("metadata") or {}
repository = (metadata.get("repository") or "").lower()
if repo and repo not in repository:
continue
payload["__source_path"] = file_path
payload["__commit"] = commit
records.append(payload)
def _sort_key(record: dict) -> str:
metadata = record.get("metadata") or {}
return metadata.get("collected_at") or ""
records.sort(key=_sort_key, reverse=True)
return records[:MAX_ROWS]
def query(repo: str, pr: str, sha: str) -> Tuple[List[List[str]], str, str]:
repo = repo.strip()
pr = pr.strip()
sha = sha.strip()
if not pr:
return [], json.dumps({"error": "PR number is required."}, indent=2), "Provide a PR number to search."
records = _filter_records(repo, pr, sha)
if not records:
return [], json.dumps({"error": "No records found."}, indent=2), f"No records found for PR {pr}."
table_rows = []
for record in records:
metadata = record.get("metadata") or {}
table_rows.append(
[
metadata.get("collected_at", ""),
metadata.get("repository", ""),
metadata.get("branch", ""),
metadata.get("pull_request_number", ""),
(metadata.get("commit_sha") or "")[:12],
metadata.get("workflow_id", ""),
str(len(record.get("failures", []))),
]
)
latest_payload = json.dumps(records[0], indent=2)
status = f"Showing {len(records)} record(s) for PR {pr}."
return table_rows, latest_payload, status
def refresh_dataset() -> str:
_list_collection_files.cache_clear()
return "Cleared cached manifest. Data will be reloaded on next search."
with gr.Blocks() as demo:
gr.Markdown(
"""
# CircleCI Test Collection Helper
Use the filters below to inspect CircleCI test aggregation records for the Transformers repository (or any
repository that uploads data to the `transformers-community/circleci-test-results` dataset).
"""
)
with gr.Row():
repo_box = gr.Textbox(label="Repository", placeholder="huggingface/transformers")
pr_box = gr.Textbox(label="PR number (required)")
sha_box = gr.Textbox(label="Commit SHA (prefix accepted)")
with gr.Row():
search_btn = gr.Button("Search")
refresh_btn = gr.Button("Clear cache")
table = gr.Dataframe(
headers=[
"Collected at",
"Repository",
"Branch",
"PR",
"Commit",
"Workflow ID",
"Failures",
],
wrap=True,
)
json_view = gr.Code(label="Latest entry details", language="json")
status = gr.Markdown("")
search_btn.click(query, inputs=[repo_box, pr_box, sha_box], outputs=[table, json_view, status])
refresh_btn.click(refresh_dataset, outputs=status)
if __name__ == "__main__":
demo.queue(max_size=20).launch()