| """ |
| RAG CLI Commands |
| |
| Commands: |
| sparknet rag index <file> - Index document for retrieval |
| sparknet rag search <query> - Search indexed documents |
| sparknet rag ask <question> - Answer question using RAG |
| sparknet rag status - Show index status |
| """ |
|
|
| import typer |
| from typing import Optional, List |
| from pathlib import Path |
| import json |
| import sys |
|
|
| |
| rag_app = typer.Typer( |
| name="rag", |
| help="RAG and retrieval commands", |
| ) |
|
|
|
|
| @rag_app.command("index") |
| def index_document( |
| files: List[Path] = typer.Argument(..., help="Document file(s) to index"), |
| collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| embedding_model: str = typer.Option("nomic-embed-text", "--model", "-m", help="Embedding model"), |
| ): |
| """ |
| Index document(s) for RAG retrieval. |
| |
| Example: |
| sparknet rag index document.pdf |
| sparknet rag index *.pdf --collection contracts |
| """ |
| from loguru import logger |
|
|
| |
| valid_files = [] |
| for f in files: |
| if f.exists(): |
| valid_files.append(f) |
| else: |
| typer.echo(f"Warning: File not found, skipping: {f}", err=True) |
|
|
| if not valid_files: |
| typer.echo("Error: No valid files to index", err=True) |
| raise typer.Exit(1) |
|
|
| typer.echo(f"Indexing {len(valid_files)} document(s)...") |
|
|
| try: |
| from ..rag import ( |
| VectorStoreConfig, |
| EmbeddingConfig, |
| get_document_indexer, |
| ) |
|
|
| |
| store_config = VectorStoreConfig(collection_name=collection) |
| embed_config = EmbeddingConfig(ollama_model=embedding_model) |
|
|
| |
| indexer = get_document_indexer() |
|
|
| |
| results = indexer.index_batch([str(f) for f in valid_files]) |
|
|
| |
| successful = sum(1 for r in results if r.success) |
| total_chunks = sum(r.num_chunks_indexed for r in results) |
|
|
| typer.echo(f"\nIndexing complete:") |
| typer.echo(f" Documents: {successful}/{len(results)} successful") |
| typer.echo(f" Chunks indexed: {total_chunks}") |
|
|
| for r in results: |
| status = "✓" if r.success else "✗" |
| typer.echo(f" [{status}] {r.source_path}: {r.num_chunks_indexed} chunks") |
| if r.error: |
| typer.echo(f" Error: {r.error}") |
|
|
| except ImportError as e: |
| typer.echo(f"Error: Missing dependency - {e}", err=True) |
| raise typer.Exit(1) |
| except Exception as e: |
| typer.echo(f"Error indexing documents: {e}", err=True) |
| raise typer.Exit(1) |
|
|
|
|
| @rag_app.command("search") |
| def search_documents( |
| query: str = typer.Argument(..., help="Search query"), |
| top_k: int = typer.Option(5, "--top", "-k", help="Number of results"), |
| collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), |
| chunk_type: Optional[str] = typer.Option(None, "--type", "-t", help="Filter by chunk type"), |
| output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), |
| ): |
| """ |
| Search indexed documents. |
| |
| Example: |
| sparknet rag search "payment terms" --top 10 |
| sparknet rag search "table data" --type table |
| """ |
| typer.echo(f"Searching: {query}") |
|
|
| try: |
| from ..rag import get_document_retriever, RetrieverConfig |
|
|
| |
| config = RetrieverConfig(default_top_k=top_k) |
| retriever = get_document_retriever(config) |
|
|
| |
| filters = {} |
| if document_id: |
| filters["document_id"] = document_id |
| if chunk_type: |
| filters["chunk_type"] = chunk_type |
|
|
| |
| chunks = retriever.retrieve(query, top_k=top_k, filters=filters if filters else None) |
|
|
| if not chunks: |
| typer.echo("No results found.") |
| return |
|
|
| |
| output_data = { |
| "query": query, |
| "num_results": len(chunks), |
| "results": [ |
| { |
| "chunk_id": c.chunk_id, |
| "document_id": c.document_id, |
| "page": c.page, |
| "chunk_type": c.chunk_type, |
| "similarity": c.similarity, |
| "text": c.text[:500] + "..." if len(c.text) > 500 else c.text, |
| } |
| for c in chunks |
| ], |
| } |
|
|
| if output: |
| with open(output, "w") as f: |
| json.dump(output_data, f, indent=2) |
| typer.echo(f"Results written to: {output}") |
| else: |
| typer.echo(f"\nFound {len(chunks)} results:\n") |
| for i, c in enumerate(chunks, 1): |
| typer.echo(f"[{i}] Similarity: {c.similarity:.3f}") |
| if c.page is not None: |
| typer.echo(f" Page: {c.page + 1}, Type: {c.chunk_type or 'text'}") |
| typer.echo(f" {c.text[:200]}...") |
| typer.echo() |
|
|
| except Exception as e: |
| typer.echo(f"Error searching: {e}", err=True) |
| raise typer.Exit(1) |
|
|
|
|
| @rag_app.command("ask") |
| def ask_question( |
| question: str = typer.Argument(..., help="Question to answer"), |
| top_k: int = typer.Option(5, "--top", "-k", help="Number of context chunks"), |
| collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| document_id: Optional[str] = typer.Option(None, "--document", "-d", help="Filter by document ID"), |
| output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output JSON file"), |
| show_evidence: bool = typer.Option(True, "--evidence/--no-evidence", help="Show evidence sources"), |
| ): |
| """ |
| Answer a question using RAG. |
| |
| Example: |
| sparknet rag ask "What are the payment terms?" |
| sparknet rag ask "What is the contract value?" --document contract123 |
| """ |
| typer.echo(f"Question: {question}") |
| typer.echo("Processing...") |
|
|
| try: |
| from ..rag import get_grounded_generator, GeneratorConfig |
|
|
| |
| config = GeneratorConfig() |
| generator = get_grounded_generator(config) |
|
|
| |
| filters = {"document_id": document_id} if document_id else None |
|
|
| |
| result = generator.answer_question(question, top_k=top_k, filters=filters) |
|
|
| |
| output_data = { |
| "question": question, |
| "answer": result.answer, |
| "confidence": result.confidence, |
| "abstained": result.abstained, |
| "abstain_reason": result.abstain_reason, |
| "citations": [ |
| { |
| "index": c.index, |
| "page": c.page, |
| "snippet": c.text_snippet, |
| "confidence": c.confidence, |
| } |
| for c in result.citations |
| ], |
| "num_chunks_used": result.num_chunks_used, |
| } |
|
|
| if output: |
| with open(output, "w") as f: |
| json.dump(output_data, f, indent=2) |
| typer.echo(f"Results written to: {output}") |
| else: |
| typer.echo(f"\nAnswer: {result.answer}") |
| typer.echo(f"\nConfidence: {result.confidence:.2f}") |
|
|
| if result.abstained: |
| typer.echo(f"Note: {result.abstain_reason}") |
|
|
| if show_evidence and result.citations: |
| typer.echo(f"\nSources ({len(result.citations)}):") |
| for c in result.citations: |
| page_info = f"Page {c.page + 1}" if c.page is not None else "" |
| typer.echo(f" [{c.index}] {page_info}: {c.text_snippet[:80]}...") |
|
|
| except Exception as e: |
| typer.echo(f"Error generating answer: {e}", err=True) |
| raise typer.Exit(1) |
|
|
|
|
| @rag_app.command("status") |
| def show_status( |
| collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| ): |
| """ |
| Show RAG index status. |
| |
| Example: |
| sparknet rag status |
| sparknet rag status --collection contracts |
| """ |
| typer.echo("RAG Index Status") |
| typer.echo("=" * 40) |
|
|
| try: |
| from ..rag import get_vector_store, VectorStoreConfig |
|
|
| config = VectorStoreConfig(collection_name=collection) |
| store = get_vector_store(config) |
|
|
| |
| total_chunks = store.count() |
|
|
| typer.echo(f"Collection: {collection}") |
| typer.echo(f"Total chunks: {total_chunks}") |
|
|
| |
| if hasattr(store, 'list_documents'): |
| doc_ids = store.list_documents() |
| typer.echo(f"Documents indexed: {len(doc_ids)}") |
|
|
| if doc_ids: |
| typer.echo("\nDocuments:") |
| for doc_id in doc_ids[:10]: |
| chunk_count = store.count(doc_id) |
| typer.echo(f" - {doc_id}: {chunk_count} chunks") |
|
|
| if len(doc_ids) > 10: |
| typer.echo(f" ... and {len(doc_ids) - 10} more") |
|
|
| except Exception as e: |
| typer.echo(f"Error getting status: {e}", err=True) |
| raise typer.Exit(1) |
|
|
|
|
| @rag_app.command("delete") |
| def delete_document( |
| document_id: str = typer.Argument(..., help="Document ID to delete"), |
| collection: str = typer.Option("sparknet_documents", "--collection", "-c", help="Collection name"), |
| force: bool = typer.Option(False, "--force", "-f", help="Skip confirmation"), |
| ): |
| """ |
| Delete a document from the index. |
| |
| Example: |
| sparknet rag delete doc123 |
| sparknet rag delete doc123 --force |
| """ |
| if not force: |
| confirm = typer.confirm(f"Delete document '{document_id}' from index?") |
| if not confirm: |
| typer.echo("Cancelled.") |
| return |
|
|
| try: |
| from ..rag import get_vector_store, VectorStoreConfig |
|
|
| config = VectorStoreConfig(collection_name=collection) |
| store = get_vector_store(config) |
|
|
| deleted = store.delete_document(document_id) |
| typer.echo(f"Deleted {deleted} chunks for document: {document_id}") |
|
|
| except Exception as e: |
| typer.echo(f"Error deleting document: {e}", err=True) |
| raise typer.Exit(1) |
|
|