"""Risk Analyzer MCP Server. This MCP server provides comprehensive risk analysis including: - Value at Risk (VaR) - Conditional Value at Risk (CVaR) - Monte Carlo simulation - Portfolio risk metrics - GARCH volatility forecasting (P1 Feature) - Advanced performance metrics: Information Ratio, Calmar Ratio, Ulcer Index Based on modern portfolio theory and validated risk models. """ import logging from typing import Dict, List, Optional from decimal import Decimal import numpy as np import pandas as pd from scipy import stats from fastmcp import FastMCP from pydantic import BaseModel, Field from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("risk-analyzer") # Try to import arch for GARCH, but make it optional try: from arch import arch_model GARCH_AVAILABLE = True except ImportError: GARCH_AVAILABLE = False logger.warning("arch library not available - GARCH forecasting disabled") class PortfolioInput(BaseModel): """Portfolio for risk analysis.""" ticker: str weight: Decimal = Field(..., ge=0, le=1, description="Portfolio weight") prices: List[Decimal] = Field(..., description="Historical close prices") class BenchmarkInput(BaseModel): """Benchmark for risk analysis.""" ticker: str = Field(..., description="Benchmark ticker (e.g., SPY, ^GSPC)") prices: List[Decimal] = Field(..., description="Historical close prices") class RiskAnalysisRequest(BaseModel): """Request for risk analysis.""" portfolio: List[PortfolioInput] = Field(..., min_length=1) portfolio_value: Decimal = Field(..., gt=0, description="Total portfolio value") confidence_level: Decimal = Field(default=Decimal("0.95"), ge=0, le=1) time_horizon: int = Field(default=1, ge=1, le=252, description="Days") method: str = Field(default="historical", description="historical, parametric, or monte_carlo") num_simulations: int = Field(default=10000, ge=1000, le=100000) benchmark: Optional[BenchmarkInput] = Field(default=None, description="Optional benchmark for Information Ratio") class VaRResult(BaseModel): """Value at Risk result.""" var_absolute: Decimal = Field(..., description="VaR in currency") var_percentage: Decimal = Field(..., description="VaR as percentage") confidence_level: Decimal time_horizon: int method: str class CVaRResult(BaseModel): """Conditional Value at Risk result.""" cvar_absolute: Decimal = Field(..., description="CVaR in currency") cvar_percentage: Decimal = Field(..., description="CVaR as percentage") confidence_level: Decimal interpretation: str class RiskMetrics(BaseModel): """Comprehensive risk metrics.""" volatility_annual: Decimal sharpe_ratio: Optional[Decimal] = None sortino_ratio: Optional[Decimal] = None max_drawdown: Decimal beta: Optional[Decimal] = None correlation_matrix: Optional[Dict[str, Dict[str, Decimal]]] = None # Advanced Performance Metrics (P1 Feature) information_ratio: Optional[Decimal] = None calmar_ratio: Optional[Decimal] = None ulcer_index: Optional[Decimal] = None class RiskAnalysisResult(BaseModel): """Complete risk analysis result.""" var_95: VaRResult var_99: VaRResult cvar_95: CVaRResult cvar_99: CVaRResult risk_metrics: RiskMetrics simulation_percentiles: Optional[Dict[str, Decimal]] = None def _calculate_returns(prices: pd.DataFrame) -> pd.DataFrame: """Calculate returns from prices.""" return prices.pct_change().dropna() def _calculate_portfolio_returns(returns: pd.DataFrame, weights: np.ndarray) -> pd.Series: """Calculate portfolio returns given asset returns and weights.""" return (returns * weights).sum(axis=1) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def analyze_risk(request: RiskAnalysisRequest) -> RiskAnalysisResult: """Perform comprehensive risk analysis on a portfolio. Args: request: Risk analysis request including optional benchmark Returns: Complete risk analysis with VaR, CVaR, and metrics Example: >>> await analyze_risk(RiskAnalysisRequest(portfolio=[...], portfolio_value=100000)) Note: When a benchmark is provided, the Information Ratio will be calculated to measure risk-adjusted returns relative to the benchmark. """ logger.info(f"Analyzing risk for portfolio of {len(request.portfolio)} assets") if request.benchmark: logger.info(f"Benchmark provided: {request.benchmark.ticker}") try: # Phase 1: Prepare portfolio data price_data = {} weights = [] tickers = [] for holding in request.portfolio: price_data[holding.ticker] = [float(p) for p in holding.prices] weights.append(float(holding.weight)) tickers.append(holding.ticker) prices = pd.DataFrame(price_data) returns = _calculate_returns(prices) weights_array = np.array(weights) # Calculate portfolio returns portfolio_returns = _calculate_portfolio_returns(returns, weights_array) # Phase 1.5: Prepare benchmark data if provided benchmark_returns = None if request.benchmark: try: benchmark_prices = pd.Series([float(p) for p in request.benchmark.prices]) benchmark_returns = benchmark_prices.pct_change().dropna() # Align benchmark returns with portfolio returns min_len = min(len(portfolio_returns), len(benchmark_returns)) if min_len < len(portfolio_returns): logger.warning( f"Benchmark has fewer data points ({len(benchmark_returns)}) " f"than portfolio ({len(portfolio_returns)}). Aligning to {min_len} points." ) portfolio_returns = portfolio_returns.iloc[-min_len:] returns = returns.iloc[-min_len:] prices = prices.iloc[-min_len:] elif min_len < len(benchmark_returns): logger.warning( f"Portfolio has fewer data points ({len(portfolio_returns)}) " f"than benchmark ({len(benchmark_returns)}). Aligning to {min_len} points." ) benchmark_returns = benchmark_returns.iloc[-min_len:] logger.info( f"Aligned portfolio and benchmark returns to {min_len} data points" ) except Exception as e: logger.error(f"Error processing benchmark data: {e}") logger.warning("Continuing without benchmark - Information Ratio will be None") benchmark_returns = None # Calculate VaR and CVaR if request.method == "historical": var_95, cvar_95 = _historical_var_cvar(portfolio_returns, 0.95, request.time_horizon) var_99, cvar_99 = _historical_var_cvar(portfolio_returns, 0.99, request.time_horizon) elif request.method == "parametric": var_95, cvar_95 = _parametric_var_cvar(portfolio_returns, 0.95, request.time_horizon) var_99, cvar_99 = _parametric_var_cvar(portfolio_returns, 0.99, request.time_horizon) elif request.method == "monte_carlo": var_95, cvar_95, var_99, cvar_99, percentiles = _monte_carlo_var_cvar( returns, weights_array, 0.95, 0.99, request.time_horizon, request.num_simulations ) else: raise ValueError(f"Unknown method: {request.method}") # Phase 2: Calculate risk metrics (with benchmark and prices for Ulcer Index) risk_metrics = _calculate_risk_metrics( returns, weights_array, portfolio_returns, benchmark_returns=benchmark_returns, prices=prices ) # Convert to currency amounts portfolio_val = float(request.portfolio_value) # Create results var_95_result = VaRResult( var_absolute=Decimal(str(abs(var_95 * portfolio_val))), var_percentage=Decimal(str(abs(var_95 * 100))), confidence_level=Decimal("0.95"), time_horizon=request.time_horizon, method=request.method, ) var_99_result = VaRResult( var_absolute=Decimal(str(abs(var_99 * portfolio_val))), var_percentage=Decimal(str(abs(var_99 * 100))), confidence_level=Decimal("0.99"), time_horizon=request.time_horizon, method=request.method, ) cvar_95_result = CVaRResult( cvar_absolute=Decimal(str(abs(cvar_95 * portfolio_val))), cvar_percentage=Decimal(str(abs(cvar_95 * 100))), confidence_level=Decimal("0.95"), interpretation=f"Expected loss in worst 5% of cases: ${abs(cvar_95 * portfolio_val):,.2f}", ) cvar_99_result = CVaRResult( cvar_absolute=Decimal(str(abs(cvar_99 * portfolio_val))), cvar_percentage=Decimal(str(abs(cvar_99 * 100))), confidence_level=Decimal("0.99"), interpretation=f"Expected loss in worst 1% of cases: ${abs(cvar_99 * portfolio_val):,.2f}", ) result = RiskAnalysisResult( var_95=var_95_result, var_99=var_99_result, cvar_95=cvar_95_result, cvar_99=cvar_99_result, risk_metrics=risk_metrics, ) if request.method == "monte_carlo" and percentiles: result.simulation_percentiles = { k: Decimal(str(v)) for k, v in percentiles.items() } logger.info(f"Risk analysis complete: VaR95={var_95_result.var_absolute}") return result except Exception as e: logger.error(f"Error in risk analysis: {e}") raise def _historical_var_cvar(returns: pd.Series, confidence: float, horizon: int) -> tuple: """Calculate VaR and CVaR using historical method.""" # Scale returns for time horizon scaled_returns = returns * np.sqrt(horizon) # Calculate VaR as percentile var = np.percentile(scaled_returns, (1 - confidence) * 100) # Calculate CVaR as mean of returns below VaR cvar = scaled_returns[scaled_returns <= var].mean() return var, cvar def _parametric_var_cvar(returns: pd.Series, confidence: float, horizon: int) -> tuple: """Calculate VaR and CVaR using parametric (normal distribution) method.""" mu = returns.mean() sigma = returns.std() # Scale for time horizon mu_scaled = mu * horizon sigma_scaled = sigma * np.sqrt(horizon) # Calculate VaR using normal distribution z_score = stats.norm.ppf(1 - confidence) var = mu_scaled + z_score * sigma_scaled # Calculate CVaR analytically for normal distribution # CVaR = μ - σ * φ(z) / (1 - confidence) # where φ is the PDF of standard normal phi_z = stats.norm.pdf(z_score) cvar = mu_scaled - sigma_scaled * phi_z / (1 - confidence) return var, cvar def _monte_carlo_var_cvar( returns: pd.DataFrame, weights: np.ndarray, confidence_95: float, confidence_99: float, horizon: int, num_simulations: int ) -> tuple: """Calculate VaR and CVaR using Monte Carlo simulation.""" # Calculate mean returns and covariance matrix mean_returns = returns.mean().values cov_matrix = returns.cov().values # Run simulations simulated_returns = np.random.multivariate_normal( mean_returns * horizon, cov_matrix * horizon, num_simulations ) # Calculate portfolio returns for each simulation portfolio_sim_returns = simulated_returns @ weights # Calculate VaR and CVaR var_95 = np.percentile(portfolio_sim_returns, (1 - confidence_95) * 100) var_99 = np.percentile(portfolio_sim_returns, (1 - confidence_99) * 100) cvar_95 = portfolio_sim_returns[portfolio_sim_returns <= var_95].mean() cvar_99 = portfolio_sim_returns[portfolio_sim_returns <= var_99].mean() # Calculate additional percentiles percentiles = { "p5": float(np.percentile(portfolio_sim_returns, 5)), "p25": float(np.percentile(portfolio_sim_returns, 25)), "p50": float(np.percentile(portfolio_sim_returns, 50)), "p75": float(np.percentile(portfolio_sim_returns, 75)), "p95": float(np.percentile(portfolio_sim_returns, 95)), } return var_95, cvar_95, var_99, cvar_99, percentiles def _calculate_ulcer_index(prices: pd.Series, lookback_period: int = 14) -> float: """Calculate Ulcer Index measuring downside volatility. Args: prices: Price series (NOT returns) lookback_period: Number of periods for rolling calculation Returns: Ulcer Index value """ # Calculate rolling maximum rolling_max = prices.rolling(window=lookback_period, min_periods=1).max() # Calculate percentage drawdown pct_drawdown = ((prices - rolling_max) / rolling_max) * 100 # Clip to only negative values (drawdowns) pct_drawdown = pct_drawdown.clip(upper=0) # Square the drawdowns squared_drawdown = pct_drawdown ** 2 # Calculate rolling sum of squared drawdowns sum_squared = squared_drawdown.rolling(window=lookback_period, min_periods=1).sum() # Calculate squared average squared_avg = sum_squared / lookback_period # Take square root to get Ulcer Index ulcer_index = np.sqrt(squared_avg) return float(ulcer_index.iloc[-1]) if len(ulcer_index) > 0 else 0.0 def _calculate_calmar_ratio(returns: pd.Series, risk_free_rate: float = 0.02) -> float: """Calculate Calmar Ratio (return vs maximum drawdown). Args: returns: Portfolio returns series risk_free_rate: Annualised risk-free rate Returns: Calmar Ratio """ # Calculate CAGR cumulative_return = (1 + returns).prod() n_years = len(returns) / 252 # Assuming daily returns if n_years == 0: return 0.0 cagr = (cumulative_return ** (1 / n_years)) - 1 # Calculate maximum drawdown cumulative_returns = (1 + returns).cumprod() running_max = cumulative_returns.cummax() drawdown = (cumulative_returns - running_max) / running_max max_drawdown = abs(drawdown.min()) if max_drawdown == 0: return 0.0 calmar_ratio = (cagr - risk_free_rate) / max_drawdown return float(calmar_ratio) def _calculate_information_ratio( portfolio_returns: pd.Series, benchmark_returns: Optional[pd.Series] = None, periods_per_year: int = 252 ) -> Optional[float]: """Calculate Information Ratio vs benchmark. Args: portfolio_returns: Portfolio returns series benchmark_returns: Benchmark returns (optional) periods_per_year: Periods per year for annualisation Returns: Information Ratio or None if no benchmark or calculation error """ if benchmark_returns is None or len(benchmark_returns) == 0: return None # Align series if len(benchmark_returns) != len(portfolio_returns): return None # Calculate excess returns excess_returns = portfolio_returns - benchmark_returns # Check for valid data if len(excess_returns) < 2: return None # Annualise excess return (mean) annualised_excess_return = excess_returns.mean() * periods_per_year # Calculate tracking error (std dev of excess returns, annualised) tracking_error = excess_returns.std(ddof=1) * np.sqrt(periods_per_year) # Handle edge cases if tracking_error == 0 or np.isnan(tracking_error) or np.isinf(tracking_error): return None information_ratio = annualised_excess_return / tracking_error # Check for NaN or Inf in result if np.isnan(information_ratio) or np.isinf(information_ratio): return None return float(information_ratio) def _calculate_risk_metrics( returns: pd.DataFrame, weights: np.ndarray, portfolio_returns: pd.Series, benchmark_returns: Optional[pd.Series] = None, prices: Optional[pd.DataFrame] = None ) -> RiskMetrics: """Calculate comprehensive risk metrics. Args: returns: Asset returns DataFrame weights: Portfolio weights array portfolio_returns: Portfolio returns series benchmark_returns: Optional benchmark returns for Information Ratio prices: Optional price data for Ulcer Index Returns: RiskMetrics with all available metrics """ # Annualised volatility volatility_daily = portfolio_returns.std() volatility_annual = volatility_daily * np.sqrt(252) # Sharpe ratio (assuming 2% risk-free rate) risk_free_rate = 0.02 mean_return = portfolio_returns.mean() * 252 sharpe = (mean_return - risk_free_rate) / volatility_annual if volatility_annual > 0 else 0 # Sortino ratio (downside deviation) downside_returns = portfolio_returns[portfolio_returns < 0] downside_std = downside_returns.std() * np.sqrt(252) if len(downside_returns) > 0 else volatility_annual sortino = (mean_return - risk_free_rate) / downside_std if downside_std > 0 else 0 # Max drawdown cumulative_returns = (1 + portfolio_returns).cumprod() running_max = cumulative_returns.cummax() drawdown = (cumulative_returns - running_max) / running_max max_drawdown = drawdown.min() # Correlation matrix corr_matrix = returns.corr() correlation_dict = {} for ticker1 in corr_matrix.columns: correlation_dict[ticker1] = {} for ticker2 in corr_matrix.columns: correlation_dict[ticker1][ticker2] = Decimal(str(corr_matrix.loc[ticker1, ticker2])) # Advanced Performance Metrics (P1 Feature) calmar_ratio = _calculate_calmar_ratio(portfolio_returns, risk_free_rate) information_ratio = _calculate_information_ratio(portfolio_returns, benchmark_returns) # Ulcer Index (requires price data) ulcer_index = None if prices is not None and len(prices.columns) > 0: # Calculate portfolio prices from returns portfolio_prices = (1 + portfolio_returns).cumprod() ulcer_index = _calculate_ulcer_index(portfolio_prices) return RiskMetrics( volatility_annual=Decimal(str(volatility_annual)), sharpe_ratio=Decimal(str(sharpe)), sortino_ratio=Decimal(str(sortino)), max_drawdown=Decimal(str(max_drawdown)), correlation_matrix=correlation_dict, information_ratio=Decimal(str(information_ratio)) if information_ratio is not None else None, calmar_ratio=Decimal(str(calmar_ratio)), ulcer_index=Decimal(str(ulcer_index)) if ulcer_index is not None else None, ) class GARCHForecastRequest(BaseModel): """Request for GARCH volatility forecasting.""" ticker: str returns: List[Decimal] = Field(..., description="Historical returns (percentage)") forecast_horizon: int = Field(default=30, ge=1, le=90, description="Days to forecast") garch_p: int = Field(default=1, ge=1, le=5, description="GARCH lag order") garch_q: int = Field(default=1, ge=1, le=5, description="ARCH lag order") class GARCHForecastResult(BaseModel): """GARCH volatility forecast result.""" ticker: str model: str persistence: Decimal forecast_volatility: Dict[int, Decimal] = Field(..., description="Day -> volatility forecast") annualised_volatility: Dict[int, Decimal] = Field(..., description="Day -> annualised vol") model_diagnostics: Dict[str, Decimal] @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def forecast_volatility_garch(request: GARCHForecastRequest) -> GARCHForecastResult: """Forecast volatility using GARCH model. GARCH (Generalized Autoregressive Conditional Heteroskedasticity) models are the industry standard for financial volatility forecasting. This tool uses GARCH(1,1) which research shows is remarkably robust and difficult to outperform with more complex specifications. Args: request: GARCH forecast request with returns data Returns: Volatility forecasts with model diagnostics Example: >>> await forecast_volatility_garch( ... GARCHForecastRequest( ... ticker="AAPL", ... returns=[0.01, -0.02, 0.015, ...], ... forecast_horizon=30 ... ) ... ) """ if not GARCH_AVAILABLE: raise RuntimeError( "GARCH forecasting requires the 'arch' library. " "Install with: uv pip install arch" ) logger.info(f"Forecasting volatility for {request.ticker} with GARCH({request.garch_p},{request.garch_q})") try: # Convert returns to numpy array (percentages) returns_array = np.array([float(r) for r in request.returns]) # Fit GARCH model am = arch_model( returns_array, mean='Constant', vol='GARCH', p=request.garch_p, q=request.garch_q, dist='t' # Student's t distribution (better for fat tails) ) res = am.fit(disp='off', options={'maxiter': 1000}) # Extract parameters omega = float(res.params.get('omega', 0)) alpha = float(res.params.get(f'alpha[{request.garch_q}]', 0)) beta = float(res.params.get(f'beta[{request.garch_p}]', 0)) persistence = alpha + beta # Generate forecasts forecast = res.forecast(horizon=request.forecast_horizon) # Extract variance forecasts variance_forecasts = {} annualised_vol_forecasts = {} for day in range(1, request.forecast_horizon + 1): # Variance forecast var_forecast = float(forecast.variance.iloc[-1, day - 1]) vol_forecast = np.sqrt(var_forecast) # Annualise (assuming daily returns) annualised_vol = vol_forecast * np.sqrt(252) variance_forecasts[day] = Decimal(str(vol_forecast)) annualised_vol_forecasts[day] = Decimal(str(annualised_vol)) # Model diagnostics diagnostics = { 'omega': Decimal(str(omega)), 'alpha': Decimal(str(alpha)), 'beta': Decimal(str(beta)), 'persistence': Decimal(str(persistence)), 'log_likelihood': Decimal(str(float(res.loglikelihood))), 'aic': Decimal(str(float(res.aic))), 'bic': Decimal(str(float(res.bic))), } if 'nu' in res.params: # Degrees of freedom for Student's t diagnostics['nu'] = Decimal(str(float(res.params['nu']))) result = GARCHForecastResult( ticker=request.ticker, model=f"GARCH({request.garch_p},{request.garch_q})", persistence=Decimal(str(persistence)), forecast_volatility=variance_forecasts, annualised_volatility=annualised_vol_forecasts, model_diagnostics=diagnostics, ) logger.info( f"GARCH forecast complete for {request.ticker}: " f"persistence={persistence:.4f}, " f"1-day vol={variance_forecasts[1]:.4f}%" ) return result except Exception as e: logger.error(f"GARCH forecasting error for {request.ticker}: {e}") raise # Export the MCP server if __name__ == "__main__": mcp.run()