BrianIsaac's picture
feat: implement 7 production enhancements for portfolio analysis platform
9f411df
"""Portfolio Optimizer MCP Server.
This MCP server provides portfolio optimization using multiple methods:
- Hierarchical Risk Parity (HRP)
- Black-Litterman
- Mean-Variance Optimization
All methods are deterministic and based on PyPortfolioOpt.
"""
import logging
from typing import Dict, List, Optional
from decimal import Decimal
import numpy as np
import pandas as pd
from pypfopt import HRPOpt, BlackLittermanModel, EfficientFrontier, risk_models, expected_returns, objective_functions
from pypfopt import black_litterman
from fastmcp import FastMCP
from pydantic import BaseModel, Field
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP("portfolio-optimizer")
class MarketDataInput(BaseModel):
"""Market data for portfolio optimization."""
ticker: str
prices: List[Decimal] = Field(..., description="Historical close prices")
dates: List[str] = Field(..., description="Corresponding dates")
class OptimizationRequest(BaseModel):
"""Request for portfolio optimization."""
market_data: List[MarketDataInput] = Field(..., min_length=2)
method: str = Field(..., description="hrp, black_litterman, or mean_variance")
risk_tolerance: str = Field(default="moderate", description="conservative, moderate, aggressive")
constraints: Optional[Dict[str, Decimal]] = Field(default=None, description="Weight constraints")
class OptimizationResult(BaseModel):
"""Portfolio optimization result."""
method: str
weights: Dict[str, Decimal]
expected_return: Decimal
volatility: Decimal
sharpe_ratio: Decimal
metadata: Optional[Dict[str, str]] = Field(default_factory=dict)
def _prepare_price_data(market_data: List[MarketDataInput]) -> pd.DataFrame:
"""Convert market data to pandas DataFrame."""
price_dict = {}
for data in market_data:
price_dict[data.ticker] = [float(p) for p in data.prices]
# Use dates from first ticker
dates = pd.to_datetime(market_data[0].dates)
df = pd.DataFrame(price_dict, index=dates)
return df
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def optimize_hrp(request: OptimizationRequest) -> OptimizationResult:
"""Optimize portfolio using Hierarchical Risk Parity.
HRP uses hierarchical clustering to construct a diversified portfolio
that balances risk across clusters of correlated assets.
Args:
request: Optimization request with market data
Returns:
Optimization result with weights and metrics
Example:
>>> await optimize_hrp(OptimizationRequest(market_data=[...], method="hrp"))
"""
logger.info(f"Optimizing portfolio using HRP for {len(request.market_data)} assets")
try:
# Prepare data
prices = _prepare_price_data(request.market_data)
returns = prices.pct_change().dropna()
# Run HRP optimization
hrp = HRPOpt(returns=returns)
weights = hrp.optimize()
# Calculate performance
mu = expected_returns.mean_historical_return(prices)
S = risk_models.sample_cov(prices)
portfolio_return = sum(weights[ticker] * mu[ticker] for ticker in weights.keys())
portfolio_variance = 0
for i, ticker_i in enumerate(weights.keys()):
for j, ticker_j in enumerate(weights.keys()):
portfolio_variance += weights[ticker_i] * weights[ticker_j] * S.iloc[i, j]
portfolio_volatility = np.sqrt(portfolio_variance)
sharpe = (portfolio_return - 0.02) / portfolio_volatility if portfolio_volatility > 0 else 0
# Convert to Decimal
weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in weights.items()}
result = OptimizationResult(
method="hrp",
weights=weights_decimal,
expected_return=Decimal(str(portfolio_return)),
volatility=Decimal(str(portfolio_volatility)),
sharpe_ratio=Decimal(str(sharpe)),
metadata={"risk_model": "hierarchical_risk_parity"},
)
logger.info(f"HRP optimization complete: Sharpe={sharpe:.2f}")
return result
except Exception as e:
logger.error(f"Error in HRP optimization: {e}")
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def optimize_black_litterman(request: OptimizationRequest) -> OptimizationResult:
"""Optimize portfolio using Black-Litterman model with market equilibrium.
Black-Litterman uses market-implied equilibrium returns as the prior distribution
when no explicit investor views are provided. This grounds the portfolio in observable
market prices and risk preferences.
Args:
request: Optimization request with market data
Returns:
Optimization result with weights and metrics
Example:
>>> await optimize_black_litterman(OptimizationRequest(market_data=[...], method="black_litterman"))
"""
logger.info(f"Black-Litterman requested for {len(request.market_data)} assets using market equilibrium")
try:
# Prepare data
prices = _prepare_price_data(request.market_data)
returns = prices.pct_change().dropna()
# Calculate covariance with shrinkage for stability
S = risk_models.CovarianceShrinkage(prices).ledoit_wolf()
# Extract market caps from market data if available, otherwise use equal weights
market_caps = {}
for data in request.market_data:
market_cap = getattr(data, 'market_cap', None)
if market_cap is None:
market_cap = 1.0
market_caps[data.ticker] = float(market_cap)
total_market_cap = sum(market_caps.values())
market_weights = {ticker: cap / total_market_cap for ticker, cap in market_caps.items()}
logger.info(f"Market weights calculated: {len(market_weights)} assets, using equilibrium approach")
# Convert market weights dict to Series in proper order
market_weights_series = pd.Series(
{ticker: market_weights.get(ticker, 1.0/len(request.market_data))
for ticker in S.index}
)
# Create synthetic market portfolio price from asset prices and market weights
market_portfolio_price = (prices * market_weights_series).sum(axis=1)
# Calculate market-implied risk aversion from the synthetic market portfolio
try:
delta = black_litterman.market_implied_risk_aversion(
market_prices=market_portfolio_price,
frequency=252,
risk_free_rate=0.02
)
logger.info(f"Market-implied risk aversion (delta): {delta:.4f}")
except Exception as e:
logger.warning(f"Could not calculate market-implied risk aversion: {e}. Using default delta=1.0")
delta = 1.0
# Calculate market return as proxy using the market portfolio weights
mu_historical = expected_returns.mean_historical_return(prices)
market_return = float(mu_historical.dot(market_weights_series))
# Calculate market-implied prior returns
try:
pi = black_litterman.market_implied_prior_returns(
market_weights_series, delta, S, risk_free_rate=0.02
)
logger.info(f"Market-implied prior returns calculated for {len(pi)} assets")
except Exception as e:
logger.error(f"Failed to calculate market-implied prior returns: {e}")
logger.warning("Falling back to HRP due to equilibrium calculation failure")
return await optimize_hrp(request)
# Create Black-Litterman model with equilibrium prior (no explicit views)
bl = BlackLittermanModel(S, pi=pi, absolute_views=pd.Series({}))
# Get posterior returns (equals prior when no views provided)
posterior_returns = bl.bl_returns()
logger.info(f"Black-Litterman posterior returns: mean={posterior_returns.mean():.4f}, std={posterior_returns.std():.4f}")
# Optimise with posterior estimates
ef = EfficientFrontier(posterior_returns, S, verbose=False)
# Add L2 regularisation for stability
ef.add_objective(objective_functions.L2_reg, gamma=1.0)
# Use max_quadratic_utility for L2 regularisation compatibility
weights = ef.max_quadratic_utility(risk_aversion=1)
cleaned_weights = ef.clean_weights()
# Calculate performance metrics
perf = ef.portfolio_performance(risk_free_rate=0.02)
# Convert to Decimal
weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in cleaned_weights.items()}
result = OptimizationResult(
method="black_litterman",
weights=weights_decimal,
expected_return=Decimal(str(perf[0])),
volatility=Decimal(str(perf[1])),
sharpe_ratio=Decimal(str(perf[2])),
metadata={
"actual_method": "black_litterman_equilibrium",
"used_market_equilibrium": "true",
"risk_aversion": f"{delta:.4f}",
"market_return_proxy": f"{market_return:.4f}"
},
)
logger.info(f"Black-Litterman equilibrium optimization complete: Sharpe={perf[2]:.2f}")
return result
except Exception as e:
logger.error(f"Black-Litterman optimization failed: {e}")
logger.warning("Falling back to HRP due to optimization error")
return await optimize_hrp(request)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def optimize_mean_variance(request: OptimizationRequest) -> OptimizationResult:
"""Optimize portfolio using Mean-Variance Optimization (Markowitz).
Mean-Variance finds the portfolio with maximum Sharpe ratio or
minimum volatility for a given return target. Uses dynamic constraints
that scale with portfolio size to ensure mathematical feasibility.
Args:
request: Optimization request with market data
Returns:
Optimization result with weights and metrics
Example:
>>> await optimize_mean_variance(OptimizationRequest(market_data=[...], method="mean_variance"))
"""
logger.info(f"Optimizing portfolio using Mean-Variance for {len(request.market_data)} assets")
try:
# Prepare data
prices = _prepare_price_data(request.market_data)
n_assets = len(prices.columns)
# Calculate equal weight for dynamic constraints
equal_weight = 1.0 / n_assets
# Dynamic constraint calculation based on risk tolerance
# These constraints scale with portfolio size to ensure feasibility
if request.risk_tolerance == "conservative":
# +/- 5% from equal weight
tolerance = 0.05
min_weight = max(0.0, equal_weight - tolerance)
max_weight = min(1.0, equal_weight + tolerance)
elif request.risk_tolerance == "moderate":
# +/- 10% from equal weight
tolerance = 0.10
min_weight = max(0.0, equal_weight - tolerance)
max_weight = min(1.0, equal_weight + tolerance)
else: # aggressive
# +/- 15% from equal weight
tolerance = 0.15
min_weight = max(0.0, equal_weight - tolerance)
max_weight = min(1.0, equal_weight + tolerance)
# Verify mathematical feasibility
if n_assets * max_weight < 1.0:
logger.warning(
f"Constraints not feasible: {n_assets} assets * {max_weight:.3f} max = "
f"{n_assets * max_weight:.3f} < 1.0. Adjusting max_weight."
)
max_weight = (1.0 / n_assets) + 0.01
min_weight = max(0.0, (1.0 / n_assets) - 0.01)
logger.info(
f"Mean-Variance optimization with dynamic constraints: "
f"min_weight={min_weight:.3f}, max_weight={max_weight:.3f} for {n_assets} assets"
)
# Calculate expected returns and covariance
mu = expected_returns.mean_historical_return(prices)
S = risk_models.sample_cov(prices)
# Efficient Frontier with feasible constraints
ef = EfficientFrontier(
mu, S,
weight_bounds=(min_weight, max_weight),
verbose=False
)
# Add L2 regularisation for small portfolios (recommended for n < 20)
ef.add_objective(objective_functions.L2_reg, gamma=1.0)
# Optimise for maximum quadratic utility (compatible with L2 regularisation)
# risk_aversion=1 produces Sharpe-like behaviour without transformation issues
weights = ef.max_quadratic_utility(risk_aversion=1)
cleaned_weights = ef.clean_weights()
# Calculate performance
perf = ef.portfolio_performance(risk_free_rate=0.02)
# Convert to Decimal
weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in cleaned_weights.items()}
result = OptimizationResult(
method="mean_variance",
weights=weights_decimal,
expected_return=Decimal(str(perf[0])),
volatility=Decimal(str(perf[1])),
sharpe_ratio=Decimal(str(perf[2])),
metadata={
"optimizer": "max_sharpe",
"risk_free_rate": "0.02",
"constraint_type": "dynamic_equal_weight",
"min_weight": f"{min_weight:.4f}",
"max_weight": f"{max_weight:.4f}"
},
)
logger.info(f"Mean-Variance optimisation (quadratic utility) complete: Sharpe={perf[2]:.2f}")
return result
except Exception as e:
logger.error(f"Error in Mean-Variance optimization: {e}")
logger.info("Falling back to HRP optimization")
# Fallback to HRP which always works
return await optimize_hrp(request)
# Export the MCP server
if __name__ == "__main__":
mcp.run()