| """ |
| Quick test script for SPARKNET FastAPI backend |
| Tests all major endpoints with a sample patent. |
| """ |
|
|
| import requests |
| import json |
| import time |
| from pathlib import Path |
| from rich.console import Console |
| from rich.table import Table |
| from rich.progress import Progress, SpinnerColumn, TextColumn |
|
|
| console = Console() |
|
|
| API_BASE = "http://localhost:8000" |
|
|
| def test_health(): |
| """Test health endpoint""" |
| console.print("\n[bold blue]1. Testing Health Endpoint[/bold blue]") |
|
|
| response = requests.get(f"{API_BASE}/api/health") |
| data = response.json() |
|
|
| console.print(f"Status: [green]{data['status']}[/green]") |
| console.print(f"Active Workflows: {data['statistics']['active_workflows']}") |
| console.print(f"Processed Patents: {data['statistics']['processed_patents']}") |
|
|
| return response.status_code == 200 |
|
|
| def test_upload(patent_path): |
| """Test patent upload""" |
| console.print("\n[bold blue]2. Testing Patent Upload[/bold blue]") |
|
|
| if not Path(patent_path).exists(): |
| console.print(f"[red]Patent file not found: {patent_path}[/red]") |
| console.print("[yellow]Using mock upload test (no actual file)[/yellow]") |
| return None |
|
|
| with open(patent_path, 'rb') as f: |
| files = {'file': (Path(patent_path).name, f, 'application/pdf')} |
| response = requests.post(f"{API_BASE}/api/patents/upload", files=files) |
|
|
| if response.status_code == 200: |
| data = response.json() |
| console.print(f"[green]✓[/green] Patent uploaded successfully") |
| console.print(f"Patent ID: {data['patent_id']}") |
| console.print(f"Filename: {data['filename']}") |
| console.print(f"Size: {data['size']} bytes") |
| return data['patent_id'] |
| else: |
| console.print(f"[red]✗[/red] Upload failed: {response.text}") |
| return None |
|
|
| def test_workflow(patent_id): |
| """Test workflow execution""" |
| console.print("\n[bold blue]3. Testing Workflow Execution[/bold blue]") |
|
|
| payload = {"patent_id": patent_id, "scenario": "patent_wakeup"} |
| response = requests.post( |
| f"{API_BASE}/api/workflows/execute", |
| json=payload |
| ) |
|
|
| if response.status_code == 200: |
| data = response.json() |
| console.print(f"[green]✓[/green] Workflow started") |
| console.print(f"Workflow ID: {data['workflow_id']}") |
| return data['workflow_id'] |
| else: |
| console.print(f"[red]✗[/red] Workflow start failed: {response.text}") |
| return None |
|
|
| def monitor_workflow(workflow_id): |
| """Monitor workflow progress""" |
| console.print("\n[bold blue]4. Monitoring Workflow Progress[/bold blue]") |
|
|
| with Progress( |
| SpinnerColumn(), |
| TextColumn("[progress.description]{task.description}"), |
| console=console |
| ) as progress: |
|
|
| task = progress.add_task("Processing workflow...", total=100) |
|
|
| while True: |
| response = requests.get(f"{API_BASE}/api/workflows/{workflow_id}") |
|
|
| if response.status_code != 200: |
| console.print("[red]Failed to get workflow status[/red]") |
| break |
|
|
| data = response.json() |
| status = data['status'] |
| prog = data.get('progress', 0) |
| current_step = data.get('current_step', 'initializing') |
|
|
| progress.update(task, completed=prog, description=f"Step: {current_step}") |
|
|
| if status in ['completed', 'failed']: |
| break |
|
|
| time.sleep(2) |
|
|
| |
| response = requests.get(f"{API_BASE}/api/workflows/{workflow_id}") |
| data = response.json() |
|
|
| if data['status'] == 'completed': |
| console.print(f"\n[green]✓ Workflow completed successfully![/green]") |
| display_results(data['result']) |
| else: |
| console.print(f"\n[red]✗ Workflow failed: {data.get('error', 'Unknown error')}[/red]") |
|
|
| def display_results(result): |
| """Display workflow results""" |
| console.print("\n[bold]Analysis Results:[/bold]\n") |
|
|
| |
| console.print(f"Quality Score: [blue]{result.get('quality_score', 0):.2f}[/blue]") |
|
|
| |
| doc_analysis = result.get('document_analysis', {}) |
| if doc_analysis: |
| console.print(f"\n[bold]Patent Analysis:[/bold]") |
| console.print(f" TRL Level: {doc_analysis.get('trl_level', 'N/A')}/9") |
| console.print(f" Key Innovations: {len(doc_analysis.get('key_innovations', []))}") |
|
|
| |
| market_analysis = result.get('market_analysis', {}) |
| if market_analysis: |
| opportunities = market_analysis.get('opportunities', []) |
| console.print(f"\n[bold]Market Opportunities:[/bold]") |
| console.print(f" Found: {len(opportunities)} opportunities") |
|
|
| if opportunities: |
| table = Table(show_header=True) |
| table.add_column("Sector", style="cyan") |
| table.add_column("Market Size", justify="right") |
| table.add_column("Growth", justify="right") |
| table.add_column("Fit", style="green") |
|
|
| for opp in opportunities[:5]: |
| table.add_row( |
| opp.get('sector', '')[:30], |
| f"${opp.get('market_size_usd', 0)/1e9:.1f}B", |
| f"{opp.get('growth_rate_percent', 0)}%", |
| opp.get('technology_fit', '') |
| ) |
|
|
| console.print(table) |
|
|
| |
| matches = result.get('matches', []) |
| if matches: |
| console.print(f"\n[bold]Stakeholder Matches:[/bold]") |
| console.print(f" Found: {len(matches)} potential partners") |
|
|
| table = Table(show_header=True) |
| table.add_column("Partner", style="cyan") |
| table.add_column("Type") |
| table.add_column("Location") |
| table.add_column("Fit Score", justify="right", style="green") |
|
|
| for match in matches[:5]: |
| table.add_row( |
| match.get('stakeholder_name', '')[:30], |
| match.get('stakeholder_type', ''), |
| match.get('location', ''), |
| f"{match.get('overall_fit_score', 0)*100:.0f}%" |
| ) |
|
|
| console.print(table) |
|
|
| |
| brief = result.get('brief', {}) |
| if brief: |
| console.print(f"\n[bold]Valorization Brief:[/bold]") |
| console.print(f" PDF: {brief.get('pdf_path', 'Not generated')}") |
|
|
| def test_list_endpoints(): |
| """Test list endpoints""" |
| console.print("\n[bold blue]5. Testing List Endpoints[/bold blue]") |
|
|
| |
| response = requests.get(f"{API_BASE}/api/patents/") |
| if response.status_code == 200: |
| patents = response.json() |
| console.print(f"[green]✓[/green] Found {len(patents)} patents") |
|
|
| |
| response = requests.get(f"{API_BASE}/api/workflows/") |
| if response.status_code == 200: |
| workflows = response.json() |
| console.print(f"[green]✓[/green] Found {len(workflows)} workflows") |
|
|
| def main(): |
| """Main test function""" |
| console.print("\n[bold cyan]SPARKNET API Test Suite[/bold cyan]\n") |
|
|
| try: |
| |
| if not test_health(): |
| console.print("[red]Health check failed - is the API running?[/red]") |
| console.print("Start with: [yellow]python -m api.main[/yellow]") |
| return |
|
|
| |
| dataset_dir = Path("Dataset") |
| test_patents = list(dataset_dir.glob("*.pdf")) if dataset_dir.exists() else [] |
|
|
| if not test_patents: |
| console.print("\n[yellow]No patents found in Dataset/ directory[/yellow]") |
| console.print("Skipping upload and workflow tests") |
| return |
|
|
| |
| test_patent = test_patents[0] |
| console.print(f"\nUsing test patent: [cyan]{test_patent.name}[/cyan]") |
|
|
| |
| patent_id = test_upload(str(test_patent)) |
| if not patent_id: |
| return |
|
|
| |
| workflow_id = test_workflow(patent_id) |
| if not workflow_id: |
| return |
|
|
| |
| monitor_workflow(workflow_id) |
|
|
| |
| test_list_endpoints() |
|
|
| console.print("\n[bold green]✓ All tests completed successfully![/bold green]\n") |
|
|
| except requests.exceptions.ConnectionError: |
| console.print("\n[red]✗ Cannot connect to API[/red]") |
| console.print("Make sure the API is running: [yellow]python -m api.main[/yellow]") |
| except Exception as e: |
| console.print(f"\n[red]✗ Test failed: {e}[/red]") |
| import traceback |
| traceback.print_exc() |
|
|
| if __name__ == "__main__": |
| main() |
|
|