BrianIsaac's picture
feat: implement 7 production enhancements for portfolio analysis platform
9f411df
"""Market Data MCP Server (Provider-Agnostic).
This MCP server provides real-time and historical market data using a configurable
data provider (YFinance, FMP, EODHD, etc.).
Default: YFinance (educational use only)
Production: Set MARKET_DATA_PROVIDER=fmp to use Financial Modeling Prep
Environment Variables:
MARKET_DATA_PROVIDER: Provider type (yfinance, fmp, eodhd)
FMP_API_KEY: API key for FMP (if using FMP provider)
FMP_TIER: FMP subscription tier (free, starter, premium, ultimate)
"""
import logging
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional, Any
from decimal import Decimal
from fastmcp import FastMCP
from pydantic import BaseModel, Field
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from backend.data_providers import get_provider, ProviderType
from backend.data_providers.base import MarketDataProvider
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP("market-data")
# Initialize provider (configured via environment variables)
_provider: Optional[MarketDataProvider] = None
def get_data_provider() -> MarketDataProvider:
"""Get or create the market data provider instance.
Returns:
MarketDataProvider: Configured provider instance
"""
global _provider
if _provider is None:
_provider = get_provider()
logger.info(f"Initialized market data provider: {_provider.name}")
return _provider
class QuoteRequest(BaseModel):
"""Request for stock quote."""
tickers: List[str] = Field(..., min_length=1, max_length=50)
class QuoteResponse(BaseModel):
"""Stock quote response."""
ticker: str
price: Decimal
previous_close: Optional[Decimal] = None
open_price: Optional[Decimal] = None
high: Optional[Decimal] = None
low: Optional[Decimal] = None
volume: Optional[int] = None
market_cap: Optional[Decimal] = None
pe_ratio: Optional[Decimal] = None
dividend_yield: Optional[Decimal] = None
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class HistoricalRequest(BaseModel):
"""Request for historical data."""
ticker: str
period: str = Field(default="1y", description="Period: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max")
interval: str = Field(default="1d", description="Interval: 1m,2m,5m,15m,30m,60m,90m,1h,1d,5d,1wk,1mo,3mo")
class HistoricalResponse(BaseModel):
"""Historical data response."""
ticker: str
dates: List[str]
open_prices: List[Decimal]
high_prices: List[Decimal]
low_prices: List[Decimal]
close_prices: List[Decimal]
volumes: List[int]
returns: Optional[List[Decimal]] = None
class FundamentalsRequest(BaseModel):
"""Request for company fundamentals."""
ticker: str
class FundamentalsResponse(BaseModel):
"""Company fundamentals response."""
ticker: str
company_name: Optional[str] = None
sector: Optional[str] = None
industry: Optional[str] = None
market_cap: Optional[Decimal] = None
pe_ratio: Optional[Decimal] = None
forward_pe: Optional[Decimal] = None
peg_ratio: Optional[Decimal] = None
price_to_book: Optional[Decimal] = None
dividend_yield: Optional[Decimal] = None
profit_margin: Optional[Decimal] = None
operating_margin: Optional[Decimal] = None
return_on_equity: Optional[Decimal] = None
revenue_growth: Optional[Decimal] = None
earnings_growth: Optional[Decimal] = None
beta: Optional[Decimal] = None
fifty_two_week_high: Optional[Decimal] = None
fifty_two_week_low: Optional[Decimal] = None
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def get_quote(request: QuoteRequest) -> List[QuoteResponse]:
"""Get real-time quotes for multiple tickers.
Args:
request: Quote request with list of tickers
Returns:
List of quote responses
Example:
>>> await get_quote(QuoteRequest(tickers=["AAPL", "GOOGL"]))
"""
provider = get_data_provider()
logger.info(f"Fetching quotes for {len(request.tickers)} tickers using {provider.name}")
quotes = []
for ticker in request.tickers:
try:
# Use provider abstraction
quote_data = provider.get_quote(ticker)
# Fetch additional fundamentals for PE ratio and dividend yield
try:
ratios = provider.get_financial_ratios(ticker)
pe_ratio = ratios.pe_ratio
dividend_yield = ratios.dividend_yield
except:
pe_ratio = None
dividend_yield = None
quote = QuoteResponse(
ticker=ticker,
price=quote_data.price,
previous_close=quote_data.previous_close,
open_price=quote_data.open,
high=quote_data.high,
low=quote_data.low,
volume=quote_data.volume,
market_cap=quote_data.market_cap,
pe_ratio=pe_ratio,
dividend_yield=dividend_yield,
)
quotes.append(quote)
logger.debug(f"Successfully fetched quote for {ticker}: ${quote.price}")
except Exception as e:
logger.error(f"Error fetching quote for {ticker}: {e}")
# Return a quote with zero price to indicate failure
quotes.append(QuoteResponse(ticker=ticker, price=Decimal("0")))
return quotes
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def get_historical_data(request: HistoricalRequest) -> HistoricalResponse:
"""Get historical price data for a ticker.
Args:
request: Historical data request
Returns:
Historical price data
Example:
>>> await get_historical_data(HistoricalRequest(ticker="AAPL", period="1y"))
"""
provider = get_data_provider()
logger.info(f"Fetching historical data for {request.ticker}, period={request.period} using {provider.name}")
try:
# Convert period to date range
from datetime import datetime, timedelta
# Map period to days
period_map = {
"1d": 1, "5d": 5, "1mo": 30, "3mo": 90, "6mo": 180,
"1y": 365, "2y": 730, "5y": 1825, "10y": 3650, "ytd": -1, "max": 7300
}
days = period_map.get(request.period, 365)
end_date = datetime.now()
if request.period == "ytd":
start_date = datetime(end_date.year, 1, 1)
else:
start_date = end_date - timedelta(days=days)
# Fetch historical data using provider
hist_df = provider.get_historical_prices(
request.ticker,
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"),
interval=request.interval
)
if hist_df.empty:
logger.warning(f"No historical data found for {request.ticker}")
return HistoricalResponse(
ticker=request.ticker,
dates=[],
open_prices=[],
high_prices=[],
low_prices=[],
close_prices=[],
volumes=[],
)
# Calculate returns
returns = None
if len(hist_df) > 1:
close_prices = hist_df["close"].values
returns = [
Decimal(str((float(close_prices[i]) - float(close_prices[i - 1])) / float(close_prices[i - 1])))
for i in range(1, len(close_prices))
]
returns.insert(0, Decimal("0")) # First return is 0
response = HistoricalResponse(
ticker=request.ticker,
dates=[date.strftime("%Y-%m-%d") for date in hist_df.index],
open_prices=list(hist_df["open"]),
high_prices=list(hist_df["high"]),
low_prices=list(hist_df["low"]),
close_prices=list(hist_df["close"]),
volumes=[int(val) for val in hist_df["volume"].values],
returns=returns,
)
logger.info(f"Fetched {len(response.dates)} data points for {request.ticker}")
return response
except Exception as e:
logger.error(f"Error fetching historical data for {request.ticker}: {e}")
return HistoricalResponse(
ticker=request.ticker,
dates=[],
open_prices=[],
high_prices=[],
low_prices=[],
volumes=[],
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)),
)
@mcp.tool()
async def get_fundamentals(request: FundamentalsRequest) -> FundamentalsResponse:
"""Get company fundamentals and key metrics.
Args:
request: Fundamentals request
Returns:
Company fundamentals
Example:
>>> await get_fundamentals(FundamentalsRequest(ticker="AAPL"))
"""
provider = get_data_provider()
logger.info(f"Fetching fundamentals for {request.ticker} using {provider.name}")
try:
# Fetch company profile and financial ratios using provider
profile = provider.get_company_profile(request.ticker)
ratios = provider.get_financial_ratios(request.ticker)
quote = provider.get_quote(request.ticker)
response = FundamentalsResponse(
ticker=request.ticker,
company_name=profile.company_name,
sector=profile.sector,
industry=profile.industry,
market_cap=profile.market_cap or quote.market_cap,
pe_ratio=ratios.pe_ratio,
forward_pe=None, # Not available in all providers
peg_ratio=None, # Not available in all providers
price_to_book=ratios.pb_ratio,
dividend_yield=ratios.dividend_yield,
profit_margin=None, # Could be added to FinancialRatios model if needed
operating_margin=None,
return_on_equity=ratios.roe,
revenue_growth=None,
earnings_growth=None,
beta=None,
fifty_two_week_high=None,
fifty_two_week_low=None,
)
logger.info(f"Successfully fetched fundamentals for {request.ticker}: {response.company_name}")
return response
except Exception as e:
logger.error(f"Error fetching fundamentals for {request.ticker}: {e}")
return FundamentalsResponse(ticker=request.ticker)
# Export the MCP server
if __name__ == "__main__":
mcp.run()