tsinviking commited on
Commit
ffd6fdf
Β·
verified Β·
1 Parent(s): bfe3cee

corrected prev commit

Browse files
Files changed (2) hide show
  1. app.py +226 -248
  2. server.py +73 -156
app.py CHANGED
@@ -1,267 +1,245 @@
1
  import os
2
- import math
3
- from typing import Literal, Optional, Dict, Any, List, Annotated
4
- from datetime import datetime, timezone
5
- import requests
6
- from sgp4.api import Satrec, jday
7
- from math import sqrt
8
-
9
  from fastmcp import FastMCP
10
- # from mcp.server.fastmcp import FastMCP
11
- from langchain_chroma import Chroma
12
- from langchain_community.embeddings import HuggingFaceEmbeddings
13
- from langchain_core.vectorstores import VectorStore
14
-
15
- # ==============================================================================
16
- # 🧠 MCP SERVER INITIALIZATION
17
- # ==============================================================================
18
-
19
- mcp = FastMCP("FalconPrep", stateless_http=True)
20
-
21
- # ==============================================================================
22
- # 🧠 CORE CONSTANTS (The Engineering Brain)
23
- # ==============================================================================
24
-
25
- ENVELOPES: Dict[str, Dict[str, Any]] = {
26
- "1U_CubeSat": {"L": 10, "W": 10, "H": 11.35, "max_mass": 2.0},
27
- "3U_CubeSat": {"L": 10, "W": 10, "H": 34.05, "max_mass": 5.0},
28
- "15_Inch_ESPA": {"L": 61.0, "W": 71.0, "H": 71.0, "max_mass": 220.0},
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  }
30
-
31
- PRICING_MODEL: Dict[str, int] = {
32
- "base_rate_per_kg": 6500,
33
- "min_plate_fee": 300000
 
 
 
34
  }
 
 
 
 
 
 
 
 
 
 
35
 
36
- # ==============================================================================
37
- # πŸ—„οΈ RESOURCE: Knowledge Base (VectorStore Connection)
38
- # ==============================================================================
39
 
40
- DB_PATH = "./falcon_db"
41
 
42
- @mcp.resource("knowledge://rideshare/spacex-manuals-v1")
43
- def get_knowledge_base_resource() -> Any:
44
- print(f"Attempting to load ChromaDB client from {DB_PATH}...")
45
- try:
46
- embedding_model = HuggingFaceEmbeddings(
47
- model_name="all-MiniLM-L6-v2",
48
- model_kwargs={'device': 'cpu'}
49
- )
50
- vectorstore = Chroma(
51
- persist_directory=DB_PATH,
52
- embedding_function=embedding_model
53
- )
54
- print("βœ… KnowledgeBaseResource loaded successfully.")
55
- return vectorstore
56
- except Exception as e:
57
- print(f"❌ ERROR: Did you run 'python ingest.py'? Error: {e}")
58
- return None
59
 
60
- # ==============================================================================
61
- # πŸ—£οΈ PROMPT TEMPLATE
62
- # ==============================================================================
63
 
64
- @mcp.prompt()
65
- def launch_readiness_summary_prompt(
66
- payload_name: str,
67
- fit_check_result: str,
68
- hazard_classification_result: Dict[str, Any],
69
- cost_estimate: str,
70
- required_documents_list: str,
71
- timeline_summary: str,
72
- ) -> str:
73
- hazard_level = hazard_classification_result.get('level', 'N/A')
74
- return f"""
75
- You are the **FalconPrep Launch Readiness Assistant**, an expert in SpaceX rideshare compliance.
76
- Your task is to synthesize the following tool outputs for payload **'{payload_name}'**.
77
-
78
- **Guidelines**
79
- 1. Lead with Fit + Hazard.
80
- 2. Use bullet points.
81
- 3. Use professional but friendly compliance language.
82
-
83
- --- OUTPUTS ---
84
- Fit Check:
85
- {fit_check_result}
86
-
87
- Hazard: {hazard_level}
88
- Flags: {hazard_classification_result.get('risk_flags', [])}
89
-
90
- Cost Estimate:
91
- {cost_estimate}
92
-
93
- Required Documents:
94
- {required_documents_list}
95
-
96
- Timeline:
97
- {timeline_summary}
98
- ---
99
-
100
- **Produce a final summary now.**
101
- """
102
 
103
- # ==============================================================================
104
- # πŸ› οΈ PAYLOAD TOOLS
105
- # ==============================================================================
106
-
107
- @mcp.tool()
108
- def get_launch_requirements(
109
- knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None,
110
- payload_type: str = "",
111
- orbit: str = "",
112
- ) -> str:
113
- query = f"requirements for {payload_type} in {orbit} orbit mechanical electrical communication"
114
- try:
115
- results = knowledge_base.similarity_search(query, k=3)
116
- context = "\n\n".join([doc.page_content for doc in results])
117
- except Exception as e:
118
- context = f"ERROR during knowledge query: {e}"
119
- return f"πŸ“„ RAG REQUIREMENTS (Based on manuals):\n{context}"
120
-
121
- @mcp.tool()
122
- def check_plate_fit(
123
- length_cm: float,
124
- width_cm: float,
125
- height_cm: float,
126
- mass_kg: float,
127
- ) -> str:
128
- fits, fails = [], []
129
- user_dims = sorted([length_cm, width_cm, height_cm])
130
- for name, specs in ENVELOPES.items():
131
- env_dims = sorted([specs["L"], specs["W"], specs["H"]])
132
- mass_ok = mass_kg <= specs["max_mass"]
133
- geo_ok = all(u <= e for u, e in zip(user_dims, env_dims))
134
- if mass_ok and geo_ok:
135
- fits.append(name)
136
- else:
137
- reasons = []
138
- if not mass_ok: reasons.append(f"Overweight (Limit: {specs['max_mass']}kg)")
139
- if not geo_ok: reasons.append("Geometry Violation")
140
- fails.append(f"{name}: {' + '.join(reasons)}")
141
- if fits:
142
- return f"βœ… FIT SUCCESS: Fits {', '.join(fits)} (Recommend {fits[0]})"
143
- return f"❌ FIT FAILURE: No fit. Issues: {chr(10).join(fails)}"
144
-
145
- @mcp.tool()
146
- def classify_hazard(
147
- propellant_type: str,
148
- battery_wh: float,
149
- pressure_psi: float,
150
- ) -> Dict[str, Any]:
151
- classification, flags = "Standard", []
152
- if propellant_type.lower() not in ["none", "n/a", "green", "water", "xenon"]:
153
- classification, flags = "Hazardous", [f"High-Risk Propellant: {propellant_type}"]
154
- if battery_wh > 1000:
155
- classification, flags = "Hazardous", flags + [f"Battery > 1kWh ({battery_wh}Wh)"]
156
- if pressure_psi > 150:
157
- classification, flags = "Hazardous", flags + [f"Pressure > 150 PSI"]
158
- if not flags:
159
- flags.append("Payload appears standard/benign.")
160
- return {
161
- "level": classification,
162
- "risk_flags": flags,
163
- "implication": (
164
- "Full multi-phase Safety Review Required"
165
- if classification == "Hazardous"
166
- else "Standard single-phase review"
167
- ),
168
  }
169
 
170
- @mcp.tool()
171
- def calculate_launch_cost(mass_kg: float) -> str:
172
- mass_cost = mass_kg * PRICING_MODEL["base_rate_per_kg"]
173
- final_cost = max(mass_cost, PRICING_MODEL["min_plate_fee"])
174
- return f"πŸ’° ESTIMATED COST\nMass Charge: ${mass_cost:,.0f}\nMinimum Plate Fee: ${PRICING_MODEL['min_plate_fee']:,}\n**TOTAL ESTIMATE: ${final_cost:,.0f} USD**"
175
-
176
- @mcp.tool()
177
- def required_documents(
178
- knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None,
179
- payload_type: str = "",
180
- hazard_level: Literal["Standard", "Hazardous"] = "Standard",
181
- ) -> str:
182
- query = f"required documents for {payload_type} deliverables ICD"
183
- try:
184
- results = knowledge_base.similarity_search(query, k=3)
185
- base_docs = "\n\n".join([doc.page_content for doc in results])
186
- except Exception as e:
187
- base_docs = f"ERROR during knowledge query: {e}"
188
- extra = ""
189
- if hazard_level == "Hazardous":
190
- extra = "\n⚠️ EXTRA HAZARDOUS DOCUMENTS:\n* MSDS for all fluids\n* Burst Test Certificate\n* Propellant Handling Plan\n* Full Safety Review Package (SRDP)"
191
- return f"πŸ“‘ REQUIRED DOCUMENTS ({payload_type}, {hazard_level})\n{extra}\n---\nSTANDARD (from manual):\n{base_docs}"
192
-
193
- @mcp.tool()
194
- def timeline_check(
195
- knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None,
196
- hazard_level: Literal["Standard", "Hazardous"] = "Standard",
197
- ) -> str:
198
- query = "launch campaign schedule L-minus integration deadlines"
199
- try:
200
- results = knowledge_base.similarity_search(query, k=3)
201
- base_timeline = "\n\n".join([doc.page_content for doc in results])
202
- except Exception as e:
203
- base_timeline = f"ERROR during knowledge query: {e}"
204
- safety = "βœ… Standard Review ~L-4 Months" if hazard_level == "Standard" else "\nπŸ›‘ HAZARDOUS EARLY REVIEWS:\n* L-12m: Phase 0\n* L-9m: Phase 1\n* L-6m: Phase 2\n* L-3m: Phase 3"
205
- return f"πŸ•’ TIMELINE ({hazard_level})\n{safety}\n---\nSTANDARD MILESTONES:\n{base_timeline}"
206
 
207
- # ==============================================================================
208
- # πŸ›°οΈ ORBITAL TOOLS
209
- # ==============================================================================
210
 
211
- @mcp.tool()
212
- def fetch_gp_data_tool(
213
- query_type: str = "CATNR",
214
- query_value: str = "",
215
- format: Literal["TLE", "JSON", "JSON-PRETTY", "CSV", "XML", "KVN"] = "JSON"
216
- ) -> Dict[str, Any]:
217
- base_url = "https://celestrak.org/NORAD/elements/gp.php"
218
- params = {"FORMAT": format, query_type.upper(): query_value}
219
- try:
220
- resp = requests.get(base_url, params=params, timeout=10)
221
- resp.raise_for_status()
222
- if "JSON" in format:
223
- return resp.json()
224
- else:
225
- return {"raw": resp.text}
226
- except Exception as e:
227
- return {"error": str(e)}
228
 
229
- @mcp.tool()
230
- def propagate_orbit_tool(
231
- tle_lines: List[str],
232
- target_time: Optional[datetime] = None
233
- ) -> Dict[str, Any]:
234
- try:
235
- sat = Satrec.twoline2rv(tle_lines[0], tle_lines[1])
236
- target_time = target_time or datetime.now(timezone.utc)
237
- jd, fr = jday(target_time.year, target_time.month, target_time.day,
238
- target_time.hour, target_time.minute, target_time.second + target_time.microsecond*1e-6)
239
- e, r, v = sat.sgp4(jd, fr)
240
- if e != 0:
241
- raise RuntimeError(f"SGP4 error code {e}")
242
- return {"position_km": tuple(r), "velocity_kms": tuple(v), "timestamp": target_time.isoformat()}
243
- except Exception as e:
244
- return {"error": str(e)}
245
 
246
- @mcp.tool()
247
- def collision_check_tool(
248
- sat1_tle: List[str],
249
- sat2_tle: List[str],
250
- threshold_km: float = 5.0,
251
- target_time: Optional[datetime] = None
252
- ) -> Dict[str, Any]:
253
- try:
254
- pos1 = propagate_orbit_tool(sat1_tle, target_time)["position_km"]
255
- pos2 = propagate_orbit_tool(sat2_tle, target_time)["position_km"]
256
- distance = sqrt(sum((a - b) ** 2 for a, b in zip(pos1, pos2)))
257
- warning = distance <= threshold_km
258
- return {"distance_km": distance, "collision_warning": warning, "threshold_km": threshold_km, "timestamp": (target_time or datetime.now(timezone.utc)).isoformat()}
259
- except Exception as e:
260
- return {"error": str(e)}
261
 
262
- # ==============================================================================
263
- # 🏁 RUN MCP SERVER
264
- # ==============================================================================
265
 
266
  if __name__ == "__main__":
267
- mcp.run()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
 
 
 
 
 
 
 
2
  from fastmcp import FastMCP
3
+ from fastapi.responses import HTMLResponse
4
+ from fastapi.middleware.cors import CORSMiddleware
5
+
6
+
7
+ from server import mcp
8
+
9
+ # app = mcp.app
10
+
11
+ # app.add_middleware(
12
+ # CORSMiddleware,
13
+ # allow_origins=["*"],
14
+ # allow_credentials=True,
15
+ # allow_methods=["*"],
16
+ # allow_headers=["*"],
17
+ # )
18
+
19
+ # -----------------------------
20
+ # Landing Page (Optional UI)
21
+ # -----------------------------
22
+ @mcp.custom_route("/", methods=["GET"])
23
+ async def index(_):
24
+ return HTMLResponse("""
25
+ <!DOCTYPE html>
26
+ <html lang="en">
27
+ <head>
28
+ <meta charset="UTF-8">
29
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
30
+ <title>FalconPrep MCP Server</title>
31
+ <style>
32
+ :root {
33
+ --accent: #22d3ee;
34
+ --bg: #020b14;
35
+ --panel: rgba(0,0,0,0.45);
36
+ --border: #1e293b;
37
+ --font: "Roboto Mono", monospace;
38
+ --text-dim: #9ac7e0;
39
+ }
40
+ body {
41
+ margin: 0;
42
+ background: var(--bg);
43
+ font-family: var(--font);
44
+ color: white;
45
+ overflow-x: hidden;
46
+ overflow-y: auto;
47
+ min-height: 100vh;
48
+ position: relative;
49
+ }
50
+ #starfield {
51
+ position: fixed;
52
+ inset: 0;
53
+ width: 100vw;
54
+ height: 100vh;
55
+ z-index: -1;
56
+ display: block;
57
+ }
58
+ .container {
59
+ width: 85vw;
60
+ max-width: 900px;
61
+ margin: 0 auto;
62
+ padding-top: 10vh;
63
+ padding-bottom: 10vh;
64
+ display: flex;
65
+ flex-direction: column;
66
+ gap: 2.5vh;
67
+ }
68
+ h1 {
69
+ font-size: 3.2rem;
70
+ text-transform: uppercase;
71
+ letter-spacing: 0.18rem;
72
+ margin: 0 0 0.5rem 0;
73
+ color: var(--accent);
74
+ font-weight: 900;
75
+ text-align: center;
76
+ }
77
+ .subtitle {
78
+ opacity: 0.75;
79
+ letter-spacing: 0.12rem;
80
+ text-align: center;
81
+ margin-bottom: 2vh;
82
+ }
83
+ .panel {
84
+ background: var(--panel);
85
+ border: 1px solid var(--border);
86
+ border-radius: 8px;
87
+ padding: 1.4rem;
88
+ box-shadow: 0px 0px 15px rgba(34,211,238,0.08);
89
+ backdrop-filter: blur(4px);
90
+ }
91
+ .panel h2 {
92
+ margin-top: 0;
93
+ margin-bottom: 0.8rem;
94
+ font-size: 1.25rem;
95
+ color: var(--accent);
96
+ font-weight: 700;
97
+ text-transform: uppercase;
98
+ letter-spacing: 0.12rem;
99
+ }
100
+ p {
101
+ font-size: 1rem;
102
+ line-height: 1.55;
103
+ color: var(--text-dim);
104
+ }
105
+ ul {
106
+ padding-left: 1.2rem;
107
+ margin: 0;
108
+ }
109
+ li {
110
+ color: var(--text-dim);
111
+ margin-bottom: 0.5rem;
112
+ font-size: 1rem;
113
+ }
114
+ code {
115
+ color: var(--accent);
116
+ }
117
+ </style>
118
+ </head>
119
+
120
+ <body>
121
+ <canvas id="starfield"></canvas>
122
+
123
+ <div class="container">
124
+
125
+ <h1>FalconPrep</h1>
126
+ <p class="subtitle">AI Launch Integration Assistant</p>
127
+
128
+ <div class="panel"><h2>Problem</h2>
129
+ <p>Launch teams must navigate 200–400+ pages of SpaceX rideshare manuals, unclear pricing, and slow email back-and-forth.</p>
130
+ </div>
131
+
132
+ <div class="panel"><h2>Pain Points</h2>
133
+ <ul>
134
+ <li>Complex requirements & engineering jargon</li>
135
+ <li>Slow cost estimation</li>
136
+ <li>Unclear safety & propellant rules</li>
137
+ <li>Scattered planning tools (PDFs, Excel, emails)</li>
138
+ </ul>
139
+ </div>
140
+
141
+ <div class="panel"><h2>Solution</h2>
142
+ <p>FalconPrep turns the documentation into structured MCP tools so AI agents can compute requirements, costs, hazards, and timelines.</p>
143
+ </div>
144
+
145
+ <div class="panel"><h2>Core Tools</h2>
146
+ <ul>
147
+ <li>check_plate_fit</li>
148
+ <li>classify_hazard</li>
149
+ <li>estimate_cost</li>
150
+ <li>lookup_standard</li>
151
+ <li>generate_report</li>
152
+ </ul>
153
+ </div>
154
+
155
+ <div class="panel"><h2>How to Use</h2>
156
+ <ul>
157
+ <li>Connect with an MCP client</li>
158
+ <li>SSE endpoint: <code>/sse</code></li>
159
+ <li>Supports all tool calls via JSON</li>
160
+ </ul>
161
+ </div>
162
+
163
+ </div>
164
+
165
+ <script>
166
+ /* starfield js animation preserved */
167
+ const canvas = document.getElementById("starfield");
168
+ const ctx = canvas.getContext("2d");
169
+ let stars = [];
170
+
171
+ function resizeCanvas() {
172
+ canvas.width = window.innerWidth;
173
+ canvas.height = window.innerHeight;
174
  }
175
+ function initStars() {
176
+ resizeCanvas();
177
+ stars = Array.from({ length: 450 }, () => ({
178
+ x: Math.random() * canvas.width,
179
+ y: Math.random() * canvas.height,
180
+ speed: Math.random() * 0.7 + 0.2
181
+ }));
182
  }
183
+ let lastScrollY = 0;
184
+ let scrollSpeed = 0;
185
+ window.addEventListener("scroll", () => {
186
+ const current = window.scrollY;
187
+ scrollSpeed = current - lastScrollY;
188
+ lastScrollY = current;
189
+ });
190
+ let smoothScroll = 0;
191
+ function animateStars() {
192
+ smoothScroll += (scrollSpeed - smoothScroll) * 0.05;
193
 
194
+ ctx.fillStyle = "#020b14";
195
+ ctx.fillRect(0, 0, canvas.width, canvas.height);
 
196
 
197
+ ctx.fillStyle = "white";
198
 
199
+ for (const s of stars) {
200
+ ctx.fillRect(s.x, s.y, 1.5, 1.5);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
 
202
+ const scrollBoost = smoothScroll * 0.05;
 
 
203
 
204
+ s.y += s.speed + scrollBoost;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205
 
206
+ if (s.y > canvas.height) s.y -= canvas.height;
207
+ if (s.y < 0) s.y += canvas.height;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
  }
209
 
210
+ requestAnimationFrame(animateStars);
211
+ }
212
+ window.addEventListener("resize", resizeCanvas);
213
+ initStars();
214
+ animateStars();
215
+ </script>
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
+ </body>
218
+ </html>
219
+ """)
220
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
 
222
+ # -----------------------------
223
+ # Run FastMCP (with SSE)
224
+ # -----------------------------
225
+ HF_SPACE_PORT = int(os.getenv("PORT", 7860))
226
+ HF_SPACE_HOST = os.getenv("HOST", "0.0.0.0")
 
 
 
 
 
 
 
 
 
 
 
227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
 
 
 
 
229
 
230
  if __name__ == "__main__":
231
+ print("πŸš€ FalconPrep MCP Server starting...")
232
+ if hasattr(mcp, '_app'):
233
+ mcp._app.add_middleware(
234
+ CORSMiddleware,
235
+ allow_origins=["*"],
236
+ allow_credentials=True,
237
+ allow_methods=["*"],
238
+ allow_headers=["*"],
239
+ )
240
+
241
+ mcp.run(
242
+ transport="sse",
243
+ host=HF_SPACE_HOST,
244
+ port=HF_SPACE_PORT,
245
+ )
server.py CHANGED
@@ -1,6 +1,10 @@
1
  import os
2
  import math
3
  from typing import Literal, Optional, Dict, Any, List, Annotated
 
 
 
 
4
 
5
  from fastmcp import FastMCP
6
  # from mcp.server.fastmcp import FastMCP
@@ -8,7 +12,10 @@ from langchain_chroma import Chroma
8
  from langchain_community.embeddings import HuggingFaceEmbeddings
9
  from langchain_core.vectorstores import VectorStore
10
 
11
- # Initialize MCP Server
 
 
 
12
  mcp = FastMCP("FalconPrep", stateless_http=True)
13
 
14
  # ==============================================================================
@@ -34,16 +41,6 @@ DB_PATH = "./falcon_db"
34
 
35
  @mcp.resource("knowledge://rideshare/spacex-manuals-v1")
36
  def get_knowledge_base_resource() -> Any:
37
- """
38
- Initializes and returns the query client for the persistent ChromaDB vector store.
39
-
40
- This resource loads the SpaceX Payload User Guides and uses a local
41
- HuggingFace embedding model for zero-cost operation. The return type is
42
- Any to prevent Pydantic serialization errors during FastMCP startup.
43
-
44
- Returns:
45
- Any (Chroma VectorStore instance): The connected vector store object.
46
- """
47
  print(f"Attempting to load ChromaDB client from {DB_PATH}...")
48
  try:
49
  embedding_model = HuggingFaceEmbeddings(
@@ -54,7 +51,6 @@ def get_knowledge_base_resource() -> Any:
54
  persist_directory=DB_PATH,
55
  embedding_function=embedding_model
56
  )
57
-
58
  print("βœ… KnowledgeBaseResource loaded successfully.")
59
  return vectorstore
60
  except Exception as e:
@@ -74,21 +70,6 @@ def launch_readiness_summary_prompt(
74
  required_documents_list: str,
75
  timeline_summary: str,
76
  ) -> str:
77
- """
78
- Instructs the AI Agent on how to synthesize the structured tool outputs
79
- into a comprehensive, final launch readiness report for the user.
80
-
81
- Args:
82
- payload_name (str): The name of the satellite/payload.
83
- fit_check_result (str): Output from the check_plate_fit tool.
84
- hazard_classification_result (Dict[str, Any]): Output from the classify_hazard tool.
85
- cost_estimate (str): Output from the calculate_launch_cost tool.
86
- required_documents_list (str): Output from the required_documents tool.
87
- timeline_summary (str): Output from the timeline_check tool.
88
-
89
- Returns:
90
- str: The full system prompt instructing the LLM on synthesis.
91
- """
92
  hazard_level = hazard_classification_result.get('level', 'N/A')
93
  return f"""
94
  You are the **FalconPrep Launch Readiness Assistant**, an expert in SpaceX rideshare compliance.
@@ -120,7 +101,7 @@ def launch_readiness_summary_prompt(
120
  """
121
 
122
  # ==============================================================================
123
- # πŸ› οΈ TOOLS
124
  # ==============================================================================
125
 
126
  @mcp.tool()
@@ -129,30 +110,13 @@ def get_launch_requirements(
129
  payload_type: str = "",
130
  orbit: str = "",
131
  ) -> str:
132
- """
133
- RAG-Based: Retrieves qualitative launch requirements from the ingested SpaceX payload manuals.
134
-
135
- Queries the vector store for detailed technical and integration requirements
136
- based on the payload type and target orbit.
137
-
138
- Args:
139
- knowledge_base (Any): The injected Chroma VectorStore client.
140
- payload_type (str): The type of payload (e.g., 'CubeSat', 'Cake Topper').
141
- orbit (str): The intended orbit (e.g., 'SSO', 'LEO').
142
-
143
- Returns:
144
- str: A string containing the relevant context found in the manuals.
145
- """
146
  query = f"requirements for {payload_type} in {orbit} orbit mechanical electrical communication"
147
  try:
148
  results = knowledge_base.similarity_search(query, k=3)
149
  context = "\n\n".join([doc.page_content for doc in results])
150
  except Exception as e:
151
  context = f"ERROR during knowledge query: {e}"
152
- return f"""
153
- πŸ“„ RAG REQUIREMENTS (Based on manuals):
154
- {context}
155
- """
156
 
157
  @mcp.tool()
158
  def check_plate_fit(
@@ -161,29 +125,12 @@ def check_plate_fit(
161
  height_cm: float,
162
  mass_kg: float,
163
  ) -> str:
164
- """
165
- Logic Tool: Checks if a payload's bounding box and mass fit standard rideshare interfaces.
166
-
167
- Compares the user-provided dimensions and mass against predefined standards
168
- (1U, 3U, ESPA-class).
169
-
170
- Args:
171
- length_cm (float): Payload length in centimeters.
172
- width_cm (float): Payload width in centimeters.
173
- height_cm (float): Payload height in centimeters.
174
- mass_kg (float): Payload mass in kilograms.
175
-
176
- Returns:
177
- str: A string indicating success and compatible interfaces, or a failure detail.
178
- """
179
  fits, fails = [], []
180
  user_dims = sorted([length_cm, width_cm, height_cm])
181
-
182
  for name, specs in ENVELOPES.items():
183
  env_dims = sorted([specs["L"], specs["W"], specs["H"]])
184
  mass_ok = mass_kg <= specs["max_mass"]
185
  geo_ok = all(u <= e for u, e in zip(user_dims, env_dims))
186
-
187
  if mass_ok and geo_ok:
188
  fits.append(name)
189
  else:
@@ -191,7 +138,6 @@ def check_plate_fit(
191
  if not mass_ok: reasons.append(f"Overweight (Limit: {specs['max_mass']}kg)")
192
  if not geo_ok: reasons.append("Geometry Violation")
193
  fails.append(f"{name}: {' + '.join(reasons)}")
194
-
195
  if fits:
196
  return f"βœ… FIT SUCCESS: Fits {', '.join(fits)} (Recommend {fits[0]})"
197
  return f"❌ FIT FAILURE: No fit. Issues: {chr(10).join(fails)}"
@@ -202,32 +148,15 @@ def classify_hazard(
202
  battery_wh: float,
203
  pressure_psi: float,
204
  ) -> Dict[str, Any]:
205
- """
206
- Logic Tool: Determines the payload's Hazard Classification (Standard/Hazardous).
207
-
208
- Classification is based on critical thresholds for propulsion, battery capacity
209
- (>1 kWh or 1000 Wh), and pressure vessels (>150 PSI).
210
-
211
- Args:
212
- propellant_type (str): Type of propellant used (e.g., Hydrazine, Xenon, None).
213
- battery_wh (float): Total energy capacity of batteries in Watt-hours.
214
- pressure_psi (float): Max operating pressure of any vessel in PSI.
215
-
216
- Returns:
217
- Dict[str, Any]: Classification level, risk flags, and implication.
218
- """
219
  classification, flags = "Standard", []
220
-
221
  if propellant_type.lower() not in ["none", "n/a", "green", "water", "xenon"]:
222
  classification, flags = "Hazardous", [f"High-Risk Propellant: {propellant_type}"]
223
  if battery_wh > 1000:
224
  classification, flags = "Hazardous", flags + [f"Battery > 1kWh ({battery_wh}Wh)"]
225
  if pressure_psi > 150:
226
  classification, flags = "Hazardous", flags + [f"Pressure > 150 PSI"]
227
-
228
  if not flags:
229
  flags.append("Payload appears standard/benign.")
230
-
231
  return {
232
  "level": classification,
233
  "risk_flags": flags,
@@ -240,26 +169,9 @@ def classify_hazard(
240
 
241
  @mcp.tool()
242
  def calculate_launch_cost(mass_kg: float) -> str:
243
- """
244
- Logic Tool: Calculates the estimated launch cost using a linear cost model.
245
-
246
- Applies the base rate per kg, but enforces a minimum plate fee defined in
247
- the CORE CONSTANTS.
248
-
249
- Args:
250
- mass_kg (float): Payload mass in kilograms.
251
-
252
- Returns:
253
- str: A formatted string detailing the mass cost, minimum fee, and total estimate.
254
- """
255
  mass_cost = mass_kg * PRICING_MODEL["base_rate_per_kg"]
256
  final_cost = max(mass_cost, PRICING_MODEL["min_plate_fee"])
257
- return f"""
258
- πŸ’° ESTIMATED COST
259
- Mass Charge: ${mass_cost:,.0f}
260
- Minimum Plate Fee: ${PRICING_MODEL['min_plate_fee']:,}
261
- **TOTAL ESTIMATE: ${final_cost:,.0f} USD**
262
- """
263
 
264
  @mcp.tool()
265
  def required_documents(
@@ -267,84 +179,89 @@ def required_documents(
267
  payload_type: str = "",
268
  hazard_level: Literal["Standard", "Hazardous"] = "Standard",
269
  ) -> str:
270
- """
271
- RAG & Logic Hybrid: Generates a document checklist, dynamically adding
272
- specific safety documents if the payload is classified as Hazardous.
273
-
274
- Args:
275
- knowledge_base (Any): The injected Chroma VectorStore client.
276
- payload_type (str): The payload type for RAG querying.
277
- hazard_level (Literal["Standard", "Hazardous"]): The classification from classify_hazard.
278
-
279
- Returns:
280
- str: The combined list of standard and hazard-specific documentation.
281
- """
282
  query = f"required documents for {payload_type} deliverables ICD"
283
  try:
284
  results = knowledge_base.similarity_search(query, k=3)
285
  base_docs = "\n\n".join([doc.page_content for doc in results])
286
  except Exception as e:
287
  base_docs = f"ERROR during knowledge query: {e}"
288
-
289
  extra = ""
290
  if hazard_level == "Hazardous":
291
- extra = """
292
- ⚠️ EXTRA HAZARDOUS DOCUMENTS:
293
- * MSDS for all fluids
294
- * Burst Test Certificate
295
- * Propellant Handling Plan
296
- * Full Safety Review Package (SRDP)
297
- """
298
-
299
- return f"""
300
- πŸ“‘ REQUIRED DOCUMENTS ({payload_type}, {hazard_level})
301
- {extra}
302
- ---
303
- STANDARD (from manual):
304
- {base_docs}
305
- """
306
 
307
  @mcp.tool()
308
  def timeline_check(
309
  knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None,
310
  hazard_level: Literal["Standard", "Hazardous"] = "Standard",
311
  ) -> str:
312
- """
313
- RAG & Logic Hybrid: Generates a timeline summary, enforcing stricter safety
314
- review deadlines for Hazardous payloads based on internal logic.
315
-
316
- Args:
317
- knowledge_base (Any): The injected Chroma VectorStore client.
318
- hazard_level (Literal["Standard", "Hazardous"]): The classification from classify_hazard.
319
-
320
- Returns:
321
- str: The summary including safety milestones and standard integration dates.
322
- """
323
  query = "launch campaign schedule L-minus integration deadlines"
324
  try:
325
  results = knowledge_base.similarity_search(query, k=3)
326
  base_timeline = "\n\n".join([doc.page_content for doc in results])
327
  except Exception as e:
328
  base_timeline = f"ERROR during knowledge query: {e}"
 
 
329
 
330
- if hazard_level == "Hazardous":
331
- safety = """
332
- πŸ›‘ HAZARDOUS EARLY REVIEWS:
333
- * L-12m: Phase 0
334
- * L-9m: Phase 1
335
- * L-6m: Phase 2
336
- * L-3m: Phase 3
337
- """
338
- else:
339
- safety = "βœ… Standard Review ~L-4 Months"
340
 
341
- return f"""
342
- πŸ•’ TIMELINE ({hazard_level})
343
- {safety}
344
- ---
345
- STANDARD MILESTONES:
346
- {base_timeline}
347
- """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
348
 
349
  if __name__ == "__main__":
350
- mcp.run()
 
1
  import os
2
  import math
3
  from typing import Literal, Optional, Dict, Any, List, Annotated
4
+ from datetime import datetime, timezone
5
+ import requests
6
+ from sgp4.api import Satrec, jday
7
+ from math import sqrt
8
 
9
  from fastmcp import FastMCP
10
  # from mcp.server.fastmcp import FastMCP
 
12
  from langchain_community.embeddings import HuggingFaceEmbeddings
13
  from langchain_core.vectorstores import VectorStore
14
 
15
+ # ==============================================================================
16
+ # 🧠 MCP SERVER INITIALIZATION
17
+ # ==============================================================================
18
+
19
  mcp = FastMCP("FalconPrep", stateless_http=True)
20
 
21
  # ==============================================================================
 
41
 
42
  @mcp.resource("knowledge://rideshare/spacex-manuals-v1")
43
  def get_knowledge_base_resource() -> Any:
 
 
 
 
 
 
 
 
 
 
44
  print(f"Attempting to load ChromaDB client from {DB_PATH}...")
45
  try:
46
  embedding_model = HuggingFaceEmbeddings(
 
51
  persist_directory=DB_PATH,
52
  embedding_function=embedding_model
53
  )
 
54
  print("βœ… KnowledgeBaseResource loaded successfully.")
55
  return vectorstore
56
  except Exception as e:
 
70
  required_documents_list: str,
71
  timeline_summary: str,
72
  ) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  hazard_level = hazard_classification_result.get('level', 'N/A')
74
  return f"""
75
  You are the **FalconPrep Launch Readiness Assistant**, an expert in SpaceX rideshare compliance.
 
101
  """
102
 
103
  # ==============================================================================
104
+ # πŸ› οΈ PAYLOAD TOOLS
105
  # ==============================================================================
106
 
107
  @mcp.tool()
 
110
  payload_type: str = "",
111
  orbit: str = "",
112
  ) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  query = f"requirements for {payload_type} in {orbit} orbit mechanical electrical communication"
114
  try:
115
  results = knowledge_base.similarity_search(query, k=3)
116
  context = "\n\n".join([doc.page_content for doc in results])
117
  except Exception as e:
118
  context = f"ERROR during knowledge query: {e}"
119
+ return f"πŸ“„ RAG REQUIREMENTS (Based on manuals):\n{context}"
 
 
 
120
 
121
  @mcp.tool()
122
  def check_plate_fit(
 
125
  height_cm: float,
126
  mass_kg: float,
127
  ) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  fits, fails = [], []
129
  user_dims = sorted([length_cm, width_cm, height_cm])
 
130
  for name, specs in ENVELOPES.items():
131
  env_dims = sorted([specs["L"], specs["W"], specs["H"]])
132
  mass_ok = mass_kg <= specs["max_mass"]
133
  geo_ok = all(u <= e for u, e in zip(user_dims, env_dims))
 
134
  if mass_ok and geo_ok:
135
  fits.append(name)
136
  else:
 
138
  if not mass_ok: reasons.append(f"Overweight (Limit: {specs['max_mass']}kg)")
139
  if not geo_ok: reasons.append("Geometry Violation")
140
  fails.append(f"{name}: {' + '.join(reasons)}")
 
141
  if fits:
142
  return f"βœ… FIT SUCCESS: Fits {', '.join(fits)} (Recommend {fits[0]})"
143
  return f"❌ FIT FAILURE: No fit. Issues: {chr(10).join(fails)}"
 
148
  battery_wh: float,
149
  pressure_psi: float,
150
  ) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
151
  classification, flags = "Standard", []
 
152
  if propellant_type.lower() not in ["none", "n/a", "green", "water", "xenon"]:
153
  classification, flags = "Hazardous", [f"High-Risk Propellant: {propellant_type}"]
154
  if battery_wh > 1000:
155
  classification, flags = "Hazardous", flags + [f"Battery > 1kWh ({battery_wh}Wh)"]
156
  if pressure_psi > 150:
157
  classification, flags = "Hazardous", flags + [f"Pressure > 150 PSI"]
 
158
  if not flags:
159
  flags.append("Payload appears standard/benign.")
 
160
  return {
161
  "level": classification,
162
  "risk_flags": flags,
 
169
 
170
  @mcp.tool()
171
  def calculate_launch_cost(mass_kg: float) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
172
  mass_cost = mass_kg * PRICING_MODEL["base_rate_per_kg"]
173
  final_cost = max(mass_cost, PRICING_MODEL["min_plate_fee"])
174
+ return f"πŸ’° ESTIMATED COST\nMass Charge: ${mass_cost:,.0f}\nMinimum Plate Fee: ${PRICING_MODEL['min_plate_fee']:,}\n**TOTAL ESTIMATE: ${final_cost:,.0f} USD**"
 
 
 
 
 
175
 
176
  @mcp.tool()
177
  def required_documents(
 
179
  payload_type: str = "",
180
  hazard_level: Literal["Standard", "Hazardous"] = "Standard",
181
  ) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
182
  query = f"required documents for {payload_type} deliverables ICD"
183
  try:
184
  results = knowledge_base.similarity_search(query, k=3)
185
  base_docs = "\n\n".join([doc.page_content for doc in results])
186
  except Exception as e:
187
  base_docs = f"ERROR during knowledge query: {e}"
 
188
  extra = ""
189
  if hazard_level == "Hazardous":
190
+ extra = "\n⚠️ EXTRA HAZARDOUS DOCUMENTS:\n* MSDS for all fluids\n* Burst Test Certificate\n* Propellant Handling Plan\n* Full Safety Review Package (SRDP)"
191
+ return f"πŸ“‘ REQUIRED DOCUMENTS ({payload_type}, {hazard_level})\n{extra}\n---\nSTANDARD (from manual):\n{base_docs}"
 
 
 
 
 
 
 
 
 
 
 
 
 
192
 
193
  @mcp.tool()
194
  def timeline_check(
195
  knowledge_base: Annotated[Any, mcp.resource("knowledge://rideshare/spacex-manuals-v1")] = None,
196
  hazard_level: Literal["Standard", "Hazardous"] = "Standard",
197
  ) -> str:
 
 
 
 
 
 
 
 
 
 
 
198
  query = "launch campaign schedule L-minus integration deadlines"
199
  try:
200
  results = knowledge_base.similarity_search(query, k=3)
201
  base_timeline = "\n\n".join([doc.page_content for doc in results])
202
  except Exception as e:
203
  base_timeline = f"ERROR during knowledge query: {e}"
204
+ safety = "βœ… Standard Review ~L-4 Months" if hazard_level == "Standard" else "\nπŸ›‘ HAZARDOUS EARLY REVIEWS:\n* L-12m: Phase 0\n* L-9m: Phase 1\n* L-6m: Phase 2\n* L-3m: Phase 3"
205
+ return f"πŸ•’ TIMELINE ({hazard_level})\n{safety}\n---\nSTANDARD MILESTONES:\n{base_timeline}"
206
 
207
+ # ==============================================================================
208
+ # πŸ›°οΈ ORBITAL TOOLS
209
+ # ==============================================================================
 
 
 
 
 
 
 
210
 
211
+ @mcp.tool()
212
+ def fetch_gp_data_tool(
213
+ query_type: str = "CATNR",
214
+ query_value: str = "",
215
+ format: Literal["TLE", "JSON", "JSON-PRETTY", "CSV", "XML", "KVN"] = "JSON"
216
+ ) -> Dict[str, Any]:
217
+ base_url = "https://celestrak.org/NORAD/elements/gp.php"
218
+ params = {"FORMAT": format, query_type.upper(): query_value}
219
+ try:
220
+ resp = requests.get(base_url, params=params, timeout=10)
221
+ resp.raise_for_status()
222
+ if "JSON" in format:
223
+ return resp.json()
224
+ else:
225
+ return {"raw": resp.text}
226
+ except Exception as e:
227
+ return {"error": str(e)}
228
+
229
+ @mcp.tool()
230
+ def propagate_orbit_tool(
231
+ tle_lines: List[str],
232
+ target_time: Optional[datetime] = None
233
+ ) -> Dict[str, Any]:
234
+ try:
235
+ sat = Satrec.twoline2rv(tle_lines[0], tle_lines[1])
236
+ target_time = target_time or datetime.now(timezone.utc)
237
+ jd, fr = jday(target_time.year, target_time.month, target_time.day,
238
+ target_time.hour, target_time.minute, target_time.second + target_time.microsecond*1e-6)
239
+ e, r, v = sat.sgp4(jd, fr)
240
+ if e != 0:
241
+ raise RuntimeError(f"SGP4 error code {e}")
242
+ return {"position_km": tuple(r), "velocity_kms": tuple(v), "timestamp": target_time.isoformat()}
243
+ except Exception as e:
244
+ return {"error": str(e)}
245
+
246
+ @mcp.tool()
247
+ def collision_check_tool(
248
+ sat1_tle: List[str],
249
+ sat2_tle: List[str],
250
+ threshold_km: float = 5.0,
251
+ target_time: Optional[datetime] = None
252
+ ) -> Dict[str, Any]:
253
+ try:
254
+ pos1 = propagate_orbit_tool(sat1_tle, target_time)["position_km"]
255
+ pos2 = propagate_orbit_tool(sat2_tle, target_time)["position_km"]
256
+ distance = sqrt(sum((a - b) ** 2 for a, b in zip(pos1, pos2)))
257
+ warning = distance <= threshold_km
258
+ return {"distance_km": distance, "collision_warning": warning, "threshold_km": threshold_km, "timestamp": (target_time or datetime.now(timezone.utc)).isoformat()}
259
+ except Exception as e:
260
+ return {"error": str(e)}
261
+
262
+ # ==============================================================================
263
+ # 🏁 RUN MCP SERVER
264
+ # ==============================================================================
265
 
266
  if __name__ == "__main__":
267
+ mcp.run()