Spaces:
Running
on
Zero
Running
on
Zero
File size: 30,801 Bytes
cea2220 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 |
"""Unified MCP tools for Portfolio Intelligence Platform.
This module contains all MCP-compatible tool functions that can be:
1. Called directly by agents and workflows
2. Exposed as MCP tools via Gradio's mcp_server=True
3. Cached via @cached_async decorators
All tools use namespaced function names for clear organisation:
- market_*: Market data, fundamentals, and economic data
- technical_*: Technical analysis and feature extraction
- portfolio_*: Portfolio optimisation
- risk_*: Risk analysis and volatility forecasting
- ml_*: Machine learning predictions
- sentiment_*: News sentiment analysis
"""
import json
import logging
from decimal import Decimal
from typing import Any, Dict, List, Literal, Optional, cast
from backend.caching.decorators import cached_async
from backend.caching.redis_cache import CacheDataType
logger = logging.getLogger(__name__)
def _convert_decimals_to_floats(obj: Any) -> Any:
"""Recursively convert Decimal values to floats in a dict/list structure.
Pydantic v2 serializes Decimals to strings by default. This function
converts them back to floats for backward compatibility.
Args:
obj: Object to convert (dict, list, or value)
Returns:
Object with Decimals converted to floats
"""
if isinstance(obj, dict):
return {k: _convert_decimals_to_floats(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [_convert_decimals_to_floats(item) for item in obj]
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, str):
try:
return float(obj)
except ValueError:
return obj
else:
return obj
# =============================================================================
# MARKET DATA TOOLS (Yahoo Finance) - 3 tools
# =============================================================================
@cached_async(
namespace="yahoo_finance",
data_type=CacheDataType.MARKET_DATA,
)
async def market_get_quote(tickers: str) -> List[Dict[str, Any]]:
"""Get real-time quotes for multiple tickers.
Args:
tickers: JSON array of stock ticker symbols (e.g., '["AAPL", "NVDA"]')
Returns:
List of quote dictionaries with price, volume, market cap, etc.
"""
from backend.mcp_servers.yahoo_finance_mcp import get_quote, QuoteRequest
tickers_list = json.loads(tickers) if isinstance(tickers, str) else tickers
request = QuoteRequest(tickers=tickers_list)
result = await get_quote.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="yahoo_finance",
data_type=CacheDataType.MARKET_DATA,
)
async def market_get_historical_data(
ticker: str, period: str = "1y", interval: str = "1d"
) -> Dict[str, Any]:
"""Get historical OHLCV price data for a ticker.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max)
interval: Data interval (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo)
Returns:
Dictionary with dates, OHLCV arrays, and calculated returns.
"""
from backend.mcp_servers.yahoo_finance_mcp import (
get_historical_data,
HistoricalRequest,
)
request = HistoricalRequest(ticker=ticker, period=period, interval=interval)
result = await get_historical_data.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="yahoo_finance",
data_type=CacheDataType.HISTORICAL_DATA,
)
async def market_get_fundamentals(ticker: str) -> Dict[str, Any]:
"""Get company fundamentals and key financial metrics.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
Returns:
Dictionary with company name, sector, industry, P/E, market cap, etc.
"""
from backend.mcp_servers.yahoo_finance_mcp import (
get_fundamentals,
FundamentalsRequest,
)
request = FundamentalsRequest(ticker=ticker)
result = await get_fundamentals.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# FUNDAMENTALS TOOLS (FMP) - 6 tools
# =============================================================================
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_company_profile(ticker: str) -> Dict[str, Any]:
"""Get company profile with business description and metadata.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
Returns:
Dictionary with company name, sector, industry, description, CEO, etc.
"""
from backend.mcp_servers.fmp_mcp import get_company_profile, CompanyProfileRequest
request = CompanyProfileRequest(ticker=ticker)
result = await get_company_profile.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_income_statement(
ticker: str, period: str = "annual", limit: str = "5"
) -> List[Dict[str, Any]]:
"""Get historical income statement data.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Report period ('annual' or 'quarter')
limit: Number of periods to retrieve as string (default: '5')
Returns:
List of income statement dictionaries with revenue, net income, EPS, etc.
"""
from backend.mcp_servers.fmp_mcp import (
get_income_statement,
FinancialStatementsRequest,
)
request = FinancialStatementsRequest(
ticker=ticker, period=period, limit=int(limit)
)
result = await get_income_statement.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_balance_sheet(
ticker: str, period: str = "annual", limit: str = "5"
) -> List[Dict[str, Any]]:
"""Get historical balance sheet data.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Report period ('annual' or 'quarter')
limit: Number of periods to retrieve as string (default: '5')
Returns:
List of balance sheet dictionaries with assets, liabilities, equity, etc.
"""
from backend.mcp_servers.fmp_mcp import get_balance_sheet, FinancialStatementsRequest
request = FinancialStatementsRequest(
ticker=ticker, period=period, limit=int(limit)
)
result = await get_balance_sheet.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_cash_flow_statement(
ticker: str, period: str = "annual", limit: str = "5"
) -> List[Dict[str, Any]]:
"""Get historical cash flow statement data.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Report period ('annual' or 'quarter')
limit: Number of periods to retrieve as string (default: '5')
Returns:
List of cash flow statements with operating, investing, financing flows.
"""
from backend.mcp_servers.fmp_mcp import (
get_cash_flow_statement,
FinancialStatementsRequest,
)
request = FinancialStatementsRequest(
ticker=ticker, period=period, limit=int(limit)
)
result = await get_cash_flow_statement.fn(request)
return [r.model_dump() if hasattr(r, "model_dump") else r for r in result]
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_financial_ratios(
ticker: str, ttm: str = "true"
) -> Dict[str, Any]:
"""Get key financial ratios.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
ttm: Use trailing twelve months as string ('true' or 'false')
Returns:
Dictionary with profitability, liquidity, efficiency, and leverage ratios.
"""
from backend.mcp_servers.fmp_mcp import get_financial_ratios, FinancialRatiosRequest
request = FinancialRatiosRequest(ticker=ticker, ttm=ttm.lower() == "true")
result = await get_financial_ratios.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="fmp",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=21600,
)
async def market_get_key_metrics(ticker: str, ttm: str = "true") -> Dict[str, Any]:
"""Get key company metrics.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
ttm: Use trailing twelve months as string ('true' or 'false')
Returns:
Dictionary with market cap, P/E, P/B, EV/EBITDA, per-share metrics.
"""
from backend.mcp_servers.fmp_mcp import get_key_metrics, KeyMetricsRequest
request = KeyMetricsRequest(ticker=ticker, ttm=ttm.lower() == "true")
result = await get_key_metrics.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# ECONOMIC DATA TOOLS (FRED) - 1 tool
# =============================================================================
@cached_async(
namespace="fred",
data_type=CacheDataType.HISTORICAL_DATA,
ttl=86400,
)
async def market_get_economic_series(
series_id: str,
observation_start: Optional[str] = None,
observation_end: Optional[str] = None,
) -> Dict[str, Any]:
"""Get economic data series from FRED.
Args:
series_id: FRED series ID (e.g., 'GDP', 'UNRATE', 'DFF', 'CPIAUCSL')
observation_start: Start date in YYYY-MM-DD format (optional)
observation_end: End date in YYYY-MM-DD format (optional)
Returns:
Dictionary with series_id, title, units, frequency, and observations.
"""
from backend.mcp_servers.fred_mcp import get_economic_series, SeriesRequest
request = SeriesRequest(
series_id=series_id,
observation_start=observation_start,
observation_end=observation_end,
)
result = await get_economic_series.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# TECHNICAL ANALYSIS TOOLS - 5 tools
# =============================================================================
@cached_async(
namespace="trading",
data_type=CacheDataType.HISTORICAL_DATA,
)
async def technical_get_indicators(
ticker: str, period: str = "3mo"
) -> Dict[str, Any]:
"""Get technical indicators for a ticker.
Calculates RSI, MACD, Bollinger Bands, moving averages, and overall signal.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
period: Data period (1mo, 3mo, 6mo, 1y)
Returns:
Dictionary with RSI, MACD, Bollinger Bands, moving averages, volume trend,
and overall signal (buy, sell, or hold).
"""
from backend.mcp_servers.trading_mcp import (
get_technical_indicators,
TechnicalIndicatorsRequest,
)
request = TechnicalIndicatorsRequest(ticker=ticker, period=period)
result = await get_technical_indicators.fn(request)
return result.model_dump() if hasattr(result, "model_dump") else result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_extract_features(
ticker: str,
prices: str,
volumes: str = "[]",
include_momentum: str = "true",
include_volatility: str = "true",
include_trend: str = "true",
) -> Dict[str, Any]:
"""Extract technical features with look-ahead bias prevention.
All features are calculated using SHIFTED data to prevent future data leakage.
Args:
ticker: Stock ticker symbol
prices: JSON array of historical closing prices
volumes: JSON array of historical volumes (optional)
include_momentum: Include momentum indicators ('true' or 'false')
include_volatility: Include volatility indicators ('true' or 'false')
include_trend: Include trend indicators ('true' or 'false')
Returns:
Dictionary with extracted features and feature count.
"""
from backend.mcp_servers.feature_extraction_mcp import (
extract_technical_features,
FeatureExtractionRequest,
)
prices_list = json.loads(prices) if isinstance(prices, str) else prices
volumes_list = json.loads(volumes) if isinstance(volumes, str) else volumes
request = FeatureExtractionRequest(
ticker=ticker,
prices=prices_list,
volumes=volumes_list,
include_momentum=include_momentum.lower() == "true",
include_volatility=include_volatility.lower() == "true",
include_trend=include_trend.lower() == "true",
)
result = await extract_technical_features.fn(request)
return result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_normalise_features(
ticker: str,
features: str,
historical_features: str = "[]",
window_size: str = "100",
method: str = "ewm",
) -> Dict[str, Any]:
"""Normalise features using adaptive rolling window statistics.
Uses exponentially weighted mean/variance for robust time-varying normalisation.
Args:
ticker: Stock ticker symbol
features: JSON object of current feature values
historical_features: JSON array of historical feature observations
window_size: Rolling window size as string (default: '100')
method: Normalisation method ('ewm' or 'z_score')
Returns:
Dictionary with normalised features.
"""
from backend.mcp_servers.feature_extraction_mcp import (
normalise_features,
NormalisationRequest,
)
features_dict = json.loads(features) if isinstance(features, str) else features
hist_list = (
json.loads(historical_features)
if isinstance(historical_features, str)
else historical_features
)
request = NormalisationRequest(
ticker=ticker,
features=features_dict,
historical_features=hist_list,
window_size=int(window_size),
method=method,
)
result = await normalise_features.fn(request)
return result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_select_features(
ticker: str,
feature_vector: str,
max_features: str = "15",
variance_threshold: str = "0.95",
) -> Dict[str, Any]:
"""Select optimal features using PCA for dimensionality reduction.
Target: 6-15 features to balance predictive power with overfitting prevention.
Args:
ticker: Stock ticker symbol
feature_vector: JSON object of full feature vector
max_features: Maximum features to select as string (default: '15')
variance_threshold: Variance threshold for PCA as string (default: '0.95')
Returns:
Dictionary with selected features and metadata.
"""
from backend.mcp_servers.feature_extraction_mcp import (
select_features,
FeatureSelectionRequest,
)
vector_dict = (
json.loads(feature_vector) if isinstance(feature_vector, str) else feature_vector
)
request = FeatureSelectionRequest(
ticker=ticker,
feature_vector=vector_dict,
max_features=int(max_features),
variance_threshold=float(variance_threshold),
)
result = await select_features.fn(request)
return result
@cached_async(
namespace="feature_extraction",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=1800,
)
async def technical_compute_feature_vector(
ticker: str,
technical_features: str = "{}",
fundamental_features: str = "{}",
sentiment_features: str = "{}",
max_features: str = "30",
selection_method: str = "pca",
) -> Dict[str, Any]:
"""Compute combined feature vector from multiple sources.
Combines technical, fundamental, and sentiment features into a single vector
suitable for ML model input.
Args:
ticker: Stock ticker symbol
technical_features: JSON object of technical features
fundamental_features: JSON object of fundamental features
sentiment_features: JSON object of sentiment features
max_features: Maximum features in vector as string (default: '30')
selection_method: Selection method ('pca' or 'variance')
Returns:
Dictionary with combined feature vector and metadata.
"""
from backend.mcp_servers.feature_extraction_mcp import (
compute_feature_vector,
FeatureVectorRequest,
)
tech_dict = (
json.loads(technical_features)
if isinstance(technical_features, str)
else technical_features
)
fund_dict = (
json.loads(fundamental_features)
if isinstance(fundamental_features, str)
else fundamental_features
)
sent_dict = (
json.loads(sentiment_features)
if isinstance(sentiment_features, str)
else sentiment_features
)
request = FeatureVectorRequest(
ticker=ticker,
technical_features=tech_dict,
fundamental_features=fund_dict,
sentiment_features=sent_dict,
max_features=int(max_features),
selection_method=selection_method,
)
result = await compute_feature_vector.fn(request)
return result
# =============================================================================
# PORTFOLIO OPTIMISATION TOOLS - 3 tools
# =============================================================================
@cached_async(
namespace="portfolio_optimizer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def portfolio_optimize_hrp(
market_data_json: str, risk_tolerance: str = "moderate"
) -> Dict[str, Any]:
"""Optimise portfolio using Hierarchical Risk Parity.
HRP uses hierarchical clustering to construct a diversified portfolio
that balances risk across clusters of correlated assets.
Args:
market_data_json: JSON array of market data objects with ticker, prices, dates
e.g., '[{"ticker": "AAPL", "prices": [150.0, 151.5, ...], "dates": ["2024-01-01", ...]}]'
risk_tolerance: Risk level ('conservative', 'moderate', 'aggressive')
Returns:
Dictionary with optimal weights, expected return, volatility, Sharpe ratio.
"""
from backend.mcp_servers.portfolio_optimizer_mcp import (
optimize_hrp,
OptimizationRequest,
MarketDataInput,
)
market_data_list = json.loads(market_data_json)
market_data = [
MarketDataInput(
ticker=item["ticker"],
prices=[Decimal(str(p)) for p in item["prices"]],
dates=item["dates"],
)
for item in market_data_list
]
request = OptimizationRequest(
market_data=market_data, method="hrp", risk_tolerance=risk_tolerance
)
result = await optimize_hrp.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
@cached_async(
namespace="portfolio_optimizer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def portfolio_optimize_black_litterman(
market_data_json: str, risk_tolerance: str = "moderate"
) -> Dict[str, Any]:
"""Optimise portfolio using Black-Litterman model with market equilibrium.
Black-Litterman uses market-implied equilibrium returns as the prior distribution
when no explicit investor views are provided.
Args:
market_data_json: JSON array of market data objects with ticker, prices, dates
risk_tolerance: Risk level ('conservative', 'moderate', 'aggressive')
Returns:
Dictionary with optimal weights, expected return, volatility, Sharpe ratio.
"""
from backend.mcp_servers.portfolio_optimizer_mcp import (
optimize_black_litterman,
OptimizationRequest,
MarketDataInput,
)
market_data_list = json.loads(market_data_json)
market_data = [
MarketDataInput(
ticker=item["ticker"],
prices=[Decimal(str(p)) for p in item["prices"]],
dates=item["dates"],
)
for item in market_data_list
]
request = OptimizationRequest(
market_data=market_data, method="black_litterman", risk_tolerance=risk_tolerance
)
result = await optimize_black_litterman.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
@cached_async(
namespace="portfolio_optimizer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def portfolio_optimize_mean_variance(
market_data_json: str, risk_tolerance: str = "moderate"
) -> Dict[str, Any]:
"""Optimise portfolio using Mean-Variance Optimisation (Markowitz).
Mean-Variance finds the portfolio with maximum Sharpe ratio or
minimum volatility for a given return target.
Args:
market_data_json: JSON array of market data objects with ticker, prices, dates
risk_tolerance: Risk level ('conservative', 'moderate', 'aggressive')
Returns:
Dictionary with optimal weights, expected return, volatility, Sharpe ratio.
"""
from backend.mcp_servers.portfolio_optimizer_mcp import (
optimize_mean_variance,
OptimizationRequest,
MarketDataInput,
)
market_data_list = json.loads(market_data_json)
market_data = [
MarketDataInput(
ticker=item["ticker"],
prices=[Decimal(str(p)) for p in item["prices"]],
dates=item["dates"],
)
for item in market_data_list
]
request = OptimizationRequest(
market_data=market_data, method="mean_variance", risk_tolerance=risk_tolerance
)
result = await optimize_mean_variance.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
# =============================================================================
# RISK ANALYSIS TOOLS - 2 tools
# =============================================================================
@cached_async(
namespace="risk_analyzer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def risk_analyze(
portfolio_json: str,
portfolio_value: str,
confidence_level: str = "0.95",
time_horizon: str = "1",
method: str = "historical",
num_simulations: str = "10000",
benchmark_json: Optional[str] = None,
) -> Dict[str, Any]:
"""Perform comprehensive risk analysis on a portfolio.
Calculates VaR (95%, 99%), CVaR, Sharpe ratio, Sortino ratio,
maximum drawdown, Information Ratio, Calmar Ratio, and Ulcer Index.
Args:
portfolio_json: JSON array of portfolio holdings with ticker, weight, prices
e.g., '[{"ticker": "AAPL", "weight": 0.6, "prices": [150.0, ...]}]'
portfolio_value: Total portfolio value in dollars as string
confidence_level: VaR confidence level as string (default: '0.95')
time_horizon: VaR time horizon in days as string (default: '1')
method: VaR calculation method ('historical', 'parametric', 'monte_carlo')
num_simulations: Monte Carlo simulations as string if method='monte_carlo'
benchmark_json: Optional JSON with benchmark data for Information Ratio
Returns:
Dictionary with VaR, CVaR, risk metrics, and simulation percentiles.
"""
from backend.mcp_servers.risk_analyzer_mcp import (
analyze_risk,
RiskAnalysisRequest,
PortfolioInput,
BenchmarkInput,
)
portfolio_list = json.loads(portfolio_json)
portfolio = [
PortfolioInput(
ticker=item["ticker"],
weight=Decimal(str(item["weight"])),
prices=[Decimal(str(p)) for p in item["prices"]],
)
for item in portfolio_list
]
benchmark = None
if benchmark_json:
benchmark_data = json.loads(benchmark_json)
benchmark = BenchmarkInput(
ticker=benchmark_data["ticker"],
prices=[Decimal(str(p)) for p in benchmark_data["prices"]],
)
request = RiskAnalysisRequest(
portfolio=portfolio,
portfolio_value=Decimal(portfolio_value),
confidence_level=Decimal(confidence_level),
time_horizon=int(time_horizon),
method=method,
num_simulations=int(num_simulations),
benchmark=benchmark,
)
result = await analyze_risk.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
@cached_async(
namespace="risk_analyzer",
data_type=CacheDataType.PORTFOLIO_METRICS,
ttl=14400,
)
async def risk_forecast_volatility_garch(
ticker: str,
returns_json: str,
forecast_horizon: str = "30",
garch_p: str = "1",
garch_q: str = "1",
) -> Dict[str, Any]:
"""Forecast volatility using GARCH model.
GARCH (Generalised Autoregressive Conditional Heteroskedasticity) models
are the industry standard for financial volatility forecasting.
Args:
ticker: Stock ticker symbol
returns_json: JSON array of historical returns (as percentages)
forecast_horizon: Days to forecast as string (default: '30')
garch_p: GARCH lag order as string (default: '1')
garch_q: ARCH lag order as string (default: '1')
Returns:
Dictionary with volatility forecasts, annualised volatility, and diagnostics.
"""
from backend.mcp_servers.risk_analyzer_mcp import (
forecast_volatility_garch,
GARCHForecastRequest,
)
returns_list = json.loads(returns_json)
request = GARCHForecastRequest(
ticker=ticker,
returns=[Decimal(str(r)) for r in returns_list],
forecast_horizon=int(forecast_horizon),
garch_p=int(garch_p),
garch_q=int(garch_q),
)
result = await forecast_volatility_garch.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
# =============================================================================
# MACHINE LEARNING TOOLS - 1 tool
# =============================================================================
async def ml_forecast_ensemble(
ticker: str,
prices_json: str,
dates_json: Optional[str] = None,
forecast_horizon: str = "30",
confidence_level: str = "0.95",
use_returns: str = "true",
ensemble_method: str = "mean",
) -> Dict[str, Any]:
"""Forecast stock prices using ensemble ML models.
Combines multiple forecasting models (Chronos-Bolt, TTM, N-HiTS)
to produce robust predictions with uncertainty quantification.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
prices_json: JSON array of historical prices (minimum 10 values)
dates_json: Optional JSON array of corresponding dates
forecast_horizon: Number of days to forecast as string (default: '30')
confidence_level: Confidence level for intervals as string (default: '0.95')
use_returns: Forecast returns instead of raw prices ('true' or 'false')
ensemble_method: Combination method ('mean', 'median', 'weighted')
Returns:
Dictionary with forecasts, confidence intervals, and model metadata.
"""
from backend.mcp_servers.ensemble_predictor_mcp import (
forecast_ensemble,
ForecastRequest,
)
prices_list = json.loads(prices_json)
dates_list = json.loads(dates_json) if dates_json else None
ensemble_method_literal = cast(
Literal["mean", "median", "weighted"], ensemble_method
)
request = ForecastRequest(
ticker=ticker,
prices=[Decimal(str(p)) for p in prices_list],
dates=dates_list,
forecast_horizon=int(forecast_horizon),
confidence_level=float(confidence_level),
use_returns=use_returns.lower() == "true",
ensemble_method=ensemble_method_literal,
)
result = await forecast_ensemble.fn(request)
data = result.model_dump() if hasattr(result, "model_dump") else result
return _convert_decimals_to_floats(data)
# =============================================================================
# SENTIMENT ANALYSIS TOOLS - 1 tool
# =============================================================================
@cached_async(
namespace="news_sentiment",
data_type=CacheDataType.USER_DATA,
ttl=7200,
)
async def sentiment_get_news(ticker: str, days_back: str = "7") -> Dict[str, Any]:
"""Fetch recent news for a ticker and analyse sentiment.
Uses Finnhub API for news retrieval and VADER for sentiment analysis.
Args:
ticker: Stock ticker symbol (e.g., 'AAPL')
days_back: Number of days of historical news as string (default: '7')
Returns:
Dictionary with overall sentiment, confidence, article count, and articles.
"""
from backend.mcp_servers.news_sentiment_mcp import get_news_with_sentiment
result = await get_news_with_sentiment.fn(ticker=ticker, days_back=int(days_back))
return result.model_dump() if hasattr(result, "model_dump") else result
# =============================================================================
# CONVENIENCE FUNCTIONS (for internal use by agents)
# =============================================================================
async def get_quote_list(tickers: List[str]) -> List[Dict[str, Any]]:
"""Internal convenience function - accepts Python list instead of JSON string.
Args:
tickers: List of stock ticker symbols
Returns:
List of quote dictionaries
"""
return await market_get_quote(json.dumps(tickers))
async def get_historical_prices(
ticker: str, period: str = "1y", interval: str = "1d"
) -> Dict[str, Any]:
"""Internal convenience function - alias for market_get_historical_data.
Args:
ticker: Stock ticker symbol
period: Time period
interval: Data interval
Returns:
Historical price data dictionary
"""
return await market_get_historical_data(ticker, period, interval)
|