| """ |
| Agent Adapter for Document Intelligence |
| |
| Bridges the DocumentAgent with the new document_intelligence subsystem. |
| Provides enhanced tools and capabilities. |
| """ |
|
|
| import logging |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
| from .chunks.models import ( |
| DocumentChunk, |
| EvidenceRef, |
| ParseResult, |
| ExtractionResult, |
| ClassificationResult, |
| DocumentType, |
| ) |
| from .parsing import DocumentParser, ParserConfig |
| from .extraction import ( |
| ExtractionSchema, |
| FieldExtractor, |
| ExtractionConfig, |
| ExtractionValidator, |
| ) |
| from .grounding import EvidenceBuilder, EvidenceTracker, CropManager |
| from .tools import get_tool, list_tools, ToolResult |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class AgentConfig: |
| """Configuration for the document agent adapter.""" |
|
|
| |
| render_dpi: int = 200 |
| max_pages: Optional[int] = None |
| ocr_languages: List[str] = None |
|
|
| |
| min_confidence: float = 0.5 |
| abstain_on_low_confidence: bool = True |
|
|
| |
| enable_crops: bool = True |
| crop_output_dir: Optional[Path] = None |
|
|
| |
| max_iterations: int = 10 |
| verbose: bool = False |
|
|
| def __post_init__(self): |
| if self.ocr_languages is None: |
| self.ocr_languages = ["en"] |
|
|
|
|
| class DocumentIntelligenceAdapter: |
| """ |
| Adapter connecting DocumentAgent with document_intelligence subsystem. |
| |
| Provides: |
| - Document loading and parsing |
| - Schema-driven extraction |
| - Evidence-grounded results |
| - Tool execution |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[AgentConfig] = None, |
| llm_client: Optional[Any] = None, |
| ): |
| self.config = config or AgentConfig() |
| self.llm_client = llm_client |
|
|
| |
| self.parser = DocumentParser( |
| config=ParserConfig( |
| render_dpi=self.config.render_dpi, |
| max_pages=self.config.max_pages, |
| ocr_languages=self.config.ocr_languages, |
| ) |
| ) |
|
|
| self.extractor = FieldExtractor( |
| config=ExtractionConfig( |
| min_field_confidence=self.config.min_confidence, |
| abstain_on_low_confidence=self.config.abstain_on_low_confidence, |
| ) |
| ) |
|
|
| self.validator = ExtractionValidator( |
| min_confidence=self.config.min_confidence, |
| ) |
|
|
| self.evidence_builder = EvidenceBuilder() |
|
|
| if self.config.enable_crops and self.config.crop_output_dir: |
| self.crop_manager = CropManager(self.config.crop_output_dir) |
| else: |
| self.crop_manager = None |
|
|
| |
| self._current_parse_result: Optional[ParseResult] = None |
| self._page_images: Dict[int, Any] = {} |
|
|
| logger.info("Initialized DocumentIntelligenceAdapter") |
|
|
| def load_document( |
| self, |
| path: Union[str, Path], |
| render_pages: bool = True, |
| ) -> ParseResult: |
| """ |
| Load and parse a document. |
| |
| Args: |
| path: Path to document file |
| render_pages: Whether to keep rendered page images |
| |
| Returns: |
| ParseResult with chunks and metadata |
| """ |
| path = Path(path) |
| logger.info(f"Loading document: {path}") |
|
|
| |
| self._current_parse_result = self.parser.parse(path) |
|
|
| |
| if render_pages: |
| from .io import load_document, RenderOptions |
| loader, renderer = load_document(path) |
| for page_num in range(1, self._current_parse_result.num_pages + 1): |
| self._page_images[page_num] = renderer.render_page( |
| page_num, |
| RenderOptions(dpi=self.config.render_dpi) |
| ) |
| loader.close() |
|
|
| return self._current_parse_result |
|
|
| def extract_fields( |
| self, |
| schema: Union[ExtractionSchema, Dict[str, Any]], |
| validate: bool = True, |
| ) -> ExtractionResult: |
| """ |
| Extract fields from the loaded document. |
| |
| Args: |
| schema: Extraction schema |
| validate: Whether to validate results |
| |
| Returns: |
| ExtractionResult with values and evidence |
| """ |
| if not self._current_parse_result: |
| raise RuntimeError("No document loaded. Call load_document() first.") |
|
|
| |
| if isinstance(schema, dict): |
| schema = ExtractionSchema.from_json_schema(schema) |
|
|
| |
| result = self.extractor.extract(self._current_parse_result, schema) |
|
|
| |
| if validate: |
| validation = self.validator.validate(result, schema) |
| if not validation.is_valid: |
| logger.warning(f"Extraction validation failed: {validation.error_count} errors") |
| |
| result.metadata = result.metadata or {} |
| result.metadata["validation_issues"] = [ |
| {"field": i.field_name, "type": i.issue_type, "message": i.message} |
| for i in validation.issues |
| ] |
|
|
| return result |
|
|
| def answer_question( |
| self, |
| question: str, |
| use_llm: bool = True, |
| ) -> Tuple[str, List[EvidenceRef], float]: |
| """ |
| Answer a question about the document. |
| |
| Args: |
| question: Question to answer |
| use_llm: Whether to use LLM for generation |
| |
| Returns: |
| Tuple of (answer, evidence, confidence) |
| """ |
| if not self._current_parse_result: |
| raise RuntimeError("No document loaded") |
|
|
| tool = get_tool("answer_question", llm_client=self.llm_client) |
| result = tool.execute( |
| parse_result=self._current_parse_result, |
| question=question, |
| use_rag=False, |
| ) |
|
|
| if not result.success: |
| return f"Error: {result.error}", [], 0.0 |
|
|
| data = result.data |
| answer = data.get("answer", "") |
| confidence = data.get("confidence", 0.5) |
|
|
| |
| evidence = [] |
| for ev_dict in result.evidence: |
| from .chunks.models import BoundingBox |
| evidence.append(EvidenceRef( |
| chunk_id=ev_dict["chunk_id"], |
| doc_id=self._current_parse_result.doc_id, |
| page=ev_dict["page"], |
| bbox=BoundingBox( |
| x_min=ev_dict["bbox"][0], |
| y_min=ev_dict["bbox"][1], |
| x_max=ev_dict["bbox"][2], |
| y_max=ev_dict["bbox"][3], |
| normalized=True, |
| ), |
| source_type="text", |
| snippet=ev_dict.get("snippet", ""), |
| confidence=confidence, |
| )) |
|
|
| return answer, evidence, confidence |
|
|
| def search_chunks( |
| self, |
| query: str, |
| chunk_types: Optional[List[str]] = None, |
| top_k: int = 10, |
| ) -> List[Dict[str, Any]]: |
| """ |
| Search for chunks matching a query. |
| |
| Args: |
| query: Search query |
| chunk_types: Optional chunk type filter |
| top_k: Maximum results |
| |
| Returns: |
| List of matching chunks with scores |
| """ |
| if not self._current_parse_result: |
| raise RuntimeError("No document loaded") |
|
|
| tool = get_tool("search_chunks") |
| result = tool.execute( |
| parse_result=self._current_parse_result, |
| query=query, |
| chunk_types=chunk_types, |
| top_k=top_k, |
| ) |
|
|
| if not result.success: |
| return [] |
|
|
| return result.data.get("results", []) |
|
|
| def get_chunk(self, chunk_id: str) -> Optional[DocumentChunk]: |
| """Get a chunk by ID.""" |
| if not self._current_parse_result: |
| return None |
|
|
| for chunk in self._current_parse_result.chunks: |
| if chunk.chunk_id == chunk_id: |
| return chunk |
| return None |
|
|
| def get_page_image(self, page: int) -> Optional[Any]: |
| """Get rendered page image.""" |
| return self._page_images.get(page) |
|
|
| def crop_chunk( |
| self, |
| chunk: DocumentChunk, |
| padding_percent: float = 0.02, |
| ) -> Optional[Any]: |
| """Crop the region of a chunk from its page.""" |
| page_image = self.get_page_image(chunk.page) |
| if page_image is None: |
| return None |
|
|
| from .grounding import crop_region |
| return crop_region(page_image, chunk.bbox, padding_percent) |
|
|
| def get_tools_description(self) -> str: |
| """Get description of available tools for agent prompts.""" |
| tools = list_tools() |
| lines = [] |
| for tool in tools: |
| lines.append(f"- {tool['name']}: {tool['description']}") |
| return "\n".join(lines) |
|
|
| def execute_tool( |
| self, |
| tool_name: str, |
| **kwargs |
| ) -> ToolResult: |
| """ |
| Execute a document tool. |
| |
| Args: |
| tool_name: Name of tool to execute |
| **kwargs: Tool arguments |
| |
| Returns: |
| ToolResult |
| """ |
| |
| if "parse_result" not in kwargs and self._current_parse_result: |
| kwargs["parse_result"] = self._current_parse_result |
|
|
| tool = get_tool(tool_name, llm_client=self.llm_client) |
| return tool.execute(**kwargs) |
|
|
| @property |
| def parse_result(self) -> Optional[ParseResult]: |
| """Get current parse result.""" |
| return self._current_parse_result |
|
|
| @property |
| def document_id(self) -> Optional[str]: |
| """Get current document ID.""" |
| if self._current_parse_result: |
| return self._current_parse_result.doc_id |
| return None |
|
|
|
|
| def create_enhanced_document_agent( |
| llm_client: Any, |
| config: Optional[AgentConfig] = None, |
| ) -> "EnhancedDocumentAgent": |
| """ |
| Create an enhanced DocumentAgent with document_intelligence integration. |
| |
| Args: |
| llm_client: LLM client for reasoning |
| config: Agent configuration |
| |
| Returns: |
| EnhancedDocumentAgent instance |
| """ |
| return EnhancedDocumentAgent(llm_client=llm_client, config=config) |
|
|
|
|
| class EnhancedDocumentAgent: |
| """ |
| Enhanced DocumentAgent using document_intelligence subsystem. |
| |
| Extends the ReAct-style agent with: |
| - Better parsing and chunking |
| - Schema-driven extraction |
| - Visual grounding |
| - Evidence tracking |
| """ |
|
|
| def __init__( |
| self, |
| llm_client: Any, |
| config: Optional[AgentConfig] = None, |
| ): |
| self.adapter = DocumentIntelligenceAdapter( |
| config=config, |
| llm_client=llm_client, |
| ) |
| self.llm_client = llm_client |
| self.config = config or AgentConfig() |
|
|
| async def load_document(self, path: Union[str, Path]) -> ParseResult: |
| """Load a document for processing.""" |
| return self.adapter.load_document(path, render_pages=True) |
|
|
| async def extract_fields( |
| self, |
| schema: Union[ExtractionSchema, Dict], |
| ) -> ExtractionResult: |
| """Extract fields using schema.""" |
| return self.adapter.extract_fields(schema, validate=True) |
|
|
| async def answer_question( |
| self, |
| question: str, |
| ) -> Tuple[str, List[EvidenceRef]]: |
| """Answer a question about the document.""" |
| answer, evidence, confidence = self.adapter.answer_question(question) |
| return answer, evidence |
|
|
| async def classify(self) -> ClassificationResult: |
| """Classify the document type.""" |
| if not self.adapter.parse_result: |
| raise RuntimeError("No document loaded") |
|
|
| |
| first_page_chunks = [ |
| c for c in self.adapter.parse_result.chunks |
| if c.page == 1 |
| ][:5] |
|
|
| content = " ".join(c.text[:200] for c in first_page_chunks) |
|
|
| |
| doc_type = DocumentType.OTHER |
| confidence = 0.5 |
|
|
| type_keywords = { |
| DocumentType.INVOICE: ["invoice", "bill", "payment due", "amount due"], |
| DocumentType.CONTRACT: ["agreement", "contract", "party", "whereas"], |
| DocumentType.RECEIPT: ["receipt", "paid", "transaction", "thank you"], |
| DocumentType.FORM: ["form", "fill in", "checkbox", "signature line"], |
| DocumentType.LETTER: ["dear", "sincerely", "regards"], |
| DocumentType.REPORT: ["report", "findings", "conclusion", "summary"], |
| DocumentType.PATENT: ["patent", "claims", "invention", "embodiment"], |
| } |
|
|
| content_lower = content.lower() |
| for dtype, keywords in type_keywords.items(): |
| matches = sum(1 for k in keywords if k in content_lower) |
| if matches > 0: |
| doc_type = dtype |
| confidence = min(0.9, 0.5 + matches * 0.15) |
| break |
|
|
| return ClassificationResult( |
| doc_id=self.adapter.document_id, |
| document_type=doc_type, |
| confidence=confidence, |
| secondary_types=[], |
| ) |
|
|
| def search( |
| self, |
| query: str, |
| top_k: int = 10, |
| ) -> List[Dict[str, Any]]: |
| """Search document content.""" |
| return self.adapter.search_chunks(query, top_k=top_k) |
|
|
| @property |
| def current_document(self) -> Optional[ParseResult]: |
| """Get current document.""" |
| return self.adapter.parse_result |
|
|