|
|
"""Shared utilities for the PoseBusters MCP wrapper."""
|
|
|
|
|
|
|
|
|
import csv
|
|
|
import mimetypes
|
|
|
from pathlib import Path
|
|
|
from typing import Any, Dict, List
|
|
|
import json
|
|
|
|
|
|
|
|
|
from fastapi import UploadFile, HTTPException
|
|
|
|
|
|
|
|
|
ALLOWED_MIME_TYPES = {
|
|
|
".sdf": ["chemical/x-mdl-sdfile", "application/octet-stream", "text/plain"],
|
|
|
".pdb": ["chemical/x-pdb", "application/octet-stream", "text/plain"]
|
|
|
}
|
|
|
|
|
|
def validate_mime_type(file: UploadFile, expected_ext: str) -> None:
|
|
|
"""Validate file MIME type and extension."""
|
|
|
if not file.filename:
|
|
|
raise HTTPException(status_code=400, detail=f"Missing filename for {expected_ext} file")
|
|
|
|
|
|
file_ext = Path(file.filename).suffix.lower()
|
|
|
if file_ext != expected_ext:
|
|
|
raise HTTPException(
|
|
|
status_code=400,
|
|
|
detail=f"Invalid file extension. Expected {expected_ext}, got {file_ext}"
|
|
|
)
|
|
|
|
|
|
content_type = file.content_type or mimetypes.guess_type(file.filename)[0] or "application/octet-stream"
|
|
|
if not content_type or content_type not in ALLOWED_MIME_TYPES[expected_ext]:
|
|
|
raise HTTPException(
|
|
|
status_code=400,
|
|
|
detail=f"Invalid MIME type for {expected_ext} file"
|
|
|
)
|
|
|
|
|
|
def format_results_to_mcp(csv_text: str, stderr_text: str = "", ligand_id: str = "unknown") -> Dict[str, Any]:
|
|
|
"""Format PoseBusters results to MCP standard output format."""
|
|
|
rows: List[List[str]] = []
|
|
|
|
|
|
if not csv_text.strip():
|
|
|
error_msg = stderr_text.strip().splitlines()[0] if stderr_text.strip() else "PoseBusters output is empty."
|
|
|
rows.append([ligand_id, "❌", "0/0", error_msg])
|
|
|
else:
|
|
|
reader = csv.DictReader(csv_text.strip().splitlines())
|
|
|
for row in reader:
|
|
|
ligand_id = row.get("molecule", "?")
|
|
|
failed_tests = []
|
|
|
passed_n = total_n = 0
|
|
|
|
|
|
for k, v in row.items():
|
|
|
if k in {"file", "molecule"}:
|
|
|
continue
|
|
|
if v == "True":
|
|
|
passed_n += 1
|
|
|
total_n += 1
|
|
|
elif v == "False":
|
|
|
failed_tests.append(k)
|
|
|
total_n += 1
|
|
|
|
|
|
rows.append([
|
|
|
ligand_id,
|
|
|
"✅" if not failed_tests else "❌",
|
|
|
f"{passed_n}/{total_n}",
|
|
|
", ".join(failed_tests) if failed_tests else "All tests passed"
|
|
|
])
|
|
|
|
|
|
|
|
|
response = {
|
|
|
"object_id": "validation_results",
|
|
|
"data": {
|
|
|
"columns": ["ligand_id", "status", "passed/total", "details"],
|
|
|
"rows": rows
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
expected_cols = len(response["data"]["columns"])
|
|
|
for row in response["data"]["rows"]:
|
|
|
if len(row) != expected_cols:
|
|
|
raise ValueError(f"Row length {len(row)} does not match columns length {expected_cols}")
|
|
|
|
|
|
return response
|
|
|
|
|
|
def get_mcp_context() -> Dict[str, Any]:
|
|
|
"""Return MCP context from the JSON file."""
|
|
|
try:
|
|
|
|
|
|
current_dir = Path(__file__).parent
|
|
|
context_path = current_dir / 'mcp_context.json'
|
|
|
|
|
|
|
|
|
if not context_path.exists():
|
|
|
context_path = current_dir / '..' / 'mcp_context.json'
|
|
|
context_path = context_path.resolve()
|
|
|
|
|
|
if not context_path.exists():
|
|
|
raise FileNotFoundError(f"MCP context file not found at {context_path}")
|
|
|
|
|
|
with open(context_path, 'r') as f:
|
|
|
return json.load(f)
|
|
|
except FileNotFoundError as e:
|
|
|
raise HTTPException(
|
|
|
status_code=500,
|
|
|
detail=str(e)
|
|
|
)
|
|
|
except json.JSONDecodeError:
|
|
|
raise HTTPException(
|
|
|
status_code=500,
|
|
|
detail="Invalid MCP context JSON format"
|
|
|
) |