Spaces:
Running
on
Zero
Running
on
Zero
| """Supabase authentication module for Gradio app. | |
| Implements user authentication using Supabase Auth service with proper session management. | |
| """ | |
| import logging | |
| from typing import Optional, Dict, Any, Tuple | |
| from dataclasses import dataclass | |
| from datetime import datetime, timedelta | |
| import secrets | |
| import json | |
| from supabase import Client | |
| from backend.database import db | |
| from backend.config import settings | |
| logger = logging.getLogger(__name__) | |
| class UserSession: | |
| """User session information.""" | |
| user_id: str | |
| email: str | |
| username: str | |
| access_token: str | |
| refresh_token: str | |
| expires_at: Optional[datetime] = None | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert session to dictionary for Gradio state.""" | |
| return { | |
| "user_id": self.user_id, | |
| "email": self.email, | |
| "username": self.username, | |
| "access_token": self.access_token, | |
| "refresh_token": self.refresh_token, | |
| "expires_at": self.expires_at.isoformat() if self.expires_at else None, | |
| "is_demo": False # Authenticated users are not in demo mode | |
| } | |
| def from_dict(cls, data: Dict[str, Any]) -> Optional["UserSession"]: | |
| """Create session from dictionary.""" | |
| if not data or not data.get("user_id"): | |
| return None | |
| expires_at = None | |
| if data.get("expires_at"): | |
| expires_at = datetime.fromisoformat(data["expires_at"]) | |
| return cls( | |
| user_id=data["user_id"], | |
| email=data["email"], | |
| username=data.get("username", data["email"].split("@")[0]), | |
| access_token=data.get("access_token", ""), | |
| refresh_token=data.get("refresh_token", ""), | |
| expires_at=expires_at | |
| ) | |
| class SupabaseAuth: | |
| """Supabase authentication manager for Gradio app.""" | |
| def __init__(self): | |
| """Initialise authentication manager.""" | |
| self.client: Optional[Client] = db.client | |
| self.demo_mode = not self.is_available() | |
| def is_available(self) -> bool: | |
| """Check if Supabase authentication is available.""" | |
| return self.client is not None | |
| async def signup( | |
| self, | |
| email: str, | |
| password: str, | |
| username: Optional[str] = None | |
| ) -> Tuple[bool, str, Optional[UserSession]]: | |
| """Create a new user account. | |
| Args: | |
| email: User email | |
| password: User password | |
| username: Optional username (defaults to email prefix) | |
| Returns: | |
| Tuple of (success, message, session) | |
| """ | |
| if self.demo_mode: | |
| return False, "Authentication not available in demo mode", None | |
| try: | |
| # Sign up with Supabase Auth | |
| response = self.client.auth.sign_up({ | |
| "email": email, | |
| "password": password, | |
| "options": { | |
| "data": { | |
| "username": username or email.split("@")[0] | |
| } | |
| } | |
| }) | |
| if response.user: | |
| # User profile created automatically by database trigger | |
| # Create session if email is confirmed (or confirmation disabled) | |
| if response.session: | |
| session = UserSession( | |
| user_id=response.user.id, | |
| email=response.user.email, | |
| username=response.user.user_metadata.get("username", email.split("@")[0]), | |
| access_token=response.session.access_token, | |
| refresh_token=response.session.refresh_token, | |
| expires_at=datetime.utcnow() + timedelta(seconds=response.session.expires_in) | |
| ) | |
| logger.info(f"User signed up successfully: {email}") | |
| return True, "Account created successfully! Please check your email to confirm.", session | |
| else: | |
| logger.info(f"User signed up, awaiting email confirmation: {email}") | |
| return True, "Account created! Please check your email to confirm before logging in.", None | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Signup failed: {error_msg}") | |
| # Parse common Supabase errors | |
| if "User already registered" in error_msg: | |
| return False, "This email is already registered. Please sign in instead.", None | |
| elif "Password should be at least" in error_msg: | |
| return False, "Password must be at least 6 characters long.", None | |
| elif "Invalid email" in error_msg: | |
| return False, "Please provide a valid email address.", None | |
| else: | |
| return False, f"Signup failed: {error_msg}", None | |
| return False, "Signup failed", None | |
| async def request_password_reset(self, email: str) -> Tuple[bool, str]: | |
| """Request password reset email. | |
| Args: | |
| email: User email address | |
| Returns: | |
| Tuple of (success, message) | |
| """ | |
| if self.demo_mode: | |
| logger.warning("Password reset not available in demo mode") | |
| return False, "Password reset is not available in demo mode" | |
| try: | |
| # Use Supabase auth to send reset email with redirect back to app | |
| redirect_url = f"{settings.app_url}/" | |
| self.client.auth.reset_password_for_email( | |
| email, | |
| options={ | |
| "redirect_to": redirect_url | |
| } | |
| ) | |
| logger.info(f"Password reset requested for {email}, redirect to {redirect_url}") | |
| return True, "Email sent!" | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Password reset failed: {error_msg}") | |
| # Handle rate limiting | |
| if "you can only request this after" in error_msg.lower(): | |
| return False, "❌ Please wait a minute before requesting another reset link" | |
| return False, f"❌ Failed to send reset email: {error_msg}" | |
| async def update_password(self, new_password: str, email: str, token_hash: str) -> Tuple[bool, str]: | |
| """Update user password after reset using token_hash verification. | |
| Args: | |
| new_password: New password to set | |
| email: User email address | |
| token_hash: Recovery token hash from email link | |
| Returns: | |
| Tuple of (success, message) | |
| """ | |
| if self.demo_mode: | |
| logger.warning("Password update not available in demo mode") | |
| return False, "Password update is not available in demo mode" | |
| try: | |
| # Verify the recovery token and establish session | |
| # Note: For recovery tokens, only token_hash and type are needed (email is embedded in token) | |
| verify_response = self.client.auth.verify_otp({ | |
| "token_hash": token_hash, | |
| "type": "recovery" | |
| }) | |
| if not verify_response.user: | |
| logger.error("Token verification failed - no user returned") | |
| return False, "❌ Invalid or expired reset link. Please request a new password reset." | |
| logger.info(f"Token verified for user {verify_response.user.id}") | |
| # Update password for authenticated user | |
| update_response = self.client.auth.update_user({ | |
| "password": new_password | |
| }) | |
| logger.info("Password updated successfully") | |
| return True, "✅ Password updated successfully. You can now sign in with your new password." | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Password update failed: {error_msg}") | |
| # Handle specific errors | |
| if "expired" in error_msg.lower(): | |
| return False, "❌ Reset link has expired. Please request a new password reset." | |
| elif "invalid" in error_msg.lower(): | |
| return False, "❌ Invalid reset link. Please request a new password reset." | |
| else: | |
| return False, f"❌ Failed to update password: {error_msg}" | |
| async def login( | |
| self, | |
| email: str, | |
| password: str | |
| ) -> Tuple[bool, str, Optional[UserSession]]: | |
| """Log in a user. | |
| Args: | |
| email: User email | |
| password: User password | |
| Returns: | |
| Tuple of (success, message, session) | |
| """ | |
| if self.demo_mode: | |
| # Demo mode - return demo session with placeholder ID | |
| # Note: This should rarely be hit now that database is properly configured | |
| demo_session = UserSession( | |
| user_id="demo-user-local", # Local-only demo user | |
| email="demo@portfolio-intelligence.com", | |
| username="demo-user", | |
| access_token="demo-token", | |
| refresh_token="demo-refresh", | |
| expires_at=datetime.utcnow() + timedelta(hours=24) | |
| ) | |
| return True, "Logged in as demo user (Supabase not configured)", demo_session | |
| try: | |
| # Sign in with Supabase Auth | |
| response = self.client.auth.sign_in_with_password({ | |
| "email": email, | |
| "password": password | |
| }) | |
| if response.user and response.session: | |
| # Update last login in users table | |
| self.client.table('users').update({ | |
| 'updated_at': 'now()' | |
| }).eq('id', response.user.id).execute() | |
| # Create session | |
| session = UserSession( | |
| user_id=response.user.id, | |
| email=response.user.email, | |
| username=response.user.user_metadata.get("username", email.split("@")[0]), | |
| access_token=response.session.access_token, | |
| refresh_token=response.session.refresh_token, | |
| expires_at=datetime.utcnow() + timedelta(seconds=response.session.expires_in) | |
| ) | |
| logger.info(f"User logged in: {email}") | |
| return True, f"Welcome back, {session.username}!", session | |
| else: | |
| return False, "Invalid credentials", None | |
| except Exception as e: | |
| error_msg = str(e) | |
| logger.error(f"Login failed: {error_msg}") | |
| # Parse common Supabase errors | |
| if "Invalid login credentials" in error_msg: | |
| return False, "Invalid email or password. Please try again.", None | |
| elif "Email not confirmed" in error_msg: | |
| return False, "Please confirm your email before logging in.", None | |
| else: | |
| return False, f"Login failed: {error_msg}", None | |
| async def logout(self, session: Optional[UserSession]) -> Tuple[bool, str]: | |
| """Log out a user. | |
| Args: | |
| session: Current user session | |
| Returns: | |
| Tuple of (success, message) | |
| """ | |
| if self.demo_mode: | |
| return True, "Logged out successfully" | |
| try: | |
| # Sign out from Supabase | |
| self.client.auth.sign_out() | |
| logger.info("User logged out") | |
| return True, "Logged out successfully" | |
| except Exception as e: | |
| logger.error(f"Logout failed: {e}") | |
| return False, f"Logout failed: {str(e)}" | |
| async def get_user(self, access_token: str) -> Optional[Dict[str, Any]]: | |
| """Get current user from access token. | |
| Args: | |
| access_token: User's access token | |
| Returns: | |
| User data if valid, None otherwise | |
| """ | |
| if self.demo_mode: | |
| if access_token == "demo-token": | |
| return { | |
| "id": "00000000-0000-0000-0000-000000000001", | |
| "email": "demo@portfolio-intelligence.com", | |
| "username": "demo-user" | |
| } | |
| return None | |
| try: | |
| # Get user from Supabase using token | |
| response = self.client.auth.get_user(access_token) | |
| if response.user: | |
| return { | |
| "id": response.user.id, | |
| "email": response.user.email, | |
| "username": response.user.user_metadata.get("username", response.user.email.split("@")[0]) | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get user: {e}") | |
| return None | |
| async def refresh_session(self, refresh_token: str) -> Optional[UserSession]: | |
| """Refresh an expired session. | |
| Args: | |
| refresh_token: Refresh token from expired session | |
| Returns: | |
| New session if successful, None otherwise | |
| """ | |
| if self.demo_mode: | |
| return None | |
| try: | |
| # Refresh session with Supabase | |
| response = self.client.auth.refresh_session(refresh_token) | |
| if response.user and response.session: | |
| return UserSession( | |
| user_id=response.user.id, | |
| email=response.user.email, | |
| username=response.user.user_metadata.get("username", response.user.email.split("@")[0]), | |
| access_token=response.session.access_token, | |
| refresh_token=response.session.refresh_token, | |
| expires_at=datetime.utcnow() + timedelta(seconds=response.session.expires_in) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Session refresh failed: {e}") | |
| return None | |
| async def get_user_portfolios(self, user_id: str) -> list: | |
| """Get all portfolios for a user. | |
| Args: | |
| user_id: User ID | |
| Returns: | |
| List of portfolios with holdings | |
| """ | |
| if not self.is_available(): | |
| # Return demo portfolios | |
| return [ | |
| { | |
| "id": "demo-portfolio-1", | |
| "name": "Demo Portfolio", | |
| "risk_tolerance": "moderate", | |
| "holdings": [] | |
| } | |
| ] | |
| try: | |
| result = self.client.table('portfolios') \ | |
| .select('*, portfolio_holdings(*)') \ | |
| .eq('user_id', user_id) \ | |
| .order('created_at', desc=True) \ | |
| .execute() | |
| return result.data if result.data else [] | |
| except Exception as e: | |
| logger.error(f"Failed to get portfolios: {e}") | |
| return [] | |
| async def save_portfolio( | |
| self, | |
| user_id: str, | |
| name: str, | |
| holdings: list, | |
| risk_tolerance: str = "moderate" | |
| ) -> Tuple[bool, str, Optional[str]]: | |
| """Save a portfolio for a user. | |
| Args: | |
| user_id: User ID | |
| name: Portfolio name | |
| holdings: List of holdings | |
| risk_tolerance: Risk tolerance level | |
| Returns: | |
| Tuple of (success, message, portfolio_id) | |
| """ | |
| if self.demo_mode: | |
| return True, "Portfolio saved (demo mode)", f"demo-portfolio-{secrets.token_hex(4)}" | |
| try: | |
| # Create portfolio | |
| portfolio_data = { | |
| 'user_id': user_id, | |
| 'name': name, | |
| 'risk_tolerance': risk_tolerance | |
| } | |
| portfolio_result = self.client.table('portfolios').insert(portfolio_data).execute() | |
| if portfolio_result.data: | |
| portfolio_id = portfolio_result.data[0]['id'] | |
| # Add holdings | |
| if holdings: | |
| holdings_data = [ | |
| { | |
| 'portfolio_id': portfolio_id, | |
| 'ticker': h.get('ticker'), | |
| 'quantity': h.get('quantity'), | |
| 'cost_basis': h.get('cost_basis'), | |
| 'asset_type': h.get('asset_type', 'stock') | |
| } | |
| for h in holdings | |
| ] | |
| self.client.table('portfolio_holdings').insert(holdings_data).execute() | |
| logger.info(f"Portfolio saved: {portfolio_id}") | |
| return True, "Portfolio saved successfully!", portfolio_id | |
| except Exception as e: | |
| logger.error(f"Failed to save portfolio: {e}") | |
| return False, f"Failed to save portfolio: {str(e)}", None | |
| return False, "Failed to save portfolio", None | |
| # Global auth instance | |
| auth = SupabaseAuth() |