Spaces:
Running
Running
| import os | |
| import math | |
| from typing import Literal, Optional, Dict, Any, List, Annotated | |
| from datetime import datetime, timezone | |
| import requests | |
| from sgp4.api import Satrec, jday | |
| from math import sqrt | |
| from fastmcp import FastMCP | |
| # from mcp.server.fastmcp import FastMCP | |
| from langchain_chroma import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_core.vectorstores import VectorStore | |
| # ============================================================================== | |
| # π§ MCP SERVER INITIALIZATION | |
| # ============================================================================== | |
| mcp = FastMCP("FalconPrep", stateless_http=True) | |
| # ============================================================================== | |
| # π§ CORE CONSTANTS (The Engineering Brain) | |
| # ============================================================================== | |
| ENVELOPES: Dict[str, Dict[str, Any]] = { | |
| "1U_CubeSat": {"L": 10, "W": 10, "H": 11.35, "max_mass": 2.0}, | |
| "3U_CubeSat": {"L": 10, "W": 10, "H": 34.05, "max_mass": 5.0}, | |
| "15_Inch_ESPA": {"L": 61.0, "W": 71.0, "H": 71.0, "max_mass": 220.0}, | |
| } | |
| PRICING_MODEL: Dict[str, int] = { | |
| "base_rate_per_kg": 6500, | |
| "min_plate_fee": 300000 | |
| } | |
| # ============================================================================== | |
| # ποΈ RESOURCE: Knowledge Base (VectorStore Connection) | |
| # ============================================================================== | |
| DB_PATH = "./falcon_db" | |
| def get_knowledge_base_resource() -> Any: | |
| print(f"Attempting to load ChromaDB client from {DB_PATH}...") | |
| try: | |
| embedding_model = HuggingFaceEmbeddings( | |
| model_name="all-MiniLM-L6-v2", | |
| model_kwargs={'device': 'cpu'} | |
| ) | |
| vectorstore = Chroma( | |
| persist_directory=DB_PATH, | |
| embedding_function=embedding_model | |
| ) | |
| print("β KnowledgeBaseResource loaded successfully.") | |
| return vectorstore | |
| except Exception as e: | |
| print(f"β ERROR: Did you run 'python ingest.py'? Error: {e}") | |
| return None | |
| # ============================================================================== | |
| # π£οΈ PROMPT TEMPLATE | |
| # ============================================================================== | |
| def launch_readiness_summary_prompt( | |
| payload_name: str, | |
| fit_check_result: str, | |
| hazard_classification_result: Dict[str, Any], | |
| cost_estimate: str, | |
| required_documents_list: str, | |
| timeline_summary: str, | |
| ) -> str: | |
| hazard_level = hazard_classification_result.get('level', 'N/A') | |
| return f""" | |
| You are the **FalconPrep Launch Readiness Assistant**, an expert in SpaceX rideshare compliance. | |
| Your task is to synthesize the following tool outputs for payload **'{payload_name}'**. | |
| **Guidelines** | |
| 1. Lead with Fit + Hazard. | |
| 2. Use bullet points. | |
| 3. Use professional but friendly compliance language. | |
| --- OUTPUTS --- | |
| Fit Check: | |
| {fit_check_result} | |
| Hazard: {hazard_level} | |
| Flags: {hazard_classification_result.get('risk_flags', [])} | |
| Cost Estimate: | |
| {cost_estimate} | |
| Required Documents: | |
| {required_documents_list} | |
| Timeline: | |
| {timeline_summary} | |
| --- | |
| **Produce a final summary now.** | |
| """ | |
| # ============================================================================== | |
| # π οΈ PAYLOAD TOOLS | |
| # ============================================================================== | |
| def get_launch_requirements( | |
| knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None, | |
| payload_type: str = "", | |
| orbit: str = "", | |
| ) -> str: | |
| query = f"requirements for {payload_type} in {orbit} orbit mechanical electrical communication" | |
| try: | |
| results = knowledge_base.similarity_search(query, k=3) | |
| context = "\n\n".join([doc.page_content for doc in results]) | |
| except Exception as e: | |
| context = f"ERROR during knowledge query: {e}" | |
| return f"π RAG REQUIREMENTS (Based on manuals):\n{context}" | |
| def check_plate_fit( | |
| length_cm: float, | |
| width_cm: float, | |
| height_cm: float, | |
| mass_kg: float, | |
| ) -> str: | |
| fits, fails = [], [] | |
| user_dims = sorted([length_cm, width_cm, height_cm]) | |
| for name, specs in ENVELOPES.items(): | |
| env_dims = sorted([specs["L"], specs["W"], specs["H"]]) | |
| mass_ok = mass_kg <= specs["max_mass"] | |
| geo_ok = all(u <= e for u, e in zip(user_dims, env_dims)) | |
| if mass_ok and geo_ok: | |
| fits.append(name) | |
| else: | |
| reasons = [] | |
| if not mass_ok: reasons.append(f"Overweight (Limit: {specs['max_mass']}kg)") | |
| if not geo_ok: reasons.append("Geometry Violation") | |
| fails.append(f"{name}: {' + '.join(reasons)}") | |
| if fits: | |
| return f"β FIT SUCCESS: Fits {', '.join(fits)} (Recommend {fits[0]})" | |
| return f"β FIT FAILURE: No fit. Issues: {chr(10).join(fails)}" | |
| def classify_hazard( | |
| propellant_type: str, | |
| battery_wh: float, | |
| pressure_psi: float, | |
| ) -> Dict[str, Any]: | |
| classification, flags = "Standard", [] | |
| if propellant_type.lower() not in ["none", "n/a", "green", "water", "xenon"]: | |
| classification, flags = "Hazardous", [f"High-Risk Propellant: {propellant_type}"] | |
| if battery_wh > 1000: | |
| classification, flags = "Hazardous", flags + [f"Battery > 1kWh ({battery_wh}Wh)"] | |
| if pressure_psi > 150: | |
| classification, flags = "Hazardous", flags + [f"Pressure > 150 PSI"] | |
| if not flags: | |
| flags.append("Payload appears standard/benign.") | |
| return { | |
| "level": classification, | |
| "risk_flags": flags, | |
| "implication": ( | |
| "Full multi-phase Safety Review Required" | |
| if classification == "Hazardous" | |
| else "Standard single-phase review" | |
| ), | |
| } | |
| def calculate_launch_cost(mass_kg: float) -> str: | |
| mass_cost = mass_kg * PRICING_MODEL["base_rate_per_kg"] | |
| final_cost = max(mass_cost, PRICING_MODEL["min_plate_fee"]) | |
| return f"π° ESTIMATED COST\nMass Charge: ${mass_cost:,.0f}\nMinimum Plate Fee: ${PRICING_MODEL['min_plate_fee']:,}\n**TOTAL ESTIMATE: ${final_cost:,.0f} USD**" | |
| def required_documents( | |
| knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None, | |
| payload_type: str = "", | |
| hazard_level: Literal["Standard", "Hazardous"] = "Standard", | |
| ) -> str: | |
| query = f"required documents for {payload_type} deliverables ICD" | |
| try: | |
| results = knowledge_base.similarity_search(query, k=3) | |
| base_docs = "\n\n".join([doc.page_content for doc in results]) | |
| except Exception as e: | |
| base_docs = f"ERROR during knowledge query: {e}" | |
| extra = "" | |
| if hazard_level == "Hazardous": | |
| extra = "\nβ οΈ EXTRA HAZARDOUS DOCUMENTS:\n* MSDS for all fluids\n* Burst Test Certificate\n* Propellant Handling Plan\n* Full Safety Review Package (SRDP)" | |
| return f"π REQUIRED DOCUMENTS ({payload_type}, {hazard_level})\n{extra}\n---\nSTANDARD (from manual):\n{base_docs}" | |
| def timeline_check( | |
| knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None, | |
| hazard_level: Literal["Standard", "Hazardous"] = "Standard", | |
| ) -> str: | |
| query = "launch campaign schedule L-minus integration deadlines" | |
| try: | |
| results = knowledge_base.similarity_search(query, k=3) | |
| base_timeline = "\n\n".join([doc.page_content for doc in results]) | |
| except Exception as e: | |
| base_timeline = f"ERROR during knowledge query: {e}" | |
| safety = "β Standard Review ~L-4 Months" if hazard_level == "Standard" else "\nπ HAZARDOUS EARLY REVIEWS:\n* L-12m: Phase 0\n* L-9m: Phase 1\n* L-6m: Phase 2\n* L-3m: Phase 3" | |
| return f"π TIMELINE ({hazard_level})\n{safety}\n---\nSTANDARD MILESTONES:\n{base_timeline}" | |
| # ============================================================================== | |
| # π°οΈ ORBITAL TOOLS | |
| # ============================================================================== | |
| def fetch_gp_data_tool( | |
| query_type: str = "CATNR", | |
| query_value: str = "", | |
| format: Literal["TLE", "JSON", "JSON-PRETTY", "CSV", "XML", "KVN"] = "JSON" | |
| ) -> Dict[str, Any]: | |
| base_url = "https://celestrak.org/NORAD/elements/gp.php" | |
| params = {"FORMAT": format, query_type.upper(): query_value} | |
| try: | |
| resp = requests.get(base_url, params=params, timeout=10) | |
| resp.raise_for_status() | |
| if "JSON" in format: | |
| return resp.json() | |
| else: | |
| return {"raw": resp.text} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def propagate_orbit_tool( | |
| tle_lines: List[str], | |
| target_time: Optional[datetime] = None | |
| ) -> Dict[str, Any]: | |
| try: | |
| sat = Satrec.twoline2rv(tle_lines[0], tle_lines[1]) | |
| target_time = target_time or datetime.now(timezone.utc) | |
| jd, fr = jday(target_time.year, target_time.month, target_time.day, | |
| target_time.hour, target_time.minute, target_time.second + target_time.microsecond*1e-6) | |
| e, r, v = sat.sgp4(jd, fr) | |
| if e != 0: | |
| raise RuntimeError(f"SGP4 error code {e}") | |
| return {"position_km": tuple(r), "velocity_kms": tuple(v), "timestamp": target_time.isoformat()} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def collision_check_tool( | |
| sat1_tle: List[str], | |
| sat2_tle: List[str], | |
| threshold_km: float = 5.0, | |
| target_time: Optional[datetime] = None | |
| ) -> Dict[str, Any]: | |
| try: | |
| pos1 = propagate_orbit_tool(sat1_tle, target_time)["position_km"] | |
| pos2 = propagate_orbit_tool(sat2_tle, target_time)["position_km"] | |
| distance = sqrt(sum((a - b) ** 2 for a, b in zip(pos1, pos2))) | |
| warning = distance <= threshold_km | |
| return {"distance_km": distance, "collision_warning": warning, "threshold_km": threshold_km, "timestamp": (target_time or datetime.now(timezone.utc)).isoformat()} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| # ============================================================================== | |
| # π RUN MCP SERVER | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| mcp.run() | |