Spaces:
Running
Running
| import spaces | |
| def dummy_gpu(): | |
| pass | |
| import gradio as gr | |
| from huggingface_hub import HfApi, snapshot_download | |
| from huggingface_hub.hf_api import RepoFile | |
| from typing import Union | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| import time | |
| def split_repo(repo_id: str, user_name: str, space_name: str, storage_name: str, storage_type: str, is_private: bool=True, threshold: int=10 * 1024 * 1024, hf_token: Union[str, None]="", progress=gr.Progress(track_tqdm=True)): | |
| TEMP_DIR = tempfile.mkdtemp() | |
| exist_ok = False | |
| use_dupe_api = False | |
| info_md = "" | |
| dl_code = "" | |
| space_id = f"{user_name}/{space_name}" | |
| storage_id = f"{user_name}/{storage_name}" | |
| try: | |
| kwargs = {} | |
| if hf_token: kwargs["token"] = hf_token | |
| else: raise Exception("Token not found.") | |
| api = HfApi() | |
| if not exist_ok: | |
| if api.repo_exists(repo_id=space_id, repo_type="space", **kwargs): raise Exception(f"{space_id} already exists.") | |
| if api.repo_exists(repo_id=storage_id, repo_type=storage_type, **kwargs): raise Exception(f"{space_id} already exists.") | |
| info = api.list_repo_tree(repo_id=repo_id, repo_type="space", recursive=True, **kwargs) | |
| lfiles = [] | |
| sfiles = [] | |
| for i in info: | |
| if not isinstance(i, RepoFile): continue | |
| if i.lfs is not None and i.lfs.size > threshold: lfiles.append(i.path) | |
| else: sfiles.append(i.path) | |
| #print("Large files: ", lfiles) | |
| #print("Small files: ", sfiles) | |
| if len(lfiles) == 0: raise Exception("Large file not found.") | |
| lfiles_str = "[" + ", ".join(['"' + s + '"' for s in lfiles]) + "]" | |
| sv = api.get_space_variables(repo_id=repo_id, **kwargs) | |
| sv = [{str(k): str(v)} for k, v in sv.items()] if sv and len(sv) > 0 else [] | |
| if api.repo_exists(repo_id=space_id, repo_type="space", **kwargs) and exist_ok: api.delete_repo(repo_id=space_id, repo_type="space", **kwargs) | |
| if use_dupe_api: | |
| api.duplicate_space(from_id=repo_id, to_id=space_id, exist_ok=exist_ok, private=is_private, hardware="cpu-basic", variables=sv, **kwargs) | |
| time.sleep(10) # wait for finishing of space duplication | |
| api.delete_files(repo_id=space_id, repo_type="space", delete_patterns=lfiles, **kwargs) | |
| else: | |
| snapshot_download(repo_id=repo_id, repo_type="space", ignore_patterns=lfiles, local_dir=TEMP_DIR, **kwargs) | |
| api.create_repo(repo_id=space_id, repo_type="space", space_hardware="cpu-basic", space_variables=sv, space_sdk="gradio", exist_ok=exist_ok, private=is_private, **kwargs) | |
| api.upload_folder(repo_id=space_id, repo_type="space", ignore_patterns=lfiles, folder_path=TEMP_DIR, path_in_repo=".", **kwargs) | |
| snapshot_download(repo_id=repo_id, repo_type="space", allow_patterns=lfiles, local_dir=TEMP_DIR, **kwargs) | |
| api.create_repo(repo_id=storage_id, repo_type=storage_type, exist_ok=exist_ok, private=is_private, **kwargs) | |
| api.upload_folder(repo_id=storage_id, repo_type=storage_type, allow_patterns=lfiles, folder_path=TEMP_DIR, path_in_repo=".", **kwargs) | |
| lfiles_str = "[" + ", ".join(['"' + s + '"' for s in lfiles]) + "]" | |
| dl_code = f'from huggingface_hub import snapshot_download\nlarge_files = {lfiles_str}\nsnapshot_download(repo_id="{storage_id}", repo_type="{storage_type}", allow_patterns=large_files, local_dir=".")\n' | |
| info_md = f'## Your new space URL: [{space_id}](https://hf.co/spaces/{space_id})<br>\n## Your new storage URL: [{storage_id}](https://hf.co/{storage_id if storage_type == "model" else "datasets/" + storage_id})' | |
| except Exception as e: | |
| print(e) | |
| gr.Warning(f"Error: {e}") | |
| finally: | |
| if Path(TEMP_DIR).exists() and Path(TEMP_DIR).is_dir(): shutil.rmtree(TEMP_DIR) | |
| return info_md, dl_code | |
| css = """ | |
| .title { font-size: 3em; align-items: center; text-align: center; } | |
| .info { align-items: center; text-align: center; } | |
| .block.result { margin: 1em 0; padding: 1em; box-shadow: 0 0 3px 3px #664422, 0 0 3px 2px #664422 inset; border-radius: 6px; background: #665544; } | |
| .desc [src$='#float'] { float: right; margin: 20px; } | |
| """ | |
| with gr.Blocks(theme="NoCrypt/miku@>=1.2.2", fill_width=True, css=css, delete_cache=(60, 3600)) as demo: | |
| repo_id = gr.Textbox(label="Source repo ID", placeholder="levihsu/OOTDiffusion", value="") | |
| with gr.Row(equal_height=True): | |
| user_name = gr.Textbox(label="Your user name", value="") | |
| space_name = gr.Textbox(label="Destination repo name", placeholder="OOTDiffusion", value="") | |
| storage_name = gr.Textbox(label="Storage repo name", placeholder="OOTDiffusion-storage", value="") | |
| storage_type = gr.Radio(label="Storage repo type", choices=["dataset", "model"], value="model") | |
| with gr.Column(): | |
| hf_token = gr.Textbox(label="Your HF write token", placeholder="hf_...", value="") | |
| gr.Markdown("Your token is available at [hf.co/settings/tokens](https://huggingface.co/settings/tokens).", elem_classes="info") | |
| threshold = gr.Number(label="Size threshold (bytes)", value=10 * 1024 * 1024, minimum=1, maximum=5 * 1024 * 1024 * 1024, step=1) | |
| is_private = gr.Checkbox(label="Private", value=True) | |
| run_button = gr.Button("Submit", variant="primary") | |
| dl_code = gr.Textbox(label="Code", value="", show_copy_button=True) | |
| info_md = gr.Markdown("<br><br><br>", elem_classes="result") | |
| run_button.click(split_repo, [repo_id, user_name, space_name, storage_name, storage_type, is_private, threshold, hf_token], [info_md, dl_code]) | |
| demo.queue().launch(ssr_mode=False) | |