Spaces:
Running
on
Zero
Running
on
Zero
| """Database module for Portfolio Intelligence Platform. | |
| Handles Supabase PostgreSQL connections and operations. | |
| """ | |
| from typing import Optional, Dict, Any, List | |
| from supabase import create_client, Client | |
| from backend.config import settings | |
| from backend.utils.serialisation import decimal_to_json_safe | |
| from backend.utils.uuid_generator import string_to_uuid | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # Re-export for backwards compatibility | |
| __all__ = ['Database', 'decimal_to_json_safe'] | |
| class Database: | |
| """Database connection manager for Supabase PostgreSQL. | |
| Attributes: | |
| client: Supabase client instance | |
| """ | |
| def __init__(self): | |
| """Initialise database connection.""" | |
| self.client: Optional[Client] = None | |
| if settings.supabase_url: | |
| try: | |
| # Prefer service role key, fallback to anon key | |
| if settings.supabase_service_role_key: | |
| api_key = settings.supabase_service_role_key | |
| logger.info("Using Supabase service role key (bypasses RLS)") | |
| # Simple client creation for backend service operations | |
| # Service role automatically bypasses RLS, no special config needed | |
| self.client = create_client( | |
| settings.supabase_url, | |
| api_key | |
| ) | |
| logger.info("Supabase service role client initialised successfully") | |
| elif settings.supabase_key: | |
| api_key = settings.supabase_key | |
| logger.warning( | |
| "Using anon key - database operations may fail due to RLS policies. " | |
| "Add SUPABASE_SERVICE_ROLE_KEY environment variable for full functionality." | |
| ) | |
| # Anon key client (RLS enforced, requires user context) | |
| self.client = create_client( | |
| settings.supabase_url, | |
| api_key | |
| ) | |
| logger.info("Supabase anon client initialised (RLS enforced)") | |
| else: | |
| logger.error("No Supabase API key found (need either SUPABASE_SERVICE_ROLE_KEY or SUPABASE_KEY)") | |
| return | |
| except Exception as e: | |
| logger.error(f"Failed to initialise Supabase client: {e}") | |
| logger.info("Running in demo mode without database") | |
| else: | |
| logger.info("Supabase credentials not configured - running in demo mode") | |
| def is_connected(self) -> bool: | |
| """Check if database connection is active. | |
| Returns: | |
| True if connected, False otherwise | |
| """ | |
| return self.client is not None | |
| def _string_to_uuid(self, string_id: str) -> str: | |
| """Convert string ID to deterministic UUID. | |
| Uses Python UUID v5 generation for deterministic conversion. | |
| This is faster than database-side generation and doesn't | |
| require the uuid-ossp extension. | |
| Args: | |
| string_id: Human-readable ID (e.g., 'demo_20241119_105323') | |
| Returns: | |
| UUID string (deterministic, always same for same input) | |
| Example: | |
| >>> db = Database() | |
| >>> db._string_to_uuid('demo_20241119_105323') | |
| '...' # Same UUID every time for this input | |
| """ | |
| return string_to_uuid(string_id) | |
| async def ensure_demo_user_exists(self) -> str: | |
| """Ensure demo user exists in database. | |
| Returns: | |
| Demo user ID if successful, None otherwise | |
| """ | |
| if not self.is_connected(): | |
| return None | |
| demo_email = "demo@portfolio-intelligence.com" | |
| try: | |
| # Check if user exists by email | |
| result = self.client.table('users').select('id,email').eq('email', demo_email).execute() | |
| if result.data: | |
| user_id = result.data[0]['id'] | |
| logger.info(f"Demo user already exists with ID: {user_id}") | |
| return user_id | |
| # Create auth user using admin API (Supabase will auto-generate ID) | |
| try: | |
| response = self.client.auth.admin.create_user({ | |
| "email": demo_email, | |
| "password": "demo-password-changeme", | |
| "email_confirm": True, | |
| "user_metadata": { | |
| "username": "demo-user", | |
| "is_demo": True | |
| } | |
| }) | |
| if response.user: | |
| user_id = response.user.id | |
| logger.info(f"Created demo user via auth admin API with ID: {user_id}") | |
| # Verify profile was created by trigger | |
| result = self.client.table('users').select('id').eq('id', user_id).execute() | |
| if result.data: | |
| logger.info("Demo user profile created successfully") | |
| return user_id | |
| else: | |
| logger.warning("Demo user created in auth but profile not found, waiting...") | |
| # Sometimes there's a delay, try once more | |
| import time | |
| time.sleep(1) | |
| result = self.client.table('users').select('id').eq('id', user_id).execute() | |
| if result.data: | |
| return user_id | |
| logger.error("Demo user profile still not found after delay") | |
| return None | |
| except Exception as auth_error: | |
| # User might already exist in auth.users | |
| error_str = str(auth_error) | |
| if "already exists" in error_str.lower() or "already registered" in error_str.lower(): | |
| logger.info("Demo user already exists in auth.users, fetching ID...") | |
| result = self.client.table('users').select('id').eq('email', demo_email).execute() | |
| if result.data: | |
| return result.data[0]['id'] | |
| else: | |
| logger.error(f"Failed to create auth user: {auth_error}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to ensure demo user exists: {e}") | |
| return None | |
| async def save_analysis( | |
| self, | |
| portfolio_id: str, | |
| analysis_results: Dict[str, Any] | |
| ) -> bool: | |
| """Save portfolio analysis results to database. | |
| Args: | |
| portfolio_id: Portfolio ID | |
| analysis_results: Complete analysis results from workflow | |
| Returns: | |
| True if saved successfully, False otherwise | |
| """ | |
| if not self.is_connected(): | |
| logger.warning("Database not connected, skipping save") | |
| return False | |
| try: | |
| uuid_portfolio_id = self._string_to_uuid(portfolio_id) | |
| # Convert all Decimal objects to string for JSON serialisation | |
| # This preserves precision - float conversion loses pennies in large numbers | |
| analysis_clean = decimal_to_json_safe(analysis_results) | |
| data = { | |
| 'portfolio_id': uuid_portfolio_id, | |
| 'holdings_snapshot': analysis_clean.get('holdings', []), | |
| 'market_data': analysis_clean.get('market_data', {}), | |
| 'risk_metrics': analysis_clean.get('risk_analysis', {}), | |
| 'optimisation_results': analysis_clean.get('optimisation_results', {}), | |
| 'ai_synthesis': analysis_clean.get('ai_synthesis', ''), | |
| 'recommendations': analysis_clean.get('recommendations', []), | |
| 'reasoning_steps': analysis_clean.get('reasoning_steps', []), | |
| 'mcp_calls': analysis_clean.get('mcp_calls', []), | |
| 'sentiment_data': analysis_clean.get('sentiment_data', {}), | |
| 'execution_time_ms': analysis_clean.get('execution_time_ms'), | |
| 'model_version': analysis_clean.get('model_version', 'claude-sonnet-4-5'), | |
| } | |
| self.client.table('portfolio_analyses').insert(data).execute() | |
| logger.info(f"Saved analysis for portfolio {portfolio_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save analysis: {e}") | |
| return False | |
| async def save_portfolio( | |
| self, | |
| portfolio_id: str, | |
| user_id: str, | |
| name: str, | |
| risk_tolerance: str = 'moderate' | |
| ) -> bool: | |
| """Save portfolio to database. | |
| Args: | |
| portfolio_id: Portfolio ID | |
| user_id: User ID | |
| name: Portfolio name | |
| risk_tolerance: Risk tolerance level | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self.is_connected(): | |
| return False | |
| try: | |
| uuid_portfolio_id = self._string_to_uuid(portfolio_id) | |
| self.client.table('portfolios').insert({ | |
| 'id': uuid_portfolio_id, | |
| 'user_id': user_id, | |
| 'name': name, | |
| 'risk_tolerance': risk_tolerance | |
| }).execute() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save portfolio: {e}") | |
| return False | |
| async def get_user_portfolios(self, user_id: str) -> List[Dict[str, Any]]: | |
| """Get all saved portfolios for a user. | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| List of portfolio dictionaries with id, name, and risk_tolerance | |
| """ | |
| if not self.is_connected(): | |
| return [] | |
| try: | |
| result = self.client.table('portfolios')\ | |
| .select('id, name, risk_tolerance, created_at')\ | |
| .eq('user_id', user_id)\ | |
| .order('created_at', desc=True)\ | |
| .execute() | |
| return result.data if result.data else [] | |
| except Exception as e: | |
| logger.error(f"Failed to get user portfolios: {e}") | |
| return [] | |
| async def get_portfolio_by_id(self, portfolio_id: str, user_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a portfolio by ID with its holdings. | |
| Args: | |
| portfolio_id: Portfolio ID | |
| user_id: User ID for authorisation check | |
| Returns: | |
| Portfolio dict with holdings, or None if not found/unauthorised | |
| """ | |
| if not self.is_connected(): | |
| return None | |
| try: | |
| # Get portfolio with holdings | |
| result = self.client.table('portfolios')\ | |
| .select('*, portfolio_holdings(*)')\ | |
| .eq('id', portfolio_id)\ | |
| .eq('user_id', user_id)\ | |
| .execute() | |
| if result.data and len(result.data) > 0: | |
| return result.data[0] | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get portfolio {portfolio_id}: {e}") | |
| return None | |
| async def get_analysis_history( | |
| self, | |
| user_id: str, | |
| limit: int = 10 | |
| ) -> List[Dict[str, Any]]: | |
| """Get analysis history for all portfolios owned by a user. | |
| Args: | |
| user_id: User ID | |
| limit: Maximum number of analyses to return | |
| Returns: | |
| List of analysis results | |
| """ | |
| if not self.is_connected(): | |
| return [] | |
| try: | |
| # Join with portfolios table to filter by user_id | |
| result = self.client.table('portfolio_analyses') \ | |
| .select('*, portfolios!inner(user_id, risk_tolerance)') \ | |
| .eq('portfolios.user_id', user_id) \ | |
| .order('analysis_date', desc=True) \ | |
| .limit(limit) \ | |
| .execute() | |
| return result.data if result.data else [] | |
| except Exception as e: | |
| logger.error(f"Failed to get analysis history: {e}") | |
| return [] | |
| async def get_analysis_by_id( | |
| self, | |
| analysis_id: str, | |
| user_id: str | |
| ) -> Optional[Dict[str, Any]]: | |
| """Get a specific analysis by ID with user authorisation check. | |
| Args: | |
| analysis_id: Analysis ID to retrieve | |
| user_id: User ID for authorisation check | |
| Returns: | |
| Analysis data if found and authorised, None otherwise | |
| """ | |
| if not self.is_connected(): | |
| return None | |
| try: | |
| # Join with portfolios to ensure user owns this analysis | |
| result = self.client.table('portfolio_analyses') \ | |
| .select('*, portfolios!inner(user_id, risk_tolerance, name)') \ | |
| .eq('id', analysis_id) \ | |
| .eq('portfolios.user_id', user_id) \ | |
| .execute() | |
| if result.data: | |
| return result.data[0] | |
| else: | |
| logger.warning(f"Analysis {analysis_id} not found or not authorised for user {user_id}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to get analysis by ID: {e}") | |
| return None | |
| async def save_portfolio_input( | |
| self, | |
| user_id: str, | |
| portfolio_text: str | |
| ) -> bool: | |
| """Save portfolio input text for quick reload. | |
| Automatically keeps only last 3 entries per user via database trigger. | |
| Args: | |
| user_id: User ID | |
| portfolio_text: Raw portfolio input text | |
| Returns: | |
| True if saved successfully, False otherwise | |
| """ | |
| if not self.is_connected(): | |
| logger.warning("Database not connected, skipping portfolio input save") | |
| return False | |
| try: | |
| self.client.table('portfolio_inputs').insert({ | |
| 'user_id': user_id, | |
| 'description': portfolio_text | |
| }).execute() | |
| logger.info(f"Saved portfolio input for user {user_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save portfolio input: {e}") | |
| return False | |
| async def get_portfolio_inputs( | |
| self, | |
| user_id: str, | |
| limit: int = 3 | |
| ) -> List[Dict[str, Any]]: | |
| """Get last N portfolio inputs for user. | |
| Args: | |
| user_id: User ID | |
| limit: Maximum number to return (default 3) | |
| Returns: | |
| List of portfolio input dicts with id, description, created_at | |
| """ | |
| if not self.is_connected(): | |
| return [] | |
| try: | |
| result = self.client.table('portfolio_inputs')\ | |
| .select('id, description, created_at')\ | |
| .eq('user_id', user_id)\ | |
| .order('created_at', desc=True)\ | |
| .limit(limit)\ | |
| .execute() | |
| return result.data if result.data else [] | |
| except Exception as e: | |
| logger.error(f"Failed to get portfolio inputs: {e}") | |
| return [] | |
| # Global database instance | |
| db = Database() | |