| | |
| | |
| | |
| | |
| | from __future__ import annotations |
| | from typing import Any, Dict, List, Optional |
| | from pathlib import Path |
| | import json |
| | import cv2 |
| | import yaml |
| | import logging |
| |
|
| | from chromadb.config import Settings |
| | import chromadb |
| |
|
| | from audio_tools import process_audio_for_video |
| | from background_descriptor import build_keyframes_and_per_second, describe_keyframes_with_llm |
| | from identity_manager import IdentityManager |
| |
|
| |
|
| | log = logging.getLogger("video_processing") |
| | if not log.handlers: |
| | h = logging.StreamHandler(); h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")) |
| | log.addHandler(h) |
| | log.setLevel(logging.INFO) |
| |
|
| |
|
| | def _ensure_dir(p: Path) -> Path: |
| | p.mkdir(parents=True, exist_ok=True) |
| | return p |
| |
|
| |
|
| | def _ensure_chroma(db_dir: str | Path): |
| | _ensure_dir(Path(db_dir)) |
| | return chromadb.Client(Settings( |
| | persist_directory=str(db_dir), |
| | chroma_db_impl="duckdb+parquet", |
| | anonymized_telemetry=False, |
| | )) |
| |
|
| |
|
| | def load_config(path: str) -> Dict[str, Any]: |
| | p = Path(path) |
| | if not p.exists(): |
| | return {} |
| | return yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
| |
|
| |
|
| | def process_video_pipeline( |
| | video_path: str, |
| | *, |
| | config_path: str = "config_veureu.yaml", |
| | out_root: str = "results", |
| | db_dir: str = "chroma_db", |
| | ) -> Dict[str, Any]: |
| | cfg = load_config(config_path) |
| | out_dir = _ensure_dir(Path(out_root) / Path(video_path).stem) |
| |
|
| | |
| | cap = cv2.VideoCapture(str(video_path)) |
| | if not cap.isOpened(): |
| | raise RuntimeError(f"Cannot open video: {video_path}") |
| | fps = float(cap.get(cv2.CAP_PROP_FPS)) or 25.0 |
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 |
| | duration = (total_frames / fps) if total_frames > 0 else 0.0 |
| | cap.release() |
| |
|
| | |
| | face_col = voice_col = None |
| | if cfg.get("database", {}).get("enabled", True): |
| | client = _ensure_chroma(cfg.get("database", {}).get("persist_directory", db_dir)) |
| | if cfg.get("database", {}).get("enable_face_recognition", True): |
| | try: |
| | face_col = client.get_collection(cfg.get("database", {}).get("face_collection", "index_faces")) |
| | except Exception: |
| | face_col = None |
| | if cfg.get("database", {}).get("enable_voice_recognition", True): |
| | try: |
| | voice_col = client.get_collection(cfg.get("database", {}).get("voice_collection", "index_voices")) |
| | except Exception: |
| | voice_col = None |
| |
|
| | |
| | keyframes, per_second, _ = build_keyframes_and_per_second(video_path, out_dir, cfg, face_collection=face_col) |
| |
|
| | |
| | for i in range(len(keyframes)): |
| | if i < len(keyframes) - 1: |
| | keyframes[i]["end"] = keyframes[i + 1]["start"] |
| | else: |
| | keyframes[i]["end"] = round(duration, 2) |
| |
|
| | |
| | face_identities = {f.get("identity") for fr in per_second for f in (fr.get("faces") or []) if f.get("identity")} |
| | keyframes, montage_path = describe_keyframes_with_llm(keyframes, out_dir, face_identities=face_identities, config_path=config_path) |
| |
|
| | |
| | audio_segments, srt_unmodified_path, full_transcription = process_audio_for_video(video_path=str(video_path), out_dir=out_dir, cfg=cfg, voice_collection=voice_col) |
| |
|
| | |
| | im = IdentityManager(face_collection=face_col, voice_collection=voice_col) |
| | per_second = im.assign_faces_to_frames(per_second) |
| | keyframes = im.assign_faces_to_frames(keyframes) |
| | audio_segments = im.assign_voices_to_segments(audio_segments, distance_threshold=cfg.get("voice_processing", {}).get("speaker_identification", {}).get("distance_threshold")) |
| |
|
| | |
| | keyframes = im.map_identities_over_ranges(per_second, keyframes, key="faces", out_key="persona") |
| | audio_segments = im.map_identities_over_ranges(per_second, audio_segments, key="faces", out_key="persona") |
| |
|
| | |
| | frames_analysis = [{ |
| | "frame_number": fr.get("id"), |
| | "start": fr.get("start"), |
| | "end": fr.get("end"), |
| | "ocr": fr.get("ocr", ""), |
| | "persona": fr.get("persona", []), |
| | "description": fr.get("description", ""), |
| | } for fr in keyframes] |
| |
|
| | analysis = { |
| | "frames": frames_analysis, |
| | "audio_segments": [{k: v for k, v in seg.items() if k != "voice_embedding"} for seg in audio_segments], |
| | "full_transcription": full_transcription, |
| | } |
| | analysis_path = out_dir / f"{Path(video_path).stem}_analysis.json" |
| | analysis_path.write_text(json.dumps(analysis, indent=2, ensure_ascii=False), encoding="utf-8") |
| |
|
| | return { |
| | "output_dir": str(out_dir), |
| | "files": { |
| | "montage_path": montage_path, |
| | "srt_path": srt_unmodified_path, |
| | "analysis_path": str(analysis_path), |
| | }, |
| | "stats": { |
| | "duration_seconds": duration, |
| | "total_frames": total_frames, |
| | "frames_processed": len(keyframes), |
| | "audio_segments_processed": len(audio_segments), |
| | }, |
| | } |
| |
|