#!/usr/bin/env python3 """ Life Coach Model - DEBUG VERSION Versione con logging estensivo per diagnosticare blocchi su HF Spaces """ import os import torch import logging import time import traceback import gc import threading from datetime import datetime from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, StoppingCriteria, StoppingCriteriaList from peft import PeftModel from pathlib import Path import re # AGGIUNTO PER PULIZIA RISPOSTA # Installa psutil se non presente (per HF Spaces) try: import psutil except ImportError: import subprocess subprocess.check_call(["pip", "install", "psutil", "--break-system-packages"]) import psutil # Setup logging ultra-dettagliato logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - [PID:%(process)d] - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def log_system_status(prefix=""): """Log dettagliato dello stato del sistema""" logger.info(f"{'='*60}") logger.info(f"{prefix} SYSTEM STATUS CHECK") logger.info(f"PID: {os.getpid()}") logger.info(f"Thread ID: {threading.get_ident()}") # CPU info cpu_percent = psutil.cpu_percent(interval=0.1) logger.info(f"CPU Usage: {cpu_percent}%") # Memory info mem = psutil.virtual_memory() logger.info(f"RAM: {mem.used/1e9:.2f}GB used / {mem.total/1e9:.2f}GB total ({mem.percent}%)") # GPU info if available if torch.cuda.is_available(): try: gpu_mem = torch.cuda.mem_get_info() logger.info(f"GPU Memory: {gpu_mem[0]/1e9:.2f}GB free / {gpu_mem[1]/1e9:.2f}GB total") logger.info(f"GPU Allocated: {torch.cuda.memory_allocated()/1e9:.2f}GB") logger.info(f"GPU Reserved: {torch.cuda.memory_reserved()/1e9:.2f}GB") logger.info(f"CUDA Device: {torch.cuda.get_device_name()}") except Exception as e: logger.error(f"Error getting GPU info: {e}") logger.info(f"{'='*60}") class LifeCoachModel: def __init__(self, model_name="microsoft/Phi-4", model_save_path="data/life_coach_model", train_file=None): """Initialize the Life Coach model with extensive logging.""" logger.info(f"[INIT] Starting LifeCoachModel initialization") logger.info(f"[INIT] Model name: {model_name}") logger.info(f"[INIT] Save path: {model_save_path}") log_system_status("[INIT-START]") self.model_name = model_name self.model_save_path = model_save_path self.train_file = train_file # Device detection con logging logger.info(f"[INIT] Checking CUDA availability...") if torch.cuda.is_available(): self.device = torch.device("cuda") logger.info(f"[INIT] ✅ CUDA is available") logger.info(f"[INIT] CUDA version: {torch.version.cuda}") logger.info(f"[INIT] PyTorch version: {torch.__version__}") # Clear GPU memory logger.info(f"[INIT] Clearing GPU cache...") torch.cuda.empty_cache() gc.collect() logger.info(f"[INIT] GPU cache cleared") else: self.device = torch.device("cpu") logger.warning(f"[INIT] ⚠️ CUDA not available, using CPU") logger.info(f"[INIT] Device set to: {self.device}") self.tokenizer = None self.model = None # System prompt self.system_prompt = """You are Robert, a friendly and experienced life coach. Here's your background: About You: - Name: Robert (Bob to friends) - Age: 42 years old - Experience: 15 years as a certified life coach and motivational speaker - Education: Master's degree in Psychology from UC Berkeley - Specialties: Personal growth, career transitions, work-life balance, goal setting, stress management - Personal: Married with two kids, enjoy hiking and meditation in your free time - Approach: Warm, empathetic, practical, and solution-focused Your Coaching Style: - Respond ONLY to what the user actually tells you - never make assumptions about their problems - Start conversations in a welcoming, open manner - Ask clarifying questions to understand their situation better - Provide practical, actionable advice based on what they share - Be encouraging and positive, but also honest and realistic - Keep responses concise and focused (2-4 sentences usually) - Share brief personal insights when relevant, but keep the focus on the client Important: Never assume clients have problems they haven't mentioned. Let them guide the conversation and share what's on their mind.""" logger.info(f"[INIT] LifeCoachModel initialization complete") log_system_status("[INIT-END]") def load_tokenizer(self): """Load tokenizer with detailed logging.""" logger.info(f"[TOKENIZER] Starting tokenizer loading...") logger.info(f"[TOKENIZER] Loading from: {self.model_name}") try: start_time = time.time() self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=True, cache_dir=os.environ.get('HF_HOME', None) ) load_time = time.time() - start_time logger.info(f"[TOKENIZER] ✅ Tokenizer loaded in {load_time:.2f} seconds") logger.info(f"[TOKENIZER] Vocab size: {self.tokenizer.vocab_size}") logger.info(f"[TOKENIZER] Pad token: {self.tokenizer.pad_token}") # Set padding if self.tokenizer.pad_token is None: logger.info(f"[TOKENIZER] Setting pad token to eos token") self.tokenizer.pad_token = self.tokenizer.eos_token self.tokenizer.pad_token_id = self.tokenizer.eos_token_id logger.info(f"[TOKENIZER] Tokenizer ready") except Exception as e: logger.error(f"[TOKENIZER] ❌ Error loading tokenizer: {e}") logger.error(f"[TOKENIZER] Traceback: {traceback.format_exc()}") raise def load_model(self, fine_tuned=True): """Load model with EXTENSIVE logging at every step.""" logger.info(f"[MODEL] Starting model loading process...") logger.info(f"[MODEL] Fine-tuned: {fine_tuned}") log_system_status("[MODEL-LOAD-START]") if fine_tuned: adapter_path = Path(self.model_save_path) alternate_path = Path(f"./{self.model_save_path}") logger.info(f"[MODEL] Checking for adapter at: {adapter_path}") logger.info(f"[MODEL] Alternate path: {alternate_path}") if alternate_path.exists() and (alternate_path / "adapter_model.safetensors").exists(): model_path = str(alternate_path) logger.info(f"[MODEL] ✅ Found adapter at alternate path: {model_path}") elif adapter_path.exists() and (adapter_path / "adapter_model.safetensors").exists(): model_path = str(adapter_path) logger.info(f"[MODEL] ✅ Found adapter at primary path: {model_path}") else: logger.error(f"[MODEL] ❌ No adapter found, loading base model") fine_tuned = False try: # Quantization config con logging logger.info(f"[MODEL] Setting up quantization config...") quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=False ) logger.info(f"[MODEL] Quantization config created") # Load base model logger.info(f"[MODEL] Loading base model from: {self.model_name}") logger.info(f"[MODEL] This may take several minutes...") start_time = time.time() checkpoint_counter = 0 # Hook per monitorare il caricamento dei checkpoint original_print = print def counting_print(*args, **kwargs): nonlocal checkpoint_counter msg = ' '.join(str(arg) for arg in args) if 'Loading checkpoint' in msg: checkpoint_counter += 1 logger.info(f"[MODEL] Checkpoint {checkpoint_counter} - {msg}") original_print(*args, **kwargs) # Temporaneamente sostituisci print import builtins builtins.print = counting_print logger.info(f"[MODEL] Calling AutoModelForCausalLM.from_pretrained...") self.model = AutoModelForCausalLM.from_pretrained( self.model_name, quantization_config=quantization_config, device_map="auto", trust_remote_code=True, torch_dtype=torch.float16, cache_dir=os.environ.get('HF_HOME', None) ) # Ripristina print originale builtins.print = original_print load_time = time.time() - start_time logger.info(f"[MODEL] ✅ Base model loaded in {load_time:.2f} seconds") log_system_status("[MODEL-AFTER-BASE-LOAD]") # Load adapter if fine-tuned if fine_tuned: logger.info(f"[MODEL] Loading adapter from: {model_path}") start_time = time.time() self.model = PeftModel.from_pretrained( self.model, model_path, device_map="auto" ) adapter_time = time.time() - start_time logger.info(f"[MODEL] ✅ Adapter loaded in {adapter_time:.2f} seconds") # NOTA: NON fare merge_and_unload() con modelli 4-bit quantizzati! # Può causare comportamenti strani o corruzione logger.info(f"[MODEL] Adapter loaded (no merge for 4-bit models)") # Set eval mode logger.info(f"[MODEL] Setting model to eval mode...") self.model.eval() logger.info(f"[MODEL] Model configuration:") logger.info(f"[MODEL] - Parameters: {sum(p.numel() for p in self.model.parameters())/1e9:.2f}B") logger.info(f"[MODEL] - Device map: {getattr(self.model, 'hf_device_map', 'Not available')}") log_system_status("[MODEL-LOAD-COMPLETE]") logger.info(f"[MODEL] ✅✅✅ Model loading COMPLETE") except Exception as e: logger.error(f"[MODEL] ❌❌❌ CRITICAL ERROR during model loading") logger.error(f"[MODEL] Error type: {type(e).__name__}") logger.error(f"[MODEL] Error message: {str(e)}") logger.error(f"[MODEL] Full traceback:\n{traceback.format_exc()}") log_system_status("[MODEL-LOAD-ERROR]") raise # FIX 1: Aggiungi stopping_criteria def _get_stopping_criteria(self): """Stop generation at <|end|>""" stop_token = "<|end|>" stop_ids = self.tokenizer.encode(stop_token, add_special_tokens=False) class StopOnToken(StoppingCriteria): def __init__(self, stop_ids): self.stop_ids = stop_ids def __call__(self, input_ids, scores, **kwargs): return input_ids[0][-1].item() in self.stop_ids return StoppingCriteriaList([StopOnToken(stop_ids)]) def generate_response(self, prompt, max_new_tokens=256, conversation_history=None): """Generate response with DETAILED logging at every step.""" logger.info(f"{'='*80}") logger.info(f"[GENERATE] STARTING GENERATION PROCESS") logger.info(f"[GENERATE] Timestamp: {datetime.now().isoformat()}") logger.info(f"[GENERATE] Prompt length: {len(prompt)} chars") logger.info(f"[GENERATE] Max new tokens: {max_new_tokens}") logger.info(f"[GENERATE] History items: {len(conversation_history) if conversation_history else 0}") log_system_status("[GENERATE-START]") try: # Step 1: Build prompt logger.info(f"[GENERATE-1] Building full prompt...") full_prompt = f"<|system|>\n{self.system_prompt}<|end|>\n" if conversation_history: for msg in conversation_history: role = msg.get('role', 'user') content = msg.get('content', '') full_prompt += f"<|{role}|>\n{content}<|end|>\n" logger.info(f"[GENERATE-1] Added {role} message: {len(content)} chars") full_prompt += f"<|user|>\n{prompt}<|end|>\n<|assistant|>\n" logger.info(f"[GENERATE-1] Full prompt built: {len(full_prompt)} chars") # Step 2: Tokenize logger.info(f"[GENERATE-2] Starting tokenization...") start_time = time.time() inputs = self.tokenizer( full_prompt, return_tensors="pt", truncation=True, max_length=2048 ) tokenize_time = time.time() - start_time logger.info(f"[GENERATE-2] Tokenization complete in {tokenize_time:.3f}s") logger.info(f"[GENERATE-2] Input shape: {inputs['input_ids'].shape}") logger.info(f"[GENERATE-2] Number of tokens: {inputs['input_ids'].shape[-1]}") # Step 3: Move to device logger.info(f"[GENERATE-3] Moving tensors to device: {self.device}") start_time = time.time() inputs = {k: v.to(self.device) for k, v in inputs.items()} move_time = time.time() - start_time logger.info(f"[GENERATE-3] Tensors moved in {move_time:.3f}s") log_system_status("[GENERATE-BEFORE-MODEL]") # Step 4: Generate # FIX 1: Aggiunto stopping_criteria logger.info(f"[GENERATE-4] ⚡ CALLING MODEL.GENERATE()...") logger.info(f"[GENERATE-4] Generation parameters:") logger.info(f"[GENERATE-4] - max_new_tokens: {max_new_tokens}") logger.info(f"[GENERATE-4] - temperature: 0.7") logger.info(f"[GENERATE-4] - do_sample: True") start_time = time.time() logger.info(f"[GENERATE-4] >>> ENTERING model.generate() at {datetime.now().isoformat()}") with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=max_new_tokens, temperature=0.7, do_sample=True, top_p=0.9, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, stopping_criteria=self._get_stopping_criteria() # FIX 1: Aggiunto ) logger.info(f"[GENERATE-4] <<< EXITED model.generate() at {datetime.now().isoformat()}") generate_time = time.time() - start_time logger.info(f"[GENERATE-4] ✅ Generation complete in {generate_time:.2f}s") logger.info(f"[GENERATE-4] Output shape: {outputs.shape}") logger.info(f"[GENERATE-4] Generated {outputs.shape[-1] - inputs['input_ids'].shape[-1]} new tokens") log_system_status("[GENERATE-AFTER-MODEL]") # Step 5: Decode # FIX 2: Pulizia della risposta logger.info(f"[GENERATE-5] Decoding output...") start_time = time.time() full_text = self.tokenizer.decode(outputs[0], skip_special_tokens=False) # FIX 2: Estrai solo la risposta assistant pulita if "<|assistant|>" in full_text: response = full_text.split("<|assistant|>")[-1] response = re.sub(r"<\|end\|>.*$", "", response).strip() # Rimuovi tutto dopo <|end|> response = re.sub(r"<\|.*?\|>", "", response).strip() # Rimuovi tag residui else: response = full_text decode_time = time.time() - start_time logger.info(f"[GENERATE-5] Decoding complete in {decode_time:.3f}s") logger.info(f"[GENERATE-5] Response length: {len(response)} chars") logger.info(f"[GENERATE-5] Response preview: {response[:100]}...") # Step 6: Cleanup logger.info(f"[GENERATE-6] Cleaning up GPU memory...") del inputs, outputs torch.cuda.empty_cache() gc.collect() logger.info(f"[GENERATE-6] Cleanup complete") log_system_status("[GENERATE-COMPLETE]") logger.info(f"[GENERATE] ✅✅✅ GENERATION SUCCESSFUL") logger.info(f"[GENERATE] Total time: {time.time() - start_time:.2f}s") logger.info(f"{'='*80}") return response except Exception as e: logger.error(f"[GENERATE] ❌❌❌ ERROR DURING GENERATION") logger.error(f"[GENERATE] Error type: {type(e).__name__}") logger.error(f"[GENERATE] Error message: {str(e)}") logger.error(f"[GENERATE] Full traceback:\n{traceback.format_exc()}") log_system_status("[GENERATE-ERROR]") # Return fallback message return "I apologize, but I encountered an error while generating a response. Please try again." # Test if this file is run directly if __name__ == "__main__": import threading logger.info("Running test...") model = LifeCoachModel() model.load_tokenizer() model.load_model(fine_tuned=True) response = model.generate_response("Hello, how are you?", max_new_tokens=50) logger.info(f"Test response: {response}")