"""Market Data MCP Server (Provider-Agnostic). This MCP server provides real-time and historical market data using a configurable data provider (YFinance, FMP, EODHD, etc.). Default: YFinance (educational use only) Production: Set MARKET_DATA_PROVIDER=fmp to use Financial Modeling Prep Environment Variables: MARKET_DATA_PROVIDER: Provider type (yfinance, fmp, eodhd) FMP_API_KEY: API key for FMP (if using FMP provider) FMP_TIER: FMP subscription tier (free, starter, premium, ultimate) """ import logging from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional, Any from decimal import Decimal from fastmcp import FastMCP from pydantic import BaseModel, Field from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) from backend.data_providers import get_provider, ProviderType from backend.data_providers.base import MarketDataProvider logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("market-data") # Initialize provider (configured via environment variables) _provider: Optional[MarketDataProvider] = None def get_data_provider() -> MarketDataProvider: """Get or create the market data provider instance. Returns: MarketDataProvider: Configured provider instance """ global _provider if _provider is None: _provider = get_provider() logger.info(f"Initialized market data provider: {_provider.name}") return _provider class QuoteRequest(BaseModel): """Request for stock quote.""" tickers: List[str] = Field(..., min_length=1, max_length=50) class QuoteResponse(BaseModel): """Stock quote response.""" ticker: str price: Decimal previous_close: Optional[Decimal] = None open_price: Optional[Decimal] = None high: Optional[Decimal] = None low: Optional[Decimal] = None volume: Optional[int] = None market_cap: Optional[Decimal] = None pe_ratio: Optional[Decimal] = None dividend_yield: Optional[Decimal] = None timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class HistoricalRequest(BaseModel): """Request for historical data.""" ticker: str period: str = Field(default="1y", description="Period: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max") interval: str = Field(default="1d", description="Interval: 1m,2m,5m,15m,30m,60m,90m,1h,1d,5d,1wk,1mo,3mo") class HistoricalResponse(BaseModel): """Historical data response.""" ticker: str dates: List[str] open_prices: List[Decimal] high_prices: List[Decimal] low_prices: List[Decimal] close_prices: List[Decimal] volumes: List[int] returns: Optional[List[Decimal]] = None class FundamentalsRequest(BaseModel): """Request for company fundamentals.""" ticker: str class FundamentalsResponse(BaseModel): """Company fundamentals response.""" ticker: str company_name: Optional[str] = None sector: Optional[str] = None industry: Optional[str] = None market_cap: Optional[Decimal] = None pe_ratio: Optional[Decimal] = None forward_pe: Optional[Decimal] = None peg_ratio: Optional[Decimal] = None price_to_book: Optional[Decimal] = None dividend_yield: Optional[Decimal] = None profit_margin: Optional[Decimal] = None operating_margin: Optional[Decimal] = None return_on_equity: Optional[Decimal] = None revenue_growth: Optional[Decimal] = None earnings_growth: Optional[Decimal] = None beta: Optional[Decimal] = None fifty_two_week_high: Optional[Decimal] = None fifty_two_week_low: Optional[Decimal] = None @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def get_quote(request: QuoteRequest) -> List[QuoteResponse]: """Get real-time quotes for multiple tickers. Args: request: Quote request with list of tickers Returns: List of quote responses Example: >>> await get_quote(QuoteRequest(tickers=["AAPL", "GOOGL"])) """ provider = get_data_provider() logger.info(f"Fetching quotes for {len(request.tickers)} tickers using {provider.name}") quotes = [] for ticker in request.tickers: try: # Use provider abstraction quote_data = provider.get_quote(ticker) # Fetch additional fundamentals for PE ratio and dividend yield try: ratios = provider.get_financial_ratios(ticker) pe_ratio = ratios.pe_ratio dividend_yield = ratios.dividend_yield except: pe_ratio = None dividend_yield = None quote = QuoteResponse( ticker=ticker, price=quote_data.price, previous_close=quote_data.previous_close, open_price=quote_data.open, high=quote_data.high, low=quote_data.low, volume=quote_data.volume, market_cap=quote_data.market_cap, pe_ratio=pe_ratio, dividend_yield=dividend_yield, ) quotes.append(quote) logger.debug(f"Successfully fetched quote for {ticker}: ${quote.price}") except Exception as e: logger.error(f"Error fetching quote for {ticker}: {e}") # Return a quote with zero price to indicate failure quotes.append(QuoteResponse(ticker=ticker, price=Decimal("0"))) return quotes @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def get_historical_data(request: HistoricalRequest) -> HistoricalResponse: """Get historical price data for a ticker. Args: request: Historical data request Returns: Historical price data Example: >>> await get_historical_data(HistoricalRequest(ticker="AAPL", period="1y")) """ provider = get_data_provider() logger.info(f"Fetching historical data for {request.ticker}, period={request.period} using {provider.name}") try: # Convert period to date range from datetime import datetime, timedelta # Map period to days period_map = { "1d": 1, "5d": 5, "1mo": 30, "3mo": 90, "6mo": 180, "1y": 365, "2y": 730, "5y": 1825, "10y": 3650, "ytd": -1, "max": 7300 } days = period_map.get(request.period, 365) end_date = datetime.now() if request.period == "ytd": start_date = datetime(end_date.year, 1, 1) else: start_date = end_date - timedelta(days=days) # Fetch historical data using provider hist_df = provider.get_historical_prices( request.ticker, start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"), interval=request.interval ) if hist_df.empty: logger.warning(f"No historical data found for {request.ticker}") return HistoricalResponse( ticker=request.ticker, dates=[], open_prices=[], high_prices=[], low_prices=[], close_prices=[], volumes=[], ) # Calculate returns returns = None if len(hist_df) > 1: close_prices = hist_df["close"].values returns = [ Decimal(str((float(close_prices[i]) - float(close_prices[i - 1])) / float(close_prices[i - 1]))) for i in range(1, len(close_prices)) ] returns.insert(0, Decimal("0")) # First return is 0 response = HistoricalResponse( ticker=request.ticker, dates=[date.strftime("%Y-%m-%d") for date in hist_df.index], open_prices=list(hist_df["open"]), high_prices=list(hist_df["high"]), low_prices=list(hist_df["low"]), close_prices=list(hist_df["close"]), volumes=[int(val) for val in hist_df["volume"].values], returns=returns, ) logger.info(f"Fetched {len(response.dates)} data points for {request.ticker}") return response except Exception as e: logger.error(f"Error fetching historical data for {request.ticker}: {e}") return HistoricalResponse( ticker=request.ticker, dates=[], open_prices=[], high_prices=[], low_prices=[], volumes=[], ) @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def get_fundamentals(request: FundamentalsRequest) -> FundamentalsResponse: """Get company fundamentals and key metrics. Args: request: Fundamentals request Returns: Company fundamentals Example: >>> await get_fundamentals(FundamentalsRequest(ticker="AAPL")) """ provider = get_data_provider() logger.info(f"Fetching fundamentals for {request.ticker} using {provider.name}") try: # Fetch company profile and financial ratios using provider profile = provider.get_company_profile(request.ticker) ratios = provider.get_financial_ratios(request.ticker) quote = provider.get_quote(request.ticker) response = FundamentalsResponse( ticker=request.ticker, company_name=profile.company_name, sector=profile.sector, industry=profile.industry, market_cap=profile.market_cap or quote.market_cap, pe_ratio=ratios.pe_ratio, forward_pe=None, # Not available in all providers peg_ratio=None, # Not available in all providers price_to_book=ratios.pb_ratio, dividend_yield=ratios.dividend_yield, profit_margin=None, # Could be added to FinancialRatios model if needed operating_margin=None, return_on_equity=ratios.roe, revenue_growth=None, earnings_growth=None, beta=None, fifty_two_week_high=None, fifty_two_week_low=None, ) logger.info(f"Successfully fetched fundamentals for {request.ticker}: {response.company_name}") return response except Exception as e: logger.error(f"Error fetching fundamentals for {request.ticker}: {e}") return FundamentalsResponse(ticker=request.ticker) # Export the MCP server if __name__ == "__main__": mcp.run()