BrianIsaac's picture
feat: enable native Gradio MCP server with fastmcp 2.9.1
cea2220
"""MCP Router for Portfolio Intelligence Platform.
Compatibility layer that routes to unified mcp_tools module.
This file maintains backwards compatibility with existing code that uses
mcp_router.call_*_mcp() methods.
All actual implementation is now in backend.mcp_tools module.
"""
import json
import logging
from typing import Any, Dict, List
from backend import mcp_tools
from backend.utils.serialisation import dumps_str
logger = logging.getLogger(__name__)
class MCPRouter:
"""Router for orchestrating MCP tool calls.
This is a compatibility layer - all actual implementation
is in backend.mcp_tools module with namespaced functions.
"""
def __init__(self):
"""Initialise MCP router (compatibility layer)."""
logger.info("Initialising MCP router (compatibility layer)")
# Yahoo Finance MCP methods
async def call_yahoo_finance_mcp(
self, tool: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call Yahoo Finance MCP tool (delegates to market_* functions).
Args:
tool: Tool name (get_quote, get_historical_data, get_fundamentals)
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing Yahoo Finance MCP call: {tool}")
if tool == "get_quote":
tickers = params.get("tickers", [])
return await mcp_tools.market_get_quote(dumps_str(tickers))
elif tool == "get_historical_data":
return await mcp_tools.market_get_historical_data(
ticker=params["ticker"],
period=params.get("period", "1y"),
interval=params.get("interval", "1d"),
)
elif tool == "get_fundamentals":
return await mcp_tools.market_get_fundamentals(ticker=params["ticker"])
else:
raise ValueError(f"Unknown Yahoo Finance tool: {tool}")
# FMP MCP methods
async def call_fmp_mcp(self, tool: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Call Financial Modeling Prep MCP tool (delegates to market_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing FMP MCP call: {tool}")
if tool == "get_company_profile":
return await mcp_tools.market_get_company_profile(ticker=params["ticker"])
elif tool == "get_income_statement":
return await mcp_tools.market_get_income_statement(
ticker=params["ticker"],
period=params.get("period", "annual"),
limit=str(params.get("limit", 5)),
)
elif tool == "get_balance_sheet":
return await mcp_tools.market_get_balance_sheet(
ticker=params["ticker"],
period=params.get("period", "annual"),
limit=str(params.get("limit", 5)),
)
elif tool == "get_cash_flow_statement":
return await mcp_tools.market_get_cash_flow_statement(
ticker=params["ticker"],
period=params.get("period", "annual"),
limit=str(params.get("limit", 5)),
)
elif tool == "get_financial_ratios":
return await mcp_tools.market_get_financial_ratios(
ticker=params["ticker"],
ttm=str(params.get("ttm", True)).lower(),
)
elif tool == "get_key_metrics":
return await mcp_tools.market_get_key_metrics(
ticker=params["ticker"],
ttm=str(params.get("ttm", True)).lower(),
)
else:
raise ValueError(f"Unknown FMP tool: {tool}")
# Trading MCP methods
async def call_trading_mcp(
self, tool: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call Trading MCP tool (delegates to technical_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing Trading MCP call: {tool}")
if tool == "get_technical_indicators":
return await mcp_tools.technical_get_indicators(
ticker=params["ticker"],
period=params.get("period", "3mo"),
)
else:
raise ValueError(f"Unknown Trading MCP tool: {tool}")
# FRED MCP methods
async def call_fred_mcp(self, tool: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Call FRED MCP tool (delegates to market_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing FRED MCP call: {tool}")
if tool == "get_economic_series":
return await mcp_tools.market_get_economic_series(
series_id=params["series_id"],
observation_start=params.get("observation_start"),
observation_end=params.get("observation_end"),
)
else:
raise ValueError(f"Unknown FRED tool: {tool}")
# Portfolio Optimizer MCP methods
async def call_portfolio_optimizer_mcp(
self, tool: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call Portfolio Optimizer MCP tool (delegates to portfolio_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing Portfolio Optimizer MCP call: {tool}")
market_data = params.get("market_data", [])
market_data_json = dumps_str(market_data)
risk_tolerance = params.get("risk_tolerance", "moderate")
if tool == "optimize_hrp":
return await mcp_tools.portfolio_optimize_hrp(
market_data_json=market_data_json,
risk_tolerance=risk_tolerance,
)
elif tool == "optimize_black_litterman":
return await mcp_tools.portfolio_optimize_black_litterman(
market_data_json=market_data_json,
risk_tolerance=risk_tolerance,
)
elif tool == "optimize_mean_variance":
return await mcp_tools.portfolio_optimize_mean_variance(
market_data_json=market_data_json,
risk_tolerance=risk_tolerance,
)
else:
raise ValueError(f"Unknown Portfolio Optimizer tool: {tool}")
# Risk Analyzer MCP methods
async def call_risk_analyzer_mcp(
self, tool: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call Risk Analyzer MCP tool (delegates to risk_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing Risk Analyzer MCP call: {tool}")
if tool == "analyze_risk":
portfolio = params.get("portfolio", [])
benchmark = params.get("benchmark")
return await mcp_tools.risk_analyze(
portfolio_json=dumps_str(portfolio),
portfolio_value=str(params.get("portfolio_value", 100000)),
confidence_level=str(params.get("confidence_level", 0.95)),
time_horizon=str(params.get("time_horizon", 1)),
method=params.get("method", "historical"),
num_simulations=str(params.get("num_simulations", 10000)),
benchmark_json=dumps_str(benchmark) if benchmark else None,
)
elif tool == "forecast_volatility_garch":
returns = params.get("returns", [])
return await mcp_tools.risk_forecast_volatility_garch(
ticker=params["ticker"],
returns_json=dumps_str([float(r) for r in returns]),
forecast_horizon=str(params.get("forecast_horizon", 30)),
garch_p=str(params.get("garch_p", 1)),
garch_q=str(params.get("garch_q", 1)),
)
else:
raise ValueError(f"Unknown Risk Analyzer tool: {tool}")
# Ensemble Predictor MCP methods
async def call_ensemble_predictor_mcp(
self, tool: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call Ensemble Predictor MCP tool (delegates to ml_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing Ensemble Predictor MCP call: {tool}")
if tool == "forecast_ensemble":
prices = params.get("prices", [])
dates = params.get("dates")
return await mcp_tools.ml_forecast_ensemble(
ticker=params["ticker"],
prices_json=dumps_str([float(p) for p in prices]),
dates_json=dumps_str(dates) if dates else None,
forecast_horizon=str(params.get("forecast_horizon", 30)),
confidence_level=str(params.get("confidence_level", 0.95)),
use_returns=str(params.get("use_returns", True)).lower(),
ensemble_method=params.get("ensemble_method", "mean"),
)
else:
raise ValueError(f"Unknown Ensemble Predictor tool: {tool}")
# News Sentiment MCP methods
async def call_news_sentiment_mcp(
self, tool: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call News Sentiment MCP tool (delegates to sentiment_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing News Sentiment MCP call: {tool}")
if tool == "get_news_with_sentiment":
return await mcp_tools.sentiment_get_news(
ticker=params["ticker"],
days_back=str(params.get("days_back", 7)),
)
else:
raise ValueError(f"Unknown News Sentiment tool: {tool}")
# Feature Extraction MCP methods
async def call_feature_extraction_mcp(
self, tool: str, params: Dict[str, Any]
) -> Dict[str, Any]:
"""Call Feature Extraction MCP tool (delegates to technical_* functions).
Args:
tool: Tool name
params: Tool parameters
Returns:
Tool result
"""
logger.debug(f"Routing Feature Extraction MCP call: {tool}")
if tool == "extract_technical_features":
return await mcp_tools.technical_extract_features(
ticker=params["ticker"],
prices=dumps_str(params.get("prices", [])),
volumes=dumps_str(params.get("volumes", [])),
include_momentum=str(params.get("include_momentum", True)).lower(),
include_volatility=str(params.get("include_volatility", True)).lower(),
include_trend=str(params.get("include_trend", True)).lower(),
)
elif tool == "normalise_features":
return await mcp_tools.technical_normalise_features(
ticker=params["ticker"],
features=dumps_str(params.get("features", {})),
historical_features=dumps_str(params.get("historical_features", [])),
window_size=str(params.get("window_size", 100)),
method=params.get("method", "ewm"),
)
elif tool == "select_features":
return await mcp_tools.technical_select_features(
ticker=params["ticker"],
feature_vector=dumps_str(params.get("feature_vector", {})),
max_features=str(params.get("max_features", 15)),
variance_threshold=str(params.get("variance_threshold", 0.95)),
)
elif tool == "compute_feature_vector":
return await mcp_tools.technical_compute_feature_vector(
ticker=params["ticker"],
technical_features=dumps_str(params.get("technical_features", {})),
fundamental_features=dumps_str(params.get("fundamental_features", {})),
sentiment_features=dumps_str(params.get("sentiment_features", {})),
max_features=str(params.get("max_features", 30)),
selection_method=params.get("selection_method", "pca"),
)
else:
raise ValueError(f"Unknown Feature Extraction tool: {tool}")
# High-level helper methods
async def fetch_market_data(self, tickers: List[str]) -> Dict[str, Any]:
"""Fetch market data for given tickers.
Args:
tickers: List of stock/asset tickers
Returns:
Market data from Yahoo Finance
"""
logger.info(f"Fetching market data for {len(tickers)} tickers")
results = await mcp_tools.market_get_quote(dumps_str(tickers))
return {r.get("ticker", r.get("symbol")): r for r in results}
async def fetch_fundamentals(self, tickers: List[str]) -> Dict[str, Any]:
"""Fetch fundamental data for tickers.
Args:
tickers: List of stock tickers
Returns:
Fundamental data per ticker
"""
logger.info(f"Fetching fundamentals for {len(tickers)} tickers")
results = {}
for ticker in tickers:
results[ticker] = await mcp_tools.market_get_company_profile(ticker)
return results
async def fetch_technical_indicators(self, tickers: List[str]) -> Dict[str, Any]:
"""Fetch technical indicators for tickers.
Args:
tickers: List of stock tickers
Returns:
Technical indicators per ticker
"""
logger.info(f"Fetching technical indicators for {len(tickers)} tickers")
results = {}
for ticker in tickers:
results[ticker] = await mcp_tools.technical_get_indicators(ticker)
return results
async def fetch_macro_data(self) -> Dict[str, Any]:
"""Fetch macroeconomic data from FRED.
Returns:
Macroeconomic indicators
"""
logger.info("Fetching macroeconomic data")
results = {}
for series_id in ["GDP", "UNRATE", "DFF"]:
results[series_id] = await mcp_tools.market_get_economic_series(series_id)
return results
# Global MCP router instance
mcp_router = MCPRouter()