"""MCP Router for Portfolio Intelligence Platform. Compatibility layer that routes to unified mcp_tools module. This file maintains backwards compatibility with existing code that uses mcp_router.call_*_mcp() methods. All actual implementation is now in backend.mcp_tools module. """ import json import logging from typing import Any, Dict, List from backend import mcp_tools from backend.utils.serialisation import dumps_str logger = logging.getLogger(__name__) class MCPRouter: """Router for orchestrating MCP tool calls. This is a compatibility layer - all actual implementation is in backend.mcp_tools module with namespaced functions. """ def __init__(self): """Initialise MCP router (compatibility layer).""" logger.info("Initialising MCP router (compatibility layer)") # Yahoo Finance MCP methods async def call_yahoo_finance_mcp( self, tool: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call Yahoo Finance MCP tool (delegates to market_* functions). Args: tool: Tool name (get_quote, get_historical_data, get_fundamentals) params: Tool parameters Returns: Tool result """ logger.debug(f"Routing Yahoo Finance MCP call: {tool}") if tool == "get_quote": tickers = params.get("tickers", []) return await mcp_tools.market_get_quote(dumps_str(tickers)) elif tool == "get_historical_data": return await mcp_tools.market_get_historical_data( ticker=params["ticker"], period=params.get("period", "1y"), interval=params.get("interval", "1d"), ) elif tool == "get_fundamentals": return await mcp_tools.market_get_fundamentals(ticker=params["ticker"]) else: raise ValueError(f"Unknown Yahoo Finance tool: {tool}") # FMP MCP methods async def call_fmp_mcp(self, tool: str, params: Dict[str, Any]) -> Dict[str, Any]: """Call Financial Modeling Prep MCP tool (delegates to market_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing FMP MCP call: {tool}") if tool == "get_company_profile": return await mcp_tools.market_get_company_profile(ticker=params["ticker"]) elif tool == "get_income_statement": return await mcp_tools.market_get_income_statement( ticker=params["ticker"], period=params.get("period", "annual"), limit=str(params.get("limit", 5)), ) elif tool == "get_balance_sheet": return await mcp_tools.market_get_balance_sheet( ticker=params["ticker"], period=params.get("period", "annual"), limit=str(params.get("limit", 5)), ) elif tool == "get_cash_flow_statement": return await mcp_tools.market_get_cash_flow_statement( ticker=params["ticker"], period=params.get("period", "annual"), limit=str(params.get("limit", 5)), ) elif tool == "get_financial_ratios": return await mcp_tools.market_get_financial_ratios( ticker=params["ticker"], ttm=str(params.get("ttm", True)).lower(), ) elif tool == "get_key_metrics": return await mcp_tools.market_get_key_metrics( ticker=params["ticker"], ttm=str(params.get("ttm", True)).lower(), ) else: raise ValueError(f"Unknown FMP tool: {tool}") # Trading MCP methods async def call_trading_mcp( self, tool: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call Trading MCP tool (delegates to technical_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing Trading MCP call: {tool}") if tool == "get_technical_indicators": return await mcp_tools.technical_get_indicators( ticker=params["ticker"], period=params.get("period", "3mo"), ) else: raise ValueError(f"Unknown Trading MCP tool: {tool}") # FRED MCP methods async def call_fred_mcp(self, tool: str, params: Dict[str, Any]) -> Dict[str, Any]: """Call FRED MCP tool (delegates to market_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing FRED MCP call: {tool}") if tool == "get_economic_series": return await mcp_tools.market_get_economic_series( series_id=params["series_id"], observation_start=params.get("observation_start"), observation_end=params.get("observation_end"), ) else: raise ValueError(f"Unknown FRED tool: {tool}") # Portfolio Optimizer MCP methods async def call_portfolio_optimizer_mcp( self, tool: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call Portfolio Optimizer MCP tool (delegates to portfolio_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing Portfolio Optimizer MCP call: {tool}") market_data = params.get("market_data", []) market_data_json = dumps_str(market_data) risk_tolerance = params.get("risk_tolerance", "moderate") if tool == "optimize_hrp": return await mcp_tools.portfolio_optimize_hrp( market_data_json=market_data_json, risk_tolerance=risk_tolerance, ) elif tool == "optimize_black_litterman": return await mcp_tools.portfolio_optimize_black_litterman( market_data_json=market_data_json, risk_tolerance=risk_tolerance, ) elif tool == "optimize_mean_variance": return await mcp_tools.portfolio_optimize_mean_variance( market_data_json=market_data_json, risk_tolerance=risk_tolerance, ) else: raise ValueError(f"Unknown Portfolio Optimizer tool: {tool}") # Risk Analyzer MCP methods async def call_risk_analyzer_mcp( self, tool: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call Risk Analyzer MCP tool (delegates to risk_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing Risk Analyzer MCP call: {tool}") if tool == "analyze_risk": portfolio = params.get("portfolio", []) benchmark = params.get("benchmark") return await mcp_tools.risk_analyze( portfolio_json=dumps_str(portfolio), portfolio_value=str(params.get("portfolio_value", 100000)), confidence_level=str(params.get("confidence_level", 0.95)), time_horizon=str(params.get("time_horizon", 1)), method=params.get("method", "historical"), num_simulations=str(params.get("num_simulations", 10000)), benchmark_json=dumps_str(benchmark) if benchmark else None, ) elif tool == "forecast_volatility_garch": returns = params.get("returns", []) return await mcp_tools.risk_forecast_volatility_garch( ticker=params["ticker"], returns_json=dumps_str([float(r) for r in returns]), forecast_horizon=str(params.get("forecast_horizon", 30)), garch_p=str(params.get("garch_p", 1)), garch_q=str(params.get("garch_q", 1)), ) else: raise ValueError(f"Unknown Risk Analyzer tool: {tool}") # Ensemble Predictor MCP methods async def call_ensemble_predictor_mcp( self, tool: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call Ensemble Predictor MCP tool (delegates to ml_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing Ensemble Predictor MCP call: {tool}") if tool == "forecast_ensemble": prices = params.get("prices", []) dates = params.get("dates") return await mcp_tools.ml_forecast_ensemble( ticker=params["ticker"], prices_json=dumps_str([float(p) for p in prices]), dates_json=dumps_str(dates) if dates else None, forecast_horizon=str(params.get("forecast_horizon", 30)), confidence_level=str(params.get("confidence_level", 0.95)), use_returns=str(params.get("use_returns", True)).lower(), ensemble_method=params.get("ensemble_method", "mean"), ) else: raise ValueError(f"Unknown Ensemble Predictor tool: {tool}") # News Sentiment MCP methods async def call_news_sentiment_mcp( self, tool: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call News Sentiment MCP tool (delegates to sentiment_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing News Sentiment MCP call: {tool}") if tool == "get_news_with_sentiment": return await mcp_tools.sentiment_get_news( ticker=params["ticker"], days_back=str(params.get("days_back", 7)), ) else: raise ValueError(f"Unknown News Sentiment tool: {tool}") # Feature Extraction MCP methods async def call_feature_extraction_mcp( self, tool: str, params: Dict[str, Any] ) -> Dict[str, Any]: """Call Feature Extraction MCP tool (delegates to technical_* functions). Args: tool: Tool name params: Tool parameters Returns: Tool result """ logger.debug(f"Routing Feature Extraction MCP call: {tool}") if tool == "extract_technical_features": return await mcp_tools.technical_extract_features( ticker=params["ticker"], prices=dumps_str(params.get("prices", [])), volumes=dumps_str(params.get("volumes", [])), include_momentum=str(params.get("include_momentum", True)).lower(), include_volatility=str(params.get("include_volatility", True)).lower(), include_trend=str(params.get("include_trend", True)).lower(), ) elif tool == "normalise_features": return await mcp_tools.technical_normalise_features( ticker=params["ticker"], features=dumps_str(params.get("features", {})), historical_features=dumps_str(params.get("historical_features", [])), window_size=str(params.get("window_size", 100)), method=params.get("method", "ewm"), ) elif tool == "select_features": return await mcp_tools.technical_select_features( ticker=params["ticker"], feature_vector=dumps_str(params.get("feature_vector", {})), max_features=str(params.get("max_features", 15)), variance_threshold=str(params.get("variance_threshold", 0.95)), ) elif tool == "compute_feature_vector": return await mcp_tools.technical_compute_feature_vector( ticker=params["ticker"], technical_features=dumps_str(params.get("technical_features", {})), fundamental_features=dumps_str(params.get("fundamental_features", {})), sentiment_features=dumps_str(params.get("sentiment_features", {})), max_features=str(params.get("max_features", 30)), selection_method=params.get("selection_method", "pca"), ) else: raise ValueError(f"Unknown Feature Extraction tool: {tool}") # High-level helper methods async def fetch_market_data(self, tickers: List[str]) -> Dict[str, Any]: """Fetch market data for given tickers. Args: tickers: List of stock/asset tickers Returns: Market data from Yahoo Finance """ logger.info(f"Fetching market data for {len(tickers)} tickers") results = await mcp_tools.market_get_quote(dumps_str(tickers)) return {r.get("ticker", r.get("symbol")): r for r in results} async def fetch_fundamentals(self, tickers: List[str]) -> Dict[str, Any]: """Fetch fundamental data for tickers. Args: tickers: List of stock tickers Returns: Fundamental data per ticker """ logger.info(f"Fetching fundamentals for {len(tickers)} tickers") results = {} for ticker in tickers: results[ticker] = await mcp_tools.market_get_company_profile(ticker) return results async def fetch_technical_indicators(self, tickers: List[str]) -> Dict[str, Any]: """Fetch technical indicators for tickers. Args: tickers: List of stock tickers Returns: Technical indicators per ticker """ logger.info(f"Fetching technical indicators for {len(tickers)} tickers") results = {} for ticker in tickers: results[ticker] = await mcp_tools.technical_get_indicators(ticker) return results async def fetch_macro_data(self) -> Dict[str, Any]: """Fetch macroeconomic data from FRED. Returns: Macroeconomic indicators """ logger.info("Fetching macroeconomic data") results = {} for series_id in ["GDP", "UNRATE", "DFF"]: results[series_id] = await mcp_tools.market_get_economic_series(series_id) return results # Global MCP router instance mcp_router = MCPRouter()