Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torchaudio | |
| from datetime import datetime | |
| from lang_id import identify_languages | |
| from whisper import transcribe | |
| # アプリケーションの状態を保持する変数 | |
| data = [] | |
| data_df = pd.DataFrame() | |
| current_chunk = [] | |
| SAMPLING_RATE = 16000 | |
| CHUNK_DURATION = 5 # 初期値としての5秒 | |
| def normalize_audio(audio): | |
| # 音量の正規化(最大振幅が1になるようにスケーリング) | |
| audio = audio / np.max(np.abs(audio)) | |
| return audio | |
| def resample_audio(audio, orig_sr, target_sr=16000): | |
| if orig_sr != target_sr: | |
| print(f"Resampling audio from {orig_sr} to {target_sr}") | |
| audio = audio.astype(np.float32) | |
| resampler = torchaudio.transforms.Resample(orig_freq=orig_sr, new_freq=target_sr) | |
| audio = resampler(torch.from_numpy(audio).unsqueeze(0)).squeeze(0).numpy() | |
| return audio | |
| def process_chunk(chunk, language_set) -> pd.DataFrame: | |
| print(f"Processing audio chunk of length {len(chunk)}") | |
| rms = np.sqrt(np.mean(chunk**2)) | |
| db_level = 20 * np.log10(rms + 1e-9) # 加えた小さな値で-inf値を防ぐ | |
| # 音量の正規化 | |
| chunk = normalize_audio(chunk) | |
| length = len(chunk) / SAMPLING_RATE # 音声データの長さ(秒) | |
| s = datetime.now() | |
| selected_scores, all_scores = identify_languages(chunk, language_set) | |
| lang_id_time = (datetime.now() - s).total_seconds() | |
| # 日本語と英語の確率値を取得 | |
| ja_prob = selected_scores['Japanese'] | |
| en_prob = selected_scores['English'] | |
| ja_en = 'ja' if ja_prob > en_prob else 'en' | |
| # Top 3言語を取得 | |
| top3_languages = ", ".join([f"{lang} ({all_scores[lang]:.2f})" for lang in sorted(all_scores, key=all_scores.get, reverse=True)[:3]]) | |
| # テキストの認識 | |
| s = datetime.now() | |
| transcription = transcribe(chunk, language=ja_en) | |
| transcribe_time = (datetime.now() - s).total_seconds() | |
| return pd.DataFrame({ | |
| "Length (s)": [length], | |
| "db_level": [db_level], | |
| "Japanese_English": [f"{ja_en} ({ja_prob:.2f}, {en_prob:.2f})"] if db_level > 50 else ["Silent"], | |
| "Language": [top3_languages], | |
| "Lang ID Time": [lang_id_time], | |
| "Transcribe Time": [transcribe_time], | |
| "Text": [transcription], | |
| }) | |
| def process_audio_stream(audio, chunk_duration, language_set): | |
| global data_df, current_chunk, SAMPLING_RATE | |
| print("Process_audio_stream") | |
| if audio is None: | |
| return None, data_df | |
| sr, audio_data = audio | |
| # language_set | |
| language_set = [lang.strip() for lang in language_set.split(",")] | |
| print(audio_data.shape, audio_data.dtype) | |
| # 一番最初にSampling rateを揃えておく | |
| audio_data = resample_audio(audio_data, sr, target_sr=SAMPLING_RATE) | |
| audio_sec = 0 | |
| current_chunk.append(audio_data) | |
| total_chunk = np.concatenate(current_chunk) | |
| # CHUNK_DURATIONを超えていたら処理 | |
| if len(total_chunk) >= SAMPLING_RATE * chunk_duration: | |
| chunk = total_chunk[:SAMPLING_RATE * chunk_duration] | |
| total_chunk = total_chunk[SAMPLING_RATE * chunk_duration:] | |
| audio_sec += chunk_duration | |
| # Check if the audio in the window is too quiet | |
| # rms = np.sqrt(np.mean(chunk**2)) | |
| # db_level = 20 * np.log10(rms + 1e-9) # 加えた小さな値で-inf値を防ぐ | |
| # print(db_level) | |
| df = process_chunk(chunk, language_set) | |
| # add db_level | |
| # df["dB Level"] = db_level | |
| data_df = pd.concat([data_df, df], ignore_index=True) | |
| current_chunk = [total_chunk] | |
| return (SAMPLING_RATE, chunk), data_df | |
| else: | |
| return (SAMPLING_RATE, total_chunk), data_df | |
| def process_audio(audio, chunk_duration, language_set): | |
| global data, data_df, current_chunk, SAMPLING_RATE | |
| # reset state | |
| data = [] | |
| data_df = pd.DataFrame() | |
| current_chunk = [] | |
| print("Process_audio") | |
| print(audio) | |
| if audio is None: | |
| return | |
| sr, audio_data = audio | |
| # language_set | |
| language_set = [lang.strip() for lang in language_set.split(",")] | |
| print(audio_data.shape, audio_data.dtype) | |
| # 一番最初にSampling rateを揃えておく | |
| audio_data = resample_audio(audio_data, sr, target_sr=SAMPLING_RATE) | |
| audio_sec = 0 | |
| # Check if the audio in the window is too quiet | |
| rms = np.sqrt(np.mean(audio_data**2)) | |
| db_level = 20 * np.log10(rms + 1e-9) # 加えた小さな値で-inf値を防ぐ | |
| print(db_level) | |
| # 音量の正規化 | |
| audio_data = normalize_audio(audio_data) | |
| # 新しいデータを現在のチャンクに追加 | |
| current_chunk.append(audio_data) | |
| total_chunk = np.concatenate(current_chunk) | |
| while len(total_chunk) >= SAMPLING_RATE * chunk_duration: | |
| chunk = total_chunk[:SAMPLING_RATE * chunk_duration] | |
| total_chunk = total_chunk[SAMPLING_RATE * chunk_duration:] # 処理済みの部分を削除 | |
| audio_sec += chunk_duration | |
| print(f"Processing audio chunk of length {len(chunk)}") | |
| df = process_chunk(chunk, language_set) | |
| data_df = pd.concat([data_df, df], ignore_index=True) | |
| yield (SAMPLING_RATE, chunk), data_df | |
| # 未処理の残りのデータを保持 | |
| current_chunk = [total_chunk] | |
| # パラメータの入力コンポーネント | |
| chunk_duration_input = gr.Number(value=5, label="Chunk Duration (seconds)") | |
| language_set_input = gr.Textbox(value="Japanese,English", label="Language Set (comma-separated)") | |
| inputs_file = [gr.Audio(sources=["upload"], type="numpy"), chunk_duration_input, language_set_input] | |
| inputs_stream = [gr.Audio(sources=["microphone"], type="numpy", streaming=True), chunk_duration_input, language_set_input] | |
| outputs = [gr.Audio(type="numpy"), gr.DataFrame(headers=["Time", "Volume", "Length (s)"])] | |
| with gr.Blocks() as demo: | |
| with gr.TabItem("Upload"): | |
| gr.Interface( | |
| fn=process_audio, | |
| inputs=inputs_file, | |
| outputs=outputs, | |
| live=False, | |
| title="File Audio Processing", | |
| description="Upload an audio file to see the processing results." | |
| ) | |
| with gr.TabItem("Microphone"): | |
| gr.Interface( | |
| fn=process_audio_stream, | |
| inputs=inputs_stream, | |
| outputs=outputs, | |
| live=True, | |
| title="Real-time Audio Processing", | |
| description="Speak into the microphone and see real-time audio processing results." | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |