| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import annotations |
| | import argparse |
| | import csv |
| | import logging |
| | import sys |
| | import uuid |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Any, Dict, Iterable, List, Optional, Tuple |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| | log = logging.getLogger("identity_encoding") |
| |
|
| | |
| | |
| | try: |
| | import chromadb |
| | from chromadb.config import Settings |
| | except Exception as e: |
| | chromadb = None |
| | log.error("No se pudo importar chromadb: %s", e) |
| |
|
| | from libs.vision_tools_salamandra import FaceAnalyzer |
| | from collections import Counter |
| |
|
| | |
| | from libs.audio_tools_ana_2 import VoiceEmbedder |
| | from libs.vision_tools_salamandra import FaceOfImageEmbedding |
| |
|
| | |
| | try: |
| | import numpy as np |
| | except Exception: |
| | np = None |
| |
|
| | |
| | IMG_EXT = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} |
| | AUD_EXT = {".wav", ".mp3", ".flac", ".m4a", ".ogg"} |
| |
|
| |
|
| | def list_files(root: Path, exts: Iterable[str]) -> List[Path]: |
| | root = Path(root) |
| | if not root.exists(): |
| | return [] |
| | return [p for p in root.rglob('*') if p.suffix.lower() in exts] |
| |
|
| |
|
| | def ensure_chroma(db_dir: Path): |
| | if chromadb is None: |
| | raise RuntimeError("chromadb no instalado. pip install chromadb") |
| | db_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | client = chromadb.Client(Settings( |
| | chroma_db_impl="duckdb+parquet", |
| | persist_directory=str(db_dir) |
| | )) |
| | return client |
| |
|
| | |
| | def build_faces_index(faces_dir: Path, client, collection_name: str = "index_faces", |
| | deepface_model: str = 'Facenet512', drop: bool = True) -> int: |
| | |
| | if collection_name in [c.name for c in client.list_collections()] and drop: |
| | client.delete_collection(name=collection_name) |
| | col = client.get_or_create_collection(name=collection_name) |
| |
|
| | be = FaceOfImageEmbedding(deepface_model=deepface_model) |
| | count = 0 |
| | registered_identities = set() |
| |
|
| | for ident_dir in sorted(Path(faces_dir).iterdir() if Path(faces_dir).exists() else []): |
| | if not ident_dir.is_dir(): |
| | continue |
| | ident = ident_dir.name |
| | for img_path in list_files(ident_dir, IMG_EXT): |
| | embeddings = be.encode_image(img_path) |
| | if embeddings is None: |
| | log.warning("No face embedding in %s", img_path) |
| | continue |
| |
|
| | |
| | for e in (embeddings if isinstance(embeddings[0], list) else [embeddings]): |
| | uid = str(uuid.uuid4()) |
| | col.add(ids=[uid], embeddings=[e], metadatas=[{"identity": ident, "path": str(img_path)}]) |
| | count += 1 |
| | registered_identities.add(ident) |
| |
|
| | |
| | print("Ha acabado de crear la base de datos.") |
| | print(f"Total de embeddings guardados: {count}") |
| | print("Identidades registradas:") |
| | for name in sorted(registered_identities): |
| | print(f" - {name}") |
| |
|
| | log.info("index_faces => %d embeddings", count) |
| | return count |
| |
|
| | |
| |
|
| | def aggregate_face_attributes(faces_dir: Path, out_csv: Path) -> int: |
| | """ |
| | Procesa un directorio de caras por identidad y genera un CSV con edad y género. |
| | Usa FaceAnalyzer para extraer atributos. |
| | """ |
| | |
| | from libs.vision_tools_salamandra import FaceAnalyzer |
| | analyzer = FaceAnalyzer() |
| |
|
| | rows: List[Dict[str, Any]] = [] |
| |
|
| | faces_dir = Path(faces_dir) |
| | if not faces_dir.exists() or not faces_dir.is_dir(): |
| | log.error("El directorio de caras no existe: %s", faces_dir) |
| | return 0 |
| |
|
| | def most_common(lst, default="unknown"): |
| | return Counter(lst).most_common(1)[0][0] if lst else default |
| |
|
| | |
| | for ident_dir in sorted(faces_dir.iterdir()): |
| | if not ident_dir.is_dir(): |
| | continue |
| | ident = ident_dir.name |
| | attrs: List[Dict[str, Any]] = [] |
| |
|
| | log.info("Procesando identidad: %s", ident) |
| |
|
| | for img_path in sorted(list_files(ident_dir, IMG_EXT)): |
| | try: |
| | data = analyzer.analyze_image(str(img_path)) |
| | if data: |
| | attrs.append(data) |
| | except Exception as e: |
| | log.warning("Error procesando imagen %s: %s", img_path, e) |
| |
|
| | genders = [a.get("gender", "unknown") for a in attrs] |
| | ages = [a.get("age", "unknown") for a in attrs] |
| |
|
| | |
| | context_txt = (faces_dir.parent / "context" / f"{ident}.txt") |
| | identity_context = context_txt.read_text(encoding="utf-8").strip() if context_txt.exists() else "" |
| |
|
| | rows.append({ |
| | "identity": ident, |
| | "samples": len(attrs), |
| | "gender": most_common(genders), |
| | "age_bucket": most_common(ages), |
| | "identity_context": identity_context, |
| | }) |
| |
|
| | log.info("Procesados %d atributos para %s", len(attrs), ident) |
| |
|
| | |
| | out_csv.parent.mkdir(parents=True, exist_ok=True) |
| | with out_csv.open("w", newline='', encoding="utf-8") as f: |
| | fieldnames = list(rows[0].keys()) if rows else ["identity", "identity_context"] |
| | writer = csv.DictWriter(f, fieldnames=fieldnames) |
| | writer.writeheader() |
| | writer.writerows(rows) |
| |
|
| | log.info("CSV generado correctamente: %s", out_csv) |
| | return len(rows) |
| |
|
| | |
| | from pydub import AudioSegment |
| |
|
| | def build_voices_index(voices_dir: Path, client, collection_name: str = "index_voices", drop: bool = True) -> int: |
| | if collection_name in [c.name for c in client.list_collections()] and drop: |
| | client.delete_collection(name=collection_name) |
| | col = client.get_or_create_collection(name=collection_name) |
| |
|
| | ve = VoiceEmbedder() |
| | count = 0 |
| |
|
| | for ident_dir in sorted(Path(voices_dir).iterdir() if Path(voices_dir).exists() else []): |
| | if not ident_dir.is_dir(): |
| | continue |
| | ident = ident_dir.name |
| | for wav_path in list_files(ident_dir, AUD_EXT): |
| | |
| | try: |
| | emb = ve.embed(wav_path) |
| | except Exception as e: |
| | log.warning("Error leyendo audio %s: %s. Intentando reconvertir...", wav_path, e) |
| | |
| | try: |
| | audio = AudioSegment.from_file(wav_path) |
| | fixed_path = wav_path.with_name(wav_path.stem + "_fixed.wav") |
| | audio.export(fixed_path, format="wav") |
| | log.info("Archivo convertido a WAV compatible: %s", fixed_path) |
| | emb = ve.embed(fixed_path) |
| | except Exception as e2: |
| | log.error("No se pudo generar embedding tras reconversión para %s: %s", wav_path, e2) |
| | continue |
| | if emb is None: |
| | log.warning("No voice embedding en %s", wav_path) |
| | continue |
| | uid = str(uuid.uuid4()) |
| | col.add(ids=[uid], embeddings=[emb], metadatas=[{"identity": ident, "path": str(wav_path)}]) |
| | count += 1 |
| |
|
| | log.info("index_voices => %d embeddings", count) |
| | return count |
| |
|
| | |
| | @dataclass |
| | class VisionClient: |
| | provider: str = "none" |
| |
|
| | def describe(self, image_path: str, prompt: str) -> str: |
| | return (f"Automatic description (placeholder) for {Path(image_path).name}. " |
| | f"{prompt}") |
| |
|
| |
|
| | class TextEmbedder: |
| | """Text embeddings with Sentence-Transformers if available; fallback to TF-IDF.""" |
| | def __init__(self, model_name: str = "all-MiniLM-L6-v2"): |
| | self.kind = "tfidf"; self.model = None; self.vectorizer = None |
| | try: |
| | from sentence_transformers import SentenceTransformer |
| | self.model = SentenceTransformer(model_name) |
| | self.kind = "sbert" |
| | except Exception: |
| | from sklearn.feature_extraction.text import TfidfVectorizer |
| | self.vectorizer = TfidfVectorizer(max_features=768) |
| |
|
| | def fit(self, texts: List[str]): |
| | if self.vectorizer is not None: |
| | self.vectorizer.fit(texts) |
| |
|
| | def encode(self, texts: List[str]) -> List[List[float]]: |
| | if self.model is not None: |
| | arr = self.model.encode(texts, convert_to_numpy=True) |
| | return arr.astype(float).tolist() |
| | X = self.vectorizer.transform(texts) if self.vectorizer is not None else None |
| | return (X.toarray().astype(float).tolist() if X is not None else [[0.0]*128 for _ in texts]) |
| |
|
| |
|
| | def build_scenarios_descriptions(scenarios_dir: Path, out_csv: Path, vision: VisionClient, |
| | sample_per_scenario: int = 12) -> Tuple[int, List[Dict[str, Any]]]: |
| | rows: List[Dict[str, Any]] = [] |
| | for scen_dir in sorted(Path(scenarios_dir).iterdir() if Path(scenarios_dir).exists() else []): |
| | if not scen_dir.is_dir(): |
| | continue |
| | scen = scen_dir.name |
| | descs: List[str] = [] |
| | imgs = list_files(scen_dir, IMG_EXT)[:sample_per_scenario] |
| | for img in imgs: |
| | d = vision.describe(str(img), prompt="Describe location, time period, lighting, and atmosphere without mentioning people or time of day.") |
| | if d: |
| | descs.append(d) |
| | if not descs: |
| | descs = [f"Scenario {scen} (no images)"] |
| | rows.append({"scenario": scen, "descriptions": " \n".join(descs)}) |
| |
|
| | out_csv.parent.mkdir(parents=True, exist_ok=True) |
| | with out_csv.open("w", newline='', encoding="utf-8") as f: |
| | w = csv.DictWriter(f, fieldnames=["scenario", "descriptions"]) |
| | w.writeheader(); w.writerows(rows) |
| | log.info("scenarios_descriptions => %s", out_csv) |
| | return len(rows), rows |
| |
|
| |
|
| | def build_scenarios_index(client, rows: List[Dict[str, Any]], embedder: TextEmbedder, |
| | collection_name: str = "index_scenarios", drop: bool = True) -> int: |
| | texts = [r["descriptions"] for r in rows] |
| | embedder.fit(texts) |
| | embs = embedder.encode(texts) |
| |
|
| | if collection_name in [c.name for c in client.list_collections()] and drop: |
| | client.delete_collection(name=collection_name) |
| | col = client.get_or_create_collection(name=collection_name) |
| |
|
| | for r, e in zip(rows, embs): |
| | col.add(ids=[r["scenario"]], embeddings=[e], metadatas=[{"scenario": r["scenario"]}]) |
| | log.info("index_scenarios => %d descriptions", len(rows)) |
| | return len(rows) |
| |
|
| | |
| |
|
| | def main(): |
| | ap = argparse.ArgumentParser(description="Veureu — Build identity/scenario indices and CSVs") |
| | ap.add_argument('--faces_dir', default='identities/faces', help='Root directory of face images per identity') |
| | ap.add_argument('--voices_dir', default='identities/voices', help='Root directory of voice clips per identity') |
| | ap.add_argument('--scenarios_dir', default='scenarios', help='Root directory of scenario folders with images') |
| | ap.add_argument('--db_dir', default='chroma_db', help='ChromaDB persistence directory') |
| | ap.add_argument('--out_dir', default='results', help='Output directory for CSVs') |
| | ap.add_argument('--drop_collections', action='store_true', help='Delete collections if they exist before rebuilding') |
| | ap.add_argument('--deepface_model', default='Facenet512', help='DeepFace model to use as fallback') |
| | ap.add_argument('--scenario_samples', type=int, default=12, help='Number of images per scenario to describe') |
| |
|
| | args = ap.parse_args() |
| |
|
| | faces_dir = Path(args.faces_dir) |
| | voices_dir = Path(args.voices_dir) |
| | print(voices_dir) |
| | scenarios_dir = Path(args.scenarios_dir) |
| | out_dir = Path(args.out_dir); out_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | client = ensure_chroma(Path(args.db_dir)) |
| |
|
| | |
| | build_faces_index(faces_dir, client, collection_name="index_faces", deepface_model=args.deepface_model, drop=args.drop_collections) |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | build_voices_index(voices_dir, client, collection_name="index_voices", drop=args.drop_collections) |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | log.info("✅ Identity encoding completed.") |
| |
|
| |
|
| | if __name__ == '__main__' and '--video' not in sys.argv: |
| | main() |
| |
|