"""Portfolio Optimizer MCP Server. This MCP server provides portfolio optimization using multiple methods: - Hierarchical Risk Parity (HRP) - Black-Litterman - Mean-Variance Optimization All methods are deterministic and based on PyPortfolioOpt. """ import logging from typing import Dict, List, Optional from decimal import Decimal import numpy as np import pandas as pd from pypfopt import HRPOpt, BlackLittermanModel, EfficientFrontier, risk_models, expected_returns, objective_functions from pypfopt import black_litterman from fastmcp import FastMCP from pydantic import BaseModel, Field from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("portfolio-optimizer") class MarketDataInput(BaseModel): """Market data for portfolio optimization.""" ticker: str prices: List[Decimal] = Field(..., description="Historical close prices") dates: List[str] = Field(..., description="Corresponding dates") class OptimizationRequest(BaseModel): """Request for portfolio optimization.""" market_data: List[MarketDataInput] = Field(..., min_length=2) method: str = Field(..., description="hrp, black_litterman, or mean_variance") risk_tolerance: str = Field(default="moderate", description="conservative, moderate, aggressive") constraints: Optional[Dict[str, Decimal]] = Field(default=None, description="Weight constraints") class OptimizationResult(BaseModel): """Portfolio optimization result.""" method: str weights: Dict[str, Decimal] expected_return: Decimal volatility: Decimal sharpe_ratio: Decimal metadata: Optional[Dict[str, str]] = Field(default_factory=dict) def _prepare_price_data(market_data: List[MarketDataInput]) -> pd.DataFrame: """Convert market data to pandas DataFrame.""" price_dict = {} for data in market_data: price_dict[data.ticker] = [float(p) for p in data.prices] # Use dates from first ticker dates = pd.to_datetime(market_data[0].dates) df = pd.DataFrame(price_dict, index=dates) return df @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def optimize_hrp(request: OptimizationRequest) -> OptimizationResult: """Optimize portfolio using Hierarchical Risk Parity. HRP uses hierarchical clustering to construct a diversified portfolio that balances risk across clusters of correlated assets. Args: request: Optimization request with market data Returns: Optimization result with weights and metrics Example: >>> await optimize_hrp(OptimizationRequest(market_data=[...], method="hrp")) """ logger.info(f"Optimizing portfolio using HRP for {len(request.market_data)} assets") try: # Prepare data prices = _prepare_price_data(request.market_data) returns = prices.pct_change().dropna() # Run HRP optimization hrp = HRPOpt(returns=returns) weights = hrp.optimize() # Calculate performance mu = expected_returns.mean_historical_return(prices) S = risk_models.sample_cov(prices) portfolio_return = sum(weights[ticker] * mu[ticker] for ticker in weights.keys()) portfolio_variance = 0 for i, ticker_i in enumerate(weights.keys()): for j, ticker_j in enumerate(weights.keys()): portfolio_variance += weights[ticker_i] * weights[ticker_j] * S.iloc[i, j] portfolio_volatility = np.sqrt(portfolio_variance) sharpe = (portfolio_return - 0.02) / portfolio_volatility if portfolio_volatility > 0 else 0 # Convert to Decimal weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in weights.items()} result = OptimizationResult( method="hrp", weights=weights_decimal, expected_return=Decimal(str(portfolio_return)), volatility=Decimal(str(portfolio_volatility)), sharpe_ratio=Decimal(str(sharpe)), metadata={"risk_model": "hierarchical_risk_parity"}, ) logger.info(f"HRP optimization complete: Sharpe={sharpe:.2f}") return result except Exception as e: logger.error(f"Error in HRP optimization: {e}") raise @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def optimize_black_litterman(request: OptimizationRequest) -> OptimizationResult: """Optimize portfolio using Black-Litterman model with market equilibrium. Black-Litterman uses market-implied equilibrium returns as the prior distribution when no explicit investor views are provided. This grounds the portfolio in observable market prices and risk preferences. Args: request: Optimization request with market data Returns: Optimization result with weights and metrics Example: >>> await optimize_black_litterman(OptimizationRequest(market_data=[...], method="black_litterman")) """ logger.info(f"Black-Litterman requested for {len(request.market_data)} assets using market equilibrium") try: # Prepare data prices = _prepare_price_data(request.market_data) returns = prices.pct_change().dropna() # Calculate covariance with shrinkage for stability S = risk_models.CovarianceShrinkage(prices).ledoit_wolf() # Extract market caps from market data if available, otherwise use equal weights market_caps = {} for data in request.market_data: market_cap = getattr(data, 'market_cap', None) if market_cap is None: market_cap = 1.0 market_caps[data.ticker] = float(market_cap) total_market_cap = sum(market_caps.values()) market_weights = {ticker: cap / total_market_cap for ticker, cap in market_caps.items()} logger.info(f"Market weights calculated: {len(market_weights)} assets, using equilibrium approach") # Convert market weights dict to Series in proper order market_weights_series = pd.Series( {ticker: market_weights.get(ticker, 1.0/len(request.market_data)) for ticker in S.index} ) # Create synthetic market portfolio price from asset prices and market weights market_portfolio_price = (prices * market_weights_series).sum(axis=1) # Calculate market-implied risk aversion from the synthetic market portfolio try: delta = black_litterman.market_implied_risk_aversion( market_prices=market_portfolio_price, frequency=252, risk_free_rate=0.02 ) logger.info(f"Market-implied risk aversion (delta): {delta:.4f}") except Exception as e: logger.warning(f"Could not calculate market-implied risk aversion: {e}. Using default delta=1.0") delta = 1.0 # Calculate market return as proxy using the market portfolio weights mu_historical = expected_returns.mean_historical_return(prices) market_return = float(mu_historical.dot(market_weights_series)) # Calculate market-implied prior returns try: pi = black_litterman.market_implied_prior_returns( market_weights_series, delta, S, risk_free_rate=0.02 ) logger.info(f"Market-implied prior returns calculated for {len(pi)} assets") except Exception as e: logger.error(f"Failed to calculate market-implied prior returns: {e}") logger.warning("Falling back to HRP due to equilibrium calculation failure") return await optimize_hrp(request) # Create Black-Litterman model with equilibrium prior (no explicit views) bl = BlackLittermanModel(S, pi=pi, absolute_views=pd.Series({})) # Get posterior returns (equals prior when no views provided) posterior_returns = bl.bl_returns() logger.info(f"Black-Litterman posterior returns: mean={posterior_returns.mean():.4f}, std={posterior_returns.std():.4f}") # Optimise with posterior estimates ef = EfficientFrontier(posterior_returns, S, verbose=False) # Add L2 regularisation for stability ef.add_objective(objective_functions.L2_reg, gamma=1.0) # Use max_quadratic_utility for L2 regularisation compatibility weights = ef.max_quadratic_utility(risk_aversion=1) cleaned_weights = ef.clean_weights() # Calculate performance metrics perf = ef.portfolio_performance(risk_free_rate=0.02) # Convert to Decimal weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in cleaned_weights.items()} result = OptimizationResult( method="black_litterman", weights=weights_decimal, expected_return=Decimal(str(perf[0])), volatility=Decimal(str(perf[1])), sharpe_ratio=Decimal(str(perf[2])), metadata={ "actual_method": "black_litterman_equilibrium", "used_market_equilibrium": "true", "risk_aversion": f"{delta:.4f}", "market_return_proxy": f"{market_return:.4f}" }, ) logger.info(f"Black-Litterman equilibrium optimization complete: Sharpe={perf[2]:.2f}") return result except Exception as e: logger.error(f"Black-Litterman optimization failed: {e}") logger.warning("Falling back to HRP due to optimization error") return await optimize_hrp(request) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def optimize_mean_variance(request: OptimizationRequest) -> OptimizationResult: """Optimize portfolio using Mean-Variance Optimization (Markowitz). Mean-Variance finds the portfolio with maximum Sharpe ratio or minimum volatility for a given return target. Uses dynamic constraints that scale with portfolio size to ensure mathematical feasibility. Args: request: Optimization request with market data Returns: Optimization result with weights and metrics Example: >>> await optimize_mean_variance(OptimizationRequest(market_data=[...], method="mean_variance")) """ logger.info(f"Optimizing portfolio using Mean-Variance for {len(request.market_data)} assets") try: # Prepare data prices = _prepare_price_data(request.market_data) n_assets = len(prices.columns) # Calculate equal weight for dynamic constraints equal_weight = 1.0 / n_assets # Dynamic constraint calculation based on risk tolerance # These constraints scale with portfolio size to ensure feasibility if request.risk_tolerance == "conservative": # +/- 5% from equal weight tolerance = 0.05 min_weight = max(0.0, equal_weight - tolerance) max_weight = min(1.0, equal_weight + tolerance) elif request.risk_tolerance == "moderate": # +/- 10% from equal weight tolerance = 0.10 min_weight = max(0.0, equal_weight - tolerance) max_weight = min(1.0, equal_weight + tolerance) else: # aggressive # +/- 15% from equal weight tolerance = 0.15 min_weight = max(0.0, equal_weight - tolerance) max_weight = min(1.0, equal_weight + tolerance) # Verify mathematical feasibility if n_assets * max_weight < 1.0: logger.warning( f"Constraints not feasible: {n_assets} assets * {max_weight:.3f} max = " f"{n_assets * max_weight:.3f} < 1.0. Adjusting max_weight." ) max_weight = (1.0 / n_assets) + 0.01 min_weight = max(0.0, (1.0 / n_assets) - 0.01) logger.info( f"Mean-Variance optimization with dynamic constraints: " f"min_weight={min_weight:.3f}, max_weight={max_weight:.3f} for {n_assets} assets" ) # Calculate expected returns and covariance mu = expected_returns.mean_historical_return(prices) S = risk_models.sample_cov(prices) # Efficient Frontier with feasible constraints ef = EfficientFrontier( mu, S, weight_bounds=(min_weight, max_weight), verbose=False ) # Add L2 regularisation for small portfolios (recommended for n < 20) ef.add_objective(objective_functions.L2_reg, gamma=1.0) # Optimise for maximum quadratic utility (compatible with L2 regularisation) # risk_aversion=1 produces Sharpe-like behaviour without transformation issues weights = ef.max_quadratic_utility(risk_aversion=1) cleaned_weights = ef.clean_weights() # Calculate performance perf = ef.portfolio_performance(risk_free_rate=0.02) # Convert to Decimal weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in cleaned_weights.items()} result = OptimizationResult( method="mean_variance", weights=weights_decimal, expected_return=Decimal(str(perf[0])), volatility=Decimal(str(perf[1])), sharpe_ratio=Decimal(str(perf[2])), metadata={ "optimizer": "max_sharpe", "risk_free_rate": "0.02", "constraint_type": "dynamic_equal_weight", "min_weight": f"{min_weight:.4f}", "max_weight": f"{max_weight:.4f}" }, ) logger.info(f"Mean-Variance optimisation (quadratic utility) complete: Sharpe={perf[2]:.2f}") return result except Exception as e: logger.error(f"Error in Mean-Variance optimization: {e}") logger.info("Falling back to HRP optimization") # Fallback to HRP which always works return await optimize_hrp(request) # Export the MCP server if __name__ == "__main__": mcp.run()