Spaces:
Running
on
Zero
Running
on
Zero
| """Market Data MCP Server (Provider-Agnostic). | |
| This MCP server provides real-time and historical market data using a configurable | |
| data provider (YFinance, FMP, EODHD, etc.). | |
| Default: YFinance (educational use only) | |
| Production: Set MARKET_DATA_PROVIDER=fmp to use Financial Modeling Prep | |
| Environment Variables: | |
| MARKET_DATA_PROVIDER: Provider type (yfinance, fmp, eodhd) | |
| FMP_API_KEY: API key for FMP (if using FMP provider) | |
| FMP_TIER: FMP subscription tier (free, starter, premium, ultimate) | |
| """ | |
| import logging | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Dict, List, Optional, Any | |
| from decimal import Decimal | |
| from fastmcp import FastMCP | |
| from pydantic import BaseModel, Field | |
| from tenacity import ( | |
| retry, | |
| stop_after_attempt, | |
| wait_exponential, | |
| retry_if_exception_type, | |
| ) | |
| from backend.data_providers import get_provider, ProviderType | |
| from backend.data_providers.base import MarketDataProvider | |
| logger = logging.getLogger(__name__) | |
| # Initialize MCP server | |
| mcp = FastMCP("market-data") | |
| # Initialize provider (configured via environment variables) | |
| _provider: Optional[MarketDataProvider] = None | |
| def get_data_provider() -> MarketDataProvider: | |
| """Get or create the market data provider instance. | |
| Returns: | |
| MarketDataProvider: Configured provider instance | |
| """ | |
| global _provider | |
| if _provider is None: | |
| _provider = get_provider() | |
| logger.info(f"Initialized market data provider: {_provider.name}") | |
| return _provider | |
| class QuoteRequest(BaseModel): | |
| """Request for stock quote.""" | |
| tickers: List[str] = Field(..., min_length=1, max_length=50) | |
| class QuoteResponse(BaseModel): | |
| """Stock quote response.""" | |
| ticker: str | |
| price: Decimal | |
| previous_close: Optional[Decimal] = None | |
| open_price: Optional[Decimal] = None | |
| high: Optional[Decimal] = None | |
| low: Optional[Decimal] = None | |
| volume: Optional[int] = None | |
| market_cap: Optional[Decimal] = None | |
| pe_ratio: Optional[Decimal] = None | |
| dividend_yield: Optional[Decimal] = None | |
| timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) | |
| class HistoricalRequest(BaseModel): | |
| """Request for historical data.""" | |
| ticker: str | |
| period: str = Field(default="1y", description="Period: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max") | |
| interval: str = Field(default="1d", description="Interval: 1m,2m,5m,15m,30m,60m,90m,1h,1d,5d,1wk,1mo,3mo") | |
| class HistoricalResponse(BaseModel): | |
| """Historical data response.""" | |
| ticker: str | |
| dates: List[str] | |
| open_prices: List[Decimal] | |
| high_prices: List[Decimal] | |
| low_prices: List[Decimal] | |
| close_prices: List[Decimal] | |
| volumes: List[int] | |
| returns: Optional[List[Decimal]] = None | |
| class FundamentalsRequest(BaseModel): | |
| """Request for company fundamentals.""" | |
| ticker: str | |
| class FundamentalsResponse(BaseModel): | |
| """Company fundamentals response.""" | |
| ticker: str | |
| company_name: Optional[str] = None | |
| sector: Optional[str] = None | |
| industry: Optional[str] = None | |
| market_cap: Optional[Decimal] = None | |
| pe_ratio: Optional[Decimal] = None | |
| forward_pe: Optional[Decimal] = None | |
| peg_ratio: Optional[Decimal] = None | |
| price_to_book: Optional[Decimal] = None | |
| dividend_yield: Optional[Decimal] = None | |
| profit_margin: Optional[Decimal] = None | |
| operating_margin: Optional[Decimal] = None | |
| return_on_equity: Optional[Decimal] = None | |
| revenue_growth: Optional[Decimal] = None | |
| earnings_growth: Optional[Decimal] = None | |
| beta: Optional[Decimal] = None | |
| fifty_two_week_high: Optional[Decimal] = None | |
| fifty_two_week_low: Optional[Decimal] = None | |
| async def get_quote(request: QuoteRequest) -> List[QuoteResponse]: | |
| """Get real-time quotes for multiple tickers. | |
| Args: | |
| request: Quote request with list of tickers | |
| Returns: | |
| List of quote responses | |
| Example: | |
| >>> await get_quote(QuoteRequest(tickers=["AAPL", "GOOGL"])) | |
| """ | |
| provider = get_data_provider() | |
| logger.info(f"Fetching quotes for {len(request.tickers)} tickers using {provider.name}") | |
| quotes = [] | |
| for ticker in request.tickers: | |
| try: | |
| # Use provider abstraction | |
| quote_data = provider.get_quote(ticker) | |
| # Fetch additional fundamentals for PE ratio and dividend yield | |
| try: | |
| ratios = provider.get_financial_ratios(ticker) | |
| pe_ratio = ratios.pe_ratio | |
| dividend_yield = ratios.dividend_yield | |
| except: | |
| pe_ratio = None | |
| dividend_yield = None | |
| quote = QuoteResponse( | |
| ticker=ticker, | |
| price=quote_data.price, | |
| previous_close=quote_data.previous_close, | |
| open_price=quote_data.open, | |
| high=quote_data.high, | |
| low=quote_data.low, | |
| volume=quote_data.volume, | |
| market_cap=quote_data.market_cap, | |
| pe_ratio=pe_ratio, | |
| dividend_yield=dividend_yield, | |
| ) | |
| quotes.append(quote) | |
| logger.debug(f"Successfully fetched quote for {ticker}: ${quote.price}") | |
| except Exception as e: | |
| logger.error(f"Error fetching quote for {ticker}: {e}") | |
| # Return a quote with zero price to indicate failure | |
| quotes.append(QuoteResponse(ticker=ticker, price=Decimal("0"))) | |
| return quotes | |
| async def get_historical_data(request: HistoricalRequest) -> HistoricalResponse: | |
| """Get historical price data for a ticker. | |
| Args: | |
| request: Historical data request | |
| Returns: | |
| Historical price data | |
| Example: | |
| >>> await get_historical_data(HistoricalRequest(ticker="AAPL", period="1y")) | |
| """ | |
| provider = get_data_provider() | |
| logger.info(f"Fetching historical data for {request.ticker}, period={request.period} using {provider.name}") | |
| try: | |
| # Convert period to date range | |
| from datetime import datetime, timedelta | |
| # Map period to days | |
| period_map = { | |
| "1d": 1, "5d": 5, "1mo": 30, "3mo": 90, "6mo": 180, | |
| "1y": 365, "2y": 730, "5y": 1825, "10y": 3650, "ytd": -1, "max": 7300 | |
| } | |
| days = period_map.get(request.period, 365) | |
| end_date = datetime.now() | |
| if request.period == "ytd": | |
| start_date = datetime(end_date.year, 1, 1) | |
| else: | |
| start_date = end_date - timedelta(days=days) | |
| # Fetch historical data using provider | |
| hist_df = provider.get_historical_prices( | |
| request.ticker, | |
| start_date.strftime("%Y-%m-%d"), | |
| end_date.strftime("%Y-%m-%d"), | |
| interval=request.interval | |
| ) | |
| if hist_df.empty: | |
| logger.warning(f"No historical data found for {request.ticker}") | |
| return HistoricalResponse( | |
| ticker=request.ticker, | |
| dates=[], | |
| open_prices=[], | |
| high_prices=[], | |
| low_prices=[], | |
| close_prices=[], | |
| volumes=[], | |
| ) | |
| # Calculate returns | |
| returns = None | |
| if len(hist_df) > 1: | |
| close_prices = hist_df["close"].values | |
| returns = [ | |
| Decimal(str((float(close_prices[i]) - float(close_prices[i - 1])) / float(close_prices[i - 1]))) | |
| for i in range(1, len(close_prices)) | |
| ] | |
| returns.insert(0, Decimal("0")) # First return is 0 | |
| response = HistoricalResponse( | |
| ticker=request.ticker, | |
| dates=[date.strftime("%Y-%m-%d") for date in hist_df.index], | |
| open_prices=list(hist_df["open"]), | |
| high_prices=list(hist_df["high"]), | |
| low_prices=list(hist_df["low"]), | |
| close_prices=list(hist_df["close"]), | |
| volumes=[int(val) for val in hist_df["volume"].values], | |
| returns=returns, | |
| ) | |
| logger.info(f"Fetched {len(response.dates)} data points for {request.ticker}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error fetching historical data for {request.ticker}: {e}") | |
| return HistoricalResponse( | |
| ticker=request.ticker, | |
| dates=[], | |
| open_prices=[], | |
| high_prices=[], | |
| low_prices=[], | |
| volumes=[], | |
| ) | |
| async def get_fundamentals(request: FundamentalsRequest) -> FundamentalsResponse: | |
| """Get company fundamentals and key metrics. | |
| Args: | |
| request: Fundamentals request | |
| Returns: | |
| Company fundamentals | |
| Example: | |
| >>> await get_fundamentals(FundamentalsRequest(ticker="AAPL")) | |
| """ | |
| provider = get_data_provider() | |
| logger.info(f"Fetching fundamentals for {request.ticker} using {provider.name}") | |
| try: | |
| # Fetch company profile and financial ratios using provider | |
| profile = provider.get_company_profile(request.ticker) | |
| ratios = provider.get_financial_ratios(request.ticker) | |
| quote = provider.get_quote(request.ticker) | |
| response = FundamentalsResponse( | |
| ticker=request.ticker, | |
| company_name=profile.company_name, | |
| sector=profile.sector, | |
| industry=profile.industry, | |
| market_cap=profile.market_cap or quote.market_cap, | |
| pe_ratio=ratios.pe_ratio, | |
| forward_pe=None, # Not available in all providers | |
| peg_ratio=None, # Not available in all providers | |
| price_to_book=ratios.pb_ratio, | |
| dividend_yield=ratios.dividend_yield, | |
| profit_margin=None, # Could be added to FinancialRatios model if needed | |
| operating_margin=None, | |
| return_on_equity=ratios.roe, | |
| revenue_growth=None, | |
| earnings_growth=None, | |
| beta=None, | |
| fifty_two_week_high=None, | |
| fifty_two_week_low=None, | |
| ) | |
| logger.info(f"Successfully fetched fundamentals for {request.ticker}: {response.company_name}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error fetching fundamentals for {request.ticker}: {e}") | |
| return FundamentalsResponse(ticker=request.ticker) | |
| # Export the MCP server | |
| if __name__ == "__main__": | |
| mcp.run() | |