Spaces:
Running
on
Zero
Running
on
Zero
| """Portfolio Optimizer MCP Server. | |
| This MCP server provides portfolio optimization using multiple methods: | |
| - Hierarchical Risk Parity (HRP) | |
| - Black-Litterman | |
| - Mean-Variance Optimization | |
| All methods are deterministic and based on PyPortfolioOpt. | |
| """ | |
| import logging | |
| from typing import Dict, List, Optional | |
| from decimal import Decimal | |
| import numpy as np | |
| import pandas as pd | |
| from pypfopt import HRPOpt, BlackLittermanModel, EfficientFrontier, risk_models, expected_returns, objective_functions | |
| from pypfopt import black_litterman | |
| from fastmcp import FastMCP | |
| from pydantic import BaseModel, Field | |
| from tenacity import ( | |
| retry, | |
| stop_after_attempt, | |
| wait_exponential, | |
| retry_if_exception_type, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize MCP server | |
| mcp = FastMCP("portfolio-optimizer") | |
| class MarketDataInput(BaseModel): | |
| """Market data for portfolio optimization.""" | |
| ticker: str | |
| prices: List[Decimal] = Field(..., description="Historical close prices") | |
| dates: List[str] = Field(..., description="Corresponding dates") | |
| class OptimizationRequest(BaseModel): | |
| """Request for portfolio optimization.""" | |
| market_data: List[MarketDataInput] = Field(..., min_length=2) | |
| method: str = Field(..., description="hrp, black_litterman, or mean_variance") | |
| risk_tolerance: str = Field(default="moderate", description="conservative, moderate, aggressive") | |
| constraints: Optional[Dict[str, Decimal]] = Field(default=None, description="Weight constraints") | |
| class OptimizationResult(BaseModel): | |
| """Portfolio optimization result.""" | |
| method: str | |
| weights: Dict[str, Decimal] | |
| expected_return: Decimal | |
| volatility: Decimal | |
| sharpe_ratio: Decimal | |
| metadata: Optional[Dict[str, str]] = Field(default_factory=dict) | |
| def _prepare_price_data(market_data: List[MarketDataInput]) -> pd.DataFrame: | |
| """Convert market data to pandas DataFrame.""" | |
| price_dict = {} | |
| for data in market_data: | |
| price_dict[data.ticker] = [float(p) for p in data.prices] | |
| # Use dates from first ticker | |
| dates = pd.to_datetime(market_data[0].dates) | |
| df = pd.DataFrame(price_dict, index=dates) | |
| return df | |
| async def optimize_hrp(request: OptimizationRequest) -> OptimizationResult: | |
| """Optimize portfolio using Hierarchical Risk Parity. | |
| HRP uses hierarchical clustering to construct a diversified portfolio | |
| that balances risk across clusters of correlated assets. | |
| Args: | |
| request: Optimization request with market data | |
| Returns: | |
| Optimization result with weights and metrics | |
| Example: | |
| >>> await optimize_hrp(OptimizationRequest(market_data=[...], method="hrp")) | |
| """ | |
| logger.info(f"Optimizing portfolio using HRP for {len(request.market_data)} assets") | |
| try: | |
| # Prepare data | |
| prices = _prepare_price_data(request.market_data) | |
| returns = prices.pct_change().dropna() | |
| # Run HRP optimization | |
| hrp = HRPOpt(returns=returns) | |
| weights = hrp.optimize() | |
| # Calculate performance | |
| mu = expected_returns.mean_historical_return(prices) | |
| S = risk_models.sample_cov(prices) | |
| portfolio_return = sum(weights[ticker] * mu[ticker] for ticker in weights.keys()) | |
| portfolio_variance = 0 | |
| for i, ticker_i in enumerate(weights.keys()): | |
| for j, ticker_j in enumerate(weights.keys()): | |
| portfolio_variance += weights[ticker_i] * weights[ticker_j] * S.iloc[i, j] | |
| portfolio_volatility = np.sqrt(portfolio_variance) | |
| sharpe = (portfolio_return - 0.02) / portfolio_volatility if portfolio_volatility > 0 else 0 | |
| # Convert to Decimal | |
| weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in weights.items()} | |
| result = OptimizationResult( | |
| method="hrp", | |
| weights=weights_decimal, | |
| expected_return=Decimal(str(portfolio_return)), | |
| volatility=Decimal(str(portfolio_volatility)), | |
| sharpe_ratio=Decimal(str(sharpe)), | |
| metadata={"risk_model": "hierarchical_risk_parity"}, | |
| ) | |
| logger.info(f"HRP optimization complete: Sharpe={sharpe:.2f}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in HRP optimization: {e}") | |
| raise | |
| async def optimize_black_litterman(request: OptimizationRequest) -> OptimizationResult: | |
| """Optimize portfolio using Black-Litterman model with market equilibrium. | |
| Black-Litterman uses market-implied equilibrium returns as the prior distribution | |
| when no explicit investor views are provided. This grounds the portfolio in observable | |
| market prices and risk preferences. | |
| Args: | |
| request: Optimization request with market data | |
| Returns: | |
| Optimization result with weights and metrics | |
| Example: | |
| >>> await optimize_black_litterman(OptimizationRequest(market_data=[...], method="black_litterman")) | |
| """ | |
| logger.info(f"Black-Litterman requested for {len(request.market_data)} assets using market equilibrium") | |
| try: | |
| # Prepare data | |
| prices = _prepare_price_data(request.market_data) | |
| returns = prices.pct_change().dropna() | |
| # Calculate covariance with shrinkage for stability | |
| S = risk_models.CovarianceShrinkage(prices).ledoit_wolf() | |
| # Extract market caps from market data if available, otherwise use equal weights | |
| market_caps = {} | |
| for data in request.market_data: | |
| market_cap = getattr(data, 'market_cap', None) | |
| if market_cap is None: | |
| market_cap = 1.0 | |
| market_caps[data.ticker] = float(market_cap) | |
| total_market_cap = sum(market_caps.values()) | |
| market_weights = {ticker: cap / total_market_cap for ticker, cap in market_caps.items()} | |
| logger.info(f"Market weights calculated: {len(market_weights)} assets, using equilibrium approach") | |
| # Convert market weights dict to Series in proper order | |
| market_weights_series = pd.Series( | |
| {ticker: market_weights.get(ticker, 1.0/len(request.market_data)) | |
| for ticker in S.index} | |
| ) | |
| # Create synthetic market portfolio price from asset prices and market weights | |
| market_portfolio_price = (prices * market_weights_series).sum(axis=1) | |
| # Calculate market-implied risk aversion from the synthetic market portfolio | |
| try: | |
| delta = black_litterman.market_implied_risk_aversion( | |
| market_prices=market_portfolio_price, | |
| frequency=252, | |
| risk_free_rate=0.02 | |
| ) | |
| logger.info(f"Market-implied risk aversion (delta): {delta:.4f}") | |
| except Exception as e: | |
| logger.warning(f"Could not calculate market-implied risk aversion: {e}. Using default delta=1.0") | |
| delta = 1.0 | |
| # Calculate market return as proxy using the market portfolio weights | |
| mu_historical = expected_returns.mean_historical_return(prices) | |
| market_return = float(mu_historical.dot(market_weights_series)) | |
| # Calculate market-implied prior returns | |
| try: | |
| pi = black_litterman.market_implied_prior_returns( | |
| market_weights_series, delta, S, risk_free_rate=0.02 | |
| ) | |
| logger.info(f"Market-implied prior returns calculated for {len(pi)} assets") | |
| except Exception as e: | |
| logger.error(f"Failed to calculate market-implied prior returns: {e}") | |
| logger.warning("Falling back to HRP due to equilibrium calculation failure") | |
| return await optimize_hrp(request) | |
| # Create Black-Litterman model with equilibrium prior (no explicit views) | |
| bl = BlackLittermanModel(S, pi=pi, absolute_views=pd.Series({})) | |
| # Get posterior returns (equals prior when no views provided) | |
| posterior_returns = bl.bl_returns() | |
| logger.info(f"Black-Litterman posterior returns: mean={posterior_returns.mean():.4f}, std={posterior_returns.std():.4f}") | |
| # Optimise with posterior estimates | |
| ef = EfficientFrontier(posterior_returns, S, verbose=False) | |
| # Add L2 regularisation for stability | |
| ef.add_objective(objective_functions.L2_reg, gamma=1.0) | |
| # Use max_quadratic_utility for L2 regularisation compatibility | |
| weights = ef.max_quadratic_utility(risk_aversion=1) | |
| cleaned_weights = ef.clean_weights() | |
| # Calculate performance metrics | |
| perf = ef.portfolio_performance(risk_free_rate=0.02) | |
| # Convert to Decimal | |
| weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in cleaned_weights.items()} | |
| result = OptimizationResult( | |
| method="black_litterman", | |
| weights=weights_decimal, | |
| expected_return=Decimal(str(perf[0])), | |
| volatility=Decimal(str(perf[1])), | |
| sharpe_ratio=Decimal(str(perf[2])), | |
| metadata={ | |
| "actual_method": "black_litterman_equilibrium", | |
| "used_market_equilibrium": "true", | |
| "risk_aversion": f"{delta:.4f}", | |
| "market_return_proxy": f"{market_return:.4f}" | |
| }, | |
| ) | |
| logger.info(f"Black-Litterman equilibrium optimization complete: Sharpe={perf[2]:.2f}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Black-Litterman optimization failed: {e}") | |
| logger.warning("Falling back to HRP due to optimization error") | |
| return await optimize_hrp(request) | |
| async def optimize_mean_variance(request: OptimizationRequest) -> OptimizationResult: | |
| """Optimize portfolio using Mean-Variance Optimization (Markowitz). | |
| Mean-Variance finds the portfolio with maximum Sharpe ratio or | |
| minimum volatility for a given return target. Uses dynamic constraints | |
| that scale with portfolio size to ensure mathematical feasibility. | |
| Args: | |
| request: Optimization request with market data | |
| Returns: | |
| Optimization result with weights and metrics | |
| Example: | |
| >>> await optimize_mean_variance(OptimizationRequest(market_data=[...], method="mean_variance")) | |
| """ | |
| logger.info(f"Optimizing portfolio using Mean-Variance for {len(request.market_data)} assets") | |
| try: | |
| # Prepare data | |
| prices = _prepare_price_data(request.market_data) | |
| n_assets = len(prices.columns) | |
| # Calculate equal weight for dynamic constraints | |
| equal_weight = 1.0 / n_assets | |
| # Dynamic constraint calculation based on risk tolerance | |
| # These constraints scale with portfolio size to ensure feasibility | |
| if request.risk_tolerance == "conservative": | |
| # +/- 5% from equal weight | |
| tolerance = 0.05 | |
| min_weight = max(0.0, equal_weight - tolerance) | |
| max_weight = min(1.0, equal_weight + tolerance) | |
| elif request.risk_tolerance == "moderate": | |
| # +/- 10% from equal weight | |
| tolerance = 0.10 | |
| min_weight = max(0.0, equal_weight - tolerance) | |
| max_weight = min(1.0, equal_weight + tolerance) | |
| else: # aggressive | |
| # +/- 15% from equal weight | |
| tolerance = 0.15 | |
| min_weight = max(0.0, equal_weight - tolerance) | |
| max_weight = min(1.0, equal_weight + tolerance) | |
| # Verify mathematical feasibility | |
| if n_assets * max_weight < 1.0: | |
| logger.warning( | |
| f"Constraints not feasible: {n_assets} assets * {max_weight:.3f} max = " | |
| f"{n_assets * max_weight:.3f} < 1.0. Adjusting max_weight." | |
| ) | |
| max_weight = (1.0 / n_assets) + 0.01 | |
| min_weight = max(0.0, (1.0 / n_assets) - 0.01) | |
| logger.info( | |
| f"Mean-Variance optimization with dynamic constraints: " | |
| f"min_weight={min_weight:.3f}, max_weight={max_weight:.3f} for {n_assets} assets" | |
| ) | |
| # Calculate expected returns and covariance | |
| mu = expected_returns.mean_historical_return(prices) | |
| S = risk_models.sample_cov(prices) | |
| # Efficient Frontier with feasible constraints | |
| ef = EfficientFrontier( | |
| mu, S, | |
| weight_bounds=(min_weight, max_weight), | |
| verbose=False | |
| ) | |
| # Add L2 regularisation for small portfolios (recommended for n < 20) | |
| ef.add_objective(objective_functions.L2_reg, gamma=1.0) | |
| # Optimise for maximum quadratic utility (compatible with L2 regularisation) | |
| # risk_aversion=1 produces Sharpe-like behaviour without transformation issues | |
| weights = ef.max_quadratic_utility(risk_aversion=1) | |
| cleaned_weights = ef.clean_weights() | |
| # Calculate performance | |
| perf = ef.portfolio_performance(risk_free_rate=0.02) | |
| # Convert to Decimal | |
| weights_decimal = {ticker: Decimal(str(weight)) for ticker, weight in cleaned_weights.items()} | |
| result = OptimizationResult( | |
| method="mean_variance", | |
| weights=weights_decimal, | |
| expected_return=Decimal(str(perf[0])), | |
| volatility=Decimal(str(perf[1])), | |
| sharpe_ratio=Decimal(str(perf[2])), | |
| metadata={ | |
| "optimizer": "max_sharpe", | |
| "risk_free_rate": "0.02", | |
| "constraint_type": "dynamic_equal_weight", | |
| "min_weight": f"{min_weight:.4f}", | |
| "max_weight": f"{max_weight:.4f}" | |
| }, | |
| ) | |
| logger.info(f"Mean-Variance optimisation (quadratic utility) complete: Sharpe={perf[2]:.2f}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in Mean-Variance optimization: {e}") | |
| logger.info("Falling back to HRP optimization") | |
| # Fallback to HRP which always works | |
| return await optimize_hrp(request) | |
| # Export the MCP server | |
| if __name__ == "__main__": | |
| mcp.run() | |