""" BLIND ASSISTANCE MODEL - HUGGING FACE SPACES DEPLOYMENT Enhanced Video Navigation System with Audio Guidance """ import gradio as gr import cv2 import numpy as np from ultralytics import YOLO from gtts import gTTS import pygame import os import time from collections import deque from PIL import Image, ImageEnhance import torch import threading from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip import tempfile import json # Optional imports try: import easyocr EASYOCR_AVAILABLE = True except ImportError: EASYOCR_AVAILABLE = False print("⚠️ EasyOCR not available") try: import segmentation_models_pytorch as smp SMP_AVAILABLE = True except ImportError: SMP_AVAILABLE = False print("⚠️ segmentation_models_pytorch not available") class AudioNavigationSystem: def __init__(self): print("πŸš€ Initializing Blind Assistance Model...") # Load YOLOv8 model print("Loading YOLOv8 model...") self.model = YOLO('yolov8n.pt') print("βœ… Model loaded successfully!") # Initialize Semantic Segmentation Model print("Loading Semantic Segmentation Model...") self.segmentation_model = self.load_segmentation_model() print("βœ… Segmentation model loaded!") # Define segmentation classes self.segmentation_classes = { 0: 'road', 1: 'sidewalk', 2: 'building', 3: 'wall', 4: 'fence', 5: 'pole', 6: 'traffic light', 7: 'traffic sign', 8: 'vegetation', 9: 'terrain', 10: 'sky', 11: 'person', 12: 'rider', 13: 'car', 14: 'truck', 15: 'bus', 16: 'train', 17: 'motorcycle', 18: 'bicycle', 19: 'void' } # Initialize Text Detection print("Loading Text Detection...") self.reader = self.load_text_detector() print("βœ… Text detection initialized!") # Audio system self.use_audio = True self.audio_files = [] self.audio_timestamps = [] self.video_start_time = None self.speaking = False self.audio_lock = threading.Lock() # Navigation classes self.navigation_classes = { 'person': 'person', 'car': 'vehicle', 'truck': 'vehicle', 'bus': 'vehicle', 'motorcycle': 'vehicle', 'bicycle': 'bicycle', 'traffic light': 'traffic light', 'stop sign': 'stop sign', 'chair': 'chair', 'bench': 'bench' } # Priority levels self.object_priority = { 'important_text': 10, 'vehicle': 5, 'person': 4, 'bicycle': 4, 'traffic light': 3, 'stop sign': 3, 'stairs': 4, 'curb': 4, 'crosswalk': 3, 'text': 2, 'road': 1, 'sidewalk': 1, 'building': 1, 'vegetation': 1 } # Important keywords for text self.important_keywords = [ 'exit', 'entrance', 'warning', 'danger', 'caution', 'stop', 'stairs', 'elevator', 'escalator', 'crosswalk', 'curb', 'emergency', 'hospital', 'police', 'fire', 'help', 'men', 'women', 'toilet', 'restroom', 'washroom', 'up', 'down', 'left', 'right', 'north', 'south', 'east', 'west', 'hazard', 'attention' ] # Frame dimensions self.frame_width = 0 self.frame_height = 0 # Announcement cooldown self.last_announcement = time.time() self.announcement_cooldown = 3 # Store detected items self.detected_items = set() self.text_size_reference = 100 self.last_segmentation_analysis = "" self.segmentation_cooldown = 2 print("βœ… System initialized successfully!") def load_text_detector(self): """Load text detection model""" if EASYOCR_AVAILABLE: try: return easyocr.Reader(['en']) except Exception as e: print(f"⚠️ EasyOCR initialization failed: {e}") return None def load_segmentation_model(self): """Load segmentation model""" if not SMP_AVAILABLE: return None try: model = smp.Unet( encoder_name="mobilenet_v2", encoder_weights="voc", classes=20, activation=None, ) return model except Exception as e: print(f"⚠️ Could not load segmentation model: {e}") return None def perform_semantic_segmentation(self, frame): """Perform semantic segmentation""" try: h, w = frame.shape[:2] seg_map = np.zeros((h, w), dtype=np.uint8) hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Road detection dark_mask = cv2.inRange(hsv, (0, 0, 0), (180, 255, 100)) seg_map[h//2:, :][dark_mask[h//2:, :] > 0] = 0 # Sky detection sky_mask = cv2.inRange(hsv, (100, 50, 150), (140, 255, 255)) seg_map[:h//3, :][sky_mask[:h//3, :] > 0] = 10 return seg_map except Exception as e: return np.zeros((frame.shape[0], frame.shape[1]), dtype=np.uint8) def analyze_segmentation_map(self, seg_map): """Analyze segmentation map""" h, w = seg_map.shape analysis = { 'immediate_walkable': 0, 'immediate_obstacles': 0, 'critical_warnings': [], 'guidance': [], 'environment': 'unknown' } immediate_path = seg_map[int(h*0.7):, :] road_pixels = np.sum(immediate_path == 0) total_pixels = immediate_path.size if total_pixels > 0: road_percentage = (road_pixels / total_pixels) * 100 if road_percentage > 60: analysis['guidance'].append("Clear path ahead") analysis['environment'] = 'road' elif road_percentage > 30: analysis['guidance'].append("Moderate path clarity") analysis['environment'] = 'mixed' else: analysis['guidance'].append("Obstructed path ahead") analysis['environment'] = 'obstructed' return analysis def generate_segmentation_guidance(self, seg_analysis): """Generate guidance from segmentation""" if not seg_analysis['guidance']: return None guidance = ". ".join(seg_analysis['guidance']) if seg_analysis['environment'] == 'road': guidance += ". You appear to be on a road." elif seg_analysis['environment'] == 'obstructed': guidance += ". Path may be obstructed." return guidance def preprocess_image_for_text(self, image): """Preprocess image for text detection""" pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) enhancer = ImageEnhance.Contrast(pil_image) pil_image = enhancer.enhance(2.0) enhancer = ImageEnhance.Sharpness(pil_image) pil_image = enhancer.enhance(2.0) return cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) def detect_text_easyocr(self, frame): """Detect text using EasyOCR""" if self.reader is None: return [] try: processed_frame = self.preprocess_image_for_text(frame) gray = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, (5, 5), 0) thresh = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) kernel = np.ones((2, 2), np.uint8) morphed = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel) processed_for_ocr = cv2.cvtColor(morphed, cv2.COLOR_GRAY2BGR) results = self.reader.readtext(processed_for_ocr, decoder='beamsearch', beamWidth=5, batch_size=1, height_ths=0.5, width_ths=0.5, min_size=20, text_threshold=0.3, link_threshold=0.3) detected_texts = [] for (bbox, text, confidence) in results: if confidence > 0.4 and len(text.strip()) > 1: clean_text = text.strip().lower() if len(bbox) >= 4: y_coords = [point[1] for point in bbox] text_height = max(y_coords) - min(y_coords) distance = self.calculate_text_distance(text_height) distance_category = self.get_distance_category(distance) is_important = any(keyword in clean_text for keyword in self.important_keywords) detected_texts.append({ 'type': 'text', 'text': clean_text, 'confidence': confidence, 'bbox': bbox, 'position': self.get_text_position(bbox), 'distance': distance, 'distance_category': distance_category, 'is_important': is_important, 'priority': 10 if is_important else 2 }) return detected_texts except Exception as e: print(f"Text detection error: {e}") return [] def get_text_position(self, bbox): """Determine text position""" if isinstance(bbox, list) and len(bbox) == 4: x_coords = [point[0] for point in bbox] x_center = sum(x_coords) / len(x_coords) third = self.frame_width / 3 if x_center < third: return "left" elif x_center < 2 * third: return "center" else: return "right" return "center" def calculate_text_distance(self, bbox_height): """Estimate text distance""" if bbox_height <= 0: return 10.0 distance = (self.text_size_reference * 2.0) / bbox_height return max(0.5, min(distance, 15.0)) def get_distance_category(self, distance): """Convert distance to category""" if distance < 2: return "very close" elif distance < 4: return "close" elif distance < 7: return "moderate distance" elif distance < 10: return "far" else: return "very far" def calculate_object_distance(self, bbox_height, object_type="person"): """Estimate object distance""" reference_sizes = { 'person': 1.7, 'vehicle': 1.5, 'bicycle': 1.0, 'animal': 0.5, 'chair': 1.0, 'bench': 1.0, 'pole': 2.0, 'default': 1.0 } real_height = reference_sizes.get(object_type, reference_sizes['default']) focal_length = 500 if bbox_height > 0: distance = (focal_length * real_height) / bbox_height return max(0.5, min(distance, 20)) return 20 def get_object_position(self, bbox): """Determine object position""" x_center = (bbox[0] + bbox[2]) / 2 third = self.frame_width / 3 if x_center < third: return "left" elif x_center < 2 * third: return "center" else: return "right" def get_comprehensive_priority(self, item): """Calculate comprehensive priority""" base_priority = self.object_priority.get(item.get('label', 'object'), 1) distance = item.get('distance', 10) distance_factor = max(0, 10 - distance) / 2 position = item.get('position', 'right') position_factor = 2 if position == 'center' else 1 if item.get('type') == 'text': if item.get('is_important', False): return 10 + distance_factor else: return 5 + distance_factor return base_priority * position_factor + distance_factor def generate_comprehensive_announcement(self, all_detections): """Generate balanced announcements""" if not all_detections: return "Path clear" messages = [] all_detections.sort(key=self.get_comprehensive_priority, reverse=True) announced_count = 0 max_announcements = 4 for item in all_detections: if announced_count >= max_announcements: break item_type = item.get('type', 'object') if item_type == 'text': text = item['text'] position = item['position'] distance_category = item['distance_category'] if item['is_important']: messages.append(f"IMPORTANT: {text} {distance_category} on your {position}") else: messages.append(f"Sign: {text} {distance_category} on your {position}") announced_count += 1 else: if announced_count < max_announcements: label = item['label'] position = item['position'] distance_category = item['distance_category'] if position == "center" and item['distance'] < 3: messages.append(f"Warning! {label} directly ahead, {distance_category}") else: messages.append(f"{label} on your {position}, {distance_category}") announced_count += 1 center_objects = [item for item in all_detections if item.get('position') == 'center' and item.get('distance', 10) < 3] if center_objects and len(messages) < 5: left_count = sum(1 for item in all_detections[:6] if item.get('position') == 'left') right_count = sum(1 for item in all_detections[:6] if item.get('position') == 'right') if left_count < right_count: messages.append("Consider moving left") elif right_count < left_count: messages.append("Consider moving right") return ". ".join(messages) def speak_gtts(self, text, timestamp=None): """Text-to-speech using gTTS""" if not text or self.speaking: return with self.audio_lock: self.speaking = True try: if timestamp is None: if self.video_start_time: timestamp = time.time() - self.video_start_time else: timestamp = 0 minutes = int(timestamp // 60) seconds = int(timestamp % 60) timestamp_str = f"{minutes:02d}:{seconds:02d}" print(f"πŸ”Š [{timestamp_str}] GUIDANCE: {text}") tts = gTTS(text=text, lang='en', slow=False) audio_filename = f"audio_{timestamp_str.replace(':', '-')}_{int(time.time() * 1000)}.mp3" tts.save(audio_filename) self.audio_files.append(audio_filename) self.audio_timestamps.append({ 'filename': audio_filename, 'timestamp': timestamp, 'timestamp_str': timestamp_str, 'text': text }) except Exception as e: print(f"⚠️ Speech generation error: {e}") finally: self.speaking = False time.sleep(0.5) def process_frame(self, frame): """Process video frame""" self.frame_height, self.frame_width = frame.shape[:2] seg_map = self.perform_semantic_segmentation(frame) seg_analysis = self.analyze_segmentation_map(seg_map) results = self.model(frame, conf=0.4, verbose=False) all_detections = [] objects_info = [] text_info = [] # Process YOLO detections for result in results: boxes = result.boxes for box in boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) conf = float(box.conf[0]) cls = int(box.cls[0]) label = self.model.names[cls] if label.lower() in self.navigation_classes: nav_label = self.navigation_classes[label.lower()] bbox_height = y2 - y1 distance = self.calculate_object_distance(bbox_height, nav_label) distance_category = self.get_distance_category(distance) position = self.get_object_position([x1, y1, x2, y2]) object_info = { 'type': 'object', 'label': nav_label, 'distance': distance, 'distance_category': distance_category, 'position': position, 'bbox': [x1, y1, x2, y2], 'confidence': conf, 'priority': self.object_priority.get(nav_label, 1) } objects_info.append(object_info) all_detections.append(object_info) # Draw bounding box if nav_label == 'vehicle': color = (0, 0, 255) elif nav_label == 'person': color = (0, 255, 255) elif nav_label == 'bicycle': color = (255, 0, 0) else: color = (0, 255, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) label_text = f"{nav_label.upper()} {distance_category}" (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(frame, (x1, y1-th-10), (x1+tw+10, y1), color, -1) cv2.putText(frame, label_text, (x1+5, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Detect text current_time = time.time() if (current_time - self.last_announcement) > 1.5: text_info = self.detect_text_easyocr(frame) new_texts = [] for text_data in text_info: text_hash = hash(text_data['text'][:20]) if text_hash not in self.detected_items: new_texts.append(text_data) self.detected_items.add(text_hash) text_info = new_texts all_detections.extend(text_info) # Draw text bounding boxes for text_data in text_info: bbox = text_data['bbox'] text = text_data['text'] is_important = text_data['is_important'] color = (255, 0, 255) if is_important else (255, 255, 0) thickness = 3 if is_important else 2 pts = np.array(bbox, np.int32) pts = pts.reshape((-1, 1, 2)) cv2.polylines(frame, [pts], True, color, thickness) label_text = f"🚩 {text}" if is_important else f"TEXT: {text}" x_coords = [point[0] for point in bbox] y_coords = [point[1] for point in bbox] text_x = int(min(x_coords)) text_y = int(min(y_coords)) - 10 if text_y < 20: text_y = int(max(y_coords)) + 25 (tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) cv2.rectangle(frame, (text_x, text_y-th-5), (text_x+tw+10, text_y+5), color, -1) cv2.putText(frame, label_text, (text_x+5, text_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Generate navigation message message = None if (current_time - self.last_announcement) > self.announcement_cooldown: seg_guidance = self.generate_segmentation_guidance(seg_analysis) object_message = self.generate_comprehensive_announcement(all_detections) if seg_guidance and "obstructed" in seg_guidance.lower(): message = f"{seg_guidance}. {object_message}" elif seg_guidance and object_message == "Path clear": message = seg_guidance else: message = object_message if message and message != "Path clear": threading.Thread(target=self.speak_gtts, args=(message,)).start() self.last_announcement = current_time # Status overlay overlay = frame.copy() cv2.rectangle(overlay, (5, 5), (500, 35), (0, 0, 0), -1) cv2.addWeighted(overlay, 0.6, frame, 0.4, 0, frame) status_text = f"Objects: {len(objects_info)} | Texts: {len(text_info)}" cv2.putText(frame, status_text, (15, 28), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) # Draw center danger zone center_objects = [obj for obj in objects_info if obj['position'] == 'center' and obj['distance'] < 3] if center_objects: cv2.rectangle(frame, (self.frame_width//3, self.frame_height-100), (2*self.frame_width//3, self.frame_height-10), (0, 0, 255), 3) cv2.putText(frame, "OBSTACLE IN PATH", (self.frame_width//3 + 20, self.frame_height-50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) return frame, message, len(objects_info), len(text_info) def process_video(self, video_path, output_path='output_navigation.mp4'): """Process uploaded video""" cap = cv2.VideoCapture(video_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) print(f"Processing video: {total_frames} frames at {fps} FPS") self.audio_timestamps = [] self.audio_files = [] self.detected_items = set() self.video_start_time = time.time() frame_count = 0 try: while cap.isOpened(): ret, frame = cap.read() if not ret: break processed_frame, message, obj_count, text_count = self.process_frame(frame) out.write(processed_frame) frame_count += 1 if frame_count % 30 == 0: progress = (frame_count / total_frames) * 100 print(f"Progress: {progress:.1f}%") finally: cap.release() out.release() print(f"βœ… Video processing complete!") if self.audio_timestamps: final_output = 'final_with_audio.mp4' return self.merge_audio_into_video(output_path, final_output) else: return output_path def merge_audio_into_video(self, video_path, output_path='final_with_audio.mp4'): """Merge audio into video""" print("🎡 Merging audio into video...") if not self.audio_timestamps: return video_path try: video = VideoFileClip(video_path) video_duration = video.duration audio_clips = [] for audio_info in self.audio_timestamps: if os.path.exists(audio_info['filename']): try: audio_clip = AudioFileClip(audio_info['filename']) audio_clip = audio_clip.set_start(audio_info['timestamp']) audio_clips.append(audio_clip) except Exception as e: print(f"⚠️ Failed to load {audio_info['filename']}: {e}") if not audio_clips: return video_path final_audio = CompositeAudioClip(audio_clips) final_audio = final_audio.set_duration(video_duration) final_video = video.set_audio(final_audio) final_video.write_videofile( output_path, codec='libx264', audio_codec='aac', fps=video.fps, verbose=False, logger=None ) video.close() final_video.close() final_audio.close() for clip in audio_clips: clip.close() print(f"βœ… Video with audio saved!") return output_path except Exception as e: print(f"❌ Error merging audio: {e}") return video_path # Initialize the system nav_system = AudioNavigationSystem() def process_video_gradio(video_file): """Gradio interface function""" try: if video_file is None: return None, "Please upload a video file" # Create temporary file with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_input: tmp_input.write(video_file) input_path = tmp_input.name # Check video duration cap = cv2.VideoCapture(input_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) duration = frame_count / fps if fps > 0 else 0 cap.release() if duration > 15: return None, f"⚠️ Video is {duration:.1f} seconds long. Please upload a video shorter than 15 seconds." # Process video output_path = nav_system.process_video(input_path) # Generate transcript transcript_text = "Audio Guidance Transcript:\n\n" for item in nav_system.audio_timestamps: transcript_text += f"[{item['timestamp_str']}] {item['text']}\n\n" return output_path, transcript_text except Exception as e: return None, f"Error processing video: {str(e)}" # Create Gradio interface with gr.Blocks(title="Blind Assistance AI", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🦯 Blind Assistance AI - Video Navigation System Upload a video to receive audio navigation guidance with object detection, text recognition, and scene analysis. ⚠️ **Important:** Please upload videos **shorter than 15 seconds** for optimal processing. """) with gr.Row(): with gr.Column(): video_input = gr.Video(label="Upload Video (Max 15 seconds)") process_btn = gr.Button("Process Video", variant="primary", size="lg") with gr.Column(): video_output = gr.Video(label="Processed Video with Audio Guidance") transcript_output = gr.Textbox(label="Audio Transcript", lines=10) gr.Markdown(""" ### Features: - 🎯 **Object Detection**: Identifies people, vehicles, and obstacles - πŸ“ **Text Detection & OCR**: Reads signs, labels, and important text - πŸ—ΊοΈ **Scene Analysis**: Understands environment and context - πŸ”Š **Voice Guidance**: Real-time audio navigation instructions """) process_btn.click( fn=process_video_gradio, inputs=[video_input], outputs=[video_output, transcript_output] ) # Launch the app if __name__ == "__main__": demo.launch()