File size: 5,325 Bytes
1cf02c3
 
 
 
 
 
2262962
 
1cf02c3
 
645eac6
 
1cf02c3
 
645eac6
1cf02c3
 
645eac6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1cf02c3
645eac6
 
 
 
 
1cf02c3
645eac6
1cf02c3
 
3601d7a
645eac6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1cf02c3
 
 
 
645eac6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3601d7a
645eac6
 
1cf02c3
645eac6
 
 
3601d7a
645eac6
1cf02c3
645eac6
1cf02c3
 
645eac6
 
 
 
 
 
 
1cf02c3
 
 
645eac6
 
 
1cf02c3
 
 
645eac6
 
 
 
 
1cf02c3
 
645eac6
1cf02c3
645eac6
 
 
1cf02c3
 
645eac6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1cf02c3
645eac6
 
1cf02c3
645eac6
 
1cf02c3
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import json
import os
from functools import lru_cache
from typing import List, Tuple

import gradio as gr
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import HfHubHTTPError

DATASET_ID = os.environ.get(
    "CIRCLECI_RESULTS_DATASET_ID",
    "transformers-community/circleci-test-results",
)
MAX_ROWS = 200
API = HfApi()


@lru_cache(maxsize=128)
def _list_collection_files(pr_number: str) -> Tuple[str, ...]:
    """
    Return the `collection_summary.json` paths stored for a specific PR.
    """
    prefix = f"pr-{pr_number}"
    try:
        entries = API.list_repo_tree(
            repo_id=DATASET_ID,
            repo_type="dataset",
            path=prefix,
            recursive=True,
        )
    except HfHubHTTPError as error:
        print(f"Failed to list repo tree for {prefix}: {error}")
        return tuple()

    files = []
    for entry in entries:
        entry_type = getattr(entry, "type", None)
        if entry_type == "file" and entry.path.endswith("collection_summary.json"):
            files.append(entry.path)
    return tuple(files)


def _load_payload(path: str) -> dict | None:
    try:
        local_path = hf_hub_download(
            repo_id=DATASET_ID,
            filename=path,
            repo_type="dataset",
        )
    except Exception as error:
        print(f"Failed to download {path}: {error}")
        return None

    try:
        with open(local_path) as fp:
            return json.load(fp)
    except Exception as error:
        print(f"Failed to load JSON for {path}: {error}")
        return None


def _extract_commit_from_path(path: str) -> str:
    parts = path.split("/")
    if len(parts) >= 2 and parts[1].startswith("sha-"):
        return parts[1][len("sha-") :]
    return "unknown"


def _filter_records(repo: str, pr: str, sha: str) -> List[dict]:
    repo = repo.strip().lower()
    pr = pr.strip()
    sha = sha.strip().lower()

    if not pr:
        return []

    file_paths = _list_collection_files(pr)
    records: List[dict] = []
    for file_path in file_paths:
        commit = _extract_commit_from_path(file_path)
        if sha and not commit.lower().startswith(sha):
            continue
        payload = _load_payload(file_path)
        if payload is None:
            continue
        metadata = payload.get("metadata") or {}
        repository = (metadata.get("repository") or "").lower()
        if repo and repo not in repository:
            continue
        payload["__source_path"] = file_path
        payload["__commit"] = commit
        records.append(payload)

    def _sort_key(record: dict) -> str:
        metadata = record.get("metadata") or {}
        return metadata.get("collected_at") or ""

    records.sort(key=_sort_key, reverse=True)
    return records[:MAX_ROWS]


def query(repo: str, pr: str, sha: str) -> Tuple[List[List[str]], str, str]:
    repo = repo.strip()
    pr = pr.strip()
    sha = sha.strip()

    if not pr:
        return [], json.dumps({"error": "PR number is required."}, indent=2), "Provide a PR number to search."

    records = _filter_records(repo, pr, sha)
    if not records:
        return [], json.dumps({"error": "No records found."}, indent=2), f"No records found for PR {pr}."

    table_rows = []
    for record in records:
        metadata = record.get("metadata") or {}
        table_rows.append(
            [
                metadata.get("collected_at", ""),
                metadata.get("repository", ""),
                metadata.get("branch", ""),
                metadata.get("pull_request_number", ""),
                (metadata.get("commit_sha") or "")[:12],
                metadata.get("workflow_id", ""),
                str(len(record.get("failures", []))),
            ]
        )

    latest_payload = json.dumps(records[0], indent=2)
    status = f"Showing {len(records)} record(s) for PR {pr}."
    return table_rows, latest_payload, status


def refresh_dataset() -> str:
    _list_collection_files.cache_clear()
    return "Cleared cached manifest. Data will be reloaded on next search."


with gr.Blocks() as demo:
    gr.Markdown(
        """
        # CircleCI Test Collection Helper

        Use the filters below to inspect CircleCI test aggregation records for the Transformers repository (or any
        repository that uploads data to the `transformers-community/circleci-test-results` dataset).
        """
    )

    with gr.Row():
        repo_box = gr.Textbox(label="Repository", placeholder="huggingface/transformers")
        pr_box = gr.Textbox(label="PR number (required)")
        sha_box = gr.Textbox(label="Commit SHA (prefix accepted)")

    with gr.Row():
        search_btn = gr.Button("Search")
        refresh_btn = gr.Button("Clear cache")

    table = gr.Dataframe(
        headers=[
            "Collected at",
            "Repository",
            "Branch",
            "PR",
            "Commit",
            "Workflow ID",
            "Failures",
        ],
        wrap=True,
    )
    json_view = gr.Code(label="Latest entry details", language="json")
    status = gr.Markdown("")

    search_btn.click(query, inputs=[repo_box, pr_box, sha_box], outputs=[table, json_view, status])
    refresh_btn.click(refresh_dataset, outputs=status)

if __name__ == "__main__":
    demo.queue(max_size=20).launch()