| """ |
| Evidence Building and Management |
| |
| Creates and manages evidence references for extracted data. |
| Links every extraction to its visual source. |
| """ |
|
|
| import hashlib |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Union |
|
|
| from ..chunks.models import ( |
| BoundingBox, |
| DocumentChunk, |
| EvidenceRef, |
| TableChunk, |
| ChartChunk, |
| ) |
|
|
|
|
| @dataclass |
| class EvidenceConfig: |
| """Configuration for evidence building.""" |
|
|
| |
| crop_enabled: bool = True |
| crop_output_dir: Optional[Path] = None |
| crop_format: str = "png" |
| crop_padding_percent: float = 0.02 |
|
|
| |
| include_snippet: bool = True |
| max_snippet_length: int = 200 |
| include_context: bool = True |
| context_chars: int = 50 |
|
|
|
|
| class EvidenceBuilder: |
| """ |
| Builds evidence references for extractions. |
| |
| Creates links between extracted values and their |
| visual sources in the document. |
| """ |
|
|
| def __init__(self, config: Optional[EvidenceConfig] = None): |
| self.config = config or EvidenceConfig() |
| self._crop_counter = 0 |
|
|
| def create_evidence( |
| self, |
| chunk: DocumentChunk, |
| value: Any, |
| field_name: Optional[str] = None, |
| crop_image: Optional[Any] = None, |
| ) -> EvidenceRef: |
| """ |
| Create an evidence reference from a chunk. |
| |
| Args: |
| chunk: Source chunk |
| value: Extracted value |
| field_name: Optional field name being extracted |
| crop_image: Optional cropped image for this evidence |
| |
| Returns: |
| EvidenceRef linking to the source |
| """ |
| |
| crop_path = None |
| if crop_image is not None and self.config.crop_enabled: |
| crop_path = self._save_crop(crop_image, chunk) |
|
|
| |
| snippet = self._create_snippet(chunk.text, str(value)) |
|
|
| |
| if isinstance(chunk, TableChunk): |
| source_type = "table" |
| elif isinstance(chunk, ChartChunk): |
| source_type = "chart" |
| else: |
| source_type = chunk.chunk_type.value |
|
|
| return EvidenceRef( |
| chunk_id=chunk.chunk_id, |
| doc_id=chunk.doc_id, |
| page=chunk.page, |
| bbox=chunk.bbox, |
| source_type=source_type, |
| snippet=snippet, |
| confidence=chunk.confidence, |
| crop_path=crop_path, |
| ) |
|
|
| def create_evidence_from_bbox( |
| self, |
| doc_id: str, |
| page: int, |
| bbox: BoundingBox, |
| source_text: str, |
| confidence: float = 1.0, |
| source_type: str = "region", |
| crop_image: Optional[Any] = None, |
| ) -> EvidenceRef: |
| """ |
| Create evidence from a bounding box. |
| |
| Args: |
| doc_id: Document ID |
| page: Page number |
| bbox: Bounding box of evidence |
| source_text: Text content |
| confidence: Confidence score |
| source_type: Type of source (text, table, chart, etc.) |
| crop_image: Optional cropped image |
| |
| Returns: |
| EvidenceRef for the region |
| """ |
| |
| chunk_id = self._generate_region_id(doc_id, page, bbox) |
|
|
| |
| crop_path = None |
| if crop_image is not None and self.config.crop_enabled: |
| crop_path = self._save_crop_direct( |
| crop_image, |
| doc_id, |
| page, |
| chunk_id, |
| ) |
|
|
| return EvidenceRef( |
| chunk_id=chunk_id, |
| doc_id=doc_id, |
| page=page, |
| bbox=bbox, |
| source_type=source_type, |
| snippet=source_text[:self.config.max_snippet_length], |
| confidence=confidence, |
| crop_path=crop_path, |
| ) |
|
|
| def create_table_cell_evidence( |
| self, |
| table_chunk: TableChunk, |
| row: int, |
| col: int, |
| crop_image: Optional[Any] = None, |
| ) -> Optional[EvidenceRef]: |
| """ |
| Create evidence for a specific table cell. |
| |
| Args: |
| table_chunk: Source table |
| row: Cell row (0-indexed) |
| col: Cell column (0-indexed) |
| crop_image: Optional cropped cell image |
| |
| Returns: |
| EvidenceRef for the cell, or None if cell not found |
| """ |
| cell = table_chunk.get_cell(row, col) |
| if cell is None: |
| return None |
|
|
| cell_id = f"r{row}c{col}" |
|
|
| |
| crop_path = None |
| if crop_image is not None and self.config.crop_enabled: |
| crop_path = self._save_crop_direct( |
| crop_image, |
| table_chunk.doc_id, |
| table_chunk.page, |
| f"{table_chunk.chunk_id}_{cell_id}", |
| ) |
|
|
| return EvidenceRef( |
| chunk_id=table_chunk.chunk_id, |
| doc_id=table_chunk.doc_id, |
| page=table_chunk.page, |
| bbox=cell.bbox, |
| source_type="table_cell", |
| snippet=cell.text[:self.config.max_snippet_length], |
| confidence=cell.confidence, |
| cell_id=cell_id, |
| crop_path=crop_path, |
| ) |
|
|
| def merge_evidence( |
| self, |
| evidence_list: List[EvidenceRef], |
| ) -> List[EvidenceRef]: |
| """ |
| Merge overlapping evidence references. |
| |
| Combines evidence that refers to the same region. |
| """ |
| if len(evidence_list) <= 1: |
| return evidence_list |
|
|
| merged = [] |
| used = set() |
|
|
| for i, ev1 in enumerate(evidence_list): |
| if i in used: |
| continue |
|
|
| |
| group = [ev1] |
| for j, ev2 in enumerate(evidence_list[i + 1:], start=i + 1): |
| if j in used: |
| continue |
|
|
| if (ev1.doc_id == ev2.doc_id and |
| ev1.page == ev2.page and |
| ev1.bbox.iou(ev2.bbox) > 0.5): |
| group.append(ev2) |
| used.add(j) |
|
|
| |
| if len(group) == 1: |
| merged.append(ev1) |
| else: |
| merged.append(self._merge_evidence_group(group)) |
|
|
| used.add(i) |
|
|
| return merged |
|
|
| def _merge_evidence_group( |
| self, |
| group: List[EvidenceRef], |
| ) -> EvidenceRef: |
| """Merge a group of overlapping evidence.""" |
| |
| best = max(group, key=lambda e: e.confidence) |
|
|
| |
| merged_bbox = BoundingBox( |
| x_min=min(e.bbox.x_min for e in group), |
| y_min=min(e.bbox.y_min for e in group), |
| x_max=max(e.bbox.x_max for e in group), |
| y_max=max(e.bbox.y_max for e in group), |
| normalized=best.bbox.normalized, |
| ) |
|
|
| |
| snippets = list(set(e.snippet for e in group if e.snippet)) |
| combined_snippet = " | ".join(snippets)[:self.config.max_snippet_length] |
|
|
| return EvidenceRef( |
| chunk_id=best.chunk_id, |
| doc_id=best.doc_id, |
| page=best.page, |
| bbox=merged_bbox, |
| source_type=best.source_type, |
| snippet=combined_snippet, |
| confidence=max(e.confidence for e in group), |
| cell_id=best.cell_id, |
| crop_path=best.crop_path, |
| ) |
|
|
| def _create_snippet( |
| self, |
| full_text: str, |
| value: str, |
| ) -> str: |
| """Create a text snippet highlighting the value.""" |
| if not self.config.include_snippet: |
| return "" |
|
|
| |
| value_lower = value.lower() |
| text_lower = full_text.lower() |
|
|
| idx = text_lower.find(value_lower) |
| if idx >= 0 and self.config.include_context: |
| |
| start = max(0, idx - self.config.context_chars) |
| end = min(len(full_text), idx + len(value) + self.config.context_chars) |
|
|
| snippet = full_text[start:end] |
| if start > 0: |
| snippet = "..." + snippet |
| if end < len(full_text): |
| snippet = snippet + "..." |
|
|
| return snippet[:self.config.max_snippet_length] |
|
|
| |
| return full_text[:self.config.max_snippet_length] |
|
|
| def _generate_region_id( |
| self, |
| doc_id: str, |
| page: int, |
| bbox: BoundingBox, |
| ) -> str: |
| """Generate a stable ID for a region.""" |
| content = f"{doc_id}_{page}_{bbox.xyxy}" |
| return hashlib.md5(content.encode()).hexdigest()[:16] |
|
|
| def _save_crop( |
| self, |
| image: Any, |
| chunk: DocumentChunk, |
| ) -> Optional[str]: |
| """Save a crop image for a chunk.""" |
| return self._save_crop_direct( |
| image, |
| chunk.doc_id, |
| chunk.page, |
| chunk.chunk_id, |
| ) |
|
|
| def _save_crop_direct( |
| self, |
| image: Any, |
| doc_id: str, |
| page: int, |
| identifier: str, |
| ) -> Optional[str]: |
| """Save a crop image directly.""" |
| if self.config.crop_output_dir is None: |
| return None |
|
|
| try: |
| from PIL import Image |
| import numpy as np |
|
|
| |
| if isinstance(image, np.ndarray): |
| pil_image = Image.fromarray(image) |
| elif isinstance(image, Image.Image): |
| pil_image = image |
| else: |
| return None |
|
|
| |
| output_dir = Path(self.config.crop_output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| filename = f"{doc_id}_{page}_{identifier}.{self.config.crop_format}" |
| output_path = output_dir / filename |
|
|
| pil_image.save(output_path) |
| return str(output_path) |
|
|
| except Exception: |
| return None |
|
|
|
|
| class EvidenceTracker: |
| """ |
| Tracks evidence references during extraction. |
| |
| Maintains a collection of evidence and provides |
| methods for querying and validation. |
| """ |
|
|
| def __init__(self): |
| self._evidence: List[EvidenceRef] = [] |
| self._by_field: Dict[str, List[EvidenceRef]] = {} |
| self._by_chunk: Dict[str, List[EvidenceRef]] = {} |
|
|
| def add( |
| self, |
| evidence: EvidenceRef, |
| field_name: Optional[str] = None, |
| ) -> None: |
| """Add an evidence reference.""" |
| self._evidence.append(evidence) |
|
|
| |
| if evidence.chunk_id not in self._by_chunk: |
| self._by_chunk[evidence.chunk_id] = [] |
| self._by_chunk[evidence.chunk_id].append(evidence) |
|
|
| |
| if field_name: |
| if field_name not in self._by_field: |
| self._by_field[field_name] = [] |
| self._by_field[field_name].append(evidence) |
|
|
| def get_all(self) -> List[EvidenceRef]: |
| """Get all evidence references.""" |
| return self._evidence.copy() |
|
|
| def get_for_field(self, field_name: str) -> List[EvidenceRef]: |
| """Get evidence for a specific field.""" |
| return self._by_field.get(field_name, []).copy() |
|
|
| def get_for_chunk(self, chunk_id: str) -> List[EvidenceRef]: |
| """Get evidence from a specific chunk.""" |
| return self._by_chunk.get(chunk_id, []).copy() |
|
|
| def get_by_page(self, page: int) -> List[EvidenceRef]: |
| """Get evidence from a specific page.""" |
| return [e for e in self._evidence if e.page == page] |
|
|
| def get_high_confidence(self, threshold: float = 0.8) -> List[EvidenceRef]: |
| """Get evidence above confidence threshold.""" |
| return [e for e in self._evidence if e.confidence >= threshold] |
|
|
| def validate_field( |
| self, |
| field_name: str, |
| min_evidence: int = 1, |
| min_confidence: float = 0.5, |
| ) -> bool: |
| """ |
| Validate that a field has sufficient evidence. |
| |
| Args: |
| field_name: Field to validate |
| min_evidence: Minimum number of evidence references |
| min_confidence: Minimum confidence score |
| |
| Returns: |
| True if field has sufficient evidence |
| """ |
| field_evidence = self.get_for_field(field_name) |
|
|
| if len(field_evidence) < min_evidence: |
| return False |
|
|
| |
| max_confidence = max((e.confidence for e in field_evidence), default=0) |
| return max_confidence >= min_confidence |
|
|
| def clear(self) -> None: |
| """Clear all evidence.""" |
| self._evidence = [] |
| self._by_field = {} |
| self._by_chunk = {} |
|
|