"""Database module for Portfolio Intelligence Platform. Handles Supabase PostgreSQL connections and operations. """ from typing import Optional, Dict, Any, List from supabase import create_client, Client from backend.config import settings from backend.utils.serialisation import decimal_to_json_safe from backend.utils.uuid_generator import string_to_uuid import logging logger = logging.getLogger(__name__) # Re-export for backwards compatibility __all__ = ['Database', 'decimal_to_json_safe'] class Database: """Database connection manager for Supabase PostgreSQL. Attributes: client: Supabase client instance """ def __init__(self): """Initialise database connection.""" self.client: Optional[Client] = None if settings.supabase_url: try: # Prefer service role key, fallback to anon key if settings.supabase_service_role_key: api_key = settings.supabase_service_role_key logger.info("Using Supabase service role key (bypasses RLS)") # Simple client creation for backend service operations # Service role automatically bypasses RLS, no special config needed self.client = create_client( settings.supabase_url, api_key ) logger.info("Supabase service role client initialised successfully") elif settings.supabase_key: api_key = settings.supabase_key logger.warning( "Using anon key - database operations may fail due to RLS policies. " "Add SUPABASE_SERVICE_ROLE_KEY environment variable for full functionality." ) # Anon key client (RLS enforced, requires user context) self.client = create_client( settings.supabase_url, api_key ) logger.info("Supabase anon client initialised (RLS enforced)") else: logger.error("No Supabase API key found (need either SUPABASE_SERVICE_ROLE_KEY or SUPABASE_KEY)") return except Exception as e: logger.error(f"Failed to initialise Supabase client: {e}") logger.info("Running in demo mode without database") else: logger.info("Supabase credentials not configured - running in demo mode") def is_connected(self) -> bool: """Check if database connection is active. Returns: True if connected, False otherwise """ return self.client is not None def _string_to_uuid(self, string_id: str) -> str: """Convert string ID to deterministic UUID. Uses Python UUID v5 generation for deterministic conversion. This is faster than database-side generation and doesn't require the uuid-ossp extension. Args: string_id: Human-readable ID (e.g., 'demo_20241119_105323') Returns: UUID string (deterministic, always same for same input) Example: >>> db = Database() >>> db._string_to_uuid('demo_20241119_105323') '...' # Same UUID every time for this input """ return string_to_uuid(string_id) async def ensure_demo_user_exists(self) -> str: """Ensure demo user exists in database. Returns: Demo user ID if successful, None otherwise """ if not self.is_connected(): return None demo_email = "demo@portfolio-intelligence.com" try: # Check if user exists by email result = self.client.table('users').select('id,email').eq('email', demo_email).execute() if result.data: user_id = result.data[0]['id'] logger.info(f"Demo user already exists with ID: {user_id}") return user_id # Create auth user using admin API (Supabase will auto-generate ID) try: response = self.client.auth.admin.create_user({ "email": demo_email, "password": "demo-password-changeme", "email_confirm": True, "user_metadata": { "username": "demo-user", "is_demo": True } }) if response.user: user_id = response.user.id logger.info(f"Created demo user via auth admin API with ID: {user_id}") # Verify profile was created by trigger result = self.client.table('users').select('id').eq('id', user_id).execute() if result.data: logger.info("Demo user profile created successfully") return user_id else: logger.warning("Demo user created in auth but profile not found, waiting...") # Sometimes there's a delay, try once more import time time.sleep(1) result = self.client.table('users').select('id').eq('id', user_id).execute() if result.data: return user_id logger.error("Demo user profile still not found after delay") return None except Exception as auth_error: # User might already exist in auth.users error_str = str(auth_error) if "already exists" in error_str.lower() or "already registered" in error_str.lower(): logger.info("Demo user already exists in auth.users, fetching ID...") result = self.client.table('users').select('id').eq('email', demo_email).execute() if result.data: return result.data[0]['id'] else: logger.error(f"Failed to create auth user: {auth_error}") return None except Exception as e: logger.error(f"Failed to ensure demo user exists: {e}") return None async def save_analysis( self, portfolio_id: str, analysis_results: Dict[str, Any] ) -> bool: """Save portfolio analysis results to database. Args: portfolio_id: Portfolio ID analysis_results: Complete analysis results from workflow Returns: True if saved successfully, False otherwise """ if not self.is_connected(): logger.warning("Database not connected, skipping save") return False try: uuid_portfolio_id = self._string_to_uuid(portfolio_id) # Convert all Decimal objects to string for JSON serialisation # This preserves precision - float conversion loses pennies in large numbers analysis_clean = decimal_to_json_safe(analysis_results) data = { 'portfolio_id': uuid_portfolio_id, 'holdings_snapshot': analysis_clean.get('holdings', []), 'market_data': analysis_clean.get('market_data', {}), 'risk_metrics': analysis_clean.get('risk_analysis', {}), 'optimisation_results': analysis_clean.get('optimisation_results', {}), 'ai_synthesis': analysis_clean.get('ai_synthesis', ''), 'recommendations': analysis_clean.get('recommendations', []), 'reasoning_steps': analysis_clean.get('reasoning_steps', []), 'mcp_calls': analysis_clean.get('mcp_calls', []), 'sentiment_data': analysis_clean.get('sentiment_data', {}), 'execution_time_ms': analysis_clean.get('execution_time_ms'), 'model_version': analysis_clean.get('model_version', 'claude-sonnet-4-5'), } self.client.table('portfolio_analyses').insert(data).execute() logger.info(f"Saved analysis for portfolio {portfolio_id}") return True except Exception as e: logger.error(f"Failed to save analysis: {e}") return False async def save_portfolio( self, portfolio_id: str, user_id: str, name: str, risk_tolerance: str = 'moderate' ) -> bool: """Save portfolio to database. Args: portfolio_id: Portfolio ID user_id: User ID name: Portfolio name risk_tolerance: Risk tolerance level Returns: True if successful, False otherwise """ if not self.is_connected(): return False try: uuid_portfolio_id = self._string_to_uuid(portfolio_id) self.client.table('portfolios').insert({ 'id': uuid_portfolio_id, 'user_id': user_id, 'name': name, 'risk_tolerance': risk_tolerance }).execute() return True except Exception as e: logger.error(f"Failed to save portfolio: {e}") return False async def get_user_portfolios(self, user_id: str) -> List[Dict[str, Any]]: """Get all saved portfolios for a user. Args: user_id: User ID Returns: List of portfolio dictionaries with id, name, and risk_tolerance """ if not self.is_connected(): return [] try: result = self.client.table('portfolios')\ .select('id, name, risk_tolerance, created_at')\ .eq('user_id', user_id)\ .order('created_at', desc=True)\ .execute() return result.data if result.data else [] except Exception as e: logger.error(f"Failed to get user portfolios: {e}") return [] async def get_portfolio_by_id(self, portfolio_id: str, user_id: str) -> Optional[Dict[str, Any]]: """Get a portfolio by ID with its holdings. Args: portfolio_id: Portfolio ID user_id: User ID for authorisation check Returns: Portfolio dict with holdings, or None if not found/unauthorised """ if not self.is_connected(): return None try: # Get portfolio with holdings result = self.client.table('portfolios')\ .select('*, portfolio_holdings(*)')\ .eq('id', portfolio_id)\ .eq('user_id', user_id)\ .execute() if result.data and len(result.data) > 0: return result.data[0] return None except Exception as e: logger.error(f"Failed to get portfolio {portfolio_id}: {e}") return None async def get_analysis_history( self, user_id: str, limit: int = 10 ) -> List[Dict[str, Any]]: """Get analysis history for all portfolios owned by a user. Args: user_id: User ID limit: Maximum number of analyses to return Returns: List of analysis results """ if not self.is_connected(): return [] try: # Join with portfolios table to filter by user_id result = self.client.table('portfolio_analyses') \ .select('*, portfolios!inner(user_id, risk_tolerance)') \ .eq('portfolios.user_id', user_id) \ .order('analysis_date', desc=True) \ .limit(limit) \ .execute() return result.data if result.data else [] except Exception as e: logger.error(f"Failed to get analysis history: {e}") return [] async def get_analysis_by_id( self, analysis_id: str, user_id: str ) -> Optional[Dict[str, Any]]: """Get a specific analysis by ID with user authorisation check. Args: analysis_id: Analysis ID to retrieve user_id: User ID for authorisation check Returns: Analysis data if found and authorised, None otherwise """ if not self.is_connected(): return None try: # Join with portfolios to ensure user owns this analysis result = self.client.table('portfolio_analyses') \ .select('*, portfolios!inner(user_id, risk_tolerance, name)') \ .eq('id', analysis_id) \ .eq('portfolios.user_id', user_id) \ .execute() if result.data: return result.data[0] else: logger.warning(f"Analysis {analysis_id} not found or not authorised for user {user_id}") return None except Exception as e: logger.error(f"Failed to get analysis by ID: {e}") return None async def save_portfolio_input( self, user_id: str, portfolio_text: str ) -> bool: """Save portfolio input text for quick reload. Automatically keeps only last 3 entries per user via database trigger. Args: user_id: User ID portfolio_text: Raw portfolio input text Returns: True if saved successfully, False otherwise """ if not self.is_connected(): logger.warning("Database not connected, skipping portfolio input save") return False try: self.client.table('portfolio_inputs').insert({ 'user_id': user_id, 'description': portfolio_text }).execute() logger.info(f"Saved portfolio input for user {user_id}") return True except Exception as e: logger.error(f"Failed to save portfolio input: {e}") return False async def get_portfolio_inputs( self, user_id: str, limit: int = 3 ) -> List[Dict[str, Any]]: """Get last N portfolio inputs for user. Args: user_id: User ID limit: Maximum number to return (default 3) Returns: List of portfolio input dicts with id, description, created_at """ if not self.is_connected(): return [] try: result = self.client.table('portfolio_inputs')\ .select('id, description, created_at')\ .eq('user_id', user_id)\ .order('created_at', desc=True)\ .limit(limit)\ .execute() return result.data if result.data else [] except Exception as e: logger.error(f"Failed to get portfolio inputs: {e}") return [] # Global database instance db = Database()