n00b001's picture
save
240d878 unverified
raw
history blame
21 kB
import gradio as gr
from huggingface_hub import HfApi, ModelCard, whoami
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from llmcompressor import oneshot
from llmcompressor.modifiers.quantization import QuantizationModifier, GPTQModifier
from llmcompressor.modifiers.awq import AWQModifier, AWQMapping
from transformers import (
AutoModelForCausalLM,
Qwen2_5_VLForConditionalGeneration,
AutoConfig,
AutoModel
)
import torch
# --- Helper Functions ---
def get_quantization_recipe(method, model_architecture):
"""
Returns the appropriate llm-compressor recipe based on the selected method.
Updated to support Qwen2_5_VLForConditionalGeneration architecture and more quantization methods.
"""
if method == "AWQ":
if model_architecture not in ["LlamaForCausalLM", "Qwen2_5_VLForConditionalGeneration"]:
raise ValueError(
f"AWQ quantization is only supported for LlamaForCausalLM and Qwen2_5_VLForConditionalGeneration architectures, got {model_architecture}"
)
# AWQ is fundamentally incompatible with Qwen2.5-VL models due to conflicts with
# the complex 3D rotary positional embedding system used for multimodal processing
if model_architecture == "Qwen2_5_VLForConditionalGeneration":
raise ValueError(
f"AWQ quantization is not compatible with {model_architecture} architecture "
"due to fundamental conflicts with complex 3D rotary positional embeddings. "
"This quantization method modifies weights in a way that breaks the multimodal "
"positional encoding system. Please use GPTQ, W4A16, W8A16, W8A8_INT8, W8A8_FP8, or FP8 methods instead."
)
else: # LlamaForCausalLM and other supported architectures
# Create AWQ mappings for Llama models
mappings = [
AWQMapping(
"re:.*input_layernorm", ["re:.*q_proj", "re:.*k_proj", "re:.*v_proj"]
),
AWQMapping("re:.*v_proj", ["re:.*o_proj"]),
AWQMapping(
"re:.*post_attention_layernorm", ["re:.*gate_proj", "re:.*up_proj"]
),
AWQMapping("re:.*up_proj", ["re:.*down_proj"]),
]
return [
AWQModifier(
ignore=["lm_head"],
scheme="W4A16_ASYM",
targets=["Linear"],
mappings=mappings,
),
]
elif method == "GPTQ":
sequential_target_map = {
"LlamaForCausalLM": "LlamaDecoderLayer",
"MistralForCausalLM": "MistralDecoderLayer",
"MixtralForCausalLM": "MixtralDecoderLayer",
"Qwen2_5_VLForConditionalGeneration": "Qwen2_5_VLDecoderLayer", # Add Qwen2.5-VL support
}
sequential_target = sequential_target_map.get(model_architecture)
if sequential_target is None:
raise ValueError(
f"GPTQ quantization is not supported for {model_architecture} architecture. "
"Supported architectures are: "
f"{', '.join(sequential_target_map.keys())}"
)
if model_architecture == "Qwen2_5_VLForConditionalGeneration":
return [
GPTQModifier(
targets="Linear",
scheme="W4A16",
sequential_targets=[sequential_target],
ignore=["lm_head", "re:visual.*", "re:model.visual.*"], # Ignore visual components
),
]
else:
return [
GPTQModifier(
targets="Linear",
scheme="W4A16",
sequential_targets=[sequential_target],
ignore=["re:.*lm_head"],
),
]
elif method in ["W4A16", "W8A16", "W8A8_INT8", "W8A8_FP8", "FP8"]:
# All these methods use the QuantizationModifier
if model_architecture not in ["LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM", "Qwen2_5_VLForConditionalGeneration"]:
raise ValueError(
f"Quantization method {method} is not supported for {model_architecture} architecture. "
"Supported architectures are: LlamaForCausalLM, MistralForCausalLM, MixtralForCausalLM, Qwen2_5_VLForConditionalGeneration"
)
# Map method names to actual schemes (correct names for llmcompressor)
scheme_map = {
"W4A16": "W4A16",
"W8A16": "W8A16",
"W8A8_INT8": "W8A8", # Use the correct scheme name
"W8A8_FP8": "W8A8", # Both use W8A8 but with different dtypes
"FP8": "FP8"
}
ignore_layers = ["lm_head"]
if "Mixtral" in model_architecture:
ignore_layers.append("re:.*block_sparse_moe.gate")
elif "Qwen2_5_VL" in model_architecture:
ignore_layers.extend(["re:visual.*", "re:model.visual.*"]) # Ignore visual components for Qwen2.5-VL
# For methods that support sequential onloading for Qwen2.5-VL, we use GPTQModifier with sequential_targets
if model_architecture == "Qwen2_5_VLForConditionalGeneration" and method in ["W4A16"]:
return [
GPTQModifier(
targets="Linear",
scheme=scheme_map[method],
sequential_targets=["Qwen2_5_VLDecoderLayer"], # Sequential onloading for memory efficiency
ignore=ignore_layers,
),
]
else:
return [QuantizationModifier(
scheme=scheme_map[method],
targets="Linear",
ignore=ignore_layers
)]
elif method == "SmoothQuant":
if model_architecture not in ["LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM"]:
raise ValueError(
f"SmoothQuant is not supported for {model_architecture} architecture. "
"Supported architectures are: LlamaForCausalLM, MistralForCausalLM, MixtralForCausalLM"
)
ignore_layers = ["lm_head"]
if "Mixtral" in model_architecture:
ignore_layers.append("re:.*block_sparse_moe.gate")
return [QuantizationModifier(
scheme="W8A8", # SmoothQuant typically uses W8A8
targets="Linear",
ignore=ignore_layers
)]
elif method == "SparseGPT":
if model_architecture not in ["LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM"]:
raise ValueError(
f"SparseGPT is not supported for {model_architecture} architecture. "
"Supported architectures are: LlamaForCausalLM, MistralForCausalLM, MixtralForCausalLM"
)
ignore_layers = ["lm_head"]
if "Mixtral" in model_architecture:
ignore_layers.append("re:.*block_sparse_moe.gate")
return [
GPTQModifier( # SparseGPT uses GPTQ algorithm with different parameters
targets="Linear",
scheme="W4A16", # Default scheme for sparsity
ignore=ignore_layers,
)
]
else:
raise ValueError(f"Unsupported quantization method: {method}")
def get_model_class_by_name(model_type_name):
"""
Returns the appropriate model class based on the user-selected model type name.
"""
if model_type_name == "CausalLM (standard text generation)":
return AutoModelForCausalLM
elif model_type_name == "Qwen2_5_VLForConditionalGeneration (Qwen2.5-VL)":
from transformers import Qwen2_5_VLForConditionalGeneration
return Qwen2_5_VLForConditionalGeneration
elif model_type_name == "Qwen2ForCausalLM (Qwen2)":
from transformers import Qwen2ForCausalLM
return Qwen2ForCausalLM
elif model_type_name == "LlamaForCausalLM (Llama, Llama2, Llama3)":
from transformers import LlamaForCausalLM
return LlamaForCausalLM
elif model_type_name == "MistralForCausalLM (Mistral, Mixtral)":
from transformers import MistralForCausalLM
return MistralForCausalLM
elif model_type_name == "GemmaForCausalLM (Gemma)":
from transformers import GemmaForCausalLM
return GemmaForCausalLM
elif model_type_name == "Gemma2ForCausalLM (Gemma2)":
from transformers import Gemma2ForCausalLM
return Gemma2ForCausalLM
elif model_type_name == "PhiForCausalLM (Phi, Phi2)":
from transformers import PhiForCausalLM
return PhiForCausalLM
elif model_type_name == "Phi3ForCausalLM (Phi3)":
from transformers import Phi3ForCausalLM
return Phi3ForCausalLM
elif model_type_name == "FalconForCausalLM (Falcon)":
from transformers import FalconForCausalLM
return FalconForCausalLM
elif model_type_name == "MptForCausalLM (MPT)":
from transformers import MptForCausalLM
return MptForCausalLM
elif model_type_name == "GPT2LMHeadModel (GPT2)":
from transformers import GPT2LMHeadModel
return GPT2LMHeadModel
elif model_type_name == "GPTNeoXForCausalLM (GPT-NeoX)":
from transformers import GPTNeoXForCausalLM
return GPTNeoXForCausalLM
elif model_type_name == "GPTJForCausalLM (GPT-J)":
from transformers import GPTJForCausalLM
return GPTJForCausalLM
else:
# Default case - should not happen if all options are handled
return AutoModelForCausalLM
def determine_model_class(model_id: str, token: str, manual_model_type: str = None):
"""
Determines the appropriate model class based on either:
1. Automatic detection from model config, or
2. User selection (if provided)
"""
# If user specified a manual model type and it's not auto-detect, use that
if manual_model_type and manual_model_type != "Auto-detect (recommended)":
return get_model_class_by_name(manual_model_type)
# Otherwise, try automatic detection
try:
# Load the model configuration to determine the appropriate class
config = AutoConfig.from_pretrained(model_id, token=token, trust_remote_code=True)
# Check if model type is in the configuration
if hasattr(config, 'model_type'):
model_type = config.model_type.lower()
# Handle different model types based on their config
if model_type in ['qwen2_5_vl', 'qwen2-vl', 'qwen2vl']:
from transformers import Qwen2_5_VLForConditionalGeneration
return Qwen2_5_VLForConditionalGeneration
elif model_type in ['qwen2', 'qwen', 'qwen2.5']:
from transformers import Qwen2ForCausalLM
return Qwen2ForCausalLM
elif model_type in ['llama', 'llama2', 'llama3', 'llama3.1', 'llama3.2', 'llama3.3']:
from transformers import LlamaForCausalLM
return LlamaForCausalLM
elif model_type in ['mistral', 'mixtral']:
from transformers import MistralForCausalLM
return MistralForCausalLM
elif model_type in ['gemma', 'gemma2']:
from transformers import GemmaForCausalLM, Gemma2ForCausalLM
return Gemma2ForCausalLM if 'gemma2' in model_type else GemmaForCausalLM
elif model_type in ['phi', 'phi2', 'phi3', 'phi3.5']:
from transformers import PhiForCausalLM, Phi3ForCausalLM
return Phi3ForCausalLM if 'phi3' in model_type else PhiForCausalLM
elif model_type in ['falcon']:
from transformers import FalconForCausalLM
return FalconForCausalLM
elif model_type in ['mpt']:
from transformers import MptForCausalLM
return MptForCausalLM
elif model_type in ['gpt2', 'gpt', 'gpt_neox', 'gptj']:
from transformers import GPT2LMHeadModel, GPTNeoXForCausalLM, GPTJForCausalLM
if 'neox' in model_type:
return GPTNeoXForCausalLM
elif 'j' in model_type:
return GPTJForCausalLM
else:
return GPT2LMHeadModel
else:
# Default to AutoModelForCausalLM for standard text generation models
return AutoModelForCausalLM
else:
# If no model type is specified in config, default to AutoModelForCausalLM
return AutoModelForCausalLM
except Exception as e:
print(f"Could not determine model class from config: {e}")
return AutoModelForCausalLM # fallback to default
def compress_and_upload(
model_id: str,
quant_method: str,
model_type_selection: str, # New parameter for manual model type selection
oauth_token: gr.OAuthToken | None,
):
"""
Compresses a model using llm-compressor and uploads it to a new HF repo.
"""
if not model_id:
raise gr.Error("Please select a model from the search bar.")
if oauth_token is None:
raise gr.Error("Authentication error. Please log in to continue.")
token = oauth_token.token
try:
# Use the provided token for all hub interactions
username = whoami(token=token)["name"]
# --- 1. Load Model and Tokenizer ---
# Determine the appropriate model class based on the model's configuration or user selection
model_class = determine_model_class(model_id, token, model_type_selection)
try:
model = model_class.from_pretrained(
model_id, torch_dtype="auto", device_map=None, token=token, trust_remote_code=True
)
except ValueError as e:
if "Unrecognized configuration class" in str(e):
# If automatic detection fails, fall back to AutoModel and let transformers handle it
print(f"Automatic model class detection failed, falling back to AutoModel: {e}")
model = AutoModel.from_pretrained(
model_id, torch_dtype="auto", device_map=None, token=token, trust_remote_code=True
)
else:
raise
output_dir = f"{model_id.split('/')[-1]}-{quant_method}"
# --- 2. Get Recipe ---
if not model.config.architectures:
raise gr.Error("Could not determine model architecture.")
recipe = get_quantization_recipe(quant_method, model.config.architectures[0])
# --- 3. Run Compression ---
# Determine if this is a Qwen2.5-VL model to use appropriate dataset and data collator
if model.config.architectures and "Qwen2_5_VLForConditionalGeneration" in model.config.architectures[0]:
# Use a multimodal dataset and data collator for Qwen2.5-VL models
try:
from datasets import load_dataset
# Use a small subset of flickr30k for calibration if available
ds = load_dataset("lmms-lab/flickr30k", split="test[:64]")
ds = ds.shuffle(seed=42)
# Define a data collator for multimodal inputs
def qwen2_5_vl_data_collator(batch):
assert len(batch) == 1
return {key: torch.tensor(value) if isinstance(value, (list, int, float)) else value
for key, value in batch[0].items()}
oneshot(
model=model,
dataset=ds,
recipe=recipe,
save_compressed=True,
output_dir=output_dir,
max_seq_length=2048, # Increased for multimodal models
num_calibration_samples=64,
data_collator=qwen2_5_vl_data_collator,
)
except Exception as e:
print(f"Could not load multimodal dataset, falling back to text-only: {e}")
# Fall back to text-only dataset - load it properly and pass as dataset
from datasets import load_dataset
fallback_ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:1%]")
oneshot(
model=model,
dataset=fallback_ds,
recipe=recipe,
save_compressed=True,
output_dir=output_dir,
max_seq_length=512,
num_calibration_samples=64,
)
else:
# For non-multimodal models, use the original approach
from datasets import load_dataset
ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:1%]")
oneshot(
model=model,
dataset=ds,
recipe=recipe,
save_compressed=True,
output_dir=output_dir,
max_seq_length=512,
num_calibration_samples=64,
)
# --- 4. Create Repo and Upload ---
api = HfApi(token=token)
repo_id = f"{username}/{output_dir}"
repo_url = api.create_repo(repo_id=repo_id, exist_ok=True)
api.upload_folder(
folder_path=output_dir,
repo_id=repo_id,
commit_message=f"Upload {quant_method} compressed model",
)
# --- 5. Create Model Card ---
card_content = f"""
---
license: apache-2.0
base_model: {model_id}
tags:
- llm-compressor
- quantization
- {quant_method.lower()}
---
# {quant_method} Compressed Model: {repo_id}
This model was compressed from [`{model_id}`](https://huggingface.co/{model_id}) using the [vLLM LLM-Compressor](https://github.com/vllm-project/llm-compressor) library.
This conversion was performed by the `llm-compressor-my-repo` Hugging Face Space.
## Quantization Method: {quant_method}
For more details on the recipe used, refer to the `recipe.yaml` file in this repository.
"""
card = ModelCard(card_content)
card.push_to_hub(repo_id, token=token)
return f'<h1>✅ Success!</h1><br/>Model compressed and saved to your new repo: <a href="{repo_url}" target="_blank" style="text-decoration:underline">{repo_id}</a>'
except gr.Error as e:
raise e
except Exception as e:
error_message = str(e).replace("\n", "<br/>")
return f'<h1>❌ ERROR</h1><br/><pre style="white-space:pre-wrap;">{error_message}</pre>'
# --- Gradio Interface ---
def build_gradio_app():
with gr.Blocks(css="footer {display: none !important;}") as demo:
gr.Markdown("# LLM-Compressor My Repo")
gr.Markdown(
"Log in, choose a model, select a quantization method, and this Space will create a new compressed model repository on your Hugging Face profile."
)
with gr.Row():
login_button = gr.LoginButton(min_width=250) # noqa: F841
gr.Markdown("### 1. Select a Model from the Hugging Face Hub")
model_input = HuggingfaceHubSearch(
label="Search for a Model",
search_type="model",
)
gr.Markdown("### 2. Choose a Quantization Method")
quant_method_dropdown = gr.Dropdown(
["W4A16", "W8A16", "W8A8_INT8", "W8A8_FP8", "GPTQ", "FP8", "AWQ", "SmoothQuant", "SparseGPT"],
label="Quantization Method",
value="W4A16"
)
gr.Markdown("### 3. Model Type (Auto-detected, but you can override if needed)")
model_type_dropdown = gr.Dropdown(
choices=[
"Auto-detect (recommended)",
"CausalLM (standard text generation)",
"Qwen2_5_VLForConditionalGeneration (Qwen2.5-VL)",
"Qwen2ForCausalLM (Qwen2)",
"LlamaForCausalLM (Llama, Llama2, Llama3)",
"MistralForCausalLM (Mistral, Mixtral)",
"GemmaForCausalLM (Gemma)",
"Gemma2ForCausalLM (Gemma2)",
"PhiForCausalLM (Phi, Phi2)",
"Phi3ForCausalLM (Phi3)",
"FalconForCausalLM (Falcon)",
"MptForCausalLM (MPT)",
"GPT2LMHeadModel (GPT2)",
"GPTNeoXForCausalLM (GPT-NeoX)",
"GPTJForCausalLM (GPT-J)"
],
label="Model Type",
value="Auto-detect (recommended)"
)
compress_button = gr.Button("Compress and Create Repo", variant="primary")
output_html = gr.HTML(label="Result")
compress_button.click(
fn=compress_and_upload,
inputs=[model_input, quant_method_dropdown, model_type_dropdown],
outputs=output_html,
)
return demo
def main():
demo = build_gradio_app()
demo.queue(max_size=5).launch()
if __name__ == "__main__":
main()