"""Supabase authentication module for Gradio app. Implements user authentication using Supabase Auth service with proper session management. """ import logging from typing import Optional, Dict, Any, Tuple from dataclasses import dataclass from datetime import datetime, timedelta import secrets import json from supabase import Client from backend.database import db from backend.config import settings logger = logging.getLogger(__name__) @dataclass class UserSession: """User session information.""" user_id: str email: str username: str access_token: str refresh_token: str expires_at: Optional[datetime] = None def to_dict(self) -> Dict[str, Any]: """Convert session to dictionary for Gradio state.""" return { "user_id": self.user_id, "email": self.email, "username": self.username, "access_token": self.access_token, "refresh_token": self.refresh_token, "expires_at": self.expires_at.isoformat() if self.expires_at else None, "is_demo": False # Authenticated users are not in demo mode } @classmethod def from_dict(cls, data: Dict[str, Any]) -> Optional["UserSession"]: """Create session from dictionary.""" if not data or not data.get("user_id"): return None expires_at = None if data.get("expires_at"): expires_at = datetime.fromisoformat(data["expires_at"]) return cls( user_id=data["user_id"], email=data["email"], username=data.get("username", data["email"].split("@")[0]), access_token=data.get("access_token", ""), refresh_token=data.get("refresh_token", ""), expires_at=expires_at ) class SupabaseAuth: """Supabase authentication manager for Gradio app.""" def __init__(self): """Initialise authentication manager.""" self.client: Optional[Client] = db.client self.demo_mode = not self.is_available() def is_available(self) -> bool: """Check if Supabase authentication is available.""" return self.client is not None async def signup( self, email: str, password: str, username: Optional[str] = None ) -> Tuple[bool, str, Optional[UserSession]]: """Create a new user account. Args: email: User email password: User password username: Optional username (defaults to email prefix) Returns: Tuple of (success, message, session) """ if self.demo_mode: return False, "Authentication not available in demo mode", None try: # Sign up with Supabase Auth response = self.client.auth.sign_up({ "email": email, "password": password, "options": { "data": { "username": username or email.split("@")[0] } } }) if response.user: # User profile created automatically by database trigger # Create session if email is confirmed (or confirmation disabled) if response.session: session = UserSession( user_id=response.user.id, email=response.user.email, username=response.user.user_metadata.get("username", email.split("@")[0]), access_token=response.session.access_token, refresh_token=response.session.refresh_token, expires_at=datetime.utcnow() + timedelta(seconds=response.session.expires_in) ) logger.info(f"User signed up successfully: {email}") return True, "Account created successfully! Please check your email to confirm.", session else: logger.info(f"User signed up, awaiting email confirmation: {email}") return True, "Account created! Please check your email to confirm before logging in.", None except Exception as e: error_msg = str(e) logger.error(f"Signup failed: {error_msg}") # Parse common Supabase errors if "User already registered" in error_msg: return False, "This email is already registered. Please sign in instead.", None elif "Password should be at least" in error_msg: return False, "Password must be at least 6 characters long.", None elif "Invalid email" in error_msg: return False, "Please provide a valid email address.", None else: return False, f"Signup failed: {error_msg}", None return False, "Signup failed", None async def request_password_reset(self, email: str) -> Tuple[bool, str]: """Request password reset email. Args: email: User email address Returns: Tuple of (success, message) """ if self.demo_mode: logger.warning("Password reset not available in demo mode") return False, "Password reset is not available in demo mode" try: # Use Supabase auth to send reset email with redirect back to app redirect_url = f"{settings.app_url}/" self.client.auth.reset_password_for_email( email, options={ "redirect_to": redirect_url } ) logger.info(f"Password reset requested for {email}, redirect to {redirect_url}") return True, "Email sent!" except Exception as e: error_msg = str(e) logger.error(f"Password reset failed: {error_msg}") # Handle rate limiting if "you can only request this after" in error_msg.lower(): return False, "❌ Please wait a minute before requesting another reset link" return False, f"❌ Failed to send reset email: {error_msg}" async def update_password(self, new_password: str, email: str, token_hash: str) -> Tuple[bool, str]: """Update user password after reset using token_hash verification. Args: new_password: New password to set email: User email address token_hash: Recovery token hash from email link Returns: Tuple of (success, message) """ if self.demo_mode: logger.warning("Password update not available in demo mode") return False, "Password update is not available in demo mode" try: # Verify the recovery token and establish session # Note: For recovery tokens, only token_hash and type are needed (email is embedded in token) verify_response = self.client.auth.verify_otp({ "token_hash": token_hash, "type": "recovery" }) if not verify_response.user: logger.error("Token verification failed - no user returned") return False, "❌ Invalid or expired reset link. Please request a new password reset." logger.info(f"Token verified for user {verify_response.user.id}") # Update password for authenticated user update_response = self.client.auth.update_user({ "password": new_password }) logger.info("Password updated successfully") return True, "✅ Password updated successfully. You can now sign in with your new password." except Exception as e: error_msg = str(e) logger.error(f"Password update failed: {error_msg}") # Handle specific errors if "expired" in error_msg.lower(): return False, "❌ Reset link has expired. Please request a new password reset." elif "invalid" in error_msg.lower(): return False, "❌ Invalid reset link. Please request a new password reset." else: return False, f"❌ Failed to update password: {error_msg}" async def login( self, email: str, password: str ) -> Tuple[bool, str, Optional[UserSession]]: """Log in a user. Args: email: User email password: User password Returns: Tuple of (success, message, session) """ if self.demo_mode: # Demo mode - return demo session with placeholder ID # Note: This should rarely be hit now that database is properly configured demo_session = UserSession( user_id="demo-user-local", # Local-only demo user email="demo@portfolio-intelligence.com", username="demo-user", access_token="demo-token", refresh_token="demo-refresh", expires_at=datetime.utcnow() + timedelta(hours=24) ) return True, "Logged in as demo user (Supabase not configured)", demo_session try: # Sign in with Supabase Auth response = self.client.auth.sign_in_with_password({ "email": email, "password": password }) if response.user and response.session: # Update last login in users table self.client.table('users').update({ 'updated_at': 'now()' }).eq('id', response.user.id).execute() # Create session session = UserSession( user_id=response.user.id, email=response.user.email, username=response.user.user_metadata.get("username", email.split("@")[0]), access_token=response.session.access_token, refresh_token=response.session.refresh_token, expires_at=datetime.utcnow() + timedelta(seconds=response.session.expires_in) ) logger.info(f"User logged in: {email}") return True, f"Welcome back, {session.username}!", session else: return False, "Invalid credentials", None except Exception as e: error_msg = str(e) logger.error(f"Login failed: {error_msg}") # Parse common Supabase errors if "Invalid login credentials" in error_msg: return False, "Invalid email or password. Please try again.", None elif "Email not confirmed" in error_msg: return False, "Please confirm your email before logging in.", None else: return False, f"Login failed: {error_msg}", None async def logout(self, session: Optional[UserSession]) -> Tuple[bool, str]: """Log out a user. Args: session: Current user session Returns: Tuple of (success, message) """ if self.demo_mode: return True, "Logged out successfully" try: # Sign out from Supabase self.client.auth.sign_out() logger.info("User logged out") return True, "Logged out successfully" except Exception as e: logger.error(f"Logout failed: {e}") return False, f"Logout failed: {str(e)}" async def get_user(self, access_token: str) -> Optional[Dict[str, Any]]: """Get current user from access token. Args: access_token: User's access token Returns: User data if valid, None otherwise """ if self.demo_mode: if access_token == "demo-token": return { "id": "00000000-0000-0000-0000-000000000001", "email": "demo@portfolio-intelligence.com", "username": "demo-user" } return None try: # Get user from Supabase using token response = self.client.auth.get_user(access_token) if response.user: return { "id": response.user.id, "email": response.user.email, "username": response.user.user_metadata.get("username", response.user.email.split("@")[0]) } except Exception as e: logger.error(f"Failed to get user: {e}") return None async def refresh_session(self, refresh_token: str) -> Optional[UserSession]: """Refresh an expired session. Args: refresh_token: Refresh token from expired session Returns: New session if successful, None otherwise """ if self.demo_mode: return None try: # Refresh session with Supabase response = self.client.auth.refresh_session(refresh_token) if response.user and response.session: return UserSession( user_id=response.user.id, email=response.user.email, username=response.user.user_metadata.get("username", response.user.email.split("@")[0]), access_token=response.session.access_token, refresh_token=response.session.refresh_token, expires_at=datetime.utcnow() + timedelta(seconds=response.session.expires_in) ) except Exception as e: logger.error(f"Session refresh failed: {e}") return None async def get_user_portfolios(self, user_id: str) -> list: """Get all portfolios for a user. Args: user_id: User ID Returns: List of portfolios with holdings """ if not self.is_available(): # Return demo portfolios return [ { "id": "demo-portfolio-1", "name": "Demo Portfolio", "risk_tolerance": "moderate", "holdings": [] } ] try: result = self.client.table('portfolios') \ .select('*, portfolio_holdings(*)') \ .eq('user_id', user_id) \ .order('created_at', desc=True) \ .execute() return result.data if result.data else [] except Exception as e: logger.error(f"Failed to get portfolios: {e}") return [] async def save_portfolio( self, user_id: str, name: str, holdings: list, risk_tolerance: str = "moderate" ) -> Tuple[bool, str, Optional[str]]: """Save a portfolio for a user. Args: user_id: User ID name: Portfolio name holdings: List of holdings risk_tolerance: Risk tolerance level Returns: Tuple of (success, message, portfolio_id) """ if self.demo_mode: return True, "Portfolio saved (demo mode)", f"demo-portfolio-{secrets.token_hex(4)}" try: # Create portfolio portfolio_data = { 'user_id': user_id, 'name': name, 'risk_tolerance': risk_tolerance } portfolio_result = self.client.table('portfolios').insert(portfolio_data).execute() if portfolio_result.data: portfolio_id = portfolio_result.data[0]['id'] # Add holdings if holdings: holdings_data = [ { 'portfolio_id': portfolio_id, 'ticker': h.get('ticker'), 'quantity': h.get('quantity'), 'cost_basis': h.get('cost_basis'), 'asset_type': h.get('asset_type', 'stock') } for h in holdings ] self.client.table('portfolio_holdings').insert(holdings_data).execute() logger.info(f"Portfolio saved: {portfolio_id}") return True, "Portfolio saved successfully!", portfolio_id except Exception as e: logger.error(f"Failed to save portfolio: {e}") return False, f"Failed to save portfolio: {str(e)}", None return False, "Failed to save portfolio", None # Global auth instance auth = SupabaseAuth()