import gradio as gr from huggingface_hub import HfApi, ModelCard, whoami from gradio_huggingfacehub_search import HuggingfaceHubSearch from llmcompressor import oneshot from llmcompressor.modifiers.quantization import QuantizationModifier, GPTQModifier from llmcompressor.modifiers.awq import AWQModifier, AWQMapping from transformers import ( AutoModelForCausalLM, Qwen2_5_VLForConditionalGeneration, AutoConfig, AutoModel ) import torch # --- Helper Functions --- def get_quantization_recipe(method, model_architecture): """ Returns the appropriate llm-compressor recipe based on the selected method. Updated to support Qwen2_5_VLForConditionalGeneration architecture and more quantization methods. """ if method == "AWQ": if model_architecture not in ["LlamaForCausalLM", "Qwen2_5_VLForConditionalGeneration"]: raise ValueError( f"AWQ quantization is only supported for LlamaForCausalLM and Qwen2_5_VLForConditionalGeneration architectures, got {model_architecture}" ) # AWQ is fundamentally incompatible with Qwen2.5-VL models due to conflicts with # the complex 3D rotary positional embedding system used for multimodal processing if model_architecture == "Qwen2_5_VLForConditionalGeneration": raise ValueError( f"AWQ quantization is not compatible with {model_architecture} architecture " "due to fundamental conflicts with complex 3D rotary positional embeddings. " "This quantization method modifies weights in a way that breaks the multimodal " "positional encoding system. Please use GPTQ, W4A16, W8A16, W8A8_INT8, W8A8_FP8, or FP8 methods instead." ) else: # LlamaForCausalLM and other supported architectures # Create AWQ mappings for Llama models mappings = [ AWQMapping( "re:.*input_layernorm", ["re:.*q_proj", "re:.*k_proj", "re:.*v_proj"] ), AWQMapping("re:.*v_proj", ["re:.*o_proj"]), AWQMapping( "re:.*post_attention_layernorm", ["re:.*gate_proj", "re:.*up_proj"] ), AWQMapping("re:.*up_proj", ["re:.*down_proj"]), ] return [ AWQModifier( ignore=["lm_head"], scheme="W4A16_ASYM", targets=["Linear"], mappings=mappings, ), ] elif method == "GPTQ": sequential_target_map = { "LlamaForCausalLM": "LlamaDecoderLayer", "MistralForCausalLM": "MistralDecoderLayer", "MixtralForCausalLM": "MixtralDecoderLayer", "Qwen2_5_VLForConditionalGeneration": "Qwen2_5_VLDecoderLayer", # Add Qwen2.5-VL support } sequential_target = sequential_target_map.get(model_architecture) if sequential_target is None: raise ValueError( f"GPTQ quantization is not supported for {model_architecture} architecture. " "Supported architectures are: " f"{', '.join(sequential_target_map.keys())}" ) if model_architecture == "Qwen2_5_VLForConditionalGeneration": return [ GPTQModifier( targets="Linear", scheme="W4A16", sequential_targets=[sequential_target], ignore=["lm_head", "re:visual.*", "re:model.visual.*"], # Ignore visual components ), ] else: return [ GPTQModifier( targets="Linear", scheme="W4A16", sequential_targets=[sequential_target], ignore=["re:.*lm_head"], ), ] elif method in ["W4A16", "W8A16", "W8A8_INT8", "W8A8_FP8", "FP8"]: # All these methods use the QuantizationModifier if model_architecture not in ["LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM", "Qwen2_5_VLForConditionalGeneration"]: raise ValueError( f"Quantization method {method} is not supported for {model_architecture} architecture. " "Supported architectures are: LlamaForCausalLM, MistralForCausalLM, MixtralForCausalLM, Qwen2_5_VLForConditionalGeneration" ) # Map method names to actual schemes (correct names for llmcompressor) scheme_map = { "W4A16": "W4A16", "W8A16": "W8A16", "W8A8_INT8": "W8A8", # Use the correct scheme name "W8A8_FP8": "W8A8", # Both use W8A8 but with different dtypes "FP8": "FP8" } ignore_layers = ["lm_head"] if "Mixtral" in model_architecture: ignore_layers.append("re:.*block_sparse_moe.gate") elif "Qwen2_5_VL" in model_architecture: ignore_layers.extend(["re:visual.*", "re:model.visual.*"]) # Ignore visual components for Qwen2.5-VL # For methods that support sequential onloading for Qwen2.5-VL, we use GPTQModifier with sequential_targets if model_architecture == "Qwen2_5_VLForConditionalGeneration" and method in ["W4A16"]: return [ GPTQModifier( targets="Linear", scheme=scheme_map[method], sequential_targets=["Qwen2_5_VLDecoderLayer"], # Sequential onloading for memory efficiency ignore=ignore_layers, ), ] else: return [QuantizationModifier( scheme=scheme_map[method], targets="Linear", ignore=ignore_layers )] elif method == "SmoothQuant": if model_architecture not in ["LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM"]: raise ValueError( f"SmoothQuant is not supported for {model_architecture} architecture. " "Supported architectures are: LlamaForCausalLM, MistralForCausalLM, MixtralForCausalLM" ) ignore_layers = ["lm_head"] if "Mixtral" in model_architecture: ignore_layers.append("re:.*block_sparse_moe.gate") return [QuantizationModifier( scheme="W8A8", # SmoothQuant typically uses W8A8 targets="Linear", ignore=ignore_layers )] elif method == "SparseGPT": if model_architecture not in ["LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM"]: raise ValueError( f"SparseGPT is not supported for {model_architecture} architecture. " "Supported architectures are: LlamaForCausalLM, MistralForCausalLM, MixtralForCausalLM" ) ignore_layers = ["lm_head"] if "Mixtral" in model_architecture: ignore_layers.append("re:.*block_sparse_moe.gate") return [ GPTQModifier( # SparseGPT uses GPTQ algorithm with different parameters targets="Linear", scheme="W4A16", # Default scheme for sparsity ignore=ignore_layers, ) ] else: raise ValueError(f"Unsupported quantization method: {method}") def get_model_class_by_name(model_type_name): """ Returns the appropriate model class based on the user-selected model type name. """ if model_type_name == "CausalLM (standard text generation)": return AutoModelForCausalLM elif model_type_name == "Qwen2_5_VLForConditionalGeneration (Qwen2.5-VL)": from transformers import Qwen2_5_VLForConditionalGeneration return Qwen2_5_VLForConditionalGeneration elif model_type_name == "Qwen2ForCausalLM (Qwen2)": from transformers import Qwen2ForCausalLM return Qwen2ForCausalLM elif model_type_name == "LlamaForCausalLM (Llama, Llama2, Llama3)": from transformers import LlamaForCausalLM return LlamaForCausalLM elif model_type_name == "MistralForCausalLM (Mistral, Mixtral)": from transformers import MistralForCausalLM return MistralForCausalLM elif model_type_name == "GemmaForCausalLM (Gemma)": from transformers import GemmaForCausalLM return GemmaForCausalLM elif model_type_name == "Gemma2ForCausalLM (Gemma2)": from transformers import Gemma2ForCausalLM return Gemma2ForCausalLM elif model_type_name == "PhiForCausalLM (Phi, Phi2)": from transformers import PhiForCausalLM return PhiForCausalLM elif model_type_name == "Phi3ForCausalLM (Phi3)": from transformers import Phi3ForCausalLM return Phi3ForCausalLM elif model_type_name == "FalconForCausalLM (Falcon)": from transformers import FalconForCausalLM return FalconForCausalLM elif model_type_name == "MptForCausalLM (MPT)": from transformers import MptForCausalLM return MptForCausalLM elif model_type_name == "GPT2LMHeadModel (GPT2)": from transformers import GPT2LMHeadModel return GPT2LMHeadModel elif model_type_name == "GPTNeoXForCausalLM (GPT-NeoX)": from transformers import GPTNeoXForCausalLM return GPTNeoXForCausalLM elif model_type_name == "GPTJForCausalLM (GPT-J)": from transformers import GPTJForCausalLM return GPTJForCausalLM else: # Default case - should not happen if all options are handled return AutoModelForCausalLM def determine_model_class(model_id: str, token: str, manual_model_type: str = None): """ Determines the appropriate model class based on either: 1. Automatic detection from model config, or 2. User selection (if provided) """ # If user specified a manual model type and it's not auto-detect, use that if manual_model_type and manual_model_type != "Auto-detect (recommended)": return get_model_class_by_name(manual_model_type) # Otherwise, try automatic detection try: # Load the model configuration to determine the appropriate class config = AutoConfig.from_pretrained(model_id, token=token, trust_remote_code=True) # Check if model type is in the configuration if hasattr(config, 'model_type'): model_type = config.model_type.lower() # Handle different model types based on their config if model_type in ['qwen2_5_vl', 'qwen2-vl', 'qwen2vl']: from transformers import Qwen2_5_VLForConditionalGeneration return Qwen2_5_VLForConditionalGeneration elif model_type in ['qwen2', 'qwen', 'qwen2.5']: from transformers import Qwen2ForCausalLM return Qwen2ForCausalLM elif model_type in ['llama', 'llama2', 'llama3', 'llama3.1', 'llama3.2', 'llama3.3']: from transformers import LlamaForCausalLM return LlamaForCausalLM elif model_type in ['mistral', 'mixtral']: from transformers import MistralForCausalLM return MistralForCausalLM elif model_type in ['gemma', 'gemma2']: from transformers import GemmaForCausalLM, Gemma2ForCausalLM return Gemma2ForCausalLM if 'gemma2' in model_type else GemmaForCausalLM elif model_type in ['phi', 'phi2', 'phi3', 'phi3.5']: from transformers import PhiForCausalLM, Phi3ForCausalLM return Phi3ForCausalLM if 'phi3' in model_type else PhiForCausalLM elif model_type in ['falcon']: from transformers import FalconForCausalLM return FalconForCausalLM elif model_type in ['mpt']: from transformers import MptForCausalLM return MptForCausalLM elif model_type in ['gpt2', 'gpt', 'gpt_neox', 'gptj']: from transformers import GPT2LMHeadModel, GPTNeoXForCausalLM, GPTJForCausalLM if 'neox' in model_type: return GPTNeoXForCausalLM elif 'j' in model_type: return GPTJForCausalLM else: return GPT2LMHeadModel else: # Default to AutoModelForCausalLM for standard text generation models return AutoModelForCausalLM else: # If no model type is specified in config, default to AutoModelForCausalLM return AutoModelForCausalLM except Exception as e: print(f"Could not determine model class from config: {e}") return AutoModelForCausalLM # fallback to default def compress_and_upload( model_id: str, quant_method: str, model_type_selection: str, # New parameter for manual model type selection oauth_token: gr.OAuthToken | None, ): """ Compresses a model using llm-compressor and uploads it to a new HF repo. """ if not model_id: raise gr.Error("Please select a model from the search bar.") if oauth_token is None: raise gr.Error("Authentication error. Please log in to continue.") token = oauth_token.token try: # Use the provided token for all hub interactions username = whoami(token=token)["name"] # --- 1. Load Model and Tokenizer --- # Determine the appropriate model class based on the model's configuration or user selection model_class = determine_model_class(model_id, token, model_type_selection) try: model = model_class.from_pretrained( model_id, torch_dtype="auto", device_map=None, token=token, trust_remote_code=True ) except ValueError as e: if "Unrecognized configuration class" in str(e): # If automatic detection fails, fall back to AutoModel and let transformers handle it print(f"Automatic model class detection failed, falling back to AutoModel: {e}") model = AutoModel.from_pretrained( model_id, torch_dtype="auto", device_map=None, token=token, trust_remote_code=True ) else: raise output_dir = f"{model_id.split('/')[-1]}-{quant_method}" # --- 2. Get Recipe --- if not model.config.architectures: raise gr.Error("Could not determine model architecture.") recipe = get_quantization_recipe(quant_method, model.config.architectures[0]) # --- 3. Run Compression --- # Determine if this is a Qwen2.5-VL model to use appropriate dataset and data collator if model.config.architectures and "Qwen2_5_VLForConditionalGeneration" in model.config.architectures[0]: # Use a multimodal dataset and data collator for Qwen2.5-VL models try: from datasets import load_dataset # Use a small subset of flickr30k for calibration if available ds = load_dataset("lmms-lab/flickr30k", split="test[:64]") ds = ds.shuffle(seed=42) # Define a data collator for multimodal inputs def qwen2_5_vl_data_collator(batch): assert len(batch) == 1 return {key: torch.tensor(value) if isinstance(value, (list, int, float)) else value for key, value in batch[0].items()} oneshot( model=model, dataset=ds, recipe=recipe, save_compressed=True, output_dir=output_dir, max_seq_length=2048, # Increased for multimodal models num_calibration_samples=64, data_collator=qwen2_5_vl_data_collator, ) except Exception as e: print(f"Could not load multimodal dataset, falling back to text-only: {e}") # Fall back to text-only dataset - load it properly and pass as dataset from datasets import load_dataset fallback_ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:1%]") oneshot( model=model, dataset=fallback_ds, recipe=recipe, save_compressed=True, output_dir=output_dir, max_seq_length=512, num_calibration_samples=64, ) else: # For non-multimodal models, use the original approach from datasets import load_dataset ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:1%]") oneshot( model=model, dataset=ds, recipe=recipe, save_compressed=True, output_dir=output_dir, max_seq_length=512, num_calibration_samples=64, ) # --- 4. Create Repo and Upload --- api = HfApi(token=token) repo_id = f"{username}/{output_dir}" repo_url = api.create_repo(repo_id=repo_id, exist_ok=True) api.upload_folder( folder_path=output_dir, repo_id=repo_id, commit_message=f"Upload {quant_method} compressed model", ) # --- 5. Create Model Card --- card_content = f""" --- license: apache-2.0 base_model: {model_id} tags: - llm-compressor - quantization - {quant_method.lower()} --- # {quant_method} Compressed Model: {repo_id} This model was compressed from [`{model_id}`](https://huggingface.co/{model_id}) using the [vLLM LLM-Compressor](https://github.com/vllm-project/llm-compressor) library. This conversion was performed by the `llm-compressor-my-repo` Hugging Face Space. ## Quantization Method: {quant_method} For more details on the recipe used, refer to the `recipe.yaml` file in this repository. """ card = ModelCard(card_content) card.push_to_hub(repo_id, token=token) return f'

✅ Success!


Model compressed and saved to your new repo: {repo_id}' except gr.Error as e: raise e except Exception as e: error_message = str(e).replace("\n", "
") return f'

❌ ERROR


{error_message}
' # --- Gradio Interface --- def build_gradio_app(): with gr.Blocks(css="footer {display: none !important;}") as demo: gr.Markdown("# LLM-Compressor My Repo") gr.Markdown( "Log in, choose a model, select a quantization method, and this Space will create a new compressed model repository on your Hugging Face profile." ) with gr.Row(): login_button = gr.LoginButton(min_width=250) # noqa: F841 gr.Markdown("### 1. Select a Model from the Hugging Face Hub") model_input = HuggingfaceHubSearch( label="Search for a Model", search_type="model", ) gr.Markdown("### 2. Choose a Quantization Method") quant_method_dropdown = gr.Dropdown( ["W4A16", "W8A16", "W8A8_INT8", "W8A8_FP8", "GPTQ", "FP8", "AWQ", "SmoothQuant", "SparseGPT"], label="Quantization Method", value="W4A16" ) gr.Markdown("### 3. Model Type (Auto-detected, but you can override if needed)") model_type_dropdown = gr.Dropdown( choices=[ "Auto-detect (recommended)", "CausalLM (standard text generation)", "Qwen2_5_VLForConditionalGeneration (Qwen2.5-VL)", "Qwen2ForCausalLM (Qwen2)", "LlamaForCausalLM (Llama, Llama2, Llama3)", "MistralForCausalLM (Mistral, Mixtral)", "GemmaForCausalLM (Gemma)", "Gemma2ForCausalLM (Gemma2)", "PhiForCausalLM (Phi, Phi2)", "Phi3ForCausalLM (Phi3)", "FalconForCausalLM (Falcon)", "MptForCausalLM (MPT)", "GPT2LMHeadModel (GPT2)", "GPTNeoXForCausalLM (GPT-NeoX)", "GPTJForCausalLM (GPT-J)" ], label="Model Type", value="Auto-detect (recommended)" ) compress_button = gr.Button("Compress and Create Repo", variant="primary") output_html = gr.HTML(label="Result") compress_button.click( fn=compress_and_upload, inputs=[model_input, quant_method_dropdown, model_type_dropdown], outputs=output_html, ) return demo def main(): demo = build_gradio_app() demo.queue(max_size=5).launch() if __name__ == "__main__": main()