BrianIsaac's picture
feat: implement 7 production enhancements for portfolio analysis platform
9f411df
"""Risk Analyzer MCP Server.
This MCP server provides comprehensive risk analysis including:
- Value at Risk (VaR)
- Conditional Value at Risk (CVaR)
- Monte Carlo simulation
- Portfolio risk metrics
- GARCH volatility forecasting (P1 Feature)
- Advanced performance metrics: Information Ratio, Calmar Ratio, Ulcer Index
Based on modern portfolio theory and validated risk models.
"""
import logging
from typing import Dict, List, Optional
from decimal import Decimal
import numpy as np
import pandas as pd
from scipy import stats
from fastmcp import FastMCP
from pydantic import BaseModel, Field
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP("risk-analyzer")
# Try to import arch for GARCH, but make it optional
try:
from arch import arch_model
GARCH_AVAILABLE = True
except ImportError:
GARCH_AVAILABLE = False
logger.warning("arch library not available - GARCH forecasting disabled")
class PortfolioInput(BaseModel):
"""Portfolio for risk analysis."""
ticker: str
weight: Decimal = Field(..., ge=0, le=1, description="Portfolio weight")
prices: List[Decimal] = Field(..., description="Historical close prices")
class BenchmarkInput(BaseModel):
"""Benchmark for risk analysis."""
ticker: str = Field(..., description="Benchmark ticker (e.g., SPY, ^GSPC)")
prices: List[Decimal] = Field(..., description="Historical close prices")
class RiskAnalysisRequest(BaseModel):
"""Request for risk analysis."""
portfolio: List[PortfolioInput] = Field(..., min_length=1)
portfolio_value: Decimal = Field(..., gt=0, description="Total portfolio value")
confidence_level: Decimal = Field(default=Decimal("0.95"), ge=0, le=1)
time_horizon: int = Field(default=1, ge=1, le=252, description="Days")
method: str = Field(default="historical", description="historical, parametric, or monte_carlo")
num_simulations: int = Field(default=10000, ge=1000, le=100000)
benchmark: Optional[BenchmarkInput] = Field(default=None, description="Optional benchmark for Information Ratio")
class VaRResult(BaseModel):
"""Value at Risk result."""
var_absolute: Decimal = Field(..., description="VaR in currency")
var_percentage: Decimal = Field(..., description="VaR as percentage")
confidence_level: Decimal
time_horizon: int
method: str
class CVaRResult(BaseModel):
"""Conditional Value at Risk result."""
cvar_absolute: Decimal = Field(..., description="CVaR in currency")
cvar_percentage: Decimal = Field(..., description="CVaR as percentage")
confidence_level: Decimal
interpretation: str
class RiskMetrics(BaseModel):
"""Comprehensive risk metrics."""
volatility_annual: Decimal
sharpe_ratio: Optional[Decimal] = None
sortino_ratio: Optional[Decimal] = None
max_drawdown: Decimal
beta: Optional[Decimal] = None
correlation_matrix: Optional[Dict[str, Dict[str, Decimal]]] = None
# Advanced Performance Metrics (P1 Feature)
information_ratio: Optional[Decimal] = None
calmar_ratio: Optional[Decimal] = None
ulcer_index: Optional[Decimal] = None
class RiskAnalysisResult(BaseModel):
"""Complete risk analysis result."""
var_95: VaRResult
var_99: VaRResult
cvar_95: CVaRResult
cvar_99: CVaRResult
risk_metrics: RiskMetrics
simulation_percentiles: Optional[Dict[str, Decimal]] = None
def _calculate_returns(prices: pd.DataFrame) -> pd.DataFrame:
"""Calculate returns from prices."""
return prices.pct_change().dropna()
def _calculate_portfolio_returns(returns: pd.DataFrame, weights: np.ndarray) -> pd.Series:
"""Calculate portfolio returns given asset returns and weights."""
return (returns * weights).sum(axis=1)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def analyze_risk(request: RiskAnalysisRequest) -> RiskAnalysisResult:
"""Perform comprehensive risk analysis on a portfolio.
Args:
request: Risk analysis request including optional benchmark
Returns:
Complete risk analysis with VaR, CVaR, and metrics
Example:
>>> await analyze_risk(RiskAnalysisRequest(portfolio=[...], portfolio_value=100000))
Note:
When a benchmark is provided, the Information Ratio will be calculated
to measure risk-adjusted returns relative to the benchmark.
"""
logger.info(f"Analyzing risk for portfolio of {len(request.portfolio)} assets")
if request.benchmark:
logger.info(f"Benchmark provided: {request.benchmark.ticker}")
try:
# Phase 1: Prepare portfolio data
price_data = {}
weights = []
tickers = []
for holding in request.portfolio:
price_data[holding.ticker] = [float(p) for p in holding.prices]
weights.append(float(holding.weight))
tickers.append(holding.ticker)
prices = pd.DataFrame(price_data)
returns = _calculate_returns(prices)
weights_array = np.array(weights)
# Calculate portfolio returns
portfolio_returns = _calculate_portfolio_returns(returns, weights_array)
# Phase 1.5: Prepare benchmark data if provided
benchmark_returns = None
if request.benchmark:
try:
benchmark_prices = pd.Series([float(p) for p in request.benchmark.prices])
benchmark_returns = benchmark_prices.pct_change().dropna()
# Align benchmark returns with portfolio returns
min_len = min(len(portfolio_returns), len(benchmark_returns))
if min_len < len(portfolio_returns):
logger.warning(
f"Benchmark has fewer data points ({len(benchmark_returns)}) "
f"than portfolio ({len(portfolio_returns)}). Aligning to {min_len} points."
)
portfolio_returns = portfolio_returns.iloc[-min_len:]
returns = returns.iloc[-min_len:]
prices = prices.iloc[-min_len:]
elif min_len < len(benchmark_returns):
logger.warning(
f"Portfolio has fewer data points ({len(portfolio_returns)}) "
f"than benchmark ({len(benchmark_returns)}). Aligning to {min_len} points."
)
benchmark_returns = benchmark_returns.iloc[-min_len:]
logger.info(
f"Aligned portfolio and benchmark returns to {min_len} data points"
)
except Exception as e:
logger.error(f"Error processing benchmark data: {e}")
logger.warning("Continuing without benchmark - Information Ratio will be None")
benchmark_returns = None
# Calculate VaR and CVaR
if request.method == "historical":
var_95, cvar_95 = _historical_var_cvar(portfolio_returns, 0.95, request.time_horizon)
var_99, cvar_99 = _historical_var_cvar(portfolio_returns, 0.99, request.time_horizon)
elif request.method == "parametric":
var_95, cvar_95 = _parametric_var_cvar(portfolio_returns, 0.95, request.time_horizon)
var_99, cvar_99 = _parametric_var_cvar(portfolio_returns, 0.99, request.time_horizon)
elif request.method == "monte_carlo":
var_95, cvar_95, var_99, cvar_99, percentiles = _monte_carlo_var_cvar(
returns, weights_array, 0.95, 0.99, request.time_horizon, request.num_simulations
)
else:
raise ValueError(f"Unknown method: {request.method}")
# Phase 2: Calculate risk metrics (with benchmark and prices for Ulcer Index)
risk_metrics = _calculate_risk_metrics(
returns, weights_array, portfolio_returns,
benchmark_returns=benchmark_returns,
prices=prices
)
# Convert to currency amounts
portfolio_val = float(request.portfolio_value)
# Create results
var_95_result = VaRResult(
var_absolute=Decimal(str(abs(var_95 * portfolio_val))),
var_percentage=Decimal(str(abs(var_95 * 100))),
confidence_level=Decimal("0.95"),
time_horizon=request.time_horizon,
method=request.method,
)
var_99_result = VaRResult(
var_absolute=Decimal(str(abs(var_99 * portfolio_val))),
var_percentage=Decimal(str(abs(var_99 * 100))),
confidence_level=Decimal("0.99"),
time_horizon=request.time_horizon,
method=request.method,
)
cvar_95_result = CVaRResult(
cvar_absolute=Decimal(str(abs(cvar_95 * portfolio_val))),
cvar_percentage=Decimal(str(abs(cvar_95 * 100))),
confidence_level=Decimal("0.95"),
interpretation=f"Expected loss in worst 5% of cases: ${abs(cvar_95 * portfolio_val):,.2f}",
)
cvar_99_result = CVaRResult(
cvar_absolute=Decimal(str(abs(cvar_99 * portfolio_val))),
cvar_percentage=Decimal(str(abs(cvar_99 * 100))),
confidence_level=Decimal("0.99"),
interpretation=f"Expected loss in worst 1% of cases: ${abs(cvar_99 * portfolio_val):,.2f}",
)
result = RiskAnalysisResult(
var_95=var_95_result,
var_99=var_99_result,
cvar_95=cvar_95_result,
cvar_99=cvar_99_result,
risk_metrics=risk_metrics,
)
if request.method == "monte_carlo" and percentiles:
result.simulation_percentiles = {
k: Decimal(str(v)) for k, v in percentiles.items()
}
logger.info(f"Risk analysis complete: VaR95={var_95_result.var_absolute}")
return result
except Exception as e:
logger.error(f"Error in risk analysis: {e}")
raise
def _historical_var_cvar(returns: pd.Series, confidence: float, horizon: int) -> tuple:
"""Calculate VaR and CVaR using historical method."""
# Scale returns for time horizon
scaled_returns = returns * np.sqrt(horizon)
# Calculate VaR as percentile
var = np.percentile(scaled_returns, (1 - confidence) * 100)
# Calculate CVaR as mean of returns below VaR
cvar = scaled_returns[scaled_returns <= var].mean()
return var, cvar
def _parametric_var_cvar(returns: pd.Series, confidence: float, horizon: int) -> tuple:
"""Calculate VaR and CVaR using parametric (normal distribution) method."""
mu = returns.mean()
sigma = returns.std()
# Scale for time horizon
mu_scaled = mu * horizon
sigma_scaled = sigma * np.sqrt(horizon)
# Calculate VaR using normal distribution
z_score = stats.norm.ppf(1 - confidence)
var = mu_scaled + z_score * sigma_scaled
# Calculate CVaR analytically for normal distribution
# CVaR = μ - σ * φ(z) / (1 - confidence)
# where φ is the PDF of standard normal
phi_z = stats.norm.pdf(z_score)
cvar = mu_scaled - sigma_scaled * phi_z / (1 - confidence)
return var, cvar
def _monte_carlo_var_cvar(
returns: pd.DataFrame,
weights: np.ndarray,
confidence_95: float,
confidence_99: float,
horizon: int,
num_simulations: int
) -> tuple:
"""Calculate VaR and CVaR using Monte Carlo simulation."""
# Calculate mean returns and covariance matrix
mean_returns = returns.mean().values
cov_matrix = returns.cov().values
# Run simulations
simulated_returns = np.random.multivariate_normal(
mean_returns * horizon,
cov_matrix * horizon,
num_simulations
)
# Calculate portfolio returns for each simulation
portfolio_sim_returns = simulated_returns @ weights
# Calculate VaR and CVaR
var_95 = np.percentile(portfolio_sim_returns, (1 - confidence_95) * 100)
var_99 = np.percentile(portfolio_sim_returns, (1 - confidence_99) * 100)
cvar_95 = portfolio_sim_returns[portfolio_sim_returns <= var_95].mean()
cvar_99 = portfolio_sim_returns[portfolio_sim_returns <= var_99].mean()
# Calculate additional percentiles
percentiles = {
"p5": float(np.percentile(portfolio_sim_returns, 5)),
"p25": float(np.percentile(portfolio_sim_returns, 25)),
"p50": float(np.percentile(portfolio_sim_returns, 50)),
"p75": float(np.percentile(portfolio_sim_returns, 75)),
"p95": float(np.percentile(portfolio_sim_returns, 95)),
}
return var_95, cvar_95, var_99, cvar_99, percentiles
def _calculate_ulcer_index(prices: pd.Series, lookback_period: int = 14) -> float:
"""Calculate Ulcer Index measuring downside volatility.
Args:
prices: Price series (NOT returns)
lookback_period: Number of periods for rolling calculation
Returns:
Ulcer Index value
"""
# Calculate rolling maximum
rolling_max = prices.rolling(window=lookback_period, min_periods=1).max()
# Calculate percentage drawdown
pct_drawdown = ((prices - rolling_max) / rolling_max) * 100
# Clip to only negative values (drawdowns)
pct_drawdown = pct_drawdown.clip(upper=0)
# Square the drawdowns
squared_drawdown = pct_drawdown ** 2
# Calculate rolling sum of squared drawdowns
sum_squared = squared_drawdown.rolling(window=lookback_period, min_periods=1).sum()
# Calculate squared average
squared_avg = sum_squared / lookback_period
# Take square root to get Ulcer Index
ulcer_index = np.sqrt(squared_avg)
return float(ulcer_index.iloc[-1]) if len(ulcer_index) > 0 else 0.0
def _calculate_calmar_ratio(returns: pd.Series, risk_free_rate: float = 0.02) -> float:
"""Calculate Calmar Ratio (return vs maximum drawdown).
Args:
returns: Portfolio returns series
risk_free_rate: Annualised risk-free rate
Returns:
Calmar Ratio
"""
# Calculate CAGR
cumulative_return = (1 + returns).prod()
n_years = len(returns) / 252 # Assuming daily returns
if n_years == 0:
return 0.0
cagr = (cumulative_return ** (1 / n_years)) - 1
# Calculate maximum drawdown
cumulative_returns = (1 + returns).cumprod()
running_max = cumulative_returns.cummax()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = abs(drawdown.min())
if max_drawdown == 0:
return 0.0
calmar_ratio = (cagr - risk_free_rate) / max_drawdown
return float(calmar_ratio)
def _calculate_information_ratio(
portfolio_returns: pd.Series,
benchmark_returns: Optional[pd.Series] = None,
periods_per_year: int = 252
) -> Optional[float]:
"""Calculate Information Ratio vs benchmark.
Args:
portfolio_returns: Portfolio returns series
benchmark_returns: Benchmark returns (optional)
periods_per_year: Periods per year for annualisation
Returns:
Information Ratio or None if no benchmark or calculation error
"""
if benchmark_returns is None or len(benchmark_returns) == 0:
return None
# Align series
if len(benchmark_returns) != len(portfolio_returns):
return None
# Calculate excess returns
excess_returns = portfolio_returns - benchmark_returns
# Check for valid data
if len(excess_returns) < 2:
return None
# Annualise excess return (mean)
annualised_excess_return = excess_returns.mean() * periods_per_year
# Calculate tracking error (std dev of excess returns, annualised)
tracking_error = excess_returns.std(ddof=1) * np.sqrt(periods_per_year)
# Handle edge cases
if tracking_error == 0 or np.isnan(tracking_error) or np.isinf(tracking_error):
return None
information_ratio = annualised_excess_return / tracking_error
# Check for NaN or Inf in result
if np.isnan(information_ratio) or np.isinf(information_ratio):
return None
return float(information_ratio)
def _calculate_risk_metrics(
returns: pd.DataFrame,
weights: np.ndarray,
portfolio_returns: pd.Series,
benchmark_returns: Optional[pd.Series] = None,
prices: Optional[pd.DataFrame] = None
) -> RiskMetrics:
"""Calculate comprehensive risk metrics.
Args:
returns: Asset returns DataFrame
weights: Portfolio weights array
portfolio_returns: Portfolio returns series
benchmark_returns: Optional benchmark returns for Information Ratio
prices: Optional price data for Ulcer Index
Returns:
RiskMetrics with all available metrics
"""
# Annualised volatility
volatility_daily = portfolio_returns.std()
volatility_annual = volatility_daily * np.sqrt(252)
# Sharpe ratio (assuming 2% risk-free rate)
risk_free_rate = 0.02
mean_return = portfolio_returns.mean() * 252
sharpe = (mean_return - risk_free_rate) / volatility_annual if volatility_annual > 0 else 0
# Sortino ratio (downside deviation)
downside_returns = portfolio_returns[portfolio_returns < 0]
downside_std = downside_returns.std() * np.sqrt(252) if len(downside_returns) > 0 else volatility_annual
sortino = (mean_return - risk_free_rate) / downside_std if downside_std > 0 else 0
# Max drawdown
cumulative_returns = (1 + portfolio_returns).cumprod()
running_max = cumulative_returns.cummax()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = drawdown.min()
# Correlation matrix
corr_matrix = returns.corr()
correlation_dict = {}
for ticker1 in corr_matrix.columns:
correlation_dict[ticker1] = {}
for ticker2 in corr_matrix.columns:
correlation_dict[ticker1][ticker2] = Decimal(str(corr_matrix.loc[ticker1, ticker2]))
# Advanced Performance Metrics (P1 Feature)
calmar_ratio = _calculate_calmar_ratio(portfolio_returns, risk_free_rate)
information_ratio = _calculate_information_ratio(portfolio_returns, benchmark_returns)
# Ulcer Index (requires price data)
ulcer_index = None
if prices is not None and len(prices.columns) > 0:
# Calculate portfolio prices from returns
portfolio_prices = (1 + portfolio_returns).cumprod()
ulcer_index = _calculate_ulcer_index(portfolio_prices)
return RiskMetrics(
volatility_annual=Decimal(str(volatility_annual)),
sharpe_ratio=Decimal(str(sharpe)),
sortino_ratio=Decimal(str(sortino)),
max_drawdown=Decimal(str(max_drawdown)),
correlation_matrix=correlation_dict,
information_ratio=Decimal(str(information_ratio)) if information_ratio is not None else None,
calmar_ratio=Decimal(str(calmar_ratio)),
ulcer_index=Decimal(str(ulcer_index)) if ulcer_index is not None else None,
)
class GARCHForecastRequest(BaseModel):
"""Request for GARCH volatility forecasting."""
ticker: str
returns: List[Decimal] = Field(..., description="Historical returns (percentage)")
forecast_horizon: int = Field(default=30, ge=1, le=90, description="Days to forecast")
garch_p: int = Field(default=1, ge=1, le=5, description="GARCH lag order")
garch_q: int = Field(default=1, ge=1, le=5, description="ARCH lag order")
class GARCHForecastResult(BaseModel):
"""GARCH volatility forecast result."""
ticker: str
model: str
persistence: Decimal
forecast_volatility: Dict[int, Decimal] = Field(..., description="Day -> volatility forecast")
annualised_volatility: Dict[int, Decimal] = Field(..., description="Day -> annualised vol")
model_diagnostics: Dict[str, Decimal]
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def forecast_volatility_garch(request: GARCHForecastRequest) -> GARCHForecastResult:
"""Forecast volatility using GARCH model.
GARCH (Generalized Autoregressive Conditional Heteroskedasticity) models
are the industry standard for financial volatility forecasting. This tool
uses GARCH(1,1) which research shows is remarkably robust and difficult
to outperform with more complex specifications.
Args:
request: GARCH forecast request with returns data
Returns:
Volatility forecasts with model diagnostics
Example:
>>> await forecast_volatility_garch(
... GARCHForecastRequest(
... ticker="AAPL",
... returns=[0.01, -0.02, 0.015, ...],
... forecast_horizon=30
... )
... )
"""
if not GARCH_AVAILABLE:
raise RuntimeError(
"GARCH forecasting requires the 'arch' library. "
"Install with: uv pip install arch"
)
logger.info(f"Forecasting volatility for {request.ticker} with GARCH({request.garch_p},{request.garch_q})")
try:
# Convert returns to numpy array (percentages)
returns_array = np.array([float(r) for r in request.returns])
# Fit GARCH model
am = arch_model(
returns_array,
mean='Constant',
vol='GARCH',
p=request.garch_p,
q=request.garch_q,
dist='t' # Student's t distribution (better for fat tails)
)
res = am.fit(disp='off', options={'maxiter': 1000})
# Extract parameters
omega = float(res.params.get('omega', 0))
alpha = float(res.params.get(f'alpha[{request.garch_q}]', 0))
beta = float(res.params.get(f'beta[{request.garch_p}]', 0))
persistence = alpha + beta
# Generate forecasts
forecast = res.forecast(horizon=request.forecast_horizon)
# Extract variance forecasts
variance_forecasts = {}
annualised_vol_forecasts = {}
for day in range(1, request.forecast_horizon + 1):
# Variance forecast
var_forecast = float(forecast.variance.iloc[-1, day - 1])
vol_forecast = np.sqrt(var_forecast)
# Annualise (assuming daily returns)
annualised_vol = vol_forecast * np.sqrt(252)
variance_forecasts[day] = Decimal(str(vol_forecast))
annualised_vol_forecasts[day] = Decimal(str(annualised_vol))
# Model diagnostics
diagnostics = {
'omega': Decimal(str(omega)),
'alpha': Decimal(str(alpha)),
'beta': Decimal(str(beta)),
'persistence': Decimal(str(persistence)),
'log_likelihood': Decimal(str(float(res.loglikelihood))),
'aic': Decimal(str(float(res.aic))),
'bic': Decimal(str(float(res.bic))),
}
if 'nu' in res.params: # Degrees of freedom for Student's t
diagnostics['nu'] = Decimal(str(float(res.params['nu'])))
result = GARCHForecastResult(
ticker=request.ticker,
model=f"GARCH({request.garch_p},{request.garch_q})",
persistence=Decimal(str(persistence)),
forecast_volatility=variance_forecasts,
annualised_volatility=annualised_vol_forecasts,
model_diagnostics=diagnostics,
)
logger.info(
f"GARCH forecast complete for {request.ticker}: "
f"persistence={persistence:.4f}, "
f"1-day vol={variance_forecasts[1]:.4f}%"
)
return result
except Exception as e:
logger.error(f"GARCH forecasting error for {request.ticker}: {e}")
raise
# Export the MCP server
if __name__ == "__main__":
mcp.run()