| """ |
| PlotWeaver Audiobook Generator |
| English β Hausa Translation + TTS with Timestamps + Emotions |
| |
| Optimized for fast startup on HuggingFace Spaces. |
| """ |
|
|
| import gradio as gr |
| import torch |
| import numpy as np |
| import tempfile |
| import re |
| from pathlib import Path |
| from datetime import timedelta |
| from typing import List, Tuple, Dict |
|
|
| |
| import fitz |
| from docx import Document |
|
|
| import scipy.io.wavfile as wavfile |
| from scipy import signal |
|
|
| |
| |
| |
| NLLB_MODEL = "facebook/nllb-200-distilled-600M" |
| TTS_MODEL = "facebook/mms-tts-hau" |
| SRC_LANG = "eng_Latn" |
| TGT_LANG = "hau_Latn" |
| SAMPLE_RATE = 16000 |
| MAX_CHUNK_LENGTH = 200 |
|
|
| |
| EMOTION_SETTINGS = { |
| "joy": {"pitch": 1.15, "speed": 1.10, "energy": 1.2, "emoji": "π"}, |
| "sadness": {"pitch": 0.90, "speed": 0.85, "energy": 0.8, "emoji": "π’"}, |
| "anger": {"pitch": 1.10, "speed": 1.15, "energy": 1.4, "emoji": "π "}, |
| "fear": {"pitch": 1.20, "speed": 1.20, "energy": 1.1, "emoji": "π¨"}, |
| "surprise": {"pitch": 1.25, "speed": 1.05, "energy": 1.3, "emoji": "π²"}, |
| "neutral": {"pitch": 1.00, "speed": 1.00, "energy": 1.0, "emoji": "π"}, |
| } |
|
|
| |
| EMOTION_KEYWORDS = { |
| "joy": ["happy", "joy", "excited", "wonderful", "great", "love", "beautiful", "amazing", "fantastic", "delighted", "pleased", "glad", "cheerful", "celebrate", "laugh", "smile"], |
| "sadness": ["sad", "sorry", "unfortunately", "loss", "grief", "tears", "cry", "mourn", "depressed", "heartbroken", "tragic", "miserable", "lonely", "pain", "suffer"], |
| "anger": ["angry", "furious", "outraged", "hate", "frustrat", "annoyed", "mad", "rage", "hostile", "bitter", "resent", "irritat", "violent", "fight", "attack"], |
| "fear": ["afraid", "fear", "scared", "terrified", "worried", "anxious", "panic", "horror", "dread", "nervous", "frighten", "danger", "threat", "alarm"], |
| "surprise": ["surprised", "amazed", "astonished", "shocked", "unexpected", "wow", "incredible", "unbelievable", "sudden", "remarkable", "stunning"], |
| } |
|
|
| |
| _models = {} |
|
|
| |
| |
| |
| def extract_text_from_pdf(file_path: str) -> str: |
| """Extract text from PDF.""" |
| doc = fitz.open(file_path) |
| text = "" |
| for page in doc: |
| text += page.get_text() + "\n" |
| doc.close() |
| return text.strip() |
|
|
| def extract_text_from_docx(file_path: str) -> str: |
| """Extract text from DOCX with multiple fallback methods.""" |
| import zipfile |
| import xml.etree.ElementTree as ET |
| |
| |
| try: |
| with zipfile.ZipFile(file_path, 'r') as z: |
| if 'word/document.xml' in z.namelist(): |
| xml_content = z.read('word/document.xml') |
| tree = ET.fromstring(xml_content) |
| |
| |
| texts = [] |
| for elem in tree.iter(): |
| if elem.tag.endswith('}t') or elem.tag == 't': |
| if elem.text: |
| texts.append(elem.text) |
| |
| text = ''.join(texts) |
| if text.strip(): |
| return text |
| except Exception as e: |
| print(f"XML extraction failed: {e}") |
| |
| |
| try: |
| doc = Document(file_path) |
| text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) |
| if text.strip(): |
| return text |
| except Exception as e: |
| print(f"python-docx failed: {e}") |
| |
| |
| try: |
| doc = fitz.open(file_path) |
| text = "" |
| for page in doc: |
| text += page.get_text() + "\n" |
| doc.close() |
| if text.strip(): |
| return text.strip() |
| except Exception as e: |
| print(f"PyMuPDF failed: {e}") |
| |
| raise ValueError("Could not extract text from this DOCX file. Please convert to PDF or TXT.") |
|
|
| def extract_text_from_doc(file_path: str) -> str: |
| """Extract text from old .doc format using PyMuPDF.""" |
| |
| try: |
| doc = fitz.open(file_path) |
| text = "" |
| for page in doc: |
| text += page.get_text() + "\n" |
| doc.close() |
| if text.strip(): |
| return text.strip() |
| except Exception as e: |
| print(f"PyMuPDF .doc failed: {e}") |
| |
| |
| try: |
| import olefile |
| ole = olefile.OleFileIO(file_path) |
| |
| |
| if ole.exists('WordDocument'): |
| |
| stream = ole.openstream('WordDocument') |
| data = stream.read() |
| |
| |
| text_parts = [] |
| current_text = [] |
| for byte in data: |
| if 32 <= byte < 127: |
| current_text.append(chr(byte)) |
| elif current_text: |
| text_parts.append(''.join(current_text)) |
| current_text = [] |
| if current_text: |
| text_parts.append(''.join(current_text)) |
| |
| text = ' '.join([t for t in text_parts if len(t) > 3]) |
| ole.close() |
| |
| if text.strip(): |
| return text.strip() |
| except ImportError: |
| print("olefile not installed") |
| except Exception as e: |
| print(f"olefile failed: {e}") |
| |
| raise ValueError("Cannot read this .doc file. Please convert to .docx, .pdf, or .txt format.\n\nTip: Open in Microsoft Word or LibreOffice and 'Save As' a different format.") |
|
|
| def extract_text(file_path: str) -> str: |
| """Extract text from uploaded file.""" |
| ext = Path(file_path).suffix.lower() |
| |
| if ext == ".pdf": |
| return extract_text_from_pdf(file_path) |
| elif ext == ".docx": |
| return extract_text_from_docx(file_path) |
| elif ext == ".doc": |
| return extract_text_from_doc(file_path) |
| elif ext == ".txt": |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: |
| return f.read() |
| else: |
| raise ValueError(f"Unsupported format: {ext}. Please use PDF, DOCX, DOC, or TXT.") |
|
|
| |
| |
| |
| def get_translation_model(): |
| """Load translation model only when needed.""" |
| if "nllb" not in _models: |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer |
| |
| print("π₯ Loading NLLB-200...") |
| tokenizer = AutoTokenizer.from_pretrained(NLLB_MODEL, src_lang=SRC_LANG) |
| model = AutoModelForSeq2SeqLM.from_pretrained(NLLB_MODEL, torch_dtype=torch.float16) |
| |
| if torch.cuda.is_available(): |
| model = model.cuda() |
| model.eval() |
| |
| _models["nllb"] = (model, tokenizer) |
| print("β
NLLB-200 loaded") |
| |
| return _models["nllb"] |
|
|
| def get_tts_model(): |
| """Load TTS model only when needed.""" |
| if "tts" not in _models: |
| from transformers import VitsModel, AutoTokenizer |
| |
| print("π₯ Loading MMS-TTS Hausa...") |
| model = VitsModel.from_pretrained(TTS_MODEL) |
| tokenizer = AutoTokenizer.from_pretrained(TTS_MODEL) |
| |
| if torch.cuda.is_available(): |
| model = model.cuda() |
| model.eval() |
| |
| _models["tts"] = (model, tokenizer) |
| print("β
MMS-TTS loaded") |
| |
| return _models["tts"] |
|
|
| |
| |
| |
| def detect_emotion(text: str) -> str: |
| """Detect emotion from English text using keyword matching.""" |
| text_lower = text.lower() |
| |
| emotion_scores = {emotion: 0 for emotion in EMOTION_KEYWORDS} |
| |
| for emotion, keywords in EMOTION_KEYWORDS.items(): |
| for keyword in keywords: |
| if keyword in text_lower: |
| emotion_scores[emotion] += 1 |
| |
| |
| if text.count('!') >= 2: |
| emotion_scores["joy"] += 1 |
| emotion_scores["surprise"] += 1 |
| if text.count('?') >= 2: |
| emotion_scores["surprise"] += 1 |
| if text.isupper() and len(text) > 10: |
| emotion_scores["anger"] += 1 |
| |
| |
| max_emotion = max(emotion_scores, key=emotion_scores.get) |
| |
| if emotion_scores[max_emotion] > 0: |
| return max_emotion |
| return "neutral" |
|
|
| |
| |
| |
| def apply_emotion_to_audio(audio: np.ndarray, emotion: str, sample_rate: int = SAMPLE_RATE) -> np.ndarray: |
| """Apply emotion effects to audio (pitch, speed, energy).""" |
| settings = EMOTION_SETTINGS.get(emotion, EMOTION_SETTINGS["neutral"]) |
| |
| |
| if emotion == "neutral": |
| return audio |
| |
| |
| pitch_factor = settings["pitch"] |
| if pitch_factor != 1.0: |
| |
| new_length = int(len(audio) / pitch_factor) |
| audio = signal.resample(audio, new_length) |
| |
| |
| speed_factor = settings["speed"] |
| if speed_factor != 1.0: |
| new_length = int(len(audio) / speed_factor) |
| audio = signal.resample(audio, new_length) |
| |
| |
| energy_factor = settings["energy"] |
| audio = audio * energy_factor |
| |
| |
| max_val = np.max(np.abs(audio)) |
| if max_val > 0.95: |
| audio = audio * (0.95 / max_val) |
| |
| return audio |
|
|
| def add_pause(duration_ms: int = 300) -> np.ndarray: |
| """Generate silence for pauses between sentences.""" |
| num_samples = int(SAMPLE_RATE * duration_ms / 1000) |
| return np.zeros(num_samples) |
|
|
| |
| |
| |
| def translate_text(text: str) -> str: |
| """Translate English to Hausa.""" |
| model, tokenizer = get_translation_model() |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
| |
| sentences = re.split(r'(?<=[.!?])\s+', text) |
| translated = [] |
| |
| tgt_lang_id = tokenizer.convert_tokens_to_ids(TGT_LANG) |
| |
| with torch.no_grad(): |
| for sentence in sentences: |
| if not sentence.strip(): |
| continue |
| |
| inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=256) |
| if device == "cuda": |
| inputs = {k: v.cuda() for k, v in inputs.items()} |
| |
| outputs = model.generate( |
| **inputs, |
| forced_bos_token_id=tgt_lang_id, |
| max_length=256, |
| num_beams=4, |
| ) |
| |
| translated.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) |
| |
| return " ".join(translated) |
|
|
| |
| |
| |
| def split_text(text: str, max_len: int = MAX_CHUNK_LENGTH) -> List[str]: |
| """Split text into TTS-friendly chunks.""" |
| sentences = re.split(r'(?<=[.!?])\s+', text) |
| chunks, current = [], "" |
| |
| for s in sentences: |
| if len(current) + len(s) <= max_len: |
| current += s + " " |
| else: |
| if current: |
| chunks.append(current.strip()) |
| current = s + " " |
| if current: |
| chunks.append(current.strip()) |
| |
| return chunks |
|
|
| def generate_audio(text: str) -> Tuple[np.ndarray, List[dict]]: |
| """Generate audio with timestamps.""" |
| model, tokenizer = get_tts_model() |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
| chunks = split_text(text) |
| audio_segments = [] |
| timestamps = [] |
| current_time = 0.0 |
| |
| with torch.no_grad(): |
| for chunk in chunks: |
| if not chunk.strip(): |
| continue |
| |
| inputs = tokenizer(chunk, return_tensors="pt") |
| if device == "cuda": |
| inputs = {k: v.cuda() for k, v in inputs.items()} |
| |
| audio = model(**inputs).waveform.squeeze().cpu().numpy() |
| audio_segments.append(audio) |
| |
| duration = len(audio) / SAMPLE_RATE |
| timestamps.append({ |
| "start": format_time(current_time), |
| "end": format_time(current_time + duration), |
| "text": chunk |
| }) |
| current_time += duration |
| |
| return np.concatenate(audio_segments) if audio_segments else np.zeros(SAMPLE_RATE), timestamps |
|
|
| def format_time(seconds: float) -> str: |
| """Format as HH:MM:SS.mmm""" |
| h, r = divmod(int(seconds), 3600) |
| m, s = divmod(r, 60) |
| ms = int((seconds % 1) * 1000) |
| return f"{h:02d}:{m:02d}:{s:02d}.{ms:03d}" |
|
|
| |
| |
| |
| MAX_CHARS = 10000 |
|
|
| def process_document(file, enable_emotions=True, progress=gr.Progress()): |
| """Main pipeline: Document β Translation β TTS with Emotions β Audiobook""" |
| |
| if file is None: |
| return None, "", "", "β οΈ Please upload a document" |
| |
| try: |
| |
| progress(0.05, desc="π Extracting text...") |
| full_text = extract_text(file.name) |
| |
| if not full_text or not full_text.strip(): |
| return None, "", "", "β οΈ No text found in document" |
| |
| |
| original_length = len(full_text) |
| if original_length > MAX_CHARS: |
| text = full_text[:MAX_CHARS] |
| truncated_msg = f"\n\nβ οΈ Text truncated from {original_length:,} to {MAX_CHARS:,} characters for demo." |
| else: |
| text = full_text |
| truncated_msg = "" |
| |
| |
| sentences = re.split(r'(?<=[.!?])\s+', text) |
| sentences = [s.strip() for s in sentences if s.strip()] |
| total_sentences = len(sentences) |
| |
| |
| progress(0.08, desc="π Analyzing emotions...") |
| sentence_emotions = [] |
| for sentence in sentences: |
| emotion = detect_emotion(sentence) if enable_emotions else "neutral" |
| sentence_emotions.append(emotion) |
| |
| |
| emotion_counts = {} |
| for e in sentence_emotions: |
| emotion_counts[e] = emotion_counts.get(e, 0) + 1 |
| |
| |
| progress(0.1, desc=f"π Translating {total_sentences} sentences...") |
| translated_sentences = [] |
| |
| model, tokenizer = get_translation_model() |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
| tgt_lang_id = tokenizer.convert_tokens_to_ids(TGT_LANG) |
| |
| with torch.no_grad(): |
| for i, sentence in enumerate(sentences): |
| if not sentence.strip(): |
| continue |
| |
| |
| prog = 0.1 + (0.35 * (i / total_sentences)) |
| emotion_emoji = EMOTION_SETTINGS[sentence_emotions[i]]["emoji"] |
| progress(prog, desc=f"π Translating {i+1}/{total_sentences} {emotion_emoji}") |
| |
| inputs = tokenizer(sentence, return_tensors="pt", truncation=True, max_length=256) |
| if device == "cuda": |
| inputs = {k: v.cuda() for k, v in inputs.items()} |
| |
| outputs = model.generate( |
| **inputs, |
| forced_bos_token_id=tgt_lang_id, |
| max_length=256, |
| num_beams=4, |
| ) |
| |
| translated_sentences.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) |
| |
| translated = " ".join(translated_sentences) |
| |
| |
| progress(0.45, desc="ποΈ Generating expressive audio...") |
| |
| tts_model, tts_tokenizer = get_tts_model() |
| audio_segments = [] |
| timestamps = [] |
| current_time = 0.0 |
| |
| |
| hausa_chunks = split_text(translated) |
| total_chunks = len(hausa_chunks) |
| |
| |
| chunk_emotions = [] |
| chunk_idx = 0 |
| for i, emotion in enumerate(sentence_emotions): |
| |
| if i < len(sentences): |
| sentence_len = len(translated_sentences[i]) if i < len(translated_sentences) else 100 |
| chunks_per_sentence = max(1, sentence_len // MAX_CHUNK_LENGTH + 1) |
| for _ in range(chunks_per_sentence): |
| if chunk_idx < total_chunks: |
| chunk_emotions.append(emotion) |
| chunk_idx += 1 |
| |
| |
| while len(chunk_emotions) < total_chunks: |
| chunk_emotions.append("neutral") |
| |
| with torch.no_grad(): |
| for i, chunk in enumerate(hausa_chunks): |
| if not chunk.strip(): |
| continue |
| |
| |
| emotion = chunk_emotions[i] if i < len(chunk_emotions) else "neutral" |
| emotion_emoji = EMOTION_SETTINGS[emotion]["emoji"] |
| |
| |
| prog = 0.45 + (0.45 * (i / total_chunks)) |
| progress(prog, desc=f"ποΈ Generating audio {i+1}/{total_chunks} {emotion_emoji}") |
| |
| inputs = tts_tokenizer(chunk, return_tensors="pt") |
| if device == "cuda": |
| inputs = {k: v.cuda() for k, v in inputs.items()} |
| |
| audio = tts_model(**inputs).waveform.squeeze().cpu().numpy() |
| |
| |
| if enable_emotions and emotion != "neutral": |
| audio = apply_emotion_to_audio(audio, emotion) |
| |
| audio_segments.append(audio) |
| |
| |
| audio_segments.append(add_pause(200)) |
| |
| duration = len(audio) / SAMPLE_RATE |
| timestamps.append({ |
| "start": format_time(current_time), |
| "end": format_time(current_time + duration), |
| "text": chunk, |
| "emotion": emotion, |
| "emoji": emotion_emoji |
| }) |
| current_time += duration + 0.2 |
| |
| |
| if not audio_segments: |
| return None, "", "", "β No audio generated" |
| |
| full_audio = np.concatenate(audio_segments) |
| |
| |
| max_val = np.max(np.abs(full_audio)) |
| if max_val > 0: |
| full_audio = full_audio * (0.9 / max_val) |
| |
| |
| progress(0.95, desc="πΎ Saving audiobook...") |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: |
| wavfile.write(f.name, SAMPLE_RATE, (full_audio * 32767).astype(np.int16)) |
| audio_path = f.name |
| |
| |
| timestamps_text = "\n".join([ |
| f"[{t['start']} β {t['end']}] {t['emoji']} [{t['emotion'].upper()}] {t['text']}" |
| for t in timestamps |
| ]) |
| |
| |
| audio_duration = len(full_audio) / SAMPLE_RATE |
| duration_str = f"{int(audio_duration // 60)}:{int(audio_duration % 60):02d}" |
| |
| |
| emotion_summary = " | ".join([ |
| f"{EMOTION_SETTINGS[e]['emoji']} {e}: {c}" |
| for e, c in sorted(emotion_counts.items(), key=lambda x: -x[1]) |
| ]) |
| |
| transcript = f"""## Original (English) |
| {text[:1000]}{'...' if len(text) > 1000 else ''}{truncated_msg} |
| |
| ## Translation (Hausa) |
| {translated} |
| |
| --- |
| π **Stats**: {len(text):,} chars β {len(translated):,} chars | π΅ Duration: {duration_str} |
| |
| π **Emotions detected**: {emotion_summary} |
| """ |
| |
| progress(1.0, desc="β
Done!") |
| return audio_path, transcript, timestamps_text, f"β
Audiobook generated! Duration: {duration_str} | π Emotions: {len([e for e in sentence_emotions if e != 'neutral'])} expressive segments" |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return None, "", "", f"β Error: {str(e)}" |
|
|
| |
| |
| |
| with gr.Blocks( |
| title="PlotWeaver Audiobook", |
| theme=gr.themes.Soft(primary_hue="orange"), |
| ) as demo: |
| |
| gr.HTML(""" |
| <div style="text-align: center; margin-bottom: 1rem;"> |
| <h1>π§ PlotWeaver Audiobook Generator</h1> |
| <p><strong>English β Hausa</strong> | Powered by NLLB-200 + MMS-TTS</p> |
| <p style="color: #666;">β¨ Now with Emotional Expression!</p> |
| </div> |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| file_input = gr.File( |
| label="π Upload Document", |
| file_types=[".pdf", ".docx", ".doc", ".txt"], |
| type="filepath" |
| ) |
| |
| emotion_toggle = gr.Checkbox( |
| label="π Enable Emotional Expression", |
| value=True, |
| info="Adds emotion to voice based on text sentiment" |
| ) |
| |
| btn = gr.Button("π Generate Audiobook", variant="primary", size="lg") |
| status = gr.Textbox(label="Status", interactive=False) |
| |
| gr.Markdown(""" |
| ### How it works |
| 1. Upload English document (PDF, DOCX, DOC, TXT) |
| 2. AI **detects emotions** in text |
| 3. Translates to Hausa with NLLB-200 |
| 4. TTS generates **expressive audio** |
| 5. Download audiobook with timestamps |
| |
| --- |
| ### π Emotions Detected |
| - π **Joy** - Higher pitch, faster pace |
| - π’ **Sadness** - Lower pitch, slower pace |
| - π **Anger** - Intense, louder |
| - π¨ **Fear** - Faster, higher pitch |
| - π² **Surprise** - Excited tone |
| - π **Neutral** - Normal speech |
| |
| --- |
| β±οΈ **Processing**: ~1-2 min per page |
| """) |
| |
| with gr.Column(scale=2): |
| audio_out = gr.Audio(label="π§ Hausa Audiobook (with Emotions)") |
| with gr.Tabs(): |
| with gr.Tab("π Transcript"): |
| transcript = gr.Markdown() |
| with gr.Tab("β±οΈ Timestamps + Emotions"): |
| timestamps = gr.Textbox(lines=12, interactive=False) |
| |
| gr.HTML("""<div style="text-align: center; padding: 1rem; background: #f8f9fa; border-radius: 8px; margin-top: 1rem;"> |
| <strong>PlotWeaver</strong> - AI for African Languages | π Expressive Audiobooks |
| </div>""") |
| |
| btn.click( |
| process_document, |
| [file_input, emotion_toggle], |
| [audio_out, transcript, timestamps, status] |
| ) |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| demo.launch() |