BrianIsaac's picture
feat: enable native Gradio MCP server with fastmcp 2.9.1
cea2220
"""Unified MCP tools for Portfolio Intelligence Platform.
This module contains all MCP-compatible tool functions that can be:
1. Called directly by agents and workflows
2. Exposed as MCP tools via Gradio's mcp_server=True
3. Cached via @cached_async decorators
All tools use namespaced function names for clear organisation:
- market_*: Market data, fundamentals, and economic data
- technical_*: Technical analysis and feature extraction
- portfolio_*: Portfolio optimisation
- risk_*: Risk analysis and volatility forecasting
- ml_*: Machine learning predictions
- sentiment_*: News sentiment analysis
"""
import json
import logging
from decimal import Decimal
from typing import Any, Dict, List, Literal, Optional, cast
from backend.caching.decorators import cached_async
from backend.caching.redis_cache import CacheDataType
logger = logging.getLogger(__name__)
def _convert_decimals_to_floats(obj: Any) -> Any:
"""Recursively convert Decimal values to floats in a dict/list structure.
Pydantic v2 serializes Decimals to strings by default. This function
converts them back to floats for backward compatibility.
Args:
obj: Object to convert (dict, list, or value)
Returns:
Object with Decimals converted to floats
"""
if isinstance(obj, dict):
return {k: _convert_decimals_to_floats(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [_convert_decimals_to_floats(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, str):
try:
return float(obj)
except ValueError:
return obj
else:
return obj
# =============================================================================
# MARKET DATA TOOLS (Yahoo Finance) - 3 tools
# =============================================================================
@cached_async(
namespace="yahoo_finance",
data_type=CacheDataType.MARKET_DATA,
)
async def market_get_quote(tickers: str) -> List[Dict[str, Any]]:
"""Get real-time quotes for multiple tickers.
Args:
tickers: JSON array of stock ticker symbols (e.g., '["AAPL", "NVDA"]')
Returns:
List of quote dictionaries with price, volume, market cap, etc.
"""
from backend.mcp_servers.yahoo_finance_mcp import get_quote, QuoteRequest
tickers_list = json.loads(tickers) if isinstance(tickers, str) else tickers
request = QuoteRequest(tickers=tickers_list)
result = await get_quote.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="yahoo_finance",
data_type=CacheDataType.MARKET_DATA,
)
async def market_get_historical_data(
ticker: str, period: str = "1y", interval: str = "1d"
) -> Dict[str, Any]:
"""Get historical OHLCV price data for a ticker.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max)
interval: Data interval (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo)
Returns:
Dictionary with dates, OHLCV arrays, and calculated returns.
"""
from backend.mcp_servers.yahoo_finance_mcp import (
get_historical_data,
HistoricalRequest,
)
request = HistoricalRequest(ticker=ticker, period=period, interval=interval)
result = await get_historical_data.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="yahoo_finance",
data_type=CacheDataType.HISTORICAL_DATA,
)
async def market_get_fundamentals(ticker: str) -> Dict[str, Any]:
"""Get company fundamentals and key financial metrics.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
Returns:
Dictionary with company name, sector, industry, P/E, market cap, etc.
"""
from backend.mcp_servers.yahoo_finance_mcp import (
get_fundamentals,
FundamentalsRequest,
)
request = FundamentalsRequest(ticker=ticker)
result = await get_fundamentals.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# FUNDAMENTALS TOOLS (FMP) - 6 tools
# =============================================================================
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_company_profile(ticker: str) -> Dict[str, Any]:
"""Get company profile with business description and metadata.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
Returns:
Dictionary with company name, sector, industry, description, CEO, etc.
"""
from backend.mcp_servers.fmp_mcp import get_company_profile, CompanyProfileRequest
request = CompanyProfileRequest(ticker=ticker)
result = await get_company_profile.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_income_statement(
ticker: str, period: str = "annual", limit: str = "5"
) -> List[Dict[str, Any]]:
"""Get historical income statement data.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Report period ('annual' or 'quarter')
limit: Number of periods to retrieve as string (default: '5')
Returns:
List of income statement dictionaries with revenue, net income, EPS, etc.
"""
from backend.mcp_servers.fmp_mcp import (
get_income_statement,
FinancialStatementsRequest,
)
request = FinancialStatementsRequest(
ticker=ticker, period=period, limit=int(limit)
)
result = await get_income_statement.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_balance_sheet(
ticker: str, period: str = "annual", limit: str = "5"
) -> List[Dict[str, Any]]:
"""Get historical balance sheet data.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Report period ('annual' or 'quarter')
limit: Number of periods to retrieve as string (default: '5')
Returns:
List of balance sheet dictionaries with assets, liabilities, equity, etc.
"""
from backend.mcp_servers.fmp_mcp import get_balance_sheet, FinancialStatementsRequest
request = FinancialStatementsRequest(
ticker=ticker, period=period, limit=int(limit)
)
result = await get_balance_sheet.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_cash_flow_statement(
ticker: str, period: str = "annual", limit: str = "5"
) -> List[Dict[str, Any]]:
"""Get historical cash flow statement data.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Report period ('annual' or 'quarter')
limit: Number of periods to retrieve as string (default: '5')
Returns:
List of cash flow statements with operating, investing, financing flows.
"""
from backend.mcp_servers.fmp_mcp import (
get_cash_flow_statement,
FinancialStatementsRequest,
)
request = FinancialStatementsRequest(
ticker=ticker, period=period, limit=int(limit)
)
result = await get_cash_flow_statement.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_financial_ratios(
ticker: str, ttm: str = "true"
) -> Dict[str, Any]:
"""Get key financial ratios.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
ttm: Use trailing twelve months as string ('true' or 'false')
Returns:
Dictionary with profitability, liquidity, efficiency, and leverage ratios.
"""
from backend.mcp_servers.fmp_mcp import get_financial_ratios, FinancialRatiosRequest
request = FinancialRatiosRequest(ticker=ticker, ttm=ttm.lower() == "true")
result = await get_financial_ratios.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_key_metrics(ticker: str, ttm: str = "true") -> Dict[str, Any]:
"""Get key company metrics.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
ttm: Use trailing twelve months as string ('true' or 'false')
Returns:
Dictionary with market cap, P/E, P/B, EV/EBITDA, per-share metrics.
"""
from backend.mcp_servers.fmp_mcp import get_key_metrics, KeyMetricsRequest
request = KeyMetricsRequest(ticker=ticker, ttm=ttm.lower() == "true")
result = await get_key_metrics.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# ECONOMIC DATA TOOLS (FRED) - 1 tool
# =============================================================================
@cached_async(
namespace="fred",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=86400,
)
async def market_get_economic_series(
series_id: str,
observation_start: Optional[str] = None,
observation_end: Optional[str] = None,
) -> Dict[str, Any]:
"""Get economic data series from FRED.
Args:
series_id: FRED series ID (e.g., 'GDP', 'UNRATE', 'DFF', 'CPIAUCSL')
observation_start: Start date in YYYY-MM-DD format (optional)
observation_end: End date in YYYY-MM-DD format (optional)
Returns:
Dictionary with series_id, title, units, frequency, and observations.
"""
from backend.mcp_servers.fred_mcp import get_economic_series, SeriesRequest
request = SeriesRequest(
series_id=series_id,
observation_start=observation_start,
observation_end=observation_end,
)
result = await get_economic_series.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# TECHNICAL ANALYSIS TOOLS - 5 tools
# =============================================================================
@cached_async(
namespace="trading",
data_type=CacheDataType.HISTORICAL_DATA,
)
async def technical_get_indicators(
ticker: str, period: str = "3mo"
) -> Dict[str, Any]:
"""Get technical indicators for a ticker.
Calculates RSI, MACD, Bollinger Bands, moving averages, and overall signal.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Data period (1mo, 3mo, 6mo, 1y)
Returns:
Dictionary with RSI, MACD, Bollinger Bands, moving averages, volume trend,
and overall signal (buy, sell, or hold).
"""
from backend.mcp_servers.trading_mcp import (
get_technical_indicators,
TechnicalIndicatorsRequest,
)
request = TechnicalIndicatorsRequest(ticker=ticker, period=period)
result = await get_technical_indicators.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_extract_features(
ticker: str,
prices: str,
volumes: str = "[]",
include_momentum: str = "true",
include_volatility: str = "true",
include_trend: str = "true",
) -> Dict[str, Any]:
"""Extract technical features with look-ahead bias prevention.
All features are calculated using SHIFTED data to prevent future data leakage.
Args:
ticker: Stock ticker symbol
prices: JSON array of historical closing prices
volumes: JSON array of historical volumes (optional)
include_momentum: Include momentum indicators ('true' or 'false')
include_volatility: Include volatility indicators ('true' or 'false')
include_trend: Include trend indicators ('true' or 'false')
Returns:
Dictionary with extracted features and feature count.
"""
from backend.mcp_servers.feature_extraction_mcp import (
extract_technical_features,
FeatureExtractionRequest,
)
prices_list = json.loads(prices) if isinstance(prices, str) else prices
volumes_list = json.loads(volumes) if isinstance(volumes, str) else volumes
request = FeatureExtractionRequest(
ticker=ticker,
prices=prices_list,
volumes=volumes_list,
include_momentum=include_momentum.lower() == "true",
include_volatility=include_volatility.lower() == "true",
include_trend=include_trend.lower() == "true",
)
result = await extract_technical_features.fn(request)
return result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_normalise_features(
ticker: str,
features: str,
historical_features: str = "[]",
window_size: str = "100",
method: str = "ewm",
) -> Dict[str, Any]:
"""Normalise features using adaptive rolling window statistics.
Uses exponentially weighted mean/variance for robust time-varying normalisation.
Args:
ticker: Stock ticker symbol
features: JSON object of current feature values
historical_features: JSON array of historical feature observations
window_size: Rolling window size as string (default: '100')
method: Normalisation method ('ewm' or 'z_score')
Returns:
Dictionary with normalised features.
"""
from backend.mcp_servers.feature_extraction_mcp import (
normalise_features,
NormalisationRequest,
)
features_dict = json.loads(features) if isinstance(features, str) else features
hist_list = (
json.loads(historical_features)
if isinstance(historical_features, str)
else historical_features
)
request = NormalisationRequest(
ticker=ticker,
features=features_dict,
historical_features=hist_list,
window_size=int(window_size),
method=method,
)
result = await normalise_features.fn(request)
return result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_select_features(
ticker: str,
feature_vector: str,
max_features: str = "15",
variance_threshold: str = "0.95",
) -> Dict[str, Any]:
"""Select optimal features using PCA for dimensionality reduction.
Target: 6-15 features to balance predictive power with overfitting prevention.
Args:
ticker: Stock ticker symbol
feature_vector: JSON object of full feature vector
max_features: Maximum features to select as string (default: '15')
variance_threshold: Variance threshold for PCA as string (default: '0.95')
Returns:
Dictionary with selected features and metadata.
"""
from backend.mcp_servers.feature_extraction_mcp import (
select_features,
FeatureSelectionRequest,
)
vector_dict = (
json.loads(feature_vector) if isinstance(feature_vector, str) else feature_vector
)
request = FeatureSelectionRequest(
ticker=ticker,
feature_vector=vector_dict,
max_features=int(max_features),
variance_threshold=float(variance_threshold),
)
result = await select_features.fn(request)
return result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_compute_feature_vector(
ticker: str,
technical_features: str = "{}",
fundamental_features: str = "{}",
sentiment_features: str = "{}",
max_features: str = "30",
selection_method: str = "pca",
) -> Dict[str, Any]:
"""Compute combined feature vector from multiple sources.
Combines technical, fundamental, and sentiment features into a single vector
suitable for ML model input.
Args:
ticker: Stock ticker symbol
technical_features: JSON object of technical features
fundamental_features: JSON object of fundamental features
sentiment_features: JSON object of sentiment features
max_features: Maximum features in vector as string (default: '30')
selection_method: Selection method ('pca' or 'variance')
Returns:
Dictionary with combined feature vector and metadata.
"""
from backend.mcp_servers.feature_extraction_mcp import (
compute_feature_vector,
FeatureVectorRequest,
)
tech_dict = (
json.loads(technical_features)
if isinstance(technical_features, str)
else technical_features
)
fund_dict = (
json.loads(fundamental_features)
if isinstance(fundamental_features, str)
else fundamental_features
)
sent_dict = (
json.loads(sentiment_features)
if isinstance(sentiment_features, str)
else sentiment_features
)
request = FeatureVectorRequest(
ticker=ticker,
technical_features=tech_dict,
fundamental_features=fund_dict,
sentiment_features=sent_dict,
max_features=int(max_features),
selection_method=selection_method,
)
result = await compute_feature_vector.fn(request)
return result
# =============================================================================
# PORTFOLIO OPTIMISATION TOOLS - 3 tools
# =============================================================================
@cached_async(
namespace="portfolio_optimizer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def portfolio_optimize_hrp(
market_data_json: str, risk_tolerance: str = "moderate"
) -> Dict[str, Any]:
"""Optimise portfolio using Hierarchical Risk Parity.
HRP uses hierarchical clustering to construct a diversified portfolio
that balances risk across clusters of correlated assets.
Args:
market_data_json: JSON array of market data objects with ticker, prices, dates
e.g., '[{"ticker": "AAPL", "prices": [150.0, 151.5, ...], "dates": ["2024-01-01", ...]}]'
risk_tolerance: Risk level ('conservative', 'moderate', 'aggressive')
Returns:
Dictionary with optimal weights, expected return, volatility, Sharpe ratio.
"""
from backend.mcp_servers.portfolio_optimizer_mcp import (
optimize_hrp,
OptimizationRequest,
MarketDataInput,
)
market_data_list = json.loads(market_data_json)
market_data = [
MarketDataInput(
ticker=item["ticker"],
prices=[Decimal(str(p)) for p in item["prices"]],
dates=item["dates"],
)
for item in market_data_list
]
request = OptimizationRequest(
market_data=market_data, method="hrp", risk_tolerance=risk_tolerance
)
result = await optimize_hrp.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
@cached_async(
namespace="portfolio_optimizer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def portfolio_optimize_black_litterman(
market_data_json: str, risk_tolerance: str = "moderate"
) -> Dict[str, Any]:
"""Optimise portfolio using Black-Litterman model with market equilibrium.
Black-Litterman uses market-implied equilibrium returns as the prior distribution
when no explicit investor views are provided.
Args:
market_data_json: JSON array of market data objects with ticker, prices, dates
risk_tolerance: Risk level ('conservative', 'moderate', 'aggressive')
Returns:
Dictionary with optimal weights, expected return, volatility, Sharpe ratio.
"""
from backend.mcp_servers.portfolio_optimizer_mcp import (
optimize_black_litterman,
OptimizationRequest,
MarketDataInput,
)
market_data_list = json.loads(market_data_json)
market_data = [
MarketDataInput(
ticker=item["ticker"],
prices=[Decimal(str(p)) for p in item["prices"]],
dates=item["dates"],
)
for item in market_data_list
]
request = OptimizationRequest(
market_data=market_data, method="black_litterman", risk_tolerance=risk_tolerance
)
result = await optimize_black_litterman.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
@cached_async(
namespace="portfolio_optimizer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def portfolio_optimize_mean_variance(
market_data_json: str, risk_tolerance: str = "moderate"
) -> Dict[str, Any]:
"""Optimise portfolio using Mean-Variance Optimisation (Markowitz).
Mean-Variance finds the portfolio with maximum Sharpe ratio or
minimum volatility for a given return target.
Args:
market_data_json: JSON array of market data objects with ticker, prices, dates
risk_tolerance: Risk level ('conservative', 'moderate', 'aggressive')
Returns:
Dictionary with optimal weights, expected return, volatility, Sharpe ratio.
"""
from backend.mcp_servers.portfolio_optimizer_mcp import (
optimize_mean_variance,
OptimizationRequest,
MarketDataInput,
)
market_data_list = json.loads(market_data_json)
market_data = [
MarketDataInput(
ticker=item["ticker"],
prices=[Decimal(str(p)) for p in item["prices"]],
dates=item["dates"],
)
for item in market_data_list
]
request = OptimizationRequest(
market_data=market_data, method="mean_variance", risk_tolerance=risk_tolerance
)
result = await optimize_mean_variance.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
# =============================================================================
# RISK ANALYSIS TOOLS - 2 tools
# =============================================================================
@cached_async(
namespace="risk_analyzer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def risk_analyze(
portfolio_json: str,
portfolio_value: str,
confidence_level: str = "0.95",
time_horizon: str = "1",
method: str = "historical",
num_simulations: str = "10000",
benchmark_json: Optional[str] = None,
) -> Dict[str, Any]:
"""Perform comprehensive risk analysis on a portfolio.
Calculates VaR (95%, 99%), CVaR, Sharpe ratio, Sortino ratio,
maximum drawdown, Information Ratio, Calmar Ratio, and Ulcer Index.
Args:
portfolio_json: JSON array of portfolio holdings with ticker, weight, prices
e.g., '[{"ticker": "AAPL", "weight": 0.6, "prices": [150.0, ...]}]'
portfolio_value: Total portfolio value in dollars as string
confidence_level: VaR confidence level as string (default: '0.95')
time_horizon: VaR time horizon in days as string (default: '1')
method: VaR calculation method ('historical', 'parametric', 'monte_carlo')
num_simulations: Monte Carlo simulations as string if method='monte_carlo'
benchmark_json: Optional JSON with benchmark data for Information Ratio
Returns:
Dictionary with VaR, CVaR, risk metrics, and simulation percentiles.
"""
from backend.mcp_servers.risk_analyzer_mcp import (
analyze_risk,
RiskAnalysisRequest,
PortfolioInput,
BenchmarkInput,
)
portfolio_list = json.loads(portfolio_json)
portfolio = [
PortfolioInput(
ticker=item["ticker"],
weight=Decimal(str(item["weight"])),
prices=[Decimal(str(p)) for p in item["prices"]],
)
for item in portfolio_list
]
benchmark = None
if benchmark_json:
benchmark_data = json.loads(benchmark_json)
benchmark = BenchmarkInput(
ticker=benchmark_data["ticker"],
prices=[Decimal(str(p)) for p in benchmark_data["prices"]],
)
request = RiskAnalysisRequest(
portfolio=portfolio,
portfolio_value=Decimal(portfolio_value),
confidence_level=Decimal(confidence_level),
time_horizon=int(time_horizon),
method=method,
num_simulations=int(num_simulations),
benchmark=benchmark,
)
result = await analyze_risk.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
@cached_async(
namespace="risk_analyzer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def risk_forecast_volatility_garch(
ticker: str,
returns_json: str,
forecast_horizon: str = "30",
garch_p: str = "1",
garch_q: str = "1",
) -> Dict[str, Any]:
"""Forecast volatility using GARCH model.
GARCH (Generalised Autoregressive Conditional Heteroskedasticity) models
are the industry standard for financial volatility forecasting.
Args:
ticker: Stock ticker symbol
returns_json: JSON array of historical returns (as percentages)
forecast_horizon: Days to forecast as string (default: '30')
garch_p: GARCH lag order as string (default: '1')
garch_q: ARCH lag order as string (default: '1')
Returns:
Dictionary with volatility forecasts, annualised volatility, and diagnostics.
"""
from backend.mcp_servers.risk_analyzer_mcp import (
forecast_volatility_garch,
GARCHForecastRequest,
)
returns_list = json.loads(returns_json)
request = GARCHForecastRequest(
ticker=ticker,
returns=[Decimal(str(r)) for r in returns_list],
forecast_horizon=int(forecast_horizon),
garch_p=int(garch_p),
garch_q=int(garch_q),
)
result = await forecast_volatility_garch.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
# =============================================================================
# MACHINE LEARNING TOOLS - 1 tool
# =============================================================================
async def ml_forecast_ensemble(
ticker: str,
prices_json: str,
dates_json: Optional[str] = None,
forecast_horizon: str = "30",
confidence_level: str = "0.95",
use_returns: str = "true",
ensemble_method: str = "mean",
) -> Dict[str, Any]:
"""Forecast stock prices using ensemble ML models.
Combines multiple forecasting models (Chronos-Bolt, TTM, N-HiTS)
to produce robust predictions with uncertainty quantification.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
prices_json: JSON array of historical prices (minimum 10 values)
dates_json: Optional JSON array of corresponding dates
forecast_horizon: Number of days to forecast as string (default: '30')
confidence_level: Confidence level for intervals as string (default: '0.95')
use_returns: Forecast returns instead of raw prices ('true' or 'false')
ensemble_method: Combination method ('mean', 'median', 'weighted')
Returns:
Dictionary with forecasts, confidence intervals, and model metadata.
"""
from backend.mcp_servers.ensemble_predictor_mcp import (
forecast_ensemble,
ForecastRequest,
)
prices_list = json.loads(prices_json)
dates_list = json.loads(dates_json) if dates_json else None
ensemble_method_literal = cast(
Literal["mean", "median", "weighted"], ensemble_method
)
request = ForecastRequest(
ticker=ticker,
prices=[Decimal(str(p)) for p in prices_list],
dates=dates_list,
forecast_horizon=int(forecast_horizon),
confidence_level=float(confidence_level),
use_returns=use_returns.lower() == "true",
ensemble_method=ensemble_method_literal,
)
result = await forecast_ensemble.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
# =============================================================================
# SENTIMENT ANALYSIS TOOLS - 1 tool
# =============================================================================
@cached_async(
namespace="news_sentiment",
data_type=CacheDataType.USER_DATA,
ttl=7200,
)
async def sentiment_get_news(ticker: str, days_back: str = "7") -> Dict[str, Any]:
"""Fetch recent news for a ticker and analyse sentiment.
Uses Finnhub API for news retrieval and VADER for sentiment analysis.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
days_back: Number of days of historical news as string (default: '7')
Returns:
Dictionary with overall sentiment, confidence, article count, and articles.
"""
from backend.mcp_servers.news_sentiment_mcp import get_news_with_sentiment
result = await get_news_with_sentiment.fn(ticker=ticker, days_back=int(days_back))
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# CONVENIENCE FUNCTIONS (for internal use by agents)
# =============================================================================
async def get_quote_list(tickers: List[str]) -> List[Dict[str, Any]]:
"""Internal convenience function - accepts Python list instead of JSON string.
Args:
tickers: List of stock ticker symbols
Returns:
List of quote dictionaries
"""
return await market_get_quote(json.dumps(tickers))
async def get_historical_prices(
ticker: str, period: str = "1y", interval: str = "1d"
) -> Dict[str, Any]:
"""Internal convenience function - alias for market_get_historical_data.
Args:
ticker: Stock ticker symbol
period: Time period
interval: Data interval
Returns:
Historical price data dictionary
"""
return await market_get_historical_data(ticker, period, interval)