File size: 5,325 Bytes
1cf02c3 2262962 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 3601d7a 645eac6 1cf02c3 645eac6 3601d7a 645eac6 1cf02c3 645eac6 3601d7a 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 645eac6 1cf02c3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import json
import os
from functools import lru_cache
from typing import List, Tuple
import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import HfHubHTTPError
DATASET_ID = os.environ.get(
"CIRCLECI_RESULTS_DATASET_ID",
"transformers-community/circleci-test-results",
)
MAX_ROWS = 200
API = HfApi()
@lru_cache(maxsize=128)
def _list_collection_files(pr_number: str) -> Tuple[str, ...]:
"""
Return the `collection_summary.json` paths stored for a specific PR.
"""
prefix = f"pr-{pr_number}"
try:
entries = API.list_repo_tree(
repo_id=DATASET_ID,
repo_type="dataset",
path=prefix,
recursive=True,
)
except HfHubHTTPError as error:
print(f"Failed to list repo tree for {prefix}: {error}")
return tuple()
files = []
for entry in entries:
entry_type = getattr(entry, "type", None)
if entry_type == "file" and entry.path.endswith("collection_summary.json"):
files.append(entry.path)
return tuple(files)
def _load_payload(path: str) -> dict | None:
try:
local_path = hf_hub_download(
repo_id=DATASET_ID,
filename=path,
repo_type="dataset",
)
except Exception as error:
print(f"Failed to download {path}: {error}")
return None
try:
with open(local_path) as fp:
return json.load(fp)
except Exception as error:
print(f"Failed to load JSON for {path}: {error}")
return None
def _extract_commit_from_path(path: str) -> str:
parts = path.split("/")
if len(parts) >= 2 and parts[1].startswith("sha-"):
return parts[1][len("sha-") :]
return "unknown"
def _filter_records(repo: str, pr: str, sha: str) -> List[dict]:
repo = repo.strip().lower()
pr = pr.strip()
sha = sha.strip().lower()
if not pr:
return []
file_paths = _list_collection_files(pr)
records: List[dict] = []
for file_path in file_paths:
commit = _extract_commit_from_path(file_path)
if sha and not commit.lower().startswith(sha):
continue
payload = _load_payload(file_path)
if payload is None:
continue
metadata = payload.get("metadata") or {}
repository = (metadata.get("repository") or "").lower()
if repo and repo not in repository:
continue
payload["__source_path"] = file_path
payload["__commit"] = commit
records.append(payload)
def _sort_key(record: dict) -> str:
metadata = record.get("metadata") or {}
return metadata.get("collected_at") or ""
records.sort(key=_sort_key, reverse=True)
return records[:MAX_ROWS]
def query(repo: str, pr: str, sha: str) -> Tuple[List[List[str]], str, str]:
repo = repo.strip()
pr = pr.strip()
sha = sha.strip()
if not pr:
return [], json.dumps({"error": "PR number is required."}, indent=2), "Provide a PR number to search."
records = _filter_records(repo, pr, sha)
if not records:
return [], json.dumps({"error": "No records found."}, indent=2), f"No records found for PR {pr}."
table_rows = []
for record in records:
metadata = record.get("metadata") or {}
table_rows.append(
[
metadata.get("collected_at", ""),
metadata.get("repository", ""),
metadata.get("branch", ""),
metadata.get("pull_request_number", ""),
(metadata.get("commit_sha") or "")[:12],
metadata.get("workflow_id", ""),
str(len(record.get("failures", []))),
]
)
latest_payload = json.dumps(records[0], indent=2)
status = f"Showing {len(records)} record(s) for PR {pr}."
return table_rows, latest_payload, status
def refresh_dataset() -> str:
_list_collection_files.cache_clear()
return "Cleared cached manifest. Data will be reloaded on next search."
with gr.Blocks() as demo:
gr.Markdown(
"""
# CircleCI Test Collection Helper
Use the filters below to inspect CircleCI test aggregation records for the Transformers repository (or any
repository that uploads data to the `transformers-community/circleci-test-results` dataset).
"""
)
with gr.Row():
repo_box = gr.Textbox(label="Repository", placeholder="huggingface/transformers")
pr_box = gr.Textbox(label="PR number (required)")
sha_box = gr.Textbox(label="Commit SHA (prefix accepted)")
with gr.Row():
search_btn = gr.Button("Search")
refresh_btn = gr.Button("Clear cache")
table = gr.Dataframe(
headers=[
"Collected at",
"Repository",
"Branch",
"PR",
"Commit",
"Workflow ID",
"Failures",
],
wrap=True,
)
json_view = gr.Code(label="Latest entry details", language="json")
status = gr.Markdown("")
search_btn.click(query, inputs=[repo_box, pr_box, sha_box], outputs=[table, json_view, status])
refresh_btn.click(refresh_dataset, outputs=status)
if __name__ == "__main__":
demo.queue(max_size=20).launch()
|