File size: 4,085 Bytes
a1e520f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Shared utilities for the PoseBusters MCP wrapper."""

# Standard library imports
import csv
import mimetypes
from pathlib import Path
from typing import Any, Dict, List
import json

# Third-party imports
from fastapi import UploadFile, HTTPException

# Constants
ALLOWED_MIME_TYPES = {
    ".sdf": ["chemical/x-mdl-sdfile", "application/octet-stream", "text/plain"],
    ".pdb": ["chemical/x-pdb", "application/octet-stream", "text/plain"]
}

def validate_mime_type(file: UploadFile, expected_ext: str) -> None:
    """Validate file MIME type and extension."""
    if not file.filename:
        raise HTTPException(status_code=400, detail=f"Missing filename for {expected_ext} file")
    
    file_ext = Path(file.filename).suffix.lower()
    if file_ext != expected_ext:
        raise HTTPException(
            status_code=400, 
            detail=f"Invalid file extension. Expected {expected_ext}, got {file_ext}"
        )
    
    content_type = file.content_type or mimetypes.guess_type(file.filename)[0] or "application/octet-stream"
    if not content_type or content_type not in ALLOWED_MIME_TYPES[expected_ext]:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid MIME type for {expected_ext} file"
        )

def format_results_to_mcp(csv_text: str, stderr_text: str = "", ligand_id: str = "unknown") -> Dict[str, Any]:
    """Format PoseBusters results to MCP standard output format."""
    rows: List[List[str]] = []
    
    if not csv_text.strip():
        error_msg = stderr_text.strip().splitlines()[0] if stderr_text.strip() else "PoseBusters output is empty."
        rows.append([ligand_id, "❌", "0/0", error_msg])
    else:
        reader = csv.DictReader(csv_text.strip().splitlines())
        for row in reader:
            ligand_id = row.get("molecule", "?")
            failed_tests = []
            passed_n = total_n = 0

            for k, v in row.items():
                if k in {"file", "molecule"}:
                    continue
                if v == "True":
                    passed_n += 1
                    total_n += 1
                elif v == "False":
                    failed_tests.append(k)
                    total_n += 1

            rows.append([
                ligand_id,
                "✅" if not failed_tests else "❌",
                f"{passed_n}/{total_n}",
                ", ".join(failed_tests) if failed_tests else "All tests passed"
            ])

    # Create response
    response = {
        "object_id": "validation_results",
        "data": {
            "columns": ["ligand_id", "status", "passed/total", "details"],
            "rows": rows
        }
    }

    # Validate row lengths
    expected_cols = len(response["data"]["columns"])
    for row in response["data"]["rows"]:
        if len(row) != expected_cols:
            raise ValueError(f"Row length {len(row)} does not match columns length {expected_cols}")

    return response

def get_mcp_context() -> Dict[str, Any]:
    """Return MCP context from the JSON file."""
    try:
        # Get the directory where the current file (utils.py) is located
        current_dir = Path(__file__).parent
        context_path = current_dir / 'mcp_context.json'
        
        # If not found in current directory, try the app root (for Spaces)
        if not context_path.exists():
            context_path = current_dir / '..' / 'mcp_context.json'
            context_path = context_path.resolve()
            
        if not context_path.exists():
            raise FileNotFoundError(f"MCP context file not found at {context_path}")
            
        with open(context_path, 'r') as f:
            return json.load(f)
    except FileNotFoundError as e:
        raise HTTPException(
            status_code=500,
            detail=str(e)
        )
    except json.JSONDecodeError:
        raise HTTPException(
            status_code=500,
            detail="Invalid MCP context JSON format"
        )