#!/usr/bin/env python3 """ 🍽️ Production-Ready AI Food Recognition API =========================================== FastAPI backend optimized for Hugging Face Spaces deployment. - Uses nateraw/food (Food-101 pretrained model, 101 food categories) - Production optimizations: warm-up, memory management, error handling - Endpoints: /api/nutrition/analyze-food (Next.js) + /analyze (HF Spaces) - Auto device detection: GPU β†’ MPS β†’ CPU fallback - Enhanced image preprocessing with contrast/sharpness boost """ import os import gc import logging import asyncio import aiohttp import re from typing import Dict, Any, List, Optional from io import BytesIO from pathlib import Path # Load .env file if exists try: from dotenv import load_dotenv env_path = Path(__file__).parent / '.env' load_dotenv(dotenv_path=env_path) logging.info(f"βœ… Loaded .env from {env_path}") except ImportError: logging.warning("⚠️ python-dotenv not installed, using system environment variables") except Exception as e: logging.warning(f"⚠️ Could not load .env: {e}") import torch import torch.nn.functional as F from PIL import Image, ImageEnhance import numpy as np from fastapi import FastAPI, File, UploadFile, HTTPException, Request, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse import uvicorn from transformers import AutoImageProcessor, AutoModelForImageClassification from contextlib import asynccontextmanager # OpenAI for translations from openai import AsyncOpenAI # ==================== CONFIGURATION ==================== MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB MAX_IMAGE_SIZE = 512 ALLOWED_TYPES = ["image/jpeg", "image/jpg", "image/png", "image/webp"] # OpenAI Configuration (will be initialized after logger is set up) OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") openai_client = None # Will be initialized in lifespan startup # ==================== MULTI-MODEL FOOD RECOGNITION ==================== FOOD_MODELS = { # Primary specialize food models "food101": { "model_name": "nateraw/food", "type": "food_specialist", "classes": 101, "priority": 1, "description": "Food-101 specialized model" }, "food2k": { "model_name": "Kaludi/food-category-classification-v2.0", "type": "food_specialist", "classes": 2000, "priority": 2, "description": "Extended food categories" }, "nutrition": { "model_name": "microsoft/DiT-base-finetuned-SROIE", "type": "nutrition_labels", "classes": 1000, "priority": 3, "description": "Nutrition label recognition" }, # General object detection models that include food "general_v1": { "model_name": "google/vit-base-patch16-224", "type": "general_objects", "classes": 1000, "priority": 4, "description": "ImageNet general objects (includes food)" }, "general_v2": { "model_name": "microsoft/beit-base-patch16-224", "type": "general_objects", "classes": 1000, "priority": 5, "description": "Microsoft BEiT model" } } # Default primary model PRIMARY_MODEL = "food101" # Comprehensive food categories (all possible foods) COMPREHENSIVE_FOOD_CATEGORIES = { # Food-101 categories "pizza", "hamburger", "sushi", "ice_cream", "french_fries", "chicken_wings", "chocolate_cake", "caesar_salad", "steak", "tacos", "pancakes", "pancake", "lasagna", "apple_pie", "chicken_curry", "pad_thai", "ramen", "waffles", "waffle", "donuts", "cheesecake", "fish_and_chips", "fried_rice", "greek_salad", "guacamole", "crepe", "crepes", # Balkanska/Srpska tradicionalna jela "cevapi", "cevapcici", "burek", "pljeskavica", "sarma", "klepe", "dolma", "kajmak", "ajvar", "prebranac", "pasulj", "grah", "punjena_paprika", "musaka", "japrak", "bamija", "bosanski_lonac", "begova_corba", "tarhana", "zeljanica", "sirnica", "krompiruΕ‘a", "spanac", "tikvenica", # VoΔ‡e "apple", "banana", "orange", "grape", "strawberry", "cherry", "peach", "pear", "plum", "watermelon", "melon", "lemon", "lime", "kiwi", "mango", "pineapple", "apricot", "fig", "pomegranate", "blackberry", "raspberry", "blueberry", "cranberry", "coconut", "avocado", "papaya", "passion_fruit", # PovrΔ‡e "tomato", "cucumber", "carrot", "potato", "onion", "garlic", "pepper", "cabbage", "spinach", "lettuce", "broccoli", "cauliflower", "zucchini", "eggplant", "celery", "radish", "beet", "sweet_potato", "corn", "peas", "green_beans", "mushroom", "leek", "parsley", "basil", "mint", "dill", # Meso i riba "beef", "pork", "chicken", "lamb", "turkey", "duck", "salmon", "tuna", "cod", "mackerel", "sardine", "shrimp", "crab", "lobster", "mussels", "oysters", "squid", "octopus", # Mlečni proizvodi "milk", "cheese", "yogurt", "butter", "cream", "sour_cream", "cottage_cheese", "mozzarella", "cheddar", "parmesan", "feta", "goat_cheese", # Ε½itarice i leguminoze "bread", "rice", "pasta", "quinoa", "oats", "wheat", "barley", "lentils", "chickpeas", "black_beans", "kidney_beans", "soybeans", # Nuts and seeds "almond", "walnut", "peanut", "cashew", "pistachio", "hazelnut", "pecan", "sunflower_seeds", "pumpkin_seeds", "chia_seeds", "flax_seeds", # MeΔ‘unarodna kuhinja "spaghetti", "ravioli", "gnocchi", "risotto", "paella", "falafel", "hummus", "spring_rolls", "dim_sum", "bibimbap", "kimchi", "miso_soup", "tempura", "curry", "naan", "samosa", "tandoori", "biryani", "tikka_masala", "enchilada", "quesadilla", "burrito", "nachos", "gazpacho", "paella", # Deserti i slatkiΕ‘i "cake", "cookie", "muffin", "brownie", "pie", "tart", "pudding", "mousse", "gelato", "sorbet", "macaron", "eclair", "profiterole", "tiramisu", "baklava", "halva", "lokum", "tulumba", "krofne", # Napici "coffee", "tea", "juice", "smoothie", "wine", "beer", "cocktail", "soda", "water", "milk_shake", "lemonade", "kombucha" } # ==================== EXTERNAL NUTRITION APIs ==================== # USDA FoodData Central API (Free, comprehensive US database) USDA_API_BASE = "https://api.nal.usda.gov/fdc/v1" USDA_API_KEY = os.environ.get("USDA_API_KEY", "DEMO_KEY") # Edamam Nutrition Analysis API (Free tier: 1000 requests/month) EDAMAM_APP_ID = os.environ.get("EDAMAM_APP_ID", "") EDAMAM_APP_KEY = os.environ.get("EDAMAM_APP_KEY", "") EDAMAM_API_BASE = "https://api.edamam.com/api/nutrition-data" # Spoonacular Food API (Free tier: 150 requests/day) SPOONACULAR_API_KEY = os.environ.get("SPOONACULAR_API_KEY", "") SPOONACULAR_API_BASE = "https://api.spoonacular.com/food/ingredients" # OpenFoodFacts API (Completely FREE, 2M+ products worldwide) OPENFOODFACTS_API_BASE = "https://world.openfoodfacts.org/api/v2" # FoodRepo API (Free, comprehensive food database) FOODREPO_API_BASE = "https://www.foodrepo.org/api/v3" # ==================== LOGGING ==================== logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Default fallback nutrition values (used only if all APIs fail) DEFAULT_NUTRITION = {"calories": 200, "protein": 10.0, "carbs": 25.0, "fat": 8.0} # ==================== DEVICE SELECTION ==================== def select_device() -> str: """Smart device selection with fallback.""" if torch.cuda.is_available(): device_name = torch.cuda.get_device_name(0) logger.info(f"πŸš€ Using CUDA GPU: {device_name}") return "cuda" elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): logger.info("🍎 Using Apple Silicon GPU (MPS)") return "mps" else: logger.info("πŸ’» Using CPU (GPU not available)") return "cpu" # ==================== IMAGE PREPROCESSING ==================== def preprocess_image(image: Image.Image) -> Image.Image: """Enhanced image preprocessing for better recognition.""" # Convert to RGB if needed if image.mode != "RGB": image = image.convert("RGB") # Enhance image quality enhancer = ImageEnhance.Sharpness(image) image = enhancer.enhance(1.2) # +20% sharpness enhancer = ImageEnhance.Contrast(image) image = enhancer.enhance(1.15) # +15% contrast # Resize if too large (maintain aspect ratio) if max(image.size) > MAX_IMAGE_SIZE: ratio = MAX_IMAGE_SIZE / max(image.size) new_size = tuple(int(dim * ratio) for dim in image.size) image = image.resize(new_size, Image.Resampling.LANCZOS) return image # ==================== MULTI-API NUTRITION LOOKUP ==================== async def search_usda_nutrition(food_name: str) -> Optional[Dict[str, Any]]: """Search USDA FoodData Central for nutrition information.""" try: search_term = re.sub(r'[^a-zA-Z\s]', '', food_name.lower()) search_url = f"{USDA_API_BASE}/foods/search" async with aiohttp.ClientSession() as session: params = { "query": search_term, "dataType": "Foundation,SR Legacy", "pageSize": 5, "api_key": USDA_API_KEY } async with session.get(search_url, params=params) as response: if response.status == 200: data = await response.json() if data.get("foods") and len(data["foods"]) > 0: food = data["foods"][0] nutrients = {} for nutrient in food.get("foodNutrients", []): nutrient_name = nutrient.get("nutrientName", "").lower() value = nutrient.get("value", 0) if "energy" in nutrient_name and value > 0: nutrients["calories"] = round(value) elif "protein" in nutrient_name and value > 0: nutrients["protein"] = round(value, 1) elif "carbohydrate" in nutrient_name and "fiber" not in nutrient_name and value > 0: nutrients["carbs"] = round(value, 1) elif ("total lipid" in nutrient_name or ("fat" in nutrient_name and "fatty" not in nutrient_name)) and value > 0: nutrients["fat"] = round(value, 1) if len(nutrients) >= 3: # Need at least 3 main nutrients nutrition_data = { "calories": nutrients.get("calories", 0), "protein": nutrients.get("protein", 0.0), "carbs": nutrients.get("carbs", 0.0), "fat": nutrients.get("fat", 0.0) } logger.info(f"πŸ‡ΊπŸ‡Έ USDA nutrition found for '{food_name}': {nutrition_data}") return nutrition_data except Exception as e: logger.warning(f"⚠️ USDA lookup failed for '{food_name}': {e}") return None async def search_edamam_nutrition(food_name: str) -> Optional[Dict[str, Any]]: """Search Edamam Nutrition API for food data.""" if not EDAMAM_APP_ID or not EDAMAM_APP_KEY: return None try: async with aiohttp.ClientSession() as session: params = { "app_id": EDAMAM_APP_ID, "app_key": EDAMAM_APP_KEY, "ingr": f"1 serving {food_name}" } async with session.get(EDAMAM_API_BASE, params=params) as response: if response.status == 200: data = await response.json() if data.get("calories") and data.get("calories") > 0: nutrition_data = { "calories": round(data.get("calories", 0)), "protein": round(data.get("totalNutrients", {}).get("PROCNT", {}).get("quantity", 0), 1), "carbs": round(data.get("totalNutrients", {}).get("CHOCDF", {}).get("quantity", 0), 1), "fat": round(data.get("totalNutrients", {}).get("FAT", {}).get("quantity", 0), 1) } logger.info(f"πŸ₯— Edamam nutrition found for '{food_name}': {nutrition_data}") return nutrition_data except Exception as e: logger.warning(f"⚠️ Edamam lookup failed for '{food_name}': {e}") return None async def search_spoonacular_nutrition(food_name: str) -> Optional[Dict[str, Any]]: """Search Spoonacular API for ingredient nutrition.""" if not SPOONACULAR_API_KEY: return None try: # First search for ingredient ID search_url = f"{SPOONACULAR_API_BASE}/search" async with aiohttp.ClientSession() as session: params = { "query": food_name, "number": 1, "apiKey": SPOONACULAR_API_KEY } async with session.get(search_url, params=params) as response: if response.status == 200: data = await response.json() if data.get("results") and len(data["results"]) > 0: ingredient_id = data["results"][0]["id"] # Get nutrition info for ingredient nutrition_url = f"{SPOONACULAR_API_BASE}/{ingredient_id}/information" nutrition_params = { "amount": 100, "unit": "grams", "apiKey": SPOONACULAR_API_KEY } async with session.get(nutrition_url, params=nutrition_params) as nutrition_response: if nutrition_response.status == 200: nutrition_data_raw = await nutrition_response.json() if nutrition_data_raw.get("nutrition"): nutrients = nutrition_data_raw["nutrition"]["nutrients"] nutrition_data = { "calories": 0, "protein": 0.0, "carbs": 0.0, "fat": 0.0 } for nutrient in nutrients: name = nutrient.get("name", "").lower() amount = nutrient.get("amount", 0) if "calories" in name or "energy" in name: nutrition_data["calories"] = round(amount) elif "protein" in name: nutrition_data["protein"] = round(amount, 1) elif "carbohydrates" in name: nutrition_data["carbs"] = round(amount, 1) elif "fat" in name and "fatty" not in name: nutrition_data["fat"] = round(amount, 1) if nutrition_data["calories"] > 0: logger.info(f"πŸ₯„ Spoonacular nutrition found for '{food_name}': {nutrition_data}") return nutrition_data except Exception as e: logger.warning(f"⚠️ Spoonacular lookup failed for '{food_name}': {e}") return None def clean_food_name_for_search(raw_name: str) -> str: """Smart cleaning of Food-101 names for better API searches.""" # Remove underscores and replace with spaces cleaned = raw_name.replace("_", " ") # Handle comma-separated names - take the first part (usually English name) # Example: "Pineapple, Ananas" β†’ "Pineapple" if "," in cleaned: parts = cleaned.split(",") # Try to detect which part is English (usually the first one) # Keep the part that's more likely to be in nutrition databases cleaned = parts[0].strip() logger.info(f"🧹 Cleaned comma-separated name: '{raw_name}' β†’ '{cleaned}'") # Remove common Food-101 artifacts cleaned = re.sub(r'\b(and|with|the|a)\b', ' ', cleaned, flags=re.IGNORECASE) # Handle specific Food-101 patterns replacements = { "cup cakes": "cupcakes", "ice cream": "ice cream", "hot dog": "hot dog", "french fries": "french fries", "shrimp and grits": "shrimp grits", "macaroni and cheese": "mac and cheese" } for old, new in replacements.items(): if old in cleaned.lower(): cleaned = new break # Clean whitespace and extra punctuation cleaned = re.sub(r'\s+', ' ', cleaned).strip() cleaned = re.sub(r'[^\w\s-]', '', cleaned) # Remove special chars except hyphens return cleaned async def search_openfoodfacts_nutrition(food_name: str) -> Optional[Dict[str, Any]]: """Search OpenFoodFacts database for nutrition information.""" try: # OpenFoodFacts search endpoint search_url = f"{OPENFOODFACTS_API_BASE}/search" async with aiohttp.ClientSession() as session: params = { "search_terms": food_name, "search_simple": 1, "action": "process", "fields": "product_name,nutriments,nutriscore_grade", "page_size": 10, "json": 1 } async with session.get(search_url, params=params) as response: if response.status == 200: data = await response.json() products = data.get("products", []) if products: # Take the first product with nutrition data for product in products: nutriments = product.get("nutriments", {}) if nutriments.get("energy-kcal_100g") and nutriments.get("energy-kcal_100g") > 0: nutrition_data = { "calories": round(nutriments.get("energy-kcal_100g", 0)), "protein": round(nutriments.get("proteins_100g", 0), 1), "carbs": round(nutriments.get("carbohydrates_100g", 0), 1), "fat": round(nutriments.get("fat_100g", 0), 1) } logger.info(f"🌍 OpenFoodFacts nutrition found for '{food_name}': {nutrition_data}") return nutrition_data except Exception as e: logger.warning(f"⚠️ OpenFoodFacts lookup failed for '{food_name}': {e}") return None async def search_foodrepo_nutrition(food_name: str) -> Optional[Dict[str, Any]]: """Search FoodRepo database for nutrition information.""" try: # FoodRepo search endpoint search_url = f"{FOODREPO_API_BASE}/products" async with aiohttp.ClientSession() as session: params = { "q": food_name, "limit": 5 } async with session.get(search_url, params=params) as response: if response.status == 200: data = await response.json() if data.get("data") and len(data["data"]) > 0: product = data["data"][0] nutrients = product.get("nutrients", {}) if nutrients.get("energy"): nutrition_data = { "calories": round(nutrients.get("energy", {}).get("per100g", 0)), "protein": round(nutrients.get("protein", {}).get("per100g", 0), 1), "carbs": round(nutrients.get("carbohydrate", {}).get("per100g", 0), 1), "fat": round(nutrients.get("fat", {}).get("per100g", 0), 1) } if nutrition_data["calories"] > 0: logger.info(f"πŸ₯¬ FoodRepo nutrition found for '{food_name}': {nutrition_data}") return nutrition_data except Exception as e: logger.warning(f"⚠️ FoodRepo lookup failed for '{food_name}': {e}") return None async def get_nutrition_from_apis(food_name: str) -> Dict[str, Any]: """Get nutrition data from multiple FREE databases with comprehensive fallback.""" # Clean the Food-101 name for better searches cleaned_name = clean_food_name_for_search(food_name) logger.info(f"πŸ” Searching nutrition for: '{food_name}' β†’ '{cleaned_name}'") # Try APIs in order: Free/Unlimited first, then limited APIs nutrition_sources = [ ("OpenFoodFacts", search_openfoodfacts_nutrition), # FREE, 2M+ products ("USDA", search_usda_nutrition), # FREE, comprehensive US ("FoodRepo", search_foodrepo_nutrition), # FREE, European focus ("Edamam", search_edamam_nutrition), # 1000/month limit ("Spoonacular", search_spoonacular_nutrition) # 150/day limit ] # First attempt with cleaned name for source_name, search_func in nutrition_sources: try: nutrition_data = await search_func(cleaned_name) if nutrition_data and nutrition_data.get("calories", 0) > 0: nutrition_data["source"] = source_name logger.info(f"βœ… Found nutrition data from {source_name} for '{cleaned_name}'") return nutrition_data except Exception as e: logger.warning(f"⚠️ {source_name} search failed for '{cleaned_name}': {e}") continue # If cleaned name failed and it's different from original, try original name too if cleaned_name.lower() != food_name.lower(): logger.info(f"πŸ”„ Retrying with original name: '{food_name}'") for source_name, search_func in nutrition_sources: try: nutrition_data = await search_func(food_name) if nutrition_data and nutrition_data.get("calories", 0) > 0: nutrition_data["source"] = source_name logger.info(f"βœ… Found nutrition data from {source_name} for original '{food_name}'") return nutrition_data except Exception as e: logger.warning(f"⚠️ {source_name} search failed for original '{food_name}': {e}") continue # Try with just the first word as last resort (e.g., "pineapple juice" β†’ "pineapple") words = cleaned_name.split() if len(words) > 1: first_word = words[0] logger.info(f"πŸ”„ Last resort: trying first word only: '{first_word}'") for source_name, search_func in nutrition_sources: try: nutrition_data = await search_func(first_word) if nutrition_data and nutrition_data.get("calories", 0) > 0: nutrition_data["source"] = f"{source_name} (matched: {first_word})" logger.info(f"βœ… Found nutrition data from {source_name} for '{first_word}'") return nutrition_data except Exception as e: logger.warning(f"⚠️ {source_name} search failed for '{first_word}': {e}") continue # All APIs failed, return default values logger.warning(f"🚨 No nutrition data found for '{food_name}' after all attempts, using defaults") default_nutrition = DEFAULT_NUTRITION.copy() default_nutrition["source"] = "Default (APIs unavailable)" return default_nutrition # ==================== TRANSLATION SYSTEM ==================== # In-memory translation cache to reduce API calls translation_cache: Dict[str, Dict[str, str]] = {} # {locale: {english: translated}} # Language code mapping (i18n locale β†’ full language name) LANGUAGE_MAP = { "en": "English", "bs": "Bosnian", "de": "German", "es": "Spanish", "fr": "French", "it": "Italian", "pt": "Portuguese", "ar": "Arabic", "tr": "Turkish", "nl": "Dutch", "ru": "Russian", "zh": "Chinese", "ja": "Japanese", "ko": "Korean", "hi": "Hindi", "sr": "Serbian", "hr": "Croatian", "sq": "Albanian", "mk": "Macedonian", } async def translate_food_names_batch(food_names: List[str], target_locale: str) -> Dict[str, str]: """ Translate multiple food names in a single API call (COST OPTIMIZATION). Args: food_names: List of food names in English target_locale: Target language code Returns: Dictionary mapping original names to translated names """ # Skip translation if target is English or no OpenAI client if target_locale == "en" or not openai_client or not OPENAI_API_KEY: return {name: name for name in food_names} # Check cache first if target_locale not in translation_cache: translation_cache[target_locale] = {} translations = {} needs_translation = [] # Separate cached and uncached items for name in food_names: if name in translation_cache[target_locale]: translations[name] = translation_cache[target_locale][name] logger.info(f"πŸ’Ύ Cache hit: '{name}' β†’ '{translations[name]}' ({target_locale})") else: needs_translation.append(name) # If all cached, return immediately if not needs_translation: return translations # Get target language name target_language = LANGUAGE_MAP.get(target_locale, target_locale) try: logger.info(f"🌐 Batch translating {len(needs_translation)} items to {target_language}") # Create batch translation prompt (1 API call for multiple items) food_list = "\n".join(f"{i+1}. {name}" for i, name in enumerate(needs_translation)) response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": f"You are a professional food translator. Translate food names to {target_language}. Return ONLY the translations, one per line, in the same order. Keep it natural and commonly used." }, { "role": "user", "content": f"Translate these food names to {target_language}:\n{food_list}" } ], max_tokens=150, temperature=0.3, ) translated_lines = response.choices[0].message.content.strip().split('\n') # Parse translations and update cache for i, name in enumerate(needs_translation): if i < len(translated_lines): # Remove numbering if present (e.g., "1. Ananas" β†’ "Ananas") translated = translated_lines[i].strip() translated = translated.split('. ', 1)[-1] if '. ' in translated else translated translations[name] = translated translation_cache[target_locale][name] = translated logger.info(f"βœ… '{name}' β†’ '{translated}'") return translations except Exception as e: logger.warning(f"⚠️ Batch translation failed: {e}") # Return originals on failure for name in needs_translation: translations[name] = name return translations async def translate_food_name(food_name: str, target_locale: str) -> str: """ Translate single food name (uses batch function internally for caching). Args: food_name: Food name in English target_locale: Target language code Returns: Translated food name or original if translation fails/not needed """ result = await translate_food_names_batch([food_name], target_locale) return result.get(food_name, food_name) async def translate_description(description: str, target_locale: str) -> str: """ Translate food description to target language using OpenAI with caching. Args: description: Description in English target_locale: Target language code Returns: Translated description or original if translation fails/not needed """ # Skip translation if target is English or no OpenAI client if target_locale == "en" or not openai_client or not OPENAI_API_KEY: return description # Simple cache key (hash of description + locale) cache_key = f"desc_{hash(description)}_{target_locale}" # Check if cached in locale cache if target_locale not in translation_cache: translation_cache[target_locale] = {} if cache_key in translation_cache[target_locale]: logger.info(f"πŸ’Ύ Description cache hit ({target_locale})") return translation_cache[target_locale][cache_key] # Get target language name target_language = LANGUAGE_MAP.get(target_locale, target_locale) try: logger.info(f"🌐 Translating description to {target_language}") response = await openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": f"You are a food description translator. Translate to {target_language}. Keep it natural and concise. Return ONLY the translation." }, { "role": "user", "content": description } ], max_tokens=100, temperature=0.3, ) translated = response.choices[0].message.content.strip() # Cache the result translation_cache[target_locale][cache_key] = translated logger.info(f"βœ… Description translated to {target_language}") return translated except Exception as e: logger.warning(f"⚠️ Description translation failed: {e}") return description # ==================== MULTI-MODEL FOOD RECOGNIZER ==================== class MultiModelFoodRecognizer: """Production-ready multi-model ensemble for comprehensive food recognition.""" def __init__(self, device: str): self.device = device self.models = {} self.processors = {} self.is_loaded = False self.available_models = [] self._initialize_models() self._warm_up() def _initialize_models(self): """Initialize all available food recognition models.""" logger.info("πŸš€ Initializing multi-model food recognition system...") for model_key, model_config in FOOD_MODELS.items(): try: logger.info(f"πŸ“¦ Loading {model_config['description']}...") model_name = model_config["model_name"] # Load processor and model processor = AutoImageProcessor.from_pretrained(model_name) model = AutoModelForImageClassification.from_pretrained(model_name) # Move to device and optimize model = model.to(self.device) model.eval() # Memory optimization (skip torch.compile for MPS) if hasattr(torch, 'compile') and self.device != "mps": try: model = torch.compile(model) logger.info(f"⚑ {model_key} compiled with torch.compile") except Exception: logger.info(f"⚠️ torch.compile failed for {model_key}, using standard model") else: logger.info(f"ℹ️ Using standard model for {model_key} (torch.compile disabled for MPS)") self.models[model_key] = model self.processors[model_key] = processor self.available_models.append(model_key) logger.info(f"βœ… {model_config['description']} loaded successfully") except Exception as e: logger.warning(f"⚠️ Failed to load {model_key}: {e}") continue if self.available_models: self.is_loaded = True logger.info(f"🎯 Multi-model system ready with {len(self.available_models)} models: {self.available_models}") else: raise RuntimeError("❌ No models could be loaded!") def _warm_up(self): """Warm up all loaded models.""" if not self.available_models: return try: logger.info("πŸ”₯ Warming up all models...") # Create dummy image dummy_image = Image.new('RGB', (224, 224), color='red') for model_key in self.available_models: try: processor = self.processors[model_key] model = self.models[model_key] with torch.no_grad(): inputs = processor(images=dummy_image, return_tensors="pt") inputs = {k: v.to(self.device) for k, v in inputs.items()} _ = model(**inputs) logger.info(f"βœ… {model_key} warmed up") except Exception as e: logger.warning(f"⚠️ Warm-up failed for {model_key}: {e}") # Clean up del dummy_image if self.device == "cuda": torch.cuda.empty_cache() gc.collect() logger.info("βœ… All models warm-up completed") except Exception as e: logger.warning(f"⚠️ Model warm-up failed: {e}") def _predict_with_model(self, image: Image.Image, model_key: str, top_k: int = 5) -> Optional[List[Dict[str, Any]]]: """Predict with a specific model.""" try: if model_key not in self.available_models: return None processor = self.processors[model_key] model = self.models[model_key] # Preprocess image processed_image = preprocess_image(image) # Prepare inputs inputs = processor(images=processed_image, return_tensors="pt") inputs = {k: v.to(self.device) for k, v in inputs.items()} # Inference with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits probs = F.softmax(logits, dim=-1).cpu().numpy()[0] # Get top K predictions top_indices = np.argsort(probs)[::-1][:top_k] predictions = [] for idx in top_indices: # Handle different model output formats if hasattr(model.config, 'id2label') and str(idx) in model.config.id2label: label = model.config.id2label[str(idx)] elif hasattr(model.config, 'id2label') and idx in model.config.id2label: label = model.config.id2label[idx] else: label = f"class_{idx}" confidence = float(probs[idx]) # Clean label name clean_name = label.replace("_", " ").title() predictions.append({ "label": clean_name, "raw_label": label, "confidence": confidence, "confidence_pct": f"{confidence:.1%}", "model": model_key, "model_type": FOOD_MODELS[model_key]["type"] }) # Clean up memory del inputs, outputs, logits, probs if self.device == "cuda": torch.cuda.empty_cache() return predictions except Exception as e: logger.warning(f"⚠️ Prediction failed for {model_key}: {e}") return None def predict(self, image: Image.Image, top_k: int = 5) -> Dict[str, Any]: """Main predict method - uses ensemble if available, fallback to primary.""" return self.predict_ensemble(image, top_k) def predict_ensemble(self, image: Image.Image, top_k: int = 10) -> Dict[str, Any]: """Ensemble prediction using all available models with smart filtering.""" if not self.is_loaded: raise RuntimeError("Models not loaded") all_predictions = [] model_results = {} # Get MORE predictions from all models (top 15 instead of 5) predictions_per_model = 15 for model_key in self.available_models: predictions = self._predict_with_model(image, model_key, predictions_per_model) if predictions: model_results[model_key] = predictions all_predictions.extend(predictions) if not all_predictions: raise RuntimeError("No models produced valid predictions") # NON-FOOD items that should be COMPLETELY FILTERED OUT non_food_items = { # Kitchen utensils & cookware 'plate', 'dish', 'bowl', 'cup', 'glass', 'mug', 'spoon', 'fork', 'knife', 'spatula', 'pan', 'pot', 'tray', 'napkin', 'table', 'cloth', 'placemat', 'chopsticks', 'straw', 'bottle', 'container', 'lid', 'wrapper', 'packaging', 'cutting board', 'grater', 'whisk', 'ladle', 'tongs', 'peeler', 'sieve', 'colander', 'mixer', 'blender', 'toaster', 'oven', 'microwave', 'fridge', 'freezer', 'dishwasher', 'sink', 'counter', 'shelf', 'cabinet', 'drawer', 'waffle iron', 'frying pan', 'frypan', 'skillet', 'saucepan', 'stockpot', 'baking sheet', 'baking pan', 'baking dish', 'loaf pan', 'muffin tin', 'rolling pin', 'measuring cup', 'measuring spoon', 'kitchen scale', 'bakery', 'bakeshop', 'bakehouse', 'restaurant', 'kitchen', 'dining room', # Animals (NOT food!) 'dog', 'cat', 'bird', 'fish', 'horse', 'cow', 'pig', 'chicken', 'terrier', 'retriever', 'bulldog', 'poodle', 'beagle', 'dachshund', 'lobster', 'crab', 'shrimp', 'hunting dog', 'hyena', 'wolf', 'fox', # Objects/Electronics 'joystick', 'controller', 'remote', 'phone', 'computer', 'mouse', 'keyboard', 'water jug', 'jug', 'pitcher', 'vase', 'flowerpot' } # Generic FOOD terms that should be deprioritized (but not removed) generic_terms = { 'fruit', 'vegetable', 'food', 'meal', 'snack', 'dessert', 'salad', 'soup', 'drink', 'beverage', 'meat', 'fish', 'seafood', 'bread', 'pastry', 'cake', 'cookie', 'candy', 'chocolate' } # Ensemble voting: weight by model priority and confidence food_scores = {} filtered_count = 0 for pred in all_predictions: food_label_lower = pred["raw_label"].lower().replace("_", " ") # FILTER OUT non-food items completely is_non_food = any(non_food in food_label_lower for non_food in non_food_items) if is_non_food: filtered_count += 1 logger.info(f"🚫 Filtered non-food item: '{pred['raw_label']}'") continue # Skip this prediction entirely model_key = pred["model"] priority_weight = 1.0 / FOOD_MODELS[model_key]["priority"] # Higher priority = lower number = higher weight confidence_weight = pred["confidence"] # PENALTY for generic terms - reduce their score significantly is_generic = any(generic in food_label_lower for generic in generic_terms) # If it's a single-word generic term, penalize it even more is_single_generic = food_label_lower in generic_terms if is_single_generic: combined_score = priority_weight * confidence_weight * 0.1 # 90% penalty elif is_generic: combined_score = priority_weight * confidence_weight * 0.5 # 50% penalty else: combined_score = priority_weight * confidence_weight # Full score for specific items food_name = pred["raw_label"] if food_name not in food_scores: food_scores[food_name] = { "total_score": 0, "count": 0, "best_prediction": pred, "models": [], "is_generic": is_generic } food_scores[food_name]["total_score"] += combined_score food_scores[food_name]["count"] += 1 food_scores[food_name]["models"].append(model_key) # Keep the prediction with highest confidence as representative if pred["confidence"] > food_scores[food_name]["best_prediction"]["confidence"]: food_scores[food_name]["best_prediction"] = pred if filtered_count > 0: logger.info(f"βœ… Filtered out {filtered_count} non-food items") # Sort by ensemble score sorted_foods = sorted( food_scores.items(), key=lambda x: x[1]["total_score"], reverse=True ) # Format final results - return MORE alternatives (up to top_k) final_predictions = [] for food_name, data in sorted_foods[:top_k * 2]: # Get double to have enough after filtering pred = data["best_prediction"].copy() pred["ensemble_score"] = data["total_score"] pred["model_count"] = data["count"] pred["contributing_models"] = data["models"] pred["is_generic"] = data["is_generic"] final_predictions.append(pred) # Remove duplicates AND non-food items (double check) filtered_predictions = [] seen_labels = set() for pred in final_predictions: label_lower = pred["raw_label"].lower().replace("_", " ").strip() # DOUBLE CHECK: Filter non-food items again is_non_food = any(non_food in label_lower for non_food in non_food_items) if is_non_food: continue # Skip non-food items # Skip if we've already seen very similar label if label_lower not in seen_labels: filtered_predictions.append(pred) seen_labels.add(label_lower) if len(filtered_predictions) >= top_k: break # Primary result - prefer specific over generic AND high confidence primary = filtered_predictions[0] if filtered_predictions else { "label": "Unknown Food", "raw_label": "unknown", "confidence": 0.0, "ensemble_score": 0.0, "model_count": 0, "contributing_models": [], "is_generic": False } # QUALITY CHECK: If primary confidence is < 10%, try to find better alternative MIN_CONFIDENCE = 0.10 # 10% if primary.get("confidence", 0) < MIN_CONFIDENCE and len(filtered_predictions) > 1: logger.warning(f"⚠️ Low confidence ({primary['confidence']:.1%}) for '{primary['label']}', checking alternatives...") # Find first alternative with higher confidence for i, pred in enumerate(filtered_predictions[1:], 1): if pred.get("confidence", 0) >= MIN_CONFIDENCE / 2: # At least 5% filtered_predictions[0], filtered_predictions[i] = filtered_predictions[i], filtered_predictions[0] primary = filtered_predictions[0] logger.info(f"πŸ”„ Swapped low-confidence primary with better alternative: {primary['label']} ({primary['confidence']:.1%})") break # If primary is generic but we have specific alternatives, swap them if primary.get("is_generic") and len(filtered_predictions) > 1: for i, pred in enumerate(filtered_predictions[1:], 1): if not pred.get("is_generic"): # Swap primary with this specific prediction filtered_predictions[0], filtered_predictions[i] = filtered_predictions[i], filtered_predictions[0] primary = filtered_predictions[0] logger.info(f"πŸ”„ Swapped generic primary with specific: {primary['label']}") break return { "success": True, "label": primary["label"], "confidence": primary["confidence"], "primary_label": primary["raw_label"], "ensemble_score": primary.get("ensemble_score", 0), "alternatives": filtered_predictions[1:], # Now returns up to 9 alternatives "model_results": model_results, "system_info": { "available_models": self.available_models, "device": self.device.upper(), "total_classes": sum(FOOD_MODELS[m]["classes"] for m in self.available_models) } } # ==================== LIFESPAN EVENTS ==================== @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager.""" # Startup logger.info("πŸš€ Application startup complete") logger.info("=" * 60) logger.info("βœ… API READY FOR PRODUCTION") logger.info(f"πŸ“‘ Endpoints: /api/nutrition/analyze-food, /analyze") logger.info(f"πŸ–₯️ Device: {device.upper()}") logger.info(f"πŸ“Š Models: {len(recognizer.available_models)} active models") logger.info(f"🎯 Total Food Categories: {sum(FOOD_MODELS[m]['classes'] for m in recognizer.available_models)}") logger.info(f"🌐 Translations: {'βœ… Enabled' if openai_client else '❌ Disabled'}") logger.info("=" * 60) yield # Shutdown logger.info("πŸ”„ Shutting down...") # Cleanup GPU memory if device == "cuda": torch.cuda.empty_cache() # Garbage collection gc.collect() logger.info("βœ… Cleanup completed") # ==================== FASTAPI SETUP ==================== logger.info("=" * 60) logger.info("🍽️ PRODUCTION AI FOOD RECOGNITION API") logger.info("=" * 60) # Initialize multi-model system device = select_device() recognizer = MultiModelFoodRecognizer(device) # Initialize OpenAI client BEFORE FastAPI app if OPENAI_API_KEY: try: openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY) logger.info(f"βœ… OpenAI client initialized (key: {OPENAI_API_KEY[:20]}...)") except Exception as e: logger.warning(f"⚠️ OpenAI client initialization failed: {e}") openai_client = None else: logger.warning("⚠️ OpenAI API key not found - translations disabled") # Create FastAPI app app = FastAPI( title="AI Food Recognition API", description="Production-ready food recognition with 101 categories (Food-101 dataset)", version="2.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["*"], ) # ==================== MIDDLEWARE ==================== @app.middleware("http") async def add_security_headers(request: Request, call_next): response = await call_next(request) response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" return response # ==================== UTILITY FUNCTIONS ==================== async def validate_and_read_image(file: UploadFile) -> Image.Image: """Validate and read uploaded image file.""" # Check file size if hasattr(file, 'size') and file.size > MAX_FILE_SIZE: raise HTTPException(status_code=413, detail="File too large (max 10MB)") # Check content type if file.content_type not in ALLOWED_TYPES: raise HTTPException( status_code=400, detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_TYPES)}" ) try: # Read and validate image contents = await file.read() if len(contents) > MAX_FILE_SIZE: raise HTTPException(status_code=413, detail="File too large (max 10MB)") image = Image.open(BytesIO(contents)) return image except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid image file: {str(e)}") # ==================== API ENDPOINTS ==================== @app.get("/") def root(): """Root endpoint with API information.""" return { "message": "🍽️ AI Food Recognition API", "status": "online", "version": "2.0.0", "models": recognizer.available_models if recognizer.is_loaded else [], "total_categories": sum(FOOD_MODELS[m]["classes"] for m in recognizer.available_models) if recognizer.is_loaded else 0, "device": device.upper(), "endpoints": { "POST /api/nutrition/analyze-food": "Analyze food image (Next.js frontend)", "POST /analyze": "Analyze food image (Hugging Face Spaces)", "GET /health": "Health check", "GET /docs": "API documentation" } } @app.get("/health") def health_check(): """Comprehensive health check.""" return { "status": "healthy" if recognizer.is_loaded else "error", "models_loaded": recognizer.is_loaded, "available_models": recognizer.available_models if recognizer.is_loaded else [], "model_count": len(recognizer.available_models) if recognizer.is_loaded else 0, "total_categories": sum(FOOD_MODELS[m]["classes"] for m in recognizer.available_models) if recognizer.is_loaded else 0, "device": device.upper(), "memory_usage": f"{torch.cuda.memory_allocated() / 1024**2:.1f}MB" if device == "cuda" else "N/A" } @app.post("/api/nutrition/analyze-food") async def analyze_food_nutrition(request: Request, file: UploadFile = File(None)): """ Analyze food image or manual entry for Next.js frontend. Supports two modes: 1. Image upload: AI recognition + nutrition lookup 2. Manual entry: Direct nutrition lookup by food name Returns nutrition-focused response format with translations. """ try: # Parse form data form_data = await request.form() manual_input = form_data.get("manualInput", "false").lower() == "true" locale = form_data.get("locale", "en") # Get user's language preference logger.info(f"πŸ“₯ Request received - Mode: {'Manual' if manual_input else 'Image'}, Locale: {locale}") # MODE 1: Manual food entry (from alternatives or manual input) if manual_input: food_name = form_data.get("manualFoodName") serving_size = form_data.get("manualServingSize", "100") serving_unit = form_data.get("manualServingUnit", "g") description = form_data.get("manualDescription", "") if not food_name: raise HTTPException(status_code=400, detail="manualFoodName is required for manual entry") logger.info(f"🍽️ Manual nutrition lookup: {food_name} ({serving_size}{serving_unit})") # Direct nutrition API lookup nutrition_data = await get_nutrition_from_apis(food_name) if not nutrition_data or nutrition_data.get("calories", 0) == 0: raise HTTPException( status_code=404, detail=f"Failed to retrieve nutrition information for manual entry" ) source = nutrition_data.get("source", "Unknown") logger.info(f"βœ… Manual lookup: {food_name} | Nutrition: {source}") # Translate food name and description translated_name = await translate_food_name(food_name, locale) base_description = description or f"Manual entry: {food_name}" translated_description = await translate_description(base_description, locale) # Return manual entry format return JSONResponse(content={ "data": { "label": translated_name, "confidence": 1.0, # Manual entry has 100% confidence "nutrition": { "calories": nutrition_data["calories"], "protein": nutrition_data["protein"], "carbs": nutrition_data["carbs"], "fat": nutrition_data["fat"] }, "servingSize": serving_size, "servingUnit": serving_unit, "description": translated_description, "alternatives": [], # No alternatives for manual entry "source": f"{source} Database", "isManualEntry": True } }) # MODE 2: Image upload (AI recognition) else: if not file: raise HTTPException(status_code=400, detail="File is required for image analysis") logger.info(f"🍽️ Image analysis request: {file.filename}") # Validate and process image image = await validate_and_read_image(file) # Step 1: AI Model Prediction (request top 10 for more alternatives) results = recognizer.predict(image, top_k=10) # Step 2: API Nutrition Lookup nutrition_data = await get_nutrition_from_apis(results["primary_label"]) # Log result confidence_pct = f"{results['confidence']:.1%}" source = nutrition_data.get("source", "Unknown") logger.info(f"βœ… Prediction: {results['label']} ({confidence_pct}) | Nutrition: {source}") # BATCH TRANSLATION OPTIMIZATION: Translate all food names at once if locale != "en" and openai_client: # Collect all names to translate (primary + alternatives) names_to_translate = [results["label"]] if results.get("alternatives"): names_to_translate.extend([ alt.get("label", alt.get("raw_label", "")) for alt in results["alternatives"] ]) # Single API call for all translations translations = await translate_food_names_batch(names_to_translate, locale) # Apply translations translated_name = translations.get(results["label"], results["label"]) # Translate description base_description = f"{results['label']} identified with {int(results['confidence'] * 100)}% confidence" translated_description = await translate_description(base_description, locale) # Map alternatives with translations translated_alternatives = [] if results.get("alternatives"): for alt in results["alternatives"]: alt_name = alt.get("label", alt.get("raw_label", "")) translated_alternatives.append({ **alt, "label": translations.get(alt_name, alt_name), "original_label": alt_name }) else: # No translation needed translated_name = results["label"] translated_description = f"{results['label']} identified with {int(results['confidence'] * 100)}% confidence" translated_alternatives = results["alternatives"] # Return frontend-expected format return JSONResponse(content={ "data": { "label": translated_name, "confidence": results["confidence"], "description": translated_description, # Translated description "nutrition": { "calories": nutrition_data["calories"], "protein": nutrition_data["protein"], "carbs": nutrition_data["carbs"], "fat": nutrition_data["fat"] }, "alternatives": translated_alternatives, "source": f"AI Recognition + {source} Database", "isManualEntry": False, "locale": locale # Return locale for debugging } }) except HTTPException: raise except Exception as e: logger.error(f"❌ Analysis failed: {e}") raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") @app.post("/analyze") async def analyze_food_spaces(file: UploadFile = File(...)): """ Analyze food image for Hugging Face Spaces interface. Returns detailed response with model info. """ logger.info(f"πŸš€ HF Spaces analysis request: {file.filename}") try: # Validate and process image image = await validate_and_read_image(file) # Step 1: AI Model Prediction (request top 10 for more alternatives) results = recognizer.predict(image, top_k=10) # Step 2: API Nutrition Lookup nutrition_data = await get_nutrition_from_apis(results["primary_label"]) # Log result confidence_pct = f"{results['confidence']:.1%}" source = nutrition_data.get("source", "Unknown") logger.info(f"βœ… Prediction: {results['label']} ({confidence_pct}) | Nutrition: {source}") # Return full response with nutrition data enhanced_results = results.copy() enhanced_results["nutrition"] = nutrition_data enhanced_results["data_source"] = source return JSONResponse(content=enhanced_results) except HTTPException: raise except Exception as e: logger.error(f"❌ Analysis failed: {e}") raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") # ==================== MAIN ==================== if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) logger.info("🎯 Starting production server...") uvicorn.run( app, host="0.0.0.0", port=port, log_level="info", access_log=True )