| """ |
| Document Processor Pipeline |
| |
| Main pipeline that orchestrates document processing: |
| 1. Load document |
| 2. OCR (PaddleOCR or Tesseract) |
| 3. Layout detection |
| 4. Reading order reconstruction |
| 5. Semantic chunking |
| 6. Grounding evidence |
| |
| Outputs ProcessedDocument with all extracted information. |
| """ |
|
|
| import time |
| from pathlib import Path |
| from typing import List, Optional, Dict, Any, Union |
| from datetime import datetime |
| from pydantic import BaseModel, Field |
| from loguru import logger |
| import numpy as np |
|
|
| from ..schemas.core import ( |
| ProcessedDocument, |
| DocumentMetadata, |
| DocumentChunk, |
| OCRRegion, |
| LayoutRegion, |
| ) |
| from ..io.loader import load_document, LoadedDocument |
| from ..io.cache import get_document_cache |
| from ..ocr import get_ocr_engine, OCRConfig, OCRResult |
| from ..layout import get_layout_detector, LayoutConfig, LayoutResult |
| from ..reading_order import get_reading_order_reconstructor, ReadingOrderConfig |
| from ..chunking import get_document_chunker, ChunkerConfig |
|
|
|
|
| class PipelineConfig(BaseModel): |
| """Configuration for the document processing pipeline.""" |
| |
| ocr: OCRConfig = Field(default_factory=OCRConfig) |
| layout: LayoutConfig = Field(default_factory=LayoutConfig) |
| reading_order: ReadingOrderConfig = Field(default_factory=ReadingOrderConfig) |
| chunking: ChunkerConfig = Field(default_factory=ChunkerConfig) |
|
|
| |
| render_dpi: int = Field(default=300, ge=72, description="DPI for PDF rendering") |
| enable_caching: bool = Field(default=True, description="Cache rendered pages") |
| parallel_pages: bool = Field(default=False, description="Process pages in parallel") |
| max_pages: Optional[int] = Field(default=None, description="Max pages to process") |
|
|
| |
| include_ocr_regions: bool = Field(default=True) |
| include_layout_regions: bool = Field(default=True) |
| generate_full_text: bool = Field(default=True) |
|
|
|
|
| class DocumentProcessor: |
| """ |
| Main document processing pipeline. |
| |
| Provides end-to-end document processing with: |
| - Multi-format support (PDF, images) |
| - Pluggable OCR engines |
| - Layout detection |
| - Reading order reconstruction |
| - Semantic chunking |
| """ |
|
|
| def __init__(self, config: Optional[PipelineConfig] = None): |
| """ |
| Initialize document processor. |
| |
| Args: |
| config: Pipeline configuration |
| """ |
| self.config = config or PipelineConfig() |
| self._initialized = False |
|
|
| |
| self._ocr_engine = None |
| self._layout_detector = None |
| self._reading_order = None |
| self._chunker = None |
|
|
| def initialize(self): |
| """Initialize all pipeline components.""" |
| if self._initialized: |
| return |
|
|
| logger.info("Initializing document processing pipeline...") |
|
|
| |
| self._ocr_engine = get_ocr_engine( |
| engine_type=self.config.ocr.engine, |
| config=self.config.ocr, |
| ) |
|
|
| |
| from ..layout.detector import create_layout_detector |
| self._layout_detector = create_layout_detector(self.config.layout, initialize=True) |
|
|
| |
| self._reading_order = get_reading_order_reconstructor(self.config.reading_order) |
|
|
| |
| self._chunker = get_document_chunker(self.config.chunking) |
|
|
| self._initialized = True |
| logger.info("Document processing pipeline initialized") |
|
|
| def process( |
| self, |
| source: Union[str, Path], |
| document_id: Optional[str] = None, |
| ) -> ProcessedDocument: |
| """ |
| Process a document through the full pipeline. |
| |
| Args: |
| source: Path to document |
| document_id: Optional document ID |
| |
| Returns: |
| ProcessedDocument with all extracted information |
| """ |
| if not self._initialized: |
| self.initialize() |
|
|
| start_time = time.time() |
| source_path = str(Path(source).absolute()) |
|
|
| logger.info(f"Processing document: {source_path}") |
|
|
| try: |
| |
| loaded_doc = load_document(source_path, document_id) |
| document_id = loaded_doc.document_id |
|
|
| |
| num_pages = loaded_doc.num_pages |
| if self.config.max_pages: |
| num_pages = min(num_pages, self.config.max_pages) |
|
|
| logger.info(f"Document loaded: {num_pages} pages") |
|
|
| |
| all_ocr_regions: List[OCRRegion] = [] |
| all_layout_regions: List[LayoutRegion] = [] |
| page_dimensions = [] |
|
|
| for page_num in range(num_pages): |
| logger.debug(f"Processing page {page_num + 1}/{num_pages}") |
|
|
| |
| page_image = self._get_page_image(loaded_doc, page_num) |
| height, width = page_image.shape[:2] |
| page_dimensions.append((width, height)) |
|
|
| |
| ocr_result = self._ocr_engine.recognize(page_image, page_num) |
| if ocr_result.success: |
| all_ocr_regions.extend(ocr_result.regions) |
|
|
| |
| layout_result = self._layout_detector.detect( |
| page_image, |
| page_num, |
| ocr_result.regions if ocr_result.success else None, |
| ) |
| if layout_result.success: |
| all_layout_regions.extend(layout_result.regions) |
|
|
| |
| if all_ocr_regions: |
| reading_result = self._reading_order.reconstruct( |
| all_ocr_regions, |
| all_layout_regions, |
| page_width=page_dimensions[0][0] if page_dimensions else None, |
| page_height=page_dimensions[0][1] if page_dimensions else None, |
| ) |
|
|
| |
| if reading_result.success and reading_result.order: |
| all_ocr_regions = [all_ocr_regions[i] for i in reading_result.order] |
|
|
| |
| chunks = self._chunker.create_chunks( |
| all_ocr_regions, |
| all_layout_regions if self.config.include_layout_regions else None, |
| document_id, |
| source_path, |
| ) |
|
|
| |
| full_text = "" |
| if self.config.generate_full_text and all_ocr_regions: |
| full_text = self._generate_full_text(all_ocr_regions) |
|
|
| |
| ocr_confidence_avg = None |
| if all_ocr_regions: |
| ocr_confidence_avg = sum(r.confidence for r in all_ocr_regions) / len(all_ocr_regions) |
|
|
| layout_confidence_avg = None |
| if all_layout_regions: |
| layout_confidence_avg = sum(r.confidence for r in all_layout_regions) / len(all_layout_regions) |
|
|
| |
| metadata = DocumentMetadata( |
| document_id=document_id, |
| source_path=source_path, |
| filename=loaded_doc.filename, |
| file_type=loaded_doc.file_type, |
| file_size_bytes=loaded_doc.file_size_bytes, |
| num_pages=loaded_doc.num_pages, |
| page_dimensions=page_dimensions, |
| processed_at=datetime.utcnow(), |
| total_chunks=len(chunks), |
| total_characters=len(full_text), |
| ocr_confidence_avg=ocr_confidence_avg, |
| layout_confidence_avg=layout_confidence_avg, |
| ) |
|
|
| |
| result = ProcessedDocument( |
| metadata=metadata, |
| ocr_regions=all_ocr_regions if self.config.include_ocr_regions else [], |
| layout_regions=all_layout_regions if self.config.include_layout_regions else [], |
| chunks=chunks, |
| full_text=full_text, |
| status="completed", |
| ) |
|
|
| processing_time = time.time() - start_time |
| logger.info( |
| f"Document processed in {processing_time:.2f}s: " |
| f"{len(all_ocr_regions)} OCR regions, " |
| f"{len(all_layout_regions)} layout regions, " |
| f"{len(chunks)} chunks" |
| ) |
|
|
| return result |
|
|
| except Exception as e: |
| logger.error(f"Document processing failed: {e}") |
| raise |
|
|
| finally: |
| |
| if 'loaded_doc' in locals(): |
| loaded_doc.close() |
|
|
| def _get_page_image( |
| self, |
| doc: LoadedDocument, |
| page_num: int, |
| ) -> np.ndarray: |
| """Get page image, using cache if enabled.""" |
| if self.config.enable_caching: |
| cache = get_document_cache() |
| cached = cache.get(doc.document_id, page_num, self.config.render_dpi) |
| if cached is not None: |
| return cached |
|
|
| |
| image = doc.get_page_image(page_num, self.config.render_dpi) |
|
|
| |
| if self.config.enable_caching: |
| cache = get_document_cache() |
| cache.put(doc.document_id, page_num, self.config.render_dpi, image) |
|
|
| return image |
|
|
| def _generate_full_text(self, ocr_regions: List[OCRRegion]) -> str: |
| """Generate full text from OCR regions in reading order.""" |
| |
| by_page: Dict[int, List[OCRRegion]] = {} |
| for r in ocr_regions: |
| if r.page not in by_page: |
| by_page[r.page] = [] |
| by_page[r.page].append(r) |
|
|
| |
| pages_text = [] |
| for page_num in sorted(by_page.keys()): |
| page_regions = by_page[page_num] |
| page_text = " ".join(r.text for r in page_regions) |
| pages_text.append(page_text) |
|
|
| return "\n\n".join(pages_text) |
|
|
| def process_batch( |
| self, |
| sources: List[Union[str, Path]], |
| ) -> List[ProcessedDocument]: |
| """ |
| Process multiple documents. |
| |
| Args: |
| sources: List of document paths |
| |
| Returns: |
| List of ProcessedDocument |
| """ |
| results = [] |
| for source in sources: |
| try: |
| result = self.process(source) |
| results.append(result) |
| except Exception as e: |
| logger.error(f"Failed to process {source}: {e}") |
| |
|
|
| return results |
|
|
|
|
| |
| _document_processor: Optional[DocumentProcessor] = None |
|
|
|
|
| def get_document_processor( |
| config: Optional[PipelineConfig] = None, |
| ) -> DocumentProcessor: |
| """Get or create singleton document processor.""" |
| global _document_processor |
| if _document_processor is None: |
| _document_processor = DocumentProcessor(config) |
| _document_processor.initialize() |
| return _document_processor |
|
|
|
|
| def process_document( |
| source: Union[str, Path], |
| document_id: Optional[str] = None, |
| config: Optional[PipelineConfig] = None, |
| ) -> ProcessedDocument: |
| """ |
| Convenience function to process a document. |
| |
| Args: |
| source: Document path |
| document_id: Optional document ID |
| config: Optional pipeline configuration |
| |
| Returns: |
| ProcessedDocument |
| """ |
| processor = get_document_processor(config) |
| return processor.process(source, document_id) |
|
|