| """ |
| Document Intelligence Tools for Agents |
| |
| Tool implementations for DocumentAgent integration. |
| Each tool is designed for ReAct-style agent execution. |
| """ |
|
|
| import json |
| import logging |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Union |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ToolResult: |
| """Result from a tool execution.""" |
|
|
| success: bool |
| data: Any = None |
| error: Optional[str] = None |
| evidence: List[Dict[str, Any]] = None |
|
|
| def __post_init__(self): |
| if self.evidence is None: |
| self.evidence = [] |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "success": self.success, |
| "data": self.data, |
| "error": self.error, |
| "evidence": self.evidence, |
| } |
|
|
|
|
| class DocumentTool: |
| """Base class for document tools.""" |
|
|
| name: str = "base_tool" |
| description: str = "Base document tool" |
|
|
| def execute(self, **kwargs) -> ToolResult: |
| """Execute the tool.""" |
| raise NotImplementedError |
|
|
|
|
| class ParseDocumentTool(DocumentTool): |
| """ |
| Parse a document into semantic chunks. |
| |
| Input: |
| path: Path to document file |
| max_pages: Optional maximum pages to process |
| |
| Output: |
| ParseResult with chunks and metadata |
| """ |
|
|
| name = "parse_document" |
| description = "Parse a document into semantic chunks with OCR and layout detection" |
|
|
| def __init__(self, parser=None): |
| from ..parsing import DocumentParser |
| self.parser = parser or DocumentParser() |
|
|
| def execute( |
| self, |
| path: str, |
| max_pages: Optional[int] = None, |
| **kwargs |
| ) -> ToolResult: |
| try: |
| |
| if max_pages: |
| self.parser.config.max_pages = max_pages |
|
|
| result = self.parser.parse(path) |
|
|
| return ToolResult( |
| success=True, |
| data={ |
| "doc_id": result.doc_id, |
| "filename": result.filename, |
| "num_pages": result.num_pages, |
| "num_chunks": len(result.chunks), |
| "chunks": [ |
| { |
| "chunk_id": c.chunk_id, |
| "type": c.chunk_type.value, |
| "text": c.text[:500], |
| "page": c.page, |
| "confidence": c.confidence, |
| } |
| for c in result.chunks[:20] |
| ], |
| "markdown_preview": result.markdown_full[:2000], |
| }, |
| ) |
| except Exception as e: |
| logger.error(f"Parse document failed: {e}") |
| return ToolResult(success=False, error=str(e)) |
|
|
|
|
| class ExtractFieldsTool(DocumentTool): |
| """ |
| Extract fields from a parsed document using a schema. |
| |
| Input: |
| parse_result: Previously parsed document |
| schema: Extraction schema (dict or ExtractionSchema) |
| fields: Optional list of specific fields to extract |
| |
| Output: |
| ExtractionResult with values and evidence |
| """ |
|
|
| name = "extract_fields" |
| description = "Extract structured fields from document using a schema" |
|
|
| def __init__(self, extractor=None): |
| from ..extraction import FieldExtractor |
| self.extractor = extractor or FieldExtractor() |
|
|
| def execute( |
| self, |
| parse_result: Any, |
| schema: Union[Dict, Any], |
| fields: Optional[List[str]] = None, |
| **kwargs |
| ) -> ToolResult: |
| try: |
| from ..extraction import ExtractionSchema |
|
|
| |
| if isinstance(schema, dict): |
| schema = ExtractionSchema.from_json_schema(schema) |
|
|
| |
| if fields: |
| schema.fields = [f for f in schema.fields if f.name in fields] |
|
|
| result = self.extractor.extract(parse_result, schema) |
|
|
| return ToolResult( |
| success=True, |
| data={ |
| "extracted_data": result.data, |
| "confidence": result.overall_confidence, |
| "abstained_fields": result.abstained_fields, |
| }, |
| evidence=[ |
| { |
| "chunk_id": e.chunk_id, |
| "page": e.page, |
| "bbox": e.bbox.xyxy, |
| "snippet": e.snippet, |
| "confidence": e.confidence, |
| } |
| for e in result.evidence |
| ], |
| ) |
| except Exception as e: |
| logger.error(f"Extract fields failed: {e}") |
| return ToolResult(success=False, error=str(e)) |
|
|
|
|
| class SearchChunksTool(DocumentTool): |
| """ |
| Search for chunks containing specific text or matching criteria. |
| |
| Input: |
| parse_result: Parsed document |
| query: Search query |
| chunk_types: Optional list of chunk types to filter |
| top_k: Maximum results to return |
| |
| Output: |
| List of matching chunks with scores |
| """ |
|
|
| name = "search_chunks" |
| description = "Search document chunks for specific content" |
|
|
| def execute( |
| self, |
| parse_result: Any, |
| query: str, |
| chunk_types: Optional[List[str]] = None, |
| top_k: int = 10, |
| **kwargs |
| ) -> ToolResult: |
| try: |
| from ..chunks import ChunkType |
|
|
| query_lower = query.lower() |
| results = [] |
|
|
| for chunk in parse_result.chunks: |
| |
| if chunk_types: |
| if chunk.chunk_type.value not in chunk_types: |
| continue |
|
|
| |
| text_lower = chunk.text.lower() |
| if query_lower in text_lower: |
| |
| count = text_lower.count(query_lower) |
| position = text_lower.find(query_lower) |
| score = count * 10 + (1 / (position + 1)) * 5 |
|
|
| results.append({ |
| "chunk_id": chunk.chunk_id, |
| "type": chunk.chunk_type.value, |
| "text": chunk.text[:300], |
| "page": chunk.page, |
| "score": score, |
| "bbox": chunk.bbox.xyxy, |
| }) |
|
|
| |
| results.sort(key=lambda x: x["score"], reverse=True) |
| results = results[:top_k] |
|
|
| return ToolResult( |
| success=True, |
| data={ |
| "query": query, |
| "total_matches": len(results), |
| "results": results, |
| }, |
| ) |
| except Exception as e: |
| logger.error(f"Search chunks failed: {e}") |
| return ToolResult(success=False, error=str(e)) |
|
|
|
|
| class GetChunkDetailsTool(DocumentTool): |
| """ |
| Get detailed information about a specific chunk. |
| |
| Input: |
| parse_result: Parsed document |
| chunk_id: ID of chunk to retrieve |
| |
| Output: |
| Full chunk details including content and metadata |
| """ |
|
|
| name = "get_chunk_details" |
| description = "Get detailed information about a specific chunk" |
|
|
| def execute( |
| self, |
| parse_result: Any, |
| chunk_id: str, |
| **kwargs |
| ) -> ToolResult: |
| try: |
| from ..chunks import TableChunk, ChartChunk |
|
|
| |
| chunk = None |
| for c in parse_result.chunks: |
| if c.chunk_id == chunk_id: |
| chunk = c |
| break |
|
|
| if chunk is None: |
| return ToolResult( |
| success=False, |
| error=f"Chunk not found: {chunk_id}" |
| ) |
|
|
| data = { |
| "chunk_id": chunk.chunk_id, |
| "doc_id": chunk.doc_id, |
| "type": chunk.chunk_type.value, |
| "text": chunk.text, |
| "page": chunk.page, |
| "bbox": { |
| "x_min": chunk.bbox.x_min, |
| "y_min": chunk.bbox.y_min, |
| "x_max": chunk.bbox.x_max, |
| "y_max": chunk.bbox.y_max, |
| "normalized": chunk.bbox.normalized, |
| }, |
| "confidence": chunk.confidence, |
| "sequence_index": chunk.sequence_index, |
| } |
|
|
| |
| if isinstance(chunk, TableChunk): |
| data["table"] = { |
| "num_rows": chunk.num_rows, |
| "num_cols": chunk.num_cols, |
| "markdown": chunk.to_markdown(), |
| "csv": chunk.to_csv(), |
| } |
| elif isinstance(chunk, ChartChunk): |
| data["chart"] = { |
| "chart_type": chunk.chart_type, |
| "title": chunk.title, |
| "data_points": len(chunk.data_points), |
| "trends": chunk.trends, |
| } |
|
|
| return ToolResult(success=True, data=data) |
|
|
| except Exception as e: |
| logger.error(f"Get chunk details failed: {e}") |
| return ToolResult(success=False, error=str(e)) |
|
|
|
|
| class GetTableDataTool(DocumentTool): |
| """ |
| Get structured data from a table chunk. |
| |
| Input: |
| parse_result: Parsed document |
| chunk_id: ID of table chunk |
| format: Output format (json, csv, markdown) |
| |
| Output: |
| Table data in requested format |
| """ |
|
|
| name = "get_table_data" |
| description = "Extract structured data from a table" |
|
|
| def execute( |
| self, |
| parse_result: Any, |
| chunk_id: str, |
| format: str = "json", |
| **kwargs |
| ) -> ToolResult: |
| try: |
| from ..chunks import TableChunk |
|
|
| |
| table = None |
| for c in parse_result.chunks: |
| if c.chunk_id == chunk_id and isinstance(c, TableChunk): |
| table = c |
| break |
|
|
| if table is None: |
| return ToolResult( |
| success=False, |
| error=f"Table chunk not found: {chunk_id}" |
| ) |
|
|
| if format == "csv": |
| data = table.to_csv() |
| elif format == "markdown": |
| data = table.to_markdown() |
| else: |
| data = table.to_structured_json() |
|
|
| return ToolResult( |
| success=True, |
| data={ |
| "chunk_id": chunk_id, |
| "format": format, |
| "num_rows": table.num_rows, |
| "num_cols": table.num_cols, |
| "content": data, |
| }, |
| evidence=[{ |
| "chunk_id": chunk_id, |
| "page": table.page, |
| "bbox": table.bbox.xyxy, |
| "source_type": "table", |
| }], |
| ) |
| except Exception as e: |
| logger.error(f"Get table data failed: {e}") |
| return ToolResult(success=False, error=str(e)) |
|
|
|
|
| class AnswerQuestionTool(DocumentTool): |
| """ |
| Answer a question about the document using available chunks. |
| |
| Input: |
| parse_result: Parsed document |
| question: Question to answer |
| use_rag: Whether to use RAG for retrieval (requires indexed document) |
| document_id: Document ID for RAG retrieval (defaults to parse_result.doc_id) |
| top_k: Number of chunks to consider |
| |
| Output: |
| Answer with supporting evidence |
| """ |
|
|
| name = "answer_question" |
| description = "Answer a question about the document content" |
|
|
| def __init__(self, llm_client=None): |
| self.llm_client = llm_client |
|
|
| def execute( |
| self, |
| parse_result: Any, |
| question: str, |
| use_rag: bool = False, |
| document_id: Optional[str] = None, |
| top_k: int = 5, |
| **kwargs |
| ) -> ToolResult: |
| try: |
| |
| if use_rag: |
| return self._answer_with_rag( |
| question=question, |
| document_id=document_id or (parse_result.doc_id if parse_result else None), |
| top_k=top_k, |
| ) |
|
|
| |
| return self._answer_with_keywords( |
| parse_result=parse_result, |
| question=question, |
| top_k=top_k, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Answer question failed: {e}") |
| return ToolResult(success=False, error=str(e)) |
|
|
| def _answer_with_rag( |
| self, |
| question: str, |
| document_id: Optional[str], |
| top_k: int, |
| ) -> ToolResult: |
| """Answer using RAG retrieval.""" |
| try: |
| from .rag_tools import RAGAnswerTool |
| rag_tool = RAGAnswerTool(llm_client=self.llm_client) |
| return rag_tool.execute( |
| question=question, |
| document_id=document_id, |
| top_k=top_k, |
| ) |
| except ImportError: |
| return ToolResult( |
| success=False, |
| error="RAG module not available. Use use_rag=False or install chromadb." |
| ) |
|
|
| def _answer_with_keywords( |
| self, |
| parse_result: Any, |
| question: str, |
| top_k: int, |
| ) -> ToolResult: |
| """Answer using keyword-based search on parse_result.""" |
| if parse_result is None: |
| return ToolResult( |
| success=False, |
| error="parse_result is required when use_rag=False" |
| ) |
|
|
| |
| question_lower = question.lower() |
| relevant_chunks = [] |
|
|
| for chunk in parse_result.chunks: |
| text_lower = chunk.text.lower() |
| |
| keywords = [w for w in question_lower.split() if len(w) > 3] |
| matches = sum(1 for k in keywords if k in text_lower) |
| if matches > 0: |
| relevant_chunks.append((chunk, matches)) |
|
|
| |
| relevant_chunks.sort(key=lambda x: x[1], reverse=True) |
| top_chunks = relevant_chunks[:top_k] |
|
|
| if not top_chunks: |
| return ToolResult( |
| success=True, |
| data={ |
| "question": question, |
| "answer": "I could not find relevant information in the document to answer this question.", |
| "confidence": 0.0, |
| "abstained": True, |
| }, |
| ) |
|
|
| |
| context = "\n\n".join( |
| f"[Page {c.page}] {c.text}" |
| for c, _ in top_chunks |
| ) |
|
|
| |
| if self.llm_client is None: |
| return ToolResult( |
| success=True, |
| data={ |
| "question": question, |
| "answer": f"Based on the document: {top_chunks[0][0].text[:500]}", |
| "confidence": 0.6, |
| "context_chunks": len(top_chunks), |
| }, |
| evidence=[ |
| { |
| "chunk_id": c.chunk_id, |
| "page": c.page, |
| "bbox": c.bbox.xyxy, |
| "snippet": c.text[:200], |
| } |
| for c, _ in top_chunks |
| ], |
| ) |
|
|
| |
| try: |
| from ...rag import get_grounded_generator |
|
|
| generator = get_grounded_generator(llm_client=self.llm_client) |
|
|
| |
| chunk_dicts = [ |
| { |
| "chunk_id": c.chunk_id, |
| "document_id": c.doc_id, |
| "text": c.text, |
| "similarity": score / 10.0, |
| "page": c.page, |
| "chunk_type": c.chunk_type.value, |
| } |
| for c, score in top_chunks |
| ] |
|
|
| answer = generator.generate_answer( |
| question=question, |
| context=context, |
| chunks=chunk_dicts, |
| ) |
|
|
| return ToolResult( |
| success=True, |
| data={ |
| "question": question, |
| "answer": answer.text, |
| "confidence": answer.confidence, |
| "abstained": answer.abstained, |
| }, |
| evidence=[ |
| { |
| "chunk_id": c.chunk_id, |
| "page": c.page, |
| "bbox": c.bbox.xyxy, |
| "snippet": c.text[:200], |
| } |
| for c, _ in top_chunks |
| ], |
| ) |
|
|
| except ImportError: |
| |
| return ToolResult( |
| success=True, |
| data={ |
| "question": question, |
| "answer": f"Based on the document: {top_chunks[0][0].text[:500]}", |
| "confidence": 0.6, |
| "context_chunks": len(top_chunks), |
| }, |
| evidence=[ |
| { |
| "chunk_id": c.chunk_id, |
| "page": c.page, |
| "bbox": c.bbox.xyxy, |
| "snippet": c.text[:200], |
| } |
| for c, _ in top_chunks |
| ], |
| ) |
|
|
|
|
| class CropRegionTool(DocumentTool): |
| """ |
| Crop a region from a document page image. |
| |
| Input: |
| doc_path: Path to document |
| page: Page number (1-indexed) |
| bbox: Bounding box (x_min, y_min, x_max, y_max) |
| output_path: Optional path to save crop |
| |
| Output: |
| Crop image path or base64 data |
| """ |
|
|
| name = "crop_region" |
| description = "Crop a specific region from a document page" |
|
|
| def execute( |
| self, |
| doc_path: str, |
| page: int, |
| bbox: List[float], |
| output_path: Optional[str] = None, |
| **kwargs |
| ) -> ToolResult: |
| try: |
| from ..io import load_document, RenderOptions |
| from ..grounding import crop_region |
| from ..chunks import BoundingBox |
| from PIL import Image |
|
|
| |
| loader, renderer = load_document(doc_path) |
| page_image = renderer.render_page(page, RenderOptions(dpi=200)) |
| loader.close() |
|
|
| |
| bbox_obj = BoundingBox( |
| x_min=bbox[0], |
| y_min=bbox[1], |
| x_max=bbox[2], |
| y_max=bbox[3], |
| normalized=True, |
| ) |
|
|
| |
| crop = crop_region(page_image, bbox_obj) |
|
|
| |
| if output_path: |
| Image.fromarray(crop).save(output_path) |
| return ToolResult( |
| success=True, |
| data={ |
| "output_path": output_path, |
| "width": crop.shape[1], |
| "height": crop.shape[0], |
| }, |
| ) |
| else: |
| import base64 |
| import io |
|
|
| pil_img = Image.fromarray(crop) |
| buffer = io.BytesIO() |
| pil_img.save(buffer, format="PNG") |
| b64 = base64.b64encode(buffer.getvalue()).decode() |
|
|
| return ToolResult( |
| success=True, |
| data={ |
| "width": crop.shape[1], |
| "height": crop.shape[0], |
| "base64": b64[:100] + "...", |
| }, |
| ) |
|
|
| except Exception as e: |
| logger.error(f"Crop region failed: {e}") |
| return ToolResult(success=False, error=str(e)) |
|
|
|
|
| |
| DOCUMENT_TOOLS = { |
| "parse_document": ParseDocumentTool, |
| "extract_fields": ExtractFieldsTool, |
| "search_chunks": SearchChunksTool, |
| "get_chunk_details": GetChunkDetailsTool, |
| "get_table_data": GetTableDataTool, |
| "answer_question": AnswerQuestionTool, |
| "crop_region": CropRegionTool, |
| } |
|
|
|
|
| def get_tool(name: str, **kwargs) -> DocumentTool: |
| """Get a tool instance by name.""" |
| if name not in DOCUMENT_TOOLS: |
| raise ValueError(f"Unknown tool: {name}") |
| return DOCUMENT_TOOLS[name](**kwargs) |
|
|
|
|
| def list_tools() -> List[Dict[str, str]]: |
| """List all available tools.""" |
| return [ |
| {"name": name, "description": cls.description} |
| for name, cls in DOCUMENT_TOOLS.items() |
| ] |
|
|