"""Trading MCP Server. This MCP server provides technical analysis indicators and signals. Uses manual implementations for technical indicator calculations. """ import logging from typing import Dict, List, Optional from decimal import Decimal import pandas as pd import yfinance as yf from fastmcp import FastMCP from pydantic import BaseModel, Field from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("trading") class TechnicalIndicatorsRequest(BaseModel): """Request for technical indicators.""" ticker: str period: str = Field(default="3mo", description="Data period: 1mo, 3mo, 6mo, 1y") class RSI(BaseModel): """Relative Strength Index.""" value: Decimal signal: str = Field(..., description="overbought, oversold, or neutral") class MACD(BaseModel): """Moving Average Convergence Divergence.""" macd: Decimal signal: Decimal histogram: Decimal trend: str = Field(..., description="bullish, bearish, or neutral") class BollingerBands(BaseModel): """Bollinger Bands.""" upper: Decimal middle: Decimal lower: Decimal current_price: Decimal position: str = Field(..., description="above, within, or below bands") class MovingAverages(BaseModel): """Moving averages.""" sma_20: Optional[Decimal] = None sma_50: Optional[Decimal] = None sma_200: Optional[Decimal] = None ema_12: Optional[Decimal] = None ema_26: Optional[Decimal] = None current_price: Decimal trend: str = Field(..., description="bullish, bearish, or neutral") class TechnicalIndicators(BaseModel): """Complete set of technical indicators.""" ticker: str rsi: Optional[RSI] = None macd: Optional[MACD] = None bollinger_bands: Optional[BollingerBands] = None moving_averages: Optional[MovingAverages] = None volume_trend: Optional[str] = None overall_signal: str = Field(..., description="buy, sell, or hold") def calculate_rsi(prices: pd.Series, period: int = 14) -> float: """Calculate RSI.""" delta = prices.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi.iloc[-1] def calculate_macd(prices: pd.Series) -> Dict[str, float]: """Calculate MACD.""" ema_12 = prices.ewm(span=12, adjust=False).mean() ema_26 = prices.ewm(span=26, adjust=False).mean() macd_line = ema_12 - ema_26 signal_line = macd_line.ewm(span=9, adjust=False).mean() histogram = macd_line - signal_line return { "macd": macd_line.iloc[-1], "signal": signal_line.iloc[-1], "histogram": histogram.iloc[-1], } def calculate_bollinger_bands(prices: pd.Series, period: int = 20) -> Dict[str, float]: """Calculate Bollinger Bands.""" sma = prices.rolling(window=period).mean() std = prices.rolling(window=period).std() upper = sma + (2 * std) lower = sma - (2 * std) return { "upper": upper.iloc[-1], "middle": sma.iloc[-1], "lower": lower.iloc[-1], } @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((TimeoutError, ConnectionError, Exception)), ) @mcp.tool() async def get_technical_indicators(request: TechnicalIndicatorsRequest) -> TechnicalIndicators: """Get technical indicators for a ticker. Args: request: Technical indicators request Returns: Technical indicators Example: >>> await get_technical_indicators(TechnicalIndicatorsRequest(ticker="AAPL")) """ logger.info(f"Calculating technical indicators for {request.ticker}") try: # Fetch historical data stock = yf.Ticker(request.ticker) hist = stock.history(period=request.period) if hist.empty: logger.warning(f"No data found for {request.ticker}") return TechnicalIndicators(ticker=request.ticker, overall_signal="hold") close_prices = hist["Close"] volumes = hist["Volume"] current_price = Decimal(str(close_prices.iloc[-1])) # RSI rsi_value = calculate_rsi(close_prices) rsi_signal = "overbought" if rsi_value > 70 else "oversold" if rsi_value < 30 else "neutral" rsi = RSI(value=Decimal(str(rsi_value)), signal=rsi_signal) # MACD macd_data = calculate_macd(close_prices) macd_trend = "bullish" if macd_data["histogram"] > 0 else "bearish" if macd_data["histogram"] < 0 else "neutral" macd = MACD( macd=Decimal(str(macd_data["macd"])), signal=Decimal(str(macd_data["signal"])), histogram=Decimal(str(macd_data["histogram"])), trend=macd_trend, ) # Bollinger Bands bb_data = calculate_bollinger_bands(close_prices) bb_position = ( "above" if current_price > Decimal(str(bb_data["upper"])) else "below" if current_price < Decimal(str(bb_data["lower"])) else "within" ) bollinger_bands = BollingerBands( upper=Decimal(str(bb_data["upper"])), middle=Decimal(str(bb_data["middle"])), lower=Decimal(str(bb_data["lower"])), current_price=current_price, position=bb_position, ) # Moving Averages sma_20 = close_prices.rolling(window=20).mean().iloc[-1] if len(close_prices) >= 20 else None sma_50 = close_prices.rolling(window=50).mean().iloc[-1] if len(close_prices) >= 50 else None sma_200 = close_prices.rolling(window=200).mean().iloc[-1] if len(close_prices) >= 200 else None ema_12 = close_prices.ewm(span=12).mean().iloc[-1] ema_26 = close_prices.ewm(span=26).mean().iloc[-1] ma_trend = "bullish" if current_price > Decimal(str(sma_20 or 0)) else "bearish" if sma_20 else "neutral" moving_averages = MovingAverages( sma_20=Decimal(str(sma_20)) if sma_20 else None, sma_50=Decimal(str(sma_50)) if sma_50 else None, sma_200=Decimal(str(sma_200)) if sma_200 else None, ema_12=Decimal(str(ema_12)), ema_26=Decimal(str(ema_26)), current_price=current_price, trend=ma_trend, ) # Volume trend avg_volume = volumes.mean() current_volume = volumes.iloc[-1] volume_trend = "high" if current_volume > avg_volume * 1.5 else "low" if current_volume < avg_volume * 0.5 else "normal" # Overall signal signals = [] if rsi_signal == "oversold": signals.append("buy") elif rsi_signal == "overbought": signals.append("sell") if macd_trend == "bullish": signals.append("buy") elif macd_trend == "bearish": signals.append("sell") if ma_trend == "bullish": signals.append("buy") elif ma_trend == "bearish": signals.append("sell") buy_count = signals.count("buy") sell_count = signals.count("sell") overall_signal = "buy" if buy_count > sell_count else "sell" if sell_count > buy_count else "hold" indicators = TechnicalIndicators( ticker=request.ticker, rsi=rsi, macd=macd, bollinger_bands=bollinger_bands, moving_averages=moving_averages, volume_trend=volume_trend, overall_signal=overall_signal, ) logger.info(f"Successfully calculated indicators for {request.ticker}: {overall_signal}") return indicators except Exception as e: logger.error(f"Error calculating technical indicators for {request.ticker}: {e}") return TechnicalIndicators(ticker=request.ticker, overall_signal="hold") # Export the MCP server if __name__ == "__main__": mcp.run()