Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| π½οΈ Production-Ready AI Food Recognition API | |
| =========================================== | |
| FastAPI backend optimized for Hugging Face Spaces deployment. | |
| - Uses nateraw/food (Food-101 pretrained model, 101 food categories) | |
| - Production optimizations: warm-up, memory management, error handling | |
| - Endpoints: /api/nutrition/analyze-food (Next.js) + /analyze (HF Spaces) | |
| - Auto device detection: GPU β MPS β CPU fallback | |
| - Enhanced image preprocessing with contrast/sharpness boost | |
| """ | |
| import os | |
| import gc | |
| import logging | |
| import asyncio | |
| import aiohttp | |
| import re | |
| from typing import Dict, Any, List, Optional | |
| from io import BytesIO | |
| from pathlib import Path | |
| # Load .env file if exists | |
| try: | |
| from dotenv import load_dotenv | |
| env_path = Path(__file__).parent / '.env' | |
| load_dotenv(dotenv_path=env_path) | |
| logging.info(f"β Loaded .env from {env_path}") | |
| except ImportError: | |
| logging.warning("β οΈ python-dotenv not installed, using system environment variables") | |
| except Exception as e: | |
| logging.warning(f"β οΈ Could not load .env: {e}") | |
| import torch | |
| import torch.nn.functional as F | |
| from PIL import Image, ImageEnhance | |
| import numpy as np | |
| from fastapi import FastAPI, File, UploadFile, HTTPException, Request, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| import uvicorn | |
| from transformers import AutoImageProcessor, AutoModelForImageClassification | |
| from contextlib import asynccontextmanager | |
| # OpenAI for translations | |
| from openai import AsyncOpenAI | |
| # ==================== CONFIGURATION ==================== | |
| MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB | |
| MAX_IMAGE_SIZE = 512 | |
| ALLOWED_TYPES = ["image/jpeg", "image/jpg", "image/png", "image/webp"] | |
| # OpenAI Configuration (will be initialized after logger is set up) | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "") | |
| openai_client = None # Will be initialized in lifespan startup | |
| # ==================== MULTI-MODEL FOOD RECOGNITION ==================== | |
| FOOD_MODELS = { | |
| # Primary specialize food models | |
| "food101": { | |
| "model_name": "nateraw/food", | |
| "type": "food_specialist", | |
| "classes": 101, | |
| "priority": 1, | |
| "description": "Food-101 specialized model" | |
| }, | |
| "food2k": { | |
| "model_name": "Kaludi/food-category-classification-v2.0", | |
| "type": "food_specialist", | |
| "classes": 2000, | |
| "priority": 2, | |
| "description": "Extended food categories" | |
| }, | |
| "nutrition": { | |
| "model_name": "microsoft/DiT-base-finetuned-SROIE", | |
| "type": "nutrition_labels", | |
| "classes": 1000, | |
| "priority": 3, | |
| "description": "Nutrition label recognition" | |
| }, | |
| # General object detection models that include food | |
| "general_v1": { | |
| "model_name": "google/vit-base-patch16-224", | |
| "type": "general_objects", | |
| "classes": 1000, | |
| "priority": 4, | |
| "description": "ImageNet general objects (includes food)" | |
| }, | |
| "general_v2": { | |
| "model_name": "microsoft/beit-base-patch16-224", | |
| "type": "general_objects", | |
| "classes": 1000, | |
| "priority": 5, | |
| "description": "Microsoft BEiT model" | |
| } | |
| } | |
| # Default primary model | |
| PRIMARY_MODEL = "food101" | |
| # Comprehensive food categories (all possible foods) | |
| COMPREHENSIVE_FOOD_CATEGORIES = { | |
| # Food-101 categories | |
| "pizza", "hamburger", "sushi", "ice_cream", "french_fries", "chicken_wings", | |
| "chocolate_cake", "caesar_salad", "steak", "tacos", "pancakes", "pancake", "lasagna", | |
| "apple_pie", "chicken_curry", "pad_thai", "ramen", "waffles", "waffle", "donuts", | |
| "cheesecake", "fish_and_chips", "fried_rice", "greek_salad", "guacamole", "crepe", "crepes", | |
| # Balkanska/Srpska tradicionalna jela | |
| "cevapi", "cevapcici", "burek", "pljeskavica", "sarma", "klepe", "dolma", | |
| "kajmak", "ajvar", "prebranac", "pasulj", "grah", "punjena_paprika", | |
| "musaka", "japrak", "bamija", "bosanski_lonac", "begova_corba", "tarhana", | |
| "zeljanica", "sirnica", "krompiruΕ‘a", "spanac", "tikvenica", | |
| # VoΔe | |
| "apple", "banana", "orange", "grape", "strawberry", "cherry", "peach", | |
| "pear", "plum", "watermelon", "melon", "lemon", "lime", "kiwi", "mango", | |
| "pineapple", "apricot", "fig", "pomegranate", "blackberry", "raspberry", | |
| "blueberry", "cranberry", "coconut", "avocado", "papaya", "passion_fruit", | |
| # PovrΔe | |
| "tomato", "cucumber", "carrot", "potato", "onion", "garlic", "pepper", | |
| "cabbage", "spinach", "lettuce", "broccoli", "cauliflower", "zucchini", | |
| "eggplant", "celery", "radish", "beet", "sweet_potato", "corn", "peas", | |
| "green_beans", "mushroom", "leek", "parsley", "basil", "mint", "dill", | |
| # Meso i riba | |
| "beef", "pork", "chicken", "lamb", "turkey", "duck", "salmon", "tuna", | |
| "cod", "mackerel", "sardine", "shrimp", "crab", "lobster", "mussels", | |
| "oysters", "squid", "octopus", | |
| # MleΔni proizvodi | |
| "milk", "cheese", "yogurt", "butter", "cream", "sour_cream", "cottage_cheese", | |
| "mozzarella", "cheddar", "parmesan", "feta", "goat_cheese", | |
| # Ε½itarice i leguminoze | |
| "bread", "rice", "pasta", "quinoa", "oats", "wheat", "barley", "lentils", | |
| "chickpeas", "black_beans", "kidney_beans", "soybeans", | |
| # Nuts and seeds | |
| "almond", "walnut", "peanut", "cashew", "pistachio", "hazelnut", "pecan", | |
| "sunflower_seeds", "pumpkin_seeds", "chia_seeds", "flax_seeds", | |
| # MeΔunarodna kuhinja | |
| "spaghetti", "ravioli", "gnocchi", "risotto", "paella", "falafel", "hummus", | |
| "spring_rolls", "dim_sum", "bibimbap", "kimchi", "miso_soup", "tempura", | |
| "curry", "naan", "samosa", "tandoori", "biryani", "tikka_masala", | |
| "enchilada", "quesadilla", "burrito", "nachos", "gazpacho", "paella", | |
| # Deserti i slatkiΕ‘i | |
| "cake", "cookie", "muffin", "brownie", "pie", "tart", "pudding", "mousse", | |
| "gelato", "sorbet", "macaron", "eclair", "profiterole", "tiramisu", | |
| "baklava", "halva", "lokum", "tulumba", "krofne", | |
| # Napici | |
| "coffee", "tea", "juice", "smoothie", "wine", "beer", "cocktail", "soda", | |
| "water", "milk_shake", "lemonade", "kombucha" | |
| } | |
| # ==================== EXTERNAL NUTRITION APIs ==================== | |
| # USDA FoodData Central API (Free, comprehensive US database) | |
| USDA_API_BASE = "https://api.nal.usda.gov/fdc/v1" | |
| USDA_API_KEY = os.environ.get("USDA_API_KEY", "DEMO_KEY") | |
| # Edamam Nutrition Analysis API (Free tier: 1000 requests/month) | |
| EDAMAM_APP_ID = os.environ.get("EDAMAM_APP_ID", "") | |
| EDAMAM_APP_KEY = os.environ.get("EDAMAM_APP_KEY", "") | |
| EDAMAM_API_BASE = "https://api.edamam.com/api/nutrition-data" | |
| # Spoonacular Food API (Free tier: 150 requests/day) | |
| SPOONACULAR_API_KEY = os.environ.get("SPOONACULAR_API_KEY", "") | |
| SPOONACULAR_API_BASE = "https://api.spoonacular.com/food/ingredients" | |
| # OpenFoodFacts API (Completely FREE, 2M+ products worldwide) | |
| OPENFOODFACTS_API_BASE = "https://world.openfoodfacts.org/api/v2" | |
| # FoodRepo API (Free, comprehensive food database) | |
| FOODREPO_API_BASE = "https://www.foodrepo.org/api/v3" | |
| # ==================== LOGGING ==================== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Default fallback nutrition values (used only if all APIs fail) | |
| DEFAULT_NUTRITION = {"calories": 200, "protein": 10.0, "carbs": 25.0, "fat": 8.0} | |
| # ==================== DEVICE SELECTION ==================== | |
| def select_device() -> str: | |
| """Smart device selection with fallback.""" | |
| if torch.cuda.is_available(): | |
| device_name = torch.cuda.get_device_name(0) | |
| logger.info(f"π Using CUDA GPU: {device_name}") | |
| return "cuda" | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| logger.info("π Using Apple Silicon GPU (MPS)") | |
| return "mps" | |
| else: | |
| logger.info("π» Using CPU (GPU not available)") | |
| return "cpu" | |
| # ==================== IMAGE PREPROCESSING ==================== | |
| def preprocess_image(image: Image.Image) -> Image.Image: | |
| """Enhanced image preprocessing for better recognition.""" | |
| # Convert to RGB if needed | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| # Enhance image quality | |
| enhancer = ImageEnhance.Sharpness(image) | |
| image = enhancer.enhance(1.2) # +20% sharpness | |
| enhancer = ImageEnhance.Contrast(image) | |
| image = enhancer.enhance(1.15) # +15% contrast | |
| # Resize if too large (maintain aspect ratio) | |
| if max(image.size) > MAX_IMAGE_SIZE: | |
| ratio = MAX_IMAGE_SIZE / max(image.size) | |
| new_size = tuple(int(dim * ratio) for dim in image.size) | |
| image = image.resize(new_size, Image.Resampling.LANCZOS) | |
| return image | |
| # ==================== MULTI-API NUTRITION LOOKUP ==================== | |
| async def search_usda_nutrition(food_name: str) -> Optional[Dict[str, Any]]: | |
| """Search USDA FoodData Central for nutrition information.""" | |
| try: | |
| search_term = re.sub(r'[^a-zA-Z\s]', '', food_name.lower()) | |
| search_url = f"{USDA_API_BASE}/foods/search" | |
| async with aiohttp.ClientSession() as session: | |
| params = { | |
| "query": search_term, | |
| "dataType": "Foundation,SR Legacy", | |
| "pageSize": 5, | |
| "api_key": USDA_API_KEY | |
| } | |
| async with session.get(search_url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data.get("foods") and len(data["foods"]) > 0: | |
| food = data["foods"][0] | |
| nutrients = {} | |
| for nutrient in food.get("foodNutrients", []): | |
| nutrient_name = nutrient.get("nutrientName", "").lower() | |
| value = nutrient.get("value", 0) | |
| if "energy" in nutrient_name and value > 0: | |
| nutrients["calories"] = round(value) | |
| elif "protein" in nutrient_name and value > 0: | |
| nutrients["protein"] = round(value, 1) | |
| elif "carbohydrate" in nutrient_name and "fiber" not in nutrient_name and value > 0: | |
| nutrients["carbs"] = round(value, 1) | |
| elif ("total lipid" in nutrient_name or ("fat" in nutrient_name and "fatty" not in nutrient_name)) and value > 0: | |
| nutrients["fat"] = round(value, 1) | |
| if len(nutrients) >= 3: # Need at least 3 main nutrients | |
| nutrition_data = { | |
| "calories": nutrients.get("calories", 0), | |
| "protein": nutrients.get("protein", 0.0), | |
| "carbs": nutrients.get("carbs", 0.0), | |
| "fat": nutrients.get("fat", 0.0) | |
| } | |
| logger.info(f"πΊπΈ USDA nutrition found for '{food_name}': {nutrition_data}") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ USDA lookup failed for '{food_name}': {e}") | |
| return None | |
| async def search_edamam_nutrition(food_name: str) -> Optional[Dict[str, Any]]: | |
| """Search Edamam Nutrition API for food data.""" | |
| if not EDAMAM_APP_ID or not EDAMAM_APP_KEY: | |
| return None | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| params = { | |
| "app_id": EDAMAM_APP_ID, | |
| "app_key": EDAMAM_APP_KEY, | |
| "ingr": f"1 serving {food_name}" | |
| } | |
| async with session.get(EDAMAM_API_BASE, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data.get("calories") and data.get("calories") > 0: | |
| nutrition_data = { | |
| "calories": round(data.get("calories", 0)), | |
| "protein": round(data.get("totalNutrients", {}).get("PROCNT", {}).get("quantity", 0), 1), | |
| "carbs": round(data.get("totalNutrients", {}).get("CHOCDF", {}).get("quantity", 0), 1), | |
| "fat": round(data.get("totalNutrients", {}).get("FAT", {}).get("quantity", 0), 1) | |
| } | |
| logger.info(f"π₯ Edamam nutrition found for '{food_name}': {nutrition_data}") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ Edamam lookup failed for '{food_name}': {e}") | |
| return None | |
| async def search_spoonacular_nutrition(food_name: str) -> Optional[Dict[str, Any]]: | |
| """Search Spoonacular API for ingredient nutrition.""" | |
| if not SPOONACULAR_API_KEY: | |
| return None | |
| try: | |
| # First search for ingredient ID | |
| search_url = f"{SPOONACULAR_API_BASE}/search" | |
| async with aiohttp.ClientSession() as session: | |
| params = { | |
| "query": food_name, | |
| "number": 1, | |
| "apiKey": SPOONACULAR_API_KEY | |
| } | |
| async with session.get(search_url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data.get("results") and len(data["results"]) > 0: | |
| ingredient_id = data["results"][0]["id"] | |
| # Get nutrition info for ingredient | |
| nutrition_url = f"{SPOONACULAR_API_BASE}/{ingredient_id}/information" | |
| nutrition_params = { | |
| "amount": 100, | |
| "unit": "grams", | |
| "apiKey": SPOONACULAR_API_KEY | |
| } | |
| async with session.get(nutrition_url, params=nutrition_params) as nutrition_response: | |
| if nutrition_response.status == 200: | |
| nutrition_data_raw = await nutrition_response.json() | |
| if nutrition_data_raw.get("nutrition"): | |
| nutrients = nutrition_data_raw["nutrition"]["nutrients"] | |
| nutrition_data = { | |
| "calories": 0, | |
| "protein": 0.0, | |
| "carbs": 0.0, | |
| "fat": 0.0 | |
| } | |
| for nutrient in nutrients: | |
| name = nutrient.get("name", "").lower() | |
| amount = nutrient.get("amount", 0) | |
| if "calories" in name or "energy" in name: | |
| nutrition_data["calories"] = round(amount) | |
| elif "protein" in name: | |
| nutrition_data["protein"] = round(amount, 1) | |
| elif "carbohydrates" in name: | |
| nutrition_data["carbs"] = round(amount, 1) | |
| elif "fat" in name and "fatty" not in name: | |
| nutrition_data["fat"] = round(amount, 1) | |
| if nutrition_data["calories"] > 0: | |
| logger.info(f"π₯ Spoonacular nutrition found for '{food_name}': {nutrition_data}") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ Spoonacular lookup failed for '{food_name}': {e}") | |
| return None | |
| def clean_food_name_for_search(raw_name: str) -> str: | |
| """Smart cleaning of Food-101 names for better API searches.""" | |
| # Remove underscores and replace with spaces | |
| cleaned = raw_name.replace("_", " ") | |
| # Handle comma-separated names - take the first part (usually English name) | |
| # Example: "Pineapple, Ananas" β "Pineapple" | |
| if "," in cleaned: | |
| parts = cleaned.split(",") | |
| # Try to detect which part is English (usually the first one) | |
| # Keep the part that's more likely to be in nutrition databases | |
| cleaned = parts[0].strip() | |
| logger.info(f"π§Ή Cleaned comma-separated name: '{raw_name}' β '{cleaned}'") | |
| # Remove common Food-101 artifacts | |
| cleaned = re.sub(r'\b(and|with|the|a)\b', ' ', cleaned, flags=re.IGNORECASE) | |
| # Handle specific Food-101 patterns | |
| replacements = { | |
| "cup cakes": "cupcakes", | |
| "ice cream": "ice cream", | |
| "hot dog": "hot dog", | |
| "french fries": "french fries", | |
| "shrimp and grits": "shrimp grits", | |
| "macaroni and cheese": "mac and cheese" | |
| } | |
| for old, new in replacements.items(): | |
| if old in cleaned.lower(): | |
| cleaned = new | |
| break | |
| # Clean whitespace and extra punctuation | |
| cleaned = re.sub(r'\s+', ' ', cleaned).strip() | |
| cleaned = re.sub(r'[^\w\s-]', '', cleaned) # Remove special chars except hyphens | |
| return cleaned | |
| async def search_openfoodfacts_nutrition(food_name: str) -> Optional[Dict[str, Any]]: | |
| """Search OpenFoodFacts database for nutrition information.""" | |
| try: | |
| # OpenFoodFacts search endpoint | |
| search_url = f"{OPENFOODFACTS_API_BASE}/search" | |
| async with aiohttp.ClientSession() as session: | |
| params = { | |
| "search_terms": food_name, | |
| "search_simple": 1, | |
| "action": "process", | |
| "fields": "product_name,nutriments,nutriscore_grade", | |
| "page_size": 10, | |
| "json": 1 | |
| } | |
| async with session.get(search_url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| products = data.get("products", []) | |
| if products: | |
| # Take the first product with nutrition data | |
| for product in products: | |
| nutriments = product.get("nutriments", {}) | |
| if nutriments.get("energy-kcal_100g") and nutriments.get("energy-kcal_100g") > 0: | |
| nutrition_data = { | |
| "calories": round(nutriments.get("energy-kcal_100g", 0)), | |
| "protein": round(nutriments.get("proteins_100g", 0), 1), | |
| "carbs": round(nutriments.get("carbohydrates_100g", 0), 1), | |
| "fat": round(nutriments.get("fat_100g", 0), 1) | |
| } | |
| logger.info(f"π OpenFoodFacts nutrition found for '{food_name}': {nutrition_data}") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ OpenFoodFacts lookup failed for '{food_name}': {e}") | |
| return None | |
| async def search_foodrepo_nutrition(food_name: str) -> Optional[Dict[str, Any]]: | |
| """Search FoodRepo database for nutrition information.""" | |
| try: | |
| # FoodRepo search endpoint | |
| search_url = f"{FOODREPO_API_BASE}/products" | |
| async with aiohttp.ClientSession() as session: | |
| params = { | |
| "q": food_name, | |
| "limit": 5 | |
| } | |
| async with session.get(search_url, params=params) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data.get("data") and len(data["data"]) > 0: | |
| product = data["data"][0] | |
| nutrients = product.get("nutrients", {}) | |
| if nutrients.get("energy"): | |
| nutrition_data = { | |
| "calories": round(nutrients.get("energy", {}).get("per100g", 0)), | |
| "protein": round(nutrients.get("protein", {}).get("per100g", 0), 1), | |
| "carbs": round(nutrients.get("carbohydrate", {}).get("per100g", 0), 1), | |
| "fat": round(nutrients.get("fat", {}).get("per100g", 0), 1) | |
| } | |
| if nutrition_data["calories"] > 0: | |
| logger.info(f"π₯¬ FoodRepo nutrition found for '{food_name}': {nutrition_data}") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ FoodRepo lookup failed for '{food_name}': {e}") | |
| return None | |
| async def get_nutrition_from_apis(food_name: str) -> Dict[str, Any]: | |
| """Get nutrition data from multiple FREE databases with comprehensive fallback.""" | |
| # Clean the Food-101 name for better searches | |
| cleaned_name = clean_food_name_for_search(food_name) | |
| logger.info(f"π Searching nutrition for: '{food_name}' β '{cleaned_name}'") | |
| # Try APIs in order: Free/Unlimited first, then limited APIs | |
| nutrition_sources = [ | |
| ("OpenFoodFacts", search_openfoodfacts_nutrition), # FREE, 2M+ products | |
| ("USDA", search_usda_nutrition), # FREE, comprehensive US | |
| ("FoodRepo", search_foodrepo_nutrition), # FREE, European focus | |
| ("Edamam", search_edamam_nutrition), # 1000/month limit | |
| ("Spoonacular", search_spoonacular_nutrition) # 150/day limit | |
| ] | |
| # First attempt with cleaned name | |
| for source_name, search_func in nutrition_sources: | |
| try: | |
| nutrition_data = await search_func(cleaned_name) | |
| if nutrition_data and nutrition_data.get("calories", 0) > 0: | |
| nutrition_data["source"] = source_name | |
| logger.info(f"β Found nutrition data from {source_name} for '{cleaned_name}'") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ {source_name} search failed for '{cleaned_name}': {e}") | |
| continue | |
| # If cleaned name failed and it's different from original, try original name too | |
| if cleaned_name.lower() != food_name.lower(): | |
| logger.info(f"π Retrying with original name: '{food_name}'") | |
| for source_name, search_func in nutrition_sources: | |
| try: | |
| nutrition_data = await search_func(food_name) | |
| if nutrition_data and nutrition_data.get("calories", 0) > 0: | |
| nutrition_data["source"] = source_name | |
| logger.info(f"β Found nutrition data from {source_name} for original '{food_name}'") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ {source_name} search failed for original '{food_name}': {e}") | |
| continue | |
| # Try with just the first word as last resort (e.g., "pineapple juice" β "pineapple") | |
| words = cleaned_name.split() | |
| if len(words) > 1: | |
| first_word = words[0] | |
| logger.info(f"π Last resort: trying first word only: '{first_word}'") | |
| for source_name, search_func in nutrition_sources: | |
| try: | |
| nutrition_data = await search_func(first_word) | |
| if nutrition_data and nutrition_data.get("calories", 0) > 0: | |
| nutrition_data["source"] = f"{source_name} (matched: {first_word})" | |
| logger.info(f"β Found nutrition data from {source_name} for '{first_word}'") | |
| return nutrition_data | |
| except Exception as e: | |
| logger.warning(f"β οΈ {source_name} search failed for '{first_word}': {e}") | |
| continue | |
| # All APIs failed, return default values | |
| logger.warning(f"π¨ No nutrition data found for '{food_name}' after all attempts, using defaults") | |
| default_nutrition = DEFAULT_NUTRITION.copy() | |
| default_nutrition["source"] = "Default (APIs unavailable)" | |
| return default_nutrition | |
| # ==================== TRANSLATION SYSTEM ==================== | |
| # In-memory translation cache to reduce API calls | |
| translation_cache: Dict[str, Dict[str, str]] = {} # {locale: {english: translated}} | |
| # Language code mapping (i18n locale β full language name) | |
| LANGUAGE_MAP = { | |
| "en": "English", | |
| "bs": "Bosnian", | |
| "de": "German", | |
| "es": "Spanish", | |
| "fr": "French", | |
| "it": "Italian", | |
| "pt": "Portuguese", | |
| "ar": "Arabic", | |
| "tr": "Turkish", | |
| "nl": "Dutch", | |
| "ru": "Russian", | |
| "zh": "Chinese", | |
| "ja": "Japanese", | |
| "ko": "Korean", | |
| "hi": "Hindi", | |
| "sr": "Serbian", | |
| "hr": "Croatian", | |
| "sq": "Albanian", | |
| "mk": "Macedonian", | |
| } | |
| async def translate_food_names_batch(food_names: List[str], target_locale: str) -> Dict[str, str]: | |
| """ | |
| Translate multiple food names in a single API call (COST OPTIMIZATION). | |
| Args: | |
| food_names: List of food names in English | |
| target_locale: Target language code | |
| Returns: | |
| Dictionary mapping original names to translated names | |
| """ | |
| # Skip translation if target is English or no OpenAI client | |
| if target_locale == "en" or not openai_client or not OPENAI_API_KEY: | |
| return {name: name for name in food_names} | |
| # Check cache first | |
| if target_locale not in translation_cache: | |
| translation_cache[target_locale] = {} | |
| translations = {} | |
| needs_translation = [] | |
| # Separate cached and uncached items | |
| for name in food_names: | |
| if name in translation_cache[target_locale]: | |
| translations[name] = translation_cache[target_locale][name] | |
| logger.info(f"πΎ Cache hit: '{name}' β '{translations[name]}' ({target_locale})") | |
| else: | |
| needs_translation.append(name) | |
| # If all cached, return immediately | |
| if not needs_translation: | |
| return translations | |
| # Get target language name | |
| target_language = LANGUAGE_MAP.get(target_locale, target_locale) | |
| try: | |
| logger.info(f"π Batch translating {len(needs_translation)} items to {target_language}") | |
| # Create batch translation prompt (1 API call for multiple items) | |
| food_list = "\n".join(f"{i+1}. {name}" for i, name in enumerate(needs_translation)) | |
| response = await openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": f"You are a professional food translator. Translate food names to {target_language}. Return ONLY the translations, one per line, in the same order. Keep it natural and commonly used." | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"Translate these food names to {target_language}:\n{food_list}" | |
| } | |
| ], | |
| max_tokens=150, | |
| temperature=0.3, | |
| ) | |
| translated_lines = response.choices[0].message.content.strip().split('\n') | |
| # Parse translations and update cache | |
| for i, name in enumerate(needs_translation): | |
| if i < len(translated_lines): | |
| # Remove numbering if present (e.g., "1. Ananas" β "Ananas") | |
| translated = translated_lines[i].strip() | |
| translated = translated.split('. ', 1)[-1] if '. ' in translated else translated | |
| translations[name] = translated | |
| translation_cache[target_locale][name] = translated | |
| logger.info(f"β '{name}' β '{translated}'") | |
| return translations | |
| except Exception as e: | |
| logger.warning(f"β οΈ Batch translation failed: {e}") | |
| # Return originals on failure | |
| for name in needs_translation: | |
| translations[name] = name | |
| return translations | |
| async def translate_food_name(food_name: str, target_locale: str) -> str: | |
| """ | |
| Translate single food name (uses batch function internally for caching). | |
| Args: | |
| food_name: Food name in English | |
| target_locale: Target language code | |
| Returns: | |
| Translated food name or original if translation fails/not needed | |
| """ | |
| result = await translate_food_names_batch([food_name], target_locale) | |
| return result.get(food_name, food_name) | |
| async def translate_description(description: str, target_locale: str) -> str: | |
| """ | |
| Translate food description to target language using OpenAI with caching. | |
| Args: | |
| description: Description in English | |
| target_locale: Target language code | |
| Returns: | |
| Translated description or original if translation fails/not needed | |
| """ | |
| # Skip translation if target is English or no OpenAI client | |
| if target_locale == "en" or not openai_client or not OPENAI_API_KEY: | |
| return description | |
| # Simple cache key (hash of description + locale) | |
| cache_key = f"desc_{hash(description)}_{target_locale}" | |
| # Check if cached in locale cache | |
| if target_locale not in translation_cache: | |
| translation_cache[target_locale] = {} | |
| if cache_key in translation_cache[target_locale]: | |
| logger.info(f"πΎ Description cache hit ({target_locale})") | |
| return translation_cache[target_locale][cache_key] | |
| # Get target language name | |
| target_language = LANGUAGE_MAP.get(target_locale, target_locale) | |
| try: | |
| logger.info(f"π Translating description to {target_language}") | |
| response = await openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": f"You are a food description translator. Translate to {target_language}. Keep it natural and concise. Return ONLY the translation." | |
| }, | |
| { | |
| "role": "user", | |
| "content": description | |
| } | |
| ], | |
| max_tokens=100, | |
| temperature=0.3, | |
| ) | |
| translated = response.choices[0].message.content.strip() | |
| # Cache the result | |
| translation_cache[target_locale][cache_key] = translated | |
| logger.info(f"β Description translated to {target_language}") | |
| return translated | |
| except Exception as e: | |
| logger.warning(f"β οΈ Description translation failed: {e}") | |
| return description | |
| # ==================== MULTI-MODEL FOOD RECOGNIZER ==================== | |
| class MultiModelFoodRecognizer: | |
| """Production-ready multi-model ensemble for comprehensive food recognition.""" | |
| def __init__(self, device: str): | |
| self.device = device | |
| self.models = {} | |
| self.processors = {} | |
| self.is_loaded = False | |
| self.available_models = [] | |
| self._initialize_models() | |
| self._warm_up() | |
| def _initialize_models(self): | |
| """Initialize all available food recognition models.""" | |
| logger.info("π Initializing multi-model food recognition system...") | |
| for model_key, model_config in FOOD_MODELS.items(): | |
| try: | |
| logger.info(f"π¦ Loading {model_config['description']}...") | |
| model_name = model_config["model_name"] | |
| # Load processor and model | |
| processor = AutoImageProcessor.from_pretrained(model_name) | |
| model = AutoModelForImageClassification.from_pretrained(model_name) | |
| # Move to device and optimize | |
| model = model.to(self.device) | |
| model.eval() | |
| # Memory optimization (skip torch.compile for MPS) | |
| if hasattr(torch, 'compile') and self.device != "mps": | |
| try: | |
| model = torch.compile(model) | |
| logger.info(f"β‘ {model_key} compiled with torch.compile") | |
| except Exception: | |
| logger.info(f"β οΈ torch.compile failed for {model_key}, using standard model") | |
| else: | |
| logger.info(f"βΉοΈ Using standard model for {model_key} (torch.compile disabled for MPS)") | |
| self.models[model_key] = model | |
| self.processors[model_key] = processor | |
| self.available_models.append(model_key) | |
| logger.info(f"β {model_config['description']} loaded successfully") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Failed to load {model_key}: {e}") | |
| continue | |
| if self.available_models: | |
| self.is_loaded = True | |
| logger.info(f"π― Multi-model system ready with {len(self.available_models)} models: {self.available_models}") | |
| else: | |
| raise RuntimeError("β No models could be loaded!") | |
| def _warm_up(self): | |
| """Warm up all loaded models.""" | |
| if not self.available_models: | |
| return | |
| try: | |
| logger.info("π₯ Warming up all models...") | |
| # Create dummy image | |
| dummy_image = Image.new('RGB', (224, 224), color='red') | |
| for model_key in self.available_models: | |
| try: | |
| processor = self.processors[model_key] | |
| model = self.models[model_key] | |
| with torch.no_grad(): | |
| inputs = processor(images=dummy_image, return_tensors="pt") | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| _ = model(**inputs) | |
| logger.info(f"β {model_key} warmed up") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Warm-up failed for {model_key}: {e}") | |
| # Clean up | |
| del dummy_image | |
| if self.device == "cuda": | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| logger.info("β All models warm-up completed") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Model warm-up failed: {e}") | |
| def _predict_with_model(self, image: Image.Image, model_key: str, top_k: int = 5) -> Optional[List[Dict[str, Any]]]: | |
| """Predict with a specific model.""" | |
| try: | |
| if model_key not in self.available_models: | |
| return None | |
| processor = self.processors[model_key] | |
| model = self.models[model_key] | |
| # Preprocess image | |
| processed_image = preprocess_image(image) | |
| # Prepare inputs | |
| inputs = processor(images=processed_image, return_tensors="pt") | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| # Inference | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| probs = F.softmax(logits, dim=-1).cpu().numpy()[0] | |
| # Get top K predictions | |
| top_indices = np.argsort(probs)[::-1][:top_k] | |
| predictions = [] | |
| for idx in top_indices: | |
| # Handle different model output formats | |
| if hasattr(model.config, 'id2label') and str(idx) in model.config.id2label: | |
| label = model.config.id2label[str(idx)] | |
| elif hasattr(model.config, 'id2label') and idx in model.config.id2label: | |
| label = model.config.id2label[idx] | |
| else: | |
| label = f"class_{idx}" | |
| confidence = float(probs[idx]) | |
| # Clean label name | |
| clean_name = label.replace("_", " ").title() | |
| predictions.append({ | |
| "label": clean_name, | |
| "raw_label": label, | |
| "confidence": confidence, | |
| "confidence_pct": f"{confidence:.1%}", | |
| "model": model_key, | |
| "model_type": FOOD_MODELS[model_key]["type"] | |
| }) | |
| # Clean up memory | |
| del inputs, outputs, logits, probs | |
| if self.device == "cuda": | |
| torch.cuda.empty_cache() | |
| return predictions | |
| except Exception as e: | |
| logger.warning(f"β οΈ Prediction failed for {model_key}: {e}") | |
| return None | |
| def predict(self, image: Image.Image, top_k: int = 5) -> Dict[str, Any]: | |
| """Main predict method - uses ensemble if available, fallback to primary.""" | |
| return self.predict_ensemble(image, top_k) | |
| def predict_ensemble(self, image: Image.Image, top_k: int = 10) -> Dict[str, Any]: | |
| """Ensemble prediction using all available models with smart filtering.""" | |
| if not self.is_loaded: | |
| raise RuntimeError("Models not loaded") | |
| all_predictions = [] | |
| model_results = {} | |
| # Get MORE predictions from all models (top 15 instead of 5) | |
| predictions_per_model = 15 | |
| for model_key in self.available_models: | |
| predictions = self._predict_with_model(image, model_key, predictions_per_model) | |
| if predictions: | |
| model_results[model_key] = predictions | |
| all_predictions.extend(predictions) | |
| if not all_predictions: | |
| raise RuntimeError("No models produced valid predictions") | |
| # NON-FOOD items that should be COMPLETELY FILTERED OUT | |
| non_food_items = { | |
| # Kitchen utensils & cookware | |
| 'plate', 'dish', 'bowl', 'cup', 'glass', 'mug', 'spoon', 'fork', 'knife', | |
| 'spatula', 'pan', 'pot', 'tray', 'napkin', 'table', 'cloth', 'placemat', | |
| 'chopsticks', 'straw', 'bottle', 'container', 'lid', 'wrapper', 'packaging', | |
| 'cutting board', 'grater', 'whisk', 'ladle', 'tongs', 'peeler', 'sieve', | |
| 'colander', 'mixer', 'blender', 'toaster', 'oven', 'microwave', 'fridge', | |
| 'freezer', 'dishwasher', 'sink', 'counter', 'shelf', 'cabinet', 'drawer', | |
| 'waffle iron', 'frying pan', 'frypan', 'skillet', 'saucepan', 'stockpot', | |
| 'baking sheet', 'baking pan', 'baking dish', 'loaf pan', 'muffin tin', | |
| 'rolling pin', 'measuring cup', 'measuring spoon', 'kitchen scale', | |
| 'bakery', 'bakeshop', 'bakehouse', 'restaurant', 'kitchen', 'dining room', | |
| # Animals (NOT food!) | |
| 'dog', 'cat', 'bird', 'fish', 'horse', 'cow', 'pig', 'chicken', | |
| 'terrier', 'retriever', 'bulldog', 'poodle', 'beagle', 'dachshund', | |
| 'lobster', 'crab', 'shrimp', 'hunting dog', 'hyena', 'wolf', 'fox', | |
| # Objects/Electronics | |
| 'joystick', 'controller', 'remote', 'phone', 'computer', 'mouse', 'keyboard', | |
| 'water jug', 'jug', 'pitcher', 'vase', 'flowerpot' | |
| } | |
| # Generic FOOD terms that should be deprioritized (but not removed) | |
| generic_terms = { | |
| 'fruit', 'vegetable', 'food', 'meal', 'snack', 'dessert', | |
| 'salad', 'soup', 'drink', 'beverage', 'meat', 'fish', 'seafood', | |
| 'bread', 'pastry', 'cake', 'cookie', 'candy', 'chocolate' | |
| } | |
| # Ensemble voting: weight by model priority and confidence | |
| food_scores = {} | |
| filtered_count = 0 | |
| for pred in all_predictions: | |
| food_label_lower = pred["raw_label"].lower().replace("_", " ") | |
| # FILTER OUT non-food items completely | |
| is_non_food = any(non_food in food_label_lower for non_food in non_food_items) | |
| if is_non_food: | |
| filtered_count += 1 | |
| logger.info(f"π« Filtered non-food item: '{pred['raw_label']}'") | |
| continue # Skip this prediction entirely | |
| model_key = pred["model"] | |
| priority_weight = 1.0 / FOOD_MODELS[model_key]["priority"] # Higher priority = lower number = higher weight | |
| confidence_weight = pred["confidence"] | |
| # PENALTY for generic terms - reduce their score significantly | |
| is_generic = any(generic in food_label_lower for generic in generic_terms) | |
| # If it's a single-word generic term, penalize it even more | |
| is_single_generic = food_label_lower in generic_terms | |
| if is_single_generic: | |
| combined_score = priority_weight * confidence_weight * 0.1 # 90% penalty | |
| elif is_generic: | |
| combined_score = priority_weight * confidence_weight * 0.5 # 50% penalty | |
| else: | |
| combined_score = priority_weight * confidence_weight # Full score for specific items | |
| food_name = pred["raw_label"] | |
| if food_name not in food_scores: | |
| food_scores[food_name] = { | |
| "total_score": 0, | |
| "count": 0, | |
| "best_prediction": pred, | |
| "models": [], | |
| "is_generic": is_generic | |
| } | |
| food_scores[food_name]["total_score"] += combined_score | |
| food_scores[food_name]["count"] += 1 | |
| food_scores[food_name]["models"].append(model_key) | |
| # Keep the prediction with highest confidence as representative | |
| if pred["confidence"] > food_scores[food_name]["best_prediction"]["confidence"]: | |
| food_scores[food_name]["best_prediction"] = pred | |
| if filtered_count > 0: | |
| logger.info(f"β Filtered out {filtered_count} non-food items") | |
| # Sort by ensemble score | |
| sorted_foods = sorted( | |
| food_scores.items(), | |
| key=lambda x: x[1]["total_score"], | |
| reverse=True | |
| ) | |
| # Format final results - return MORE alternatives (up to top_k) | |
| final_predictions = [] | |
| for food_name, data in sorted_foods[:top_k * 2]: # Get double to have enough after filtering | |
| pred = data["best_prediction"].copy() | |
| pred["ensemble_score"] = data["total_score"] | |
| pred["model_count"] = data["count"] | |
| pred["contributing_models"] = data["models"] | |
| pred["is_generic"] = data["is_generic"] | |
| final_predictions.append(pred) | |
| # Remove duplicates AND non-food items (double check) | |
| filtered_predictions = [] | |
| seen_labels = set() | |
| for pred in final_predictions: | |
| label_lower = pred["raw_label"].lower().replace("_", " ").strip() | |
| # DOUBLE CHECK: Filter non-food items again | |
| is_non_food = any(non_food in label_lower for non_food in non_food_items) | |
| if is_non_food: | |
| continue # Skip non-food items | |
| # Skip if we've already seen very similar label | |
| if label_lower not in seen_labels: | |
| filtered_predictions.append(pred) | |
| seen_labels.add(label_lower) | |
| if len(filtered_predictions) >= top_k: | |
| break | |
| # Primary result - prefer specific over generic AND high confidence | |
| primary = filtered_predictions[0] if filtered_predictions else { | |
| "label": "Unknown Food", | |
| "raw_label": "unknown", | |
| "confidence": 0.0, | |
| "ensemble_score": 0.0, | |
| "model_count": 0, | |
| "contributing_models": [], | |
| "is_generic": False | |
| } | |
| # QUALITY CHECK: If primary confidence is < 10%, try to find better alternative | |
| MIN_CONFIDENCE = 0.10 # 10% | |
| if primary.get("confidence", 0) < MIN_CONFIDENCE and len(filtered_predictions) > 1: | |
| logger.warning(f"β οΈ Low confidence ({primary['confidence']:.1%}) for '{primary['label']}', checking alternatives...") | |
| # Find first alternative with higher confidence | |
| for i, pred in enumerate(filtered_predictions[1:], 1): | |
| if pred.get("confidence", 0) >= MIN_CONFIDENCE / 2: # At least 5% | |
| filtered_predictions[0], filtered_predictions[i] = filtered_predictions[i], filtered_predictions[0] | |
| primary = filtered_predictions[0] | |
| logger.info(f"π Swapped low-confidence primary with better alternative: {primary['label']} ({primary['confidence']:.1%})") | |
| break | |
| # If primary is generic but we have specific alternatives, swap them | |
| if primary.get("is_generic") and len(filtered_predictions) > 1: | |
| for i, pred in enumerate(filtered_predictions[1:], 1): | |
| if not pred.get("is_generic"): | |
| # Swap primary with this specific prediction | |
| filtered_predictions[0], filtered_predictions[i] = filtered_predictions[i], filtered_predictions[0] | |
| primary = filtered_predictions[0] | |
| logger.info(f"π Swapped generic primary with specific: {primary['label']}") | |
| break | |
| return { | |
| "success": True, | |
| "label": primary["label"], | |
| "confidence": primary["confidence"], | |
| "primary_label": primary["raw_label"], | |
| "ensemble_score": primary.get("ensemble_score", 0), | |
| "alternatives": filtered_predictions[1:], # Now returns up to 9 alternatives | |
| "model_results": model_results, | |
| "system_info": { | |
| "available_models": self.available_models, | |
| "device": self.device.upper(), | |
| "total_classes": sum(FOOD_MODELS[m]["classes"] for m in self.available_models) | |
| } | |
| } | |
| # ==================== LIFESPAN EVENTS ==================== | |
| async def lifespan(app: FastAPI): | |
| """Application lifespan manager.""" | |
| # Startup | |
| logger.info("π Application startup complete") | |
| logger.info("=" * 60) | |
| logger.info("β API READY FOR PRODUCTION") | |
| logger.info(f"π‘ Endpoints: /api/nutrition/analyze-food, /analyze") | |
| logger.info(f"π₯οΈ Device: {device.upper()}") | |
| logger.info(f"π Models: {len(recognizer.available_models)} active models") | |
| logger.info(f"π― Total Food Categories: {sum(FOOD_MODELS[m]['classes'] for m in recognizer.available_models)}") | |
| logger.info(f"π Translations: {'β Enabled' if openai_client else 'β Disabled'}") | |
| logger.info("=" * 60) | |
| yield | |
| # Shutdown | |
| logger.info("π Shutting down...") | |
| # Cleanup GPU memory | |
| if device == "cuda": | |
| torch.cuda.empty_cache() | |
| # Garbage collection | |
| gc.collect() | |
| logger.info("β Cleanup completed") | |
| # ==================== FASTAPI SETUP ==================== | |
| logger.info("=" * 60) | |
| logger.info("π½οΈ PRODUCTION AI FOOD RECOGNITION API") | |
| logger.info("=" * 60) | |
| # Initialize multi-model system | |
| device = select_device() | |
| recognizer = MultiModelFoodRecognizer(device) | |
| # Initialize OpenAI client BEFORE FastAPI app | |
| if OPENAI_API_KEY: | |
| try: | |
| openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY) | |
| logger.info(f"β OpenAI client initialized (key: {OPENAI_API_KEY[:20]}...)") | |
| except Exception as e: | |
| logger.warning(f"β οΈ OpenAI client initialization failed: {e}") | |
| openai_client = None | |
| else: | |
| logger.warning("β οΈ OpenAI API key not found - translations disabled") | |
| # Create FastAPI app | |
| app = FastAPI( | |
| title="AI Food Recognition API", | |
| description="Production-ready food recognition with 101 categories (Food-101 dataset)", | |
| version="2.0.0", | |
| docs_url="/docs", | |
| redoc_url="/redoc", | |
| lifespan=lifespan | |
| ) | |
| # CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["*"], | |
| ) | |
| # ==================== MIDDLEWARE ==================== | |
| async def add_security_headers(request: Request, call_next): | |
| response = await call_next(request) | |
| response.headers["X-Content-Type-Options"] = "nosniff" | |
| response.headers["X-Frame-Options"] = "DENY" | |
| return response | |
| # ==================== UTILITY FUNCTIONS ==================== | |
| async def validate_and_read_image(file: UploadFile) -> Image.Image: | |
| """Validate and read uploaded image file.""" | |
| # Check file size | |
| if hasattr(file, 'size') and file.size > MAX_FILE_SIZE: | |
| raise HTTPException(status_code=413, detail="File too large (max 10MB)") | |
| # Check content type | |
| if file.content_type not in ALLOWED_TYPES: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_TYPES)}" | |
| ) | |
| try: | |
| # Read and validate image | |
| contents = await file.read() | |
| if len(contents) > MAX_FILE_SIZE: | |
| raise HTTPException(status_code=413, detail="File too large (max 10MB)") | |
| image = Image.open(BytesIO(contents)) | |
| return image | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Invalid image file: {str(e)}") | |
| # ==================== API ENDPOINTS ==================== | |
| def root(): | |
| """Root endpoint with API information.""" | |
| return { | |
| "message": "π½οΈ AI Food Recognition API", | |
| "status": "online", | |
| "version": "2.0.0", | |
| "models": recognizer.available_models if recognizer.is_loaded else [], | |
| "total_categories": sum(FOOD_MODELS[m]["classes"] for m in recognizer.available_models) if recognizer.is_loaded else 0, | |
| "device": device.upper(), | |
| "endpoints": { | |
| "POST /api/nutrition/analyze-food": "Analyze food image (Next.js frontend)", | |
| "POST /analyze": "Analyze food image (Hugging Face Spaces)", | |
| "GET /health": "Health check", | |
| "GET /docs": "API documentation" | |
| } | |
| } | |
| def health_check(): | |
| """Comprehensive health check.""" | |
| return { | |
| "status": "healthy" if recognizer.is_loaded else "error", | |
| "models_loaded": recognizer.is_loaded, | |
| "available_models": recognizer.available_models if recognizer.is_loaded else [], | |
| "model_count": len(recognizer.available_models) if recognizer.is_loaded else 0, | |
| "total_categories": sum(FOOD_MODELS[m]["classes"] for m in recognizer.available_models) if recognizer.is_loaded else 0, | |
| "device": device.upper(), | |
| "memory_usage": f"{torch.cuda.memory_allocated() / 1024**2:.1f}MB" if device == "cuda" else "N/A" | |
| } | |
| async def analyze_food_nutrition(request: Request, file: UploadFile = File(None)): | |
| """ | |
| Analyze food image or manual entry for Next.js frontend. | |
| Supports two modes: | |
| 1. Image upload: AI recognition + nutrition lookup | |
| 2. Manual entry: Direct nutrition lookup by food name | |
| Returns nutrition-focused response format with translations. | |
| """ | |
| try: | |
| # Parse form data | |
| form_data = await request.form() | |
| manual_input = form_data.get("manualInput", "false").lower() == "true" | |
| locale = form_data.get("locale", "en") # Get user's language preference | |
| logger.info(f"π₯ Request received - Mode: {'Manual' if manual_input else 'Image'}, Locale: {locale}") | |
| # MODE 1: Manual food entry (from alternatives or manual input) | |
| if manual_input: | |
| food_name = form_data.get("manualFoodName") | |
| serving_size = form_data.get("manualServingSize", "100") | |
| serving_unit = form_data.get("manualServingUnit", "g") | |
| description = form_data.get("manualDescription", "") | |
| if not food_name: | |
| raise HTTPException(status_code=400, detail="manualFoodName is required for manual entry") | |
| logger.info(f"π½οΈ Manual nutrition lookup: {food_name} ({serving_size}{serving_unit})") | |
| # Direct nutrition API lookup | |
| nutrition_data = await get_nutrition_from_apis(food_name) | |
| if not nutrition_data or nutrition_data.get("calories", 0) == 0: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Failed to retrieve nutrition information for manual entry" | |
| ) | |
| source = nutrition_data.get("source", "Unknown") | |
| logger.info(f"β Manual lookup: {food_name} | Nutrition: {source}") | |
| # Translate food name and description | |
| translated_name = await translate_food_name(food_name, locale) | |
| base_description = description or f"Manual entry: {food_name}" | |
| translated_description = await translate_description(base_description, locale) | |
| # Return manual entry format | |
| return JSONResponse(content={ | |
| "data": { | |
| "label": translated_name, | |
| "confidence": 1.0, # Manual entry has 100% confidence | |
| "nutrition": { | |
| "calories": nutrition_data["calories"], | |
| "protein": nutrition_data["protein"], | |
| "carbs": nutrition_data["carbs"], | |
| "fat": nutrition_data["fat"] | |
| }, | |
| "servingSize": serving_size, | |
| "servingUnit": serving_unit, | |
| "description": translated_description, | |
| "alternatives": [], # No alternatives for manual entry | |
| "source": f"{source} Database", | |
| "isManualEntry": True | |
| } | |
| }) | |
| # MODE 2: Image upload (AI recognition) | |
| else: | |
| if not file: | |
| raise HTTPException(status_code=400, detail="File is required for image analysis") | |
| logger.info(f"π½οΈ Image analysis request: {file.filename}") | |
| # Validate and process image | |
| image = await validate_and_read_image(file) | |
| # Step 1: AI Model Prediction (request top 10 for more alternatives) | |
| results = recognizer.predict(image, top_k=10) | |
| # Step 2: API Nutrition Lookup | |
| nutrition_data = await get_nutrition_from_apis(results["primary_label"]) | |
| # Log result | |
| confidence_pct = f"{results['confidence']:.1%}" | |
| source = nutrition_data.get("source", "Unknown") | |
| logger.info(f"β Prediction: {results['label']} ({confidence_pct}) | Nutrition: {source}") | |
| # BATCH TRANSLATION OPTIMIZATION: Translate all food names at once | |
| if locale != "en" and openai_client: | |
| # Collect all names to translate (primary + alternatives) | |
| names_to_translate = [results["label"]] | |
| if results.get("alternatives"): | |
| names_to_translate.extend([ | |
| alt.get("label", alt.get("raw_label", "")) | |
| for alt in results["alternatives"] | |
| ]) | |
| # Single API call for all translations | |
| translations = await translate_food_names_batch(names_to_translate, locale) | |
| # Apply translations | |
| translated_name = translations.get(results["label"], results["label"]) | |
| # Translate description | |
| base_description = f"{results['label']} identified with {int(results['confidence'] * 100)}% confidence" | |
| translated_description = await translate_description(base_description, locale) | |
| # Map alternatives with translations | |
| translated_alternatives = [] | |
| if results.get("alternatives"): | |
| for alt in results["alternatives"]: | |
| alt_name = alt.get("label", alt.get("raw_label", "")) | |
| translated_alternatives.append({ | |
| **alt, | |
| "label": translations.get(alt_name, alt_name), | |
| "original_label": alt_name | |
| }) | |
| else: | |
| # No translation needed | |
| translated_name = results["label"] | |
| translated_description = f"{results['label']} identified with {int(results['confidence'] * 100)}% confidence" | |
| translated_alternatives = results["alternatives"] | |
| # Return frontend-expected format | |
| return JSONResponse(content={ | |
| "data": { | |
| "label": translated_name, | |
| "confidence": results["confidence"], | |
| "description": translated_description, # Translated description | |
| "nutrition": { | |
| "calories": nutrition_data["calories"], | |
| "protein": nutrition_data["protein"], | |
| "carbs": nutrition_data["carbs"], | |
| "fat": nutrition_data["fat"] | |
| }, | |
| "alternatives": translated_alternatives, | |
| "source": f"AI Recognition + {source} Database", | |
| "isManualEntry": False, | |
| "locale": locale # Return locale for debugging | |
| } | |
| }) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"β Analysis failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") | |
| async def analyze_food_spaces(file: UploadFile = File(...)): | |
| """ | |
| Analyze food image for Hugging Face Spaces interface. | |
| Returns detailed response with model info. | |
| """ | |
| logger.info(f"π HF Spaces analysis request: {file.filename}") | |
| try: | |
| # Validate and process image | |
| image = await validate_and_read_image(file) | |
| # Step 1: AI Model Prediction (request top 10 for more alternatives) | |
| results = recognizer.predict(image, top_k=10) | |
| # Step 2: API Nutrition Lookup | |
| nutrition_data = await get_nutrition_from_apis(results["primary_label"]) | |
| # Log result | |
| confidence_pct = f"{results['confidence']:.1%}" | |
| source = nutrition_data.get("source", "Unknown") | |
| logger.info(f"β Prediction: {results['label']} ({confidence_pct}) | Nutrition: {source}") | |
| # Return full response with nutrition data | |
| enhanced_results = results.copy() | |
| enhanced_results["nutrition"] = nutrition_data | |
| enhanced_results["data_source"] = source | |
| return JSONResponse(content=enhanced_results) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"β Analysis failed: {e}") | |
| raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") | |
| # ==================== MAIN ==================== | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) | |
| logger.info("π― Starting production server...") | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=port, | |
| log_level="info", | |
| access_log=True | |
| ) |