Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import numpy as np | |
| import io | |
| import tempfile | |
| from pydub import AudioSegment | |
| from dataclasses import dataclass, field | |
| import numpy as np | |
| class AppState: | |
| stream: np.ndarray | None = None | |
| sampling_rate: int = 0 | |
| pause_detected: bool = False | |
| stopped: bool = False | |
| started_talking: bool = False | |
| conversation: list = field(default_factory=list) # Use default_factory for mutable defaults | |
| # Function to process audio input and detect pauses | |
| def process_audio(audio: tuple, state: AppState): | |
| if state.stream is None: | |
| state.stream = audio[1] | |
| state.sampling_rate = audio[0] | |
| else: | |
| state.stream = np.concatenate((state.stream, audio[1])) | |
| # Custom pause detection logic (replace with actual implementation) | |
| pause_detected = len(state.stream) > state.sampling_rate * 1 # Example: 1-sec pause | |
| state.pause_detected = pause_detected | |
| if state.pause_detected: | |
| return gr.Audio(recording=False), state # Stop recording | |
| return None, state | |
| # Generate chatbot response from user audio input | |
| def response(state: AppState): | |
| if not state.pause_detected: | |
| return None, state | |
| # Convert user audio to WAV format | |
| audio_buffer = io.BytesIO() | |
| segment = AudioSegment( | |
| state.stream.tobytes(), | |
| frame_rate=state.sampling_rate, | |
| sample_width=state.stream.dtype.itemsize, | |
| channels=1 if len(state.stream.shape) == 1 else state.stream.shape[1] | |
| ) | |
| segment.export(audio_buffer, format="wav") | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: | |
| f.write(audio_buffer.getvalue()) | |
| state.conversation.append({"role": "user", "content": {"path": f.name, "mime_type": "audio/wav"}}) | |
| # Simulate chatbot's response (replace with mini omni model logic) | |
| chatbot_response = b"Simulated response audio content" # Placeholder | |
| output_buffer = chatbot_response # Stream actual chatbot response here | |
| with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as f: | |
| f.write(output_buffer) | |
| state.conversation.append({"role": "assistant", "content": {"path": f.name, "mime_type": "audio/mp3"}}) | |
| yield None, state | |
| # --- Gradio Interface --- | |
| def start_recording_user(state: AppState): | |
| if not state.stopped: | |
| return gr.Audio(recording=True) | |
| # Build Gradio app using Blocks API | |
| with gr.Blocks() as demo: | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_audio = gr.Audio(label="Input Audio", sources="microphone", type="numpy") | |
| with gr.Column(): | |
| chatbot = gr.Chatbot(label="Conversation", type="messages") | |
| output_audio = gr.Audio(label="Output Audio", streaming=True, autoplay=True) | |
| state = gr.State(value=AppState()) | |
| stream = input_audio.stream( | |
| process_audio, [input_audio, state], [input_audio, state], stream_every=0.5, time_limit=30 | |
| ) | |
| respond = input_audio.stop_recording(response, [state], [output_audio, state]) | |
| respond.then(lambda s: s.conversation, [state], [chatbot]) | |
| restart = output_audio.stop(start_recording_user, [state], [input_audio]) | |
| cancel = gr.Button("Stop Conversation", variant="stop") | |
| cancel.click(lambda: (AppState(stopped=True), gr.Audio(recording=False)), None, [state, input_audio], cancels=[respond, restart]) | |
| if __name__ == "__main__": | |
| demo.launch() | |