Spaces:
Running
on
Zero
Running
on
Zero
| """MCP Router for Portfolio Intelligence Platform. | |
| Compatibility layer that routes to unified mcp_tools module. | |
| This file maintains backwards compatibility with existing code that uses | |
| mcp_router.call_*_mcp() methods. | |
| All actual implementation is now in backend.mcp_tools module. | |
| """ | |
| import json | |
| import logging | |
| from typing import Any, Dict, List | |
| from backend import mcp_tools | |
| from backend.utils.serialisation import dumps_str | |
| logger = logging.getLogger(__name__) | |
| class MCPRouter: | |
| """Router for orchestrating MCP tool calls. | |
| This is a compatibility layer - all actual implementation | |
| is in backend.mcp_tools module with namespaced functions. | |
| """ | |
| def __init__(self): | |
| """Initialise MCP router (compatibility layer).""" | |
| logger.info("Initialising MCP router (compatibility layer)") | |
| # Yahoo Finance MCP methods | |
| async def call_yahoo_finance_mcp( | |
| self, tool: str, params: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Call Yahoo Finance MCP tool (delegates to market_* functions). | |
| Args: | |
| tool: Tool name (get_quote, get_historical_data, get_fundamentals) | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing Yahoo Finance MCP call: {tool}") | |
| if tool == "get_quote": | |
| tickers = params.get("tickers", []) | |
| return await mcp_tools.market_get_quote(dumps_str(tickers)) | |
| elif tool == "get_historical_data": | |
| return await mcp_tools.market_get_historical_data( | |
| ticker=params["ticker"], | |
| period=params.get("period", "1y"), | |
| interval=params.get("interval", "1d"), | |
| ) | |
| elif tool == "get_fundamentals": | |
| return await mcp_tools.market_get_fundamentals(ticker=params["ticker"]) | |
| else: | |
| raise ValueError(f"Unknown Yahoo Finance tool: {tool}") | |
| # FMP MCP methods | |
| async def call_fmp_mcp(self, tool: str, params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Call Financial Modeling Prep MCP tool (delegates to market_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing FMP MCP call: {tool}") | |
| if tool == "get_company_profile": | |
| return await mcp_tools.market_get_company_profile(ticker=params["ticker"]) | |
| elif tool == "get_income_statement": | |
| return await mcp_tools.market_get_income_statement( | |
| ticker=params["ticker"], | |
| period=params.get("period", "annual"), | |
| limit=str(params.get("limit", 5)), | |
| ) | |
| elif tool == "get_balance_sheet": | |
| return await mcp_tools.market_get_balance_sheet( | |
| ticker=params["ticker"], | |
| period=params.get("period", "annual"), | |
| limit=str(params.get("limit", 5)), | |
| ) | |
| elif tool == "get_cash_flow_statement": | |
| return await mcp_tools.market_get_cash_flow_statement( | |
| ticker=params["ticker"], | |
| period=params.get("period", "annual"), | |
| limit=str(params.get("limit", 5)), | |
| ) | |
| elif tool == "get_financial_ratios": | |
| return await mcp_tools.market_get_financial_ratios( | |
| ticker=params["ticker"], | |
| ttm=str(params.get("ttm", True)).lower(), | |
| ) | |
| elif tool == "get_key_metrics": | |
| return await mcp_tools.market_get_key_metrics( | |
| ticker=params["ticker"], | |
| ttm=str(params.get("ttm", True)).lower(), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown FMP tool: {tool}") | |
| # Trading MCP methods | |
| async def call_trading_mcp( | |
| self, tool: str, params: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Call Trading MCP tool (delegates to technical_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing Trading MCP call: {tool}") | |
| if tool == "get_technical_indicators": | |
| return await mcp_tools.technical_get_indicators( | |
| ticker=params["ticker"], | |
| period=params.get("period", "3mo"), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown Trading MCP tool: {tool}") | |
| # FRED MCP methods | |
| async def call_fred_mcp(self, tool: str, params: Dict[str, Any]) -> Dict[str, Any]: | |
| """Call FRED MCP tool (delegates to market_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing FRED MCP call: {tool}") | |
| if tool == "get_economic_series": | |
| return await mcp_tools.market_get_economic_series( | |
| series_id=params["series_id"], | |
| observation_start=params.get("observation_start"), | |
| observation_end=params.get("observation_end"), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown FRED tool: {tool}") | |
| # Portfolio Optimizer MCP methods | |
| async def call_portfolio_optimizer_mcp( | |
| self, tool: str, params: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Call Portfolio Optimizer MCP tool (delegates to portfolio_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing Portfolio Optimizer MCP call: {tool}") | |
| market_data = params.get("market_data", []) | |
| market_data_json = dumps_str(market_data) | |
| risk_tolerance = params.get("risk_tolerance", "moderate") | |
| if tool == "optimize_hrp": | |
| return await mcp_tools.portfolio_optimize_hrp( | |
| market_data_json=market_data_json, | |
| risk_tolerance=risk_tolerance, | |
| ) | |
| elif tool == "optimize_black_litterman": | |
| return await mcp_tools.portfolio_optimize_black_litterman( | |
| market_data_json=market_data_json, | |
| risk_tolerance=risk_tolerance, | |
| ) | |
| elif tool == "optimize_mean_variance": | |
| return await mcp_tools.portfolio_optimize_mean_variance( | |
| market_data_json=market_data_json, | |
| risk_tolerance=risk_tolerance, | |
| ) | |
| else: | |
| raise ValueError(f"Unknown Portfolio Optimizer tool: {tool}") | |
| # Risk Analyzer MCP methods | |
| async def call_risk_analyzer_mcp( | |
| self, tool: str, params: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Call Risk Analyzer MCP tool (delegates to risk_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing Risk Analyzer MCP call: {tool}") | |
| if tool == "analyze_risk": | |
| portfolio = params.get("portfolio", []) | |
| benchmark = params.get("benchmark") | |
| return await mcp_tools.risk_analyze( | |
| portfolio_json=dumps_str(portfolio), | |
| portfolio_value=str(params.get("portfolio_value", 100000)), | |
| confidence_level=str(params.get("confidence_level", 0.95)), | |
| time_horizon=str(params.get("time_horizon", 1)), | |
| method=params.get("method", "historical"), | |
| num_simulations=str(params.get("num_simulations", 10000)), | |
| benchmark_json=dumps_str(benchmark) if benchmark else None, | |
| ) | |
| elif tool == "forecast_volatility_garch": | |
| returns = params.get("returns", []) | |
| return await mcp_tools.risk_forecast_volatility_garch( | |
| ticker=params["ticker"], | |
| returns_json=dumps_str([float(r) for r in returns]), | |
| forecast_horizon=str(params.get("forecast_horizon", 30)), | |
| garch_p=str(params.get("garch_p", 1)), | |
| garch_q=str(params.get("garch_q", 1)), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown Risk Analyzer tool: {tool}") | |
| # Ensemble Predictor MCP methods | |
| async def call_ensemble_predictor_mcp( | |
| self, tool: str, params: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Call Ensemble Predictor MCP tool (delegates to ml_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing Ensemble Predictor MCP call: {tool}") | |
| if tool == "forecast_ensemble": | |
| prices = params.get("prices", []) | |
| dates = params.get("dates") | |
| return await mcp_tools.ml_forecast_ensemble( | |
| ticker=params["ticker"], | |
| prices_json=dumps_str([float(p) for p in prices]), | |
| dates_json=dumps_str(dates) if dates else None, | |
| forecast_horizon=str(params.get("forecast_horizon", 30)), | |
| confidence_level=str(params.get("confidence_level", 0.95)), | |
| use_returns=str(params.get("use_returns", True)).lower(), | |
| ensemble_method=params.get("ensemble_method", "mean"), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown Ensemble Predictor tool: {tool}") | |
| # News Sentiment MCP methods | |
| async def call_news_sentiment_mcp( | |
| self, tool: str, params: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Call News Sentiment MCP tool (delegates to sentiment_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing News Sentiment MCP call: {tool}") | |
| if tool == "get_news_with_sentiment": | |
| return await mcp_tools.sentiment_get_news( | |
| ticker=params["ticker"], | |
| days_back=str(params.get("days_back", 7)), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown News Sentiment tool: {tool}") | |
| # Feature Extraction MCP methods | |
| async def call_feature_extraction_mcp( | |
| self, tool: str, params: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Call Feature Extraction MCP tool (delegates to technical_* functions). | |
| Args: | |
| tool: Tool name | |
| params: Tool parameters | |
| Returns: | |
| Tool result | |
| """ | |
| logger.debug(f"Routing Feature Extraction MCP call: {tool}") | |
| if tool == "extract_technical_features": | |
| return await mcp_tools.technical_extract_features( | |
| ticker=params["ticker"], | |
| prices=dumps_str(params.get("prices", [])), | |
| volumes=dumps_str(params.get("volumes", [])), | |
| include_momentum=str(params.get("include_momentum", True)).lower(), | |
| include_volatility=str(params.get("include_volatility", True)).lower(), | |
| include_trend=str(params.get("include_trend", True)).lower(), | |
| ) | |
| elif tool == "normalise_features": | |
| return await mcp_tools.technical_normalise_features( | |
| ticker=params["ticker"], | |
| features=dumps_str(params.get("features", {})), | |
| historical_features=dumps_str(params.get("historical_features", [])), | |
| window_size=str(params.get("window_size", 100)), | |
| method=params.get("method", "ewm"), | |
| ) | |
| elif tool == "select_features": | |
| return await mcp_tools.technical_select_features( | |
| ticker=params["ticker"], | |
| feature_vector=dumps_str(params.get("feature_vector", {})), | |
| max_features=str(params.get("max_features", 15)), | |
| variance_threshold=str(params.get("variance_threshold", 0.95)), | |
| ) | |
| elif tool == "compute_feature_vector": | |
| return await mcp_tools.technical_compute_feature_vector( | |
| ticker=params["ticker"], | |
| technical_features=dumps_str(params.get("technical_features", {})), | |
| fundamental_features=dumps_str(params.get("fundamental_features", {})), | |
| sentiment_features=dumps_str(params.get("sentiment_features", {})), | |
| max_features=str(params.get("max_features", 30)), | |
| selection_method=params.get("selection_method", "pca"), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown Feature Extraction tool: {tool}") | |
| # High-level helper methods | |
| async def fetch_market_data(self, tickers: List[str]) -> Dict[str, Any]: | |
| """Fetch market data for given tickers. | |
| Args: | |
| tickers: List of stock/asset tickers | |
| Returns: | |
| Market data from Yahoo Finance | |
| """ | |
| logger.info(f"Fetching market data for {len(tickers)} tickers") | |
| results = await mcp_tools.market_get_quote(dumps_str(tickers)) | |
| return {r.get("ticker", r.get("symbol")): r for r in results} | |
| async def fetch_fundamentals(self, tickers: List[str]) -> Dict[str, Any]: | |
| """Fetch fundamental data for tickers. | |
| Args: | |
| tickers: List of stock tickers | |
| Returns: | |
| Fundamental data per ticker | |
| """ | |
| logger.info(f"Fetching fundamentals for {len(tickers)} tickers") | |
| results = {} | |
| for ticker in tickers: | |
| results[ticker] = await mcp_tools.market_get_company_profile(ticker) | |
| return results | |
| async def fetch_technical_indicators(self, tickers: List[str]) -> Dict[str, Any]: | |
| """Fetch technical indicators for tickers. | |
| Args: | |
| tickers: List of stock tickers | |
| Returns: | |
| Technical indicators per ticker | |
| """ | |
| logger.info(f"Fetching technical indicators for {len(tickers)} tickers") | |
| results = {} | |
| for ticker in tickers: | |
| results[ticker] = await mcp_tools.technical_get_indicators(ticker) | |
| return results | |
| async def fetch_macro_data(self) -> Dict[str, Any]: | |
| """Fetch macroeconomic data from FRED. | |
| Returns: | |
| Macroeconomic indicators | |
| """ | |
| logger.info("Fetching macroeconomic data") | |
| results = {} | |
| for series_id in ["GDP", "UNRATE", "DFF"]: | |
| results[series_id] = await mcp_tools.market_get_economic_series(series_id) | |
| return results | |
| # Global MCP router instance | |
| mcp_router = MCPRouter() | |