BrianIsaac's picture
feat: migrate portfolio auto-save from cache to persistent database storage
dbe3108
"""Database module for Portfolio Intelligence Platform.
Handles Supabase PostgreSQL connections and operations.
"""
from typing import Optional, Dict, Any, List
from supabase import create_client, Client
from backend.config import settings
from backend.utils.serialisation import decimal_to_json_safe
from backend.utils.uuid_generator import string_to_uuid
import logging
logger = logging.getLogger(__name__)
# Re-export for backwards compatibility
__all__ = ['Database', 'decimal_to_json_safe']
class Database:
"""Database connection manager for Supabase PostgreSQL.
Attributes:
client: Supabase client instance
"""
def __init__(self):
"""Initialise database connection."""
self.client: Optional[Client] = None
if settings.supabase_url:
try:
# Prefer service role key, fallback to anon key
if settings.supabase_service_role_key:
api_key = settings.supabase_service_role_key
logger.info("Using Supabase service role key (bypasses RLS)")
# Simple client creation for backend service operations
# Service role automatically bypasses RLS, no special config needed
self.client = create_client(
settings.supabase_url,
api_key
)
logger.info("Supabase service role client initialised successfully")
elif settings.supabase_key:
api_key = settings.supabase_key
logger.warning(
"Using anon key - database operations may fail due to RLS policies. "
"Add SUPABASE_SERVICE_ROLE_KEY environment variable for full functionality."
)
# Anon key client (RLS enforced, requires user context)
self.client = create_client(
settings.supabase_url,
api_key
)
logger.info("Supabase anon client initialised (RLS enforced)")
else:
logger.error("No Supabase API key found (need either SUPABASE_SERVICE_ROLE_KEY or SUPABASE_KEY)")
return
except Exception as e:
logger.error(f"Failed to initialise Supabase client: {e}")
logger.info("Running in demo mode without database")
else:
logger.info("Supabase credentials not configured - running in demo mode")
def is_connected(self) -> bool:
"""Check if database connection is active.
Returns:
True if connected, False otherwise
"""
return self.client is not None
def _string_to_uuid(self, string_id: str) -> str:
"""Convert string ID to deterministic UUID.
Uses Python UUID v5 generation for deterministic conversion.
This is faster than database-side generation and doesn't
require the uuid-ossp extension.
Args:
string_id: Human-readable ID (e.g., 'demo_20241119_105323')
Returns:
UUID string (deterministic, always same for same input)
Example:
>>> db = Database()
>>> db._string_to_uuid('demo_20241119_105323')
'...' # Same UUID every time for this input
"""
return string_to_uuid(string_id)
async def ensure_demo_user_exists(self) -> str:
"""Ensure demo user exists in database.
Returns:
Demo user ID if successful, None otherwise
"""
if not self.is_connected():
return None
demo_email = "demo@portfolio-intelligence.com"
try:
# Check if user exists by email
result = self.client.table('users').select('id,email').eq('email', demo_email).execute()
if result.data:
user_id = result.data[0]['id']
logger.info(f"Demo user already exists with ID: {user_id}")
return user_id
# Create auth user using admin API (Supabase will auto-generate ID)
try:
response = self.client.auth.admin.create_user({
"email": demo_email,
"password": "demo-password-changeme",
"email_confirm": True,
"user_metadata": {
"username": "demo-user",
"is_demo": True
}
})
if response.user:
user_id = response.user.id
logger.info(f"Created demo user via auth admin API with ID: {user_id}")
# Verify profile was created by trigger
result = self.client.table('users').select('id').eq('id', user_id).execute()
if result.data:
logger.info("Demo user profile created successfully")
return user_id
else:
logger.warning("Demo user created in auth but profile not found, waiting...")
# Sometimes there's a delay, try once more
import time
time.sleep(1)
result = self.client.table('users').select('id').eq('id', user_id).execute()
if result.data:
return user_id
logger.error("Demo user profile still not found after delay")
return None
except Exception as auth_error:
# User might already exist in auth.users
error_str = str(auth_error)
if "already exists" in error_str.lower() or "already registered" in error_str.lower():
logger.info("Demo user already exists in auth.users, fetching ID...")
result = self.client.table('users').select('id').eq('email', demo_email).execute()
if result.data:
return result.data[0]['id']
else:
logger.error(f"Failed to create auth user: {auth_error}")
return None
except Exception as e:
logger.error(f"Failed to ensure demo user exists: {e}")
return None
async def save_analysis(
self,
portfolio_id: str,
analysis_results: Dict[str, Any]
) -> bool:
"""Save portfolio analysis results to database.
Args:
portfolio_id: Portfolio ID
analysis_results: Complete analysis results from workflow
Returns:
True if saved successfully, False otherwise
"""
if not self.is_connected():
logger.warning("Database not connected, skipping save")
return False
try:
uuid_portfolio_id = self._string_to_uuid(portfolio_id)
# Convert all Decimal objects to string for JSON serialisation
# This preserves precision - float conversion loses pennies in large numbers
analysis_clean = decimal_to_json_safe(analysis_results)
data = {
'portfolio_id': uuid_portfolio_id,
'holdings_snapshot': analysis_clean.get('holdings', []),
'market_data': analysis_clean.get('market_data', {}),
'risk_metrics': analysis_clean.get('risk_analysis', {}),
'optimisation_results': analysis_clean.get('optimisation_results', {}),
'ai_synthesis': analysis_clean.get('ai_synthesis', ''),
'recommendations': analysis_clean.get('recommendations', []),
'reasoning_steps': analysis_clean.get('reasoning_steps', []),
'mcp_calls': analysis_clean.get('mcp_calls', []),
'sentiment_data': analysis_clean.get('sentiment_data', {}),
'execution_time_ms': analysis_clean.get('execution_time_ms'),
'model_version': analysis_clean.get('model_version', 'claude-sonnet-4-5'),
}
self.client.table('portfolio_analyses').insert(data).execute()
logger.info(f"Saved analysis for portfolio {portfolio_id}")
return True
except Exception as e:
logger.error(f"Failed to save analysis: {e}")
return False
async def save_portfolio(
self,
portfolio_id: str,
user_id: str,
name: str,
risk_tolerance: str = 'moderate'
) -> bool:
"""Save portfolio to database.
Args:
portfolio_id: Portfolio ID
user_id: User ID
name: Portfolio name
risk_tolerance: Risk tolerance level
Returns:
True if successful, False otherwise
"""
if not self.is_connected():
return False
try:
uuid_portfolio_id = self._string_to_uuid(portfolio_id)
self.client.table('portfolios').insert({
'id': uuid_portfolio_id,
'user_id': user_id,
'name': name,
'risk_tolerance': risk_tolerance
}).execute()
return True
except Exception as e:
logger.error(f"Failed to save portfolio: {e}")
return False
async def get_user_portfolios(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all saved portfolios for a user.
Args:
user_id: User ID
Returns:
List of portfolio dictionaries with id, name, and risk_tolerance
"""
if not self.is_connected():
return []
try:
result = self.client.table('portfolios')\
.select('id, name, risk_tolerance, created_at')\
.eq('user_id', user_id)\
.order('created_at', desc=True)\
.execute()
return result.data if result.data else []
except Exception as e:
logger.error(f"Failed to get user portfolios: {e}")
return []
async def get_portfolio_by_id(self, portfolio_id: str, user_id: str) -> Optional[Dict[str, Any]]:
"""Get a portfolio by ID with its holdings.
Args:
portfolio_id: Portfolio ID
user_id: User ID for authorisation check
Returns:
Portfolio dict with holdings, or None if not found/unauthorised
"""
if not self.is_connected():
return None
try:
# Get portfolio with holdings
result = self.client.table('portfolios')\
.select('*, portfolio_holdings(*)')\
.eq('id', portfolio_id)\
.eq('user_id', user_id)\
.execute()
if result.data and len(result.data) > 0:
return result.data[0]
return None
except Exception as e:
logger.error(f"Failed to get portfolio {portfolio_id}: {e}")
return None
async def get_analysis_history(
self,
user_id: str,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Get analysis history for all portfolios owned by a user.
Args:
user_id: User ID
limit: Maximum number of analyses to return
Returns:
List of analysis results
"""
if not self.is_connected():
return []
try:
# Join with portfolios table to filter by user_id
result = self.client.table('portfolio_analyses') \
.select('*, portfolios!inner(user_id, risk_tolerance)') \
.eq('portfolios.user_id', user_id) \
.order('analysis_date', desc=True) \
.limit(limit) \
.execute()
return result.data if result.data else []
except Exception as e:
logger.error(f"Failed to get analysis history: {e}")
return []
async def get_analysis_by_id(
self,
analysis_id: str,
user_id: str
) -> Optional[Dict[str, Any]]:
"""Get a specific analysis by ID with user authorisation check.
Args:
analysis_id: Analysis ID to retrieve
user_id: User ID for authorisation check
Returns:
Analysis data if found and authorised, None otherwise
"""
if not self.is_connected():
return None
try:
# Join with portfolios to ensure user owns this analysis
result = self.client.table('portfolio_analyses') \
.select('*, portfolios!inner(user_id, risk_tolerance, name)') \
.eq('id', analysis_id) \
.eq('portfolios.user_id', user_id) \
.execute()
if result.data:
return result.data[0]
else:
logger.warning(f"Analysis {analysis_id} not found or not authorised for user {user_id}")
return None
except Exception as e:
logger.error(f"Failed to get analysis by ID: {e}")
return None
async def save_portfolio_input(
self,
user_id: str,
portfolio_text: str
) -> bool:
"""Save portfolio input text for quick reload.
Automatically keeps only last 3 entries per user via database trigger.
Args:
user_id: User ID
portfolio_text: Raw portfolio input text
Returns:
True if saved successfully, False otherwise
"""
if not self.is_connected():
logger.warning("Database not connected, skipping portfolio input save")
return False
try:
self.client.table('portfolio_inputs').insert({
'user_id': user_id,
'description': portfolio_text
}).execute()
logger.info(f"Saved portfolio input for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to save portfolio input: {e}")
return False
async def get_portfolio_inputs(
self,
user_id: str,
limit: int = 3
) -> List[Dict[str, Any]]:
"""Get last N portfolio inputs for user.
Args:
user_id: User ID
limit: Maximum number to return (default 3)
Returns:
List of portfolio input dicts with id, description, created_at
"""
if not self.is_connected():
return []
try:
result = self.client.table('portfolio_inputs')\
.select('id, description, created_at')\
.eq('user_id', user_id)\
.order('created_at', desc=True)\
.limit(limit)\
.execute()
return result.data if result.data else []
except Exception as e:
logger.error(f"Failed to get portfolio inputs: {e}")
return []
# Global database instance
db = Database()