File size: 6,208 Bytes
a0d6949 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# %%writefile text_processor.py
"""
DeltaFlow - Text Processor
Handles semantic expansion using Qwen2.5-0.5B-Instruct
Converts brief instructions into detailed motion descriptions
"""
import gc
import traceback
from typing import Optional
import torch
import ftfy
import sentencepiece
from transformers import AutoModelForCausalLM, AutoTokenizer
class TextProcessor:
"""
Semantic expansion engine using Qwen2.5-0.5B.
Transforms user inputs into motion-rich prompts for video generation.
"""
MODEL_ID = "Qwen/Qwen2.5-0.5B-Instruct"
MAX_OUTPUT_LENGTH = 100 # Token limit to ensure ~50 words
def __init__(self, resource_manager: Optional[object] = None):
"""
Initialize TextProcessor with optional resource management.
Args:
resource_manager: Optional resource manager instance
"""
self.resource_manager = resource_manager
# Determine device
if resource_manager is not None:
self.device = resource_manager.get_device()
else:
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model: Optional[AutoModelForCausalLM] = None
self.tokenizer: Optional[AutoTokenizer] = None
self.is_loaded = False
def load_model(self) -> None:
"""Load Qwen model and tokenizer."""
if self.is_loaded:
print("⚠ TextProcessor already loaded, skipping...")
return
try:
print("→ Loading Qwen2.5-0.5B-Instruct...")
self.tokenizer = AutoTokenizer.from_pretrained(
self.MODEL_ID,
trust_remote_code=True
)
self.model = AutoModelForCausalLM.from_pretrained(
self.MODEL_ID,
torch_dtype=torch.bfloat16,
device_map="auto",
trust_remote_code=True
)
if self.resource_manager is not None:
self.resource_manager.register_model("TextProcessor", self.model)
self.is_loaded = True
print("✓ TextProcessor loaded successfully")
except Exception as e:
print(f"✗ Error loading TextProcessor: {str(e)}")
raise
def unload_model(self) -> None:
"""Unload model and free GPU memory."""
if not self.is_loaded:
return
try:
if self.model is not None:
self.model.to('cpu')
del self.model
self.model = None
if self.tokenizer is not None:
del self.tokenizer
self.tokenizer = None
if self.resource_manager is not None:
self.resource_manager.unregister_model("TextProcessor")
self.resource_manager.clear_cache(aggressive=True)
else:
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
self.is_loaded = False
print("✓ TextProcessor unloaded")
except Exception as e:
print(f"⚠ Error during TextProcessor unload: {str(e)}")
def expand_prompt(self, user_input: str) -> str:
"""
Convert user's brief instruction into detailed motion description.
Args:
user_input: User's original instruction
Returns:
str: Expanded prompt for video generation (≤50 words)
"""
if not self.is_loaded:
raise RuntimeError("TextProcessor not loaded. Call load_model() first.")
system_prompt = """You are a motion description expert. Convert the user's brief instruction into a detailed, dynamic prompt for video generation.
Focus on:
- Camera movements (pan, zoom, tilt, tracking)
- Subject actions and motions
- Scene dynamics and atmosphere
- Temporal flow and transitions
Keep output under 50 words. Use vivid, cinematic language. English only."""
try:
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input}
]
text = self.tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = self.tokenizer([text], return_tensors="pt").to(self.device)
with torch.no_grad():
generated_ids = self.model.generate(
**model_inputs,
max_new_tokens=self.MAX_OUTPUT_LENGTH,
do_sample=True,
temperature=0.7,
top_p=0.9,
repetition_penalty=1.1
)
generated_ids = [
output_ids[len(input_ids):]
for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
expanded_prompt = self.tokenizer.batch_decode(
generated_ids,
skip_special_tokens=True
)[0].strip()
# Enforce word limit
words = expanded_prompt.split()
if len(words) > 50:
expanded_prompt = " ".join(words[:50]) + "..."
print(f"✓ Prompt expanded: '{user_input}' → '{expanded_prompt}'")
return expanded_prompt
except Exception as e:
print(f"✗ Error during prompt expansion: {str(e)}")
return user_input
def process(self, user_input: str, auto_unload: bool = True) -> str:
"""
Main processing pipeline: load → expand → (optionally unload).
Args:
user_input: User's instruction
auto_unload: Whether to unload model after processing
Returns:
str: Expanded prompt
"""
try:
if not self.is_loaded:
self.load_model()
expanded = self.expand_prompt(user_input)
if auto_unload:
self.unload_model()
return expanded
except Exception as e:
print(f"✗ TextProcessor pipeline error: {str(e)}")
return user_input
|