#!/usr/bin/env python3 """ Life Coach Model - DEBUG VERSION Versione con logging estensivo per diagnosticare blocchi su HF Spaces """ import os import torch import logging import time import traceback import gc import threading from datetime import datetime from transformers import ( AutoModelForCausalLM, AutoTokenizer, StoppingCriteria, StoppingCriteriaList, ) from peft import PeftModel from pathlib import Path import re # ---------------------------------------------------------------------- # Installa psutil se non presente (per HF Spaces) # ---------------------------------------------------------------------- try: import psutil except ImportError: import subprocess subprocess.check_call(["pip", "install", "psutil", "--break-system-packages"]) import psutil # ---------------------------------------------------------------------- # Logging ultra-dettagliato # ---------------------------------------------------------------------- logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - [PID:%(process)d] - %(levelname)s - %(message)s', ) logger = logging.getLogger(__name__) def log_system_status(prefix: str = "") -> None: """Log dettagliato dello stato del sistema.""" logger.info(f"{'=' * 60}") logger.info(f"{prefix} SYSTEM STATUS CHECK") logger.info(f"PID: {os.getpid()}") logger.info(f"Thread ID: {threading.get_ident()}") cpu_percent = psutil.cpu_percent(interval=0.1) logger.info(f"CPU Usage: {cpu_percent}%") mem = psutil.virtual_memory() logger.info( f"RAM: {mem.used/1e9:.2f}GB used / {mem.total/1e9:.2f}GB total ({mem.percent}%)" ) if torch.cuda.is_available(): try: gpu_mem = torch.cuda.mem_get_info() logger.info( f"GPU Memory: {gpu_mem[0]/1e9:.2f}GB free / {gpu_mem[1]/1e9:.2f}GB total" ) logger.info(f"GPU Allocated: {torch.cuda.memory_allocated()/1e9:.2f}GB") logger.info(f"GPU Reserved: {torch.cuda.memory_reserved()/1e9:.2f}GB") logger.info(f"CUDA Device: {torch.cuda.get_device_name()}") except Exception as e: logger.error(f"Error getting GPU info: {e}") logger.info(f"{'=' * 60}") # ---------------------------------------------------------------------- # LifeCoachModel # ---------------------------------------------------------------------- class LifeCoachModel: def __init__( self, model_name: str = "microsoft/Phi-4", model_save_path: str = "data/life_coach_model", train_file: str | None = None, ): logger.info("[INIT] Starting LifeCoachModel initialization") logger.info(f"[INIT] Model name: {model_name}") logger.info(f"[INIT] Save path: {model_save_path}") log_system_status("[INIT-START]") self.model_name = model_name self.model_save_path = model_save_path self.train_file = train_file # ------------------------------------------------------------------ # Device detection # ------------------------------------------------------------------ if torch.cuda.is_available(): self.device = torch.device("cuda") logger.info("[INIT] CUDA is available") logger.info(f"[INIT] CUDA version: {torch.version.cuda}") logger.info(f"[INIT] PyTorch version: {torch.__version__}") torch.cuda.empty_cache() gc.collect() else: self.device = torch.device("cpu") logger.warning("[INIT] CUDA not available, using CPU") logger.info(f"[INIT] Device set to: {self.device}") self.tokenizer = None self.model = None # ------------------------------------------------------------------ # System prompt (esattamente come nella versione non-debug) # ------------------------------------------------------------------ self.system_prompt = """You are Robert, a friendly and experienced life coach. Here's your background: About You: - Name: Robert (Bob to friends) - Age: 42 years old - Experience: 15 years as a certified life coach and motivational speaker - Education: Master's degree in Psychology from UC Berkeley - Specialties: Personal growth, career transitions, work-life balance, goal setting, stress management - Personal: Married with two kids, enjoy hiking and meditation in your free time - Approach: Warm, empathetic, practical, and solution-focused Your Coaching Style: - Respond ONLY to what the user actually tells you - never make assumptions about their problems - Start conversations in a welcoming, open manner - Ask clarifying questions to understand their situation better - Provide practical, actionable advice based on what they share - Be encouraging and positive, but also honest and realistic - Keep responses concise and focused (2-4 sentences usually) - Share brief personal insights when relevant, but keep the focus on the client Important: Never assume clients have problems they haven't mentioned. Let them guide the conversation and share what's on their mind.""" logger.info("[INIT] LifeCoachModel initialization complete") log_system_status("[INIT-END]") # ------------------------------------------------------------------ # Tokenizer # ------------------------------------------------------------------ def load_tokenizer(self) -> None: logger.info("[TOKENIZER] Loading tokenizer...") start = time.time() self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, trust_remote_code=True, cache_dir=os.environ.get("HF_HOME", None), ) logger.info(f"[TOKENIZER] Loaded in {time.time() - start:.2f}s") logger.info(f"[TOKENIZER] Vocab size: {self.tokenizer.vocab_size}") if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.tokenizer.pad_token_id = self.tokenizer.eos_token_id logger.info("[TOKENIZER] pad_token set to eos_token") # ------------------------------------------------------------------ # Model loading – **same as old version** + explicit offload folder # ------------------------------------------------------------------ def load_model(self, fine_tuned: bool = True) -> None: """Load Phi-4 model with 4-bit quantization (fits in 24GB GPU).""" logger.info(f"[MODEL] Loading model (fine_tuned={fine_tuned})") log_system_status("[MODEL-LOAD-START]") # Resolve adapter path if fine_tuned: adapter_path = Path(self.model_save_path) alt_path = Path(f"./{self.model_save_path}") if alt_path.exists() and (alt_path / "adapter_model.safetensors").exists(): model_path = str(alt_path) logger.info(f"[MODEL] Adapter found at alternate path: {model_path}") elif adapter_path.exists() and (adapter_path / "adapter_model.safetensors").exists(): model_path = str(adapter_path) logger.info(f"[MODEL] Adapter found at primary path: {model_path}") else: logger.error("[MODEL] No adapter found → loading base model") fine_tuned = False else: model_path = None try: # 4-bit quantization config (fits ~9.5GB VRAM) from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True, ) logger.info("[MODEL] Using 4-bit NF4 quantization") # Load base model with 4-bit logger.info("[MODEL] Loading base model from HuggingFace...") start = time.time() self.model = AutoModelForCausalLM.from_pretrained( self.model_name, device_map="auto", torch_dtype=torch.float16, trust_remote_code=True, quantization_config=quantization_config, cache_dir=os.environ.get("HF_HOME", None), ) logger.info(f"[MODEL] Base model loaded in {time.time() - start:.2f}s") log_system_status("[MODEL-AFTER-BASE]") # Load PEFT adapter (no offload needed with 4-bit) if fine_tuned: logger.info(f"[MODEL] Loading PEFT adapter from {model_path}") start = time.time() self.model = PeftModel.from_pretrained( self.model, model_path, device_map="auto", ) logger.info(f"[MODEL] Adapter loaded in {time.time() - start:.2f}s") self.model.eval() logger.info(f"[MODEL] Parameters: {sum(p.numel() for p in self.model.parameters())/1e9:.2f}B") log_system_status("[MODEL-LOAD-COMPLETE]") logger.info("[MODEL] Model loading COMPLETE") except Exception as e: logger.error("[MODEL] CRITICAL ERROR during model loading") logger.error(f"[MODEL] {type(e).__name__}: {e}") logger.error(f"[MODEL] Traceback:\n{traceback.format_exc()}") raise # ------------------------------------------------------------------ # Stopping criteria (stop on <|end|>) # ------------------------------------------------------------------ def _get_stopping_criteria(self) -> StoppingCriteriaList: stop_token = "<|end|>" stop_ids = self.tokenizer.encode(stop_token, add_special_tokens=False) class StopOnToken(StoppingCriteria): def __init__(self, ids): self.ids = ids def __call__(self, input_ids, scores, **kwargs): return input_ids[0][-1].item() in self.ids return StoppingCriteriaList([StopOnToken(stop_ids)]) # ------------------------------------------------------------------ # Generation # ------------------------------------------------------------------ # ------------------------------------------------------------------ # Generation (Patched Version) # ------------------------------------------------------------------ def generate_response( self, prompt: str, max_new_tokens: int = 256, conversation_history: list | None = None, ) -> str: logger.info(f"{'=' * 80}") logger.info("[GENERATE] STARTING GENERATION") logger.info(f"[GENERATE] Prompt length: {len(prompt)} chars") logger.info(f"[GENERATE] Max new tokens: {max_new_tokens}") logger.info(f"[GENERATE] History items: {len(conversation_history or [])}") log_system_status("[GENERATE-START]") try: # -------------------------------------------------------------- # 1. Build full prompt with Phi-4 chat template # -------------------------------------------------------------- full_prompt = f"<|system|>\n{self.system_prompt}<|end|>\n" if conversation_history: for msg in conversation_history: role = msg.get("role", "user") content = msg.get("content", "") full_prompt += f"<|{role}|>\n{content}<|end|>\n" full_prompt += f"<|user|>\n{prompt}<|end|>\n<|assistant|>\n" logger.info(f"[GENERATE-1] Full prompt length: {len(full_prompt)} chars") # -------------------------------------------------------------- # 2. Tokenize # -------------------------------------------------------------- inputs = self.tokenizer( full_prompt, return_tensors="pt", truncation=True, max_length=2048, ).to(self.device) # -------------------------------------------------------------- # 3. Generate # -------------------------------------------------------------- logger.info("[GENERATE] Calling model.generate()") start = time.time() with torch.no_grad(): outputs = self.model.generate( **inputs, max_new_tokens=max_new_tokens, temperature=0.7, do_sample=True, top_p=0.9, repetition_penalty=1.2, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, stopping_criteria=self._get_stopping_criteria(), ) gen_time = time.time() - start logger.info(f"[GENERATE] Generation took {gen_time:.2f}s") logger.info(f"[GENERATE] Generated {outputs.shape[1] - inputs['input_ids'].shape[1]} tokens") # -------------------------------------------------------------- # 4. Decode & clean (Logica di Pulizia Aggiornata) # -------------------------------------------------------------- full_text = self.tokenizer.decode(outputs[0], skip_special_tokens=False) response = "" if "<|assistant|>" in full_text: response = full_text.split("<|assistant|>")[-1] # PASSAGGIO 1: Rimuove il tag di fine completo (<|end|>) e gli spazi response = re.sub(r"\s*<\|end\|>\s*$", "", response) # PASSAGGIO 2: Rimuove tutti gli altri tag completi (e.g., <|system|>) response = re.sub(r"<\|.*?\|>", "", response) # PASSAGGIO 3: Rimuove il frammento incompleto di tag alla fine (e.g., "<|") # Usa '(?s)' per robustezza, catturando qualsiasi cosa inizi con <| e non si chiuda # (con il carattere >) fino alla fine della stringa. response = re.sub(r"(?s)\s*<\|[^>]*$", "", response) # PASSAGGIO 4: Pulisce gli spazi extra response = response.strip() else: # Fallback (caso in cui <|assistant|> non è nella risposta) response = re.sub(r"\s*<\|end\|>\s*$", "", full_text) response = re.sub(r"<\|.*?\|>", "", response) response = re.sub(r"(?s)\s*<\|[^>]*$", "", response).strip() logger.info(f"[GENERATE] Response length: {len(response)} chars") logger.info(f"[GENERATE] Preview: {response[:100]}...") # -------------------------------------------------------------- # 5. Cleanup # -------------------------------------------------------------- del inputs, outputs torch.cuda.empty_cache() gc.collect() log_system_status("[GENERATE-COMPLETE]") logger.info("[GENERATE] GENERATION SUCCESSFUL") logger.info(f"{'=' * 80}") return response except Exception as e: logger.error("[GENERATE] ERROR DURING GENERATION") logger.error(f"{type(e).__name__}: {e}") logger.error(traceback.format_exc()) return "I apologize, but I encountered an error while generating a response. Please try again." # ---------------------------------------------------------------------- # Test entry point # ---------------------------------------------------------------------- if __name__ == "__main__": logger.info("Running debug test...") model = LifeCoachModel() model.load_tokenizer() model.load_model(fine_tuned=True) test_resp = model.generate_response("Hello, how are you?", max_new_tokens=50) logger.info(f"Test response: {test_resp}")