| """ |
| Document Loader |
| |
| Loads and renders PDF and image documents for processing. |
| Supports page-by-page rendering with configurable DPI. |
| """ |
|
|
| import os |
| import hashlib |
| from pathlib import Path |
| from typing import List, Tuple, Optional, Union, BinaryIO |
| from dataclasses import dataclass |
| from loguru import logger |
|
|
| import numpy as np |
| from PIL import Image |
|
|
| |
| try: |
| import fitz |
| HAS_PYMUPDF = True |
| except ImportError: |
| HAS_PYMUPDF = False |
| logger.warning("PyMuPDF not installed. PDF support disabled. Install with: pip install pymupdf") |
|
|
| |
| try: |
| from pdf2image import convert_from_path, convert_from_bytes |
| HAS_PDF2IMAGE = True |
| except ImportError: |
| HAS_PDF2IMAGE = False |
|
|
|
|
| @dataclass |
| class PageInfo: |
| """Information about a document page.""" |
| page_number: int |
| width: int |
| height: int |
| dpi: int |
| has_text: bool = False |
| rotation: int = 0 |
|
|
|
|
| @dataclass |
| class LoadedDocument: |
| """ |
| A loaded document ready for processing. |
| """ |
| document_id: str |
| source_path: str |
| filename: str |
| file_type: str |
| file_size_bytes: int |
| num_pages: int |
| pages_info: List[PageInfo] |
|
|
| |
| _doc_handle: Optional[object] = None |
|
|
| def get_page_image(self, page_number: int, dpi: int = 300) -> np.ndarray: |
| """Render a specific page as an image.""" |
| raise NotImplementedError("Subclasses must implement get_page_image") |
|
|
| def close(self): |
| """Close document handle and free resources.""" |
| pass |
|
|
|
|
| class PDFDocument(LoadedDocument): |
| """Loaded PDF document with PyMuPDF backend.""" |
|
|
| def get_page_image(self, page_number: int, dpi: int = 300) -> np.ndarray: |
| """Render PDF page as numpy array.""" |
| if not HAS_PYMUPDF or self._doc_handle is None: |
| raise RuntimeError("PyMuPDF not available or document not loaded") |
|
|
| if page_number < 0 or page_number >= self.num_pages: |
| raise ValueError(f"Page {page_number} out of range (0-{self.num_pages - 1})") |
|
|
| doc = self._doc_handle |
| page = doc[page_number] |
|
|
| |
| zoom = dpi / 72.0 |
| matrix = fitz.Matrix(zoom, zoom) |
|
|
| |
| pixmap = page.get_pixmap(matrix=matrix, alpha=False) |
|
|
| |
| img_array = np.frombuffer(pixmap.samples, dtype=np.uint8).reshape( |
| pixmap.height, pixmap.width, 3 |
| ) |
|
|
| return img_array |
|
|
| def get_page_text(self, page_number: int) -> str: |
| """Extract text from PDF page using PyMuPDF.""" |
| if not HAS_PYMUPDF or self._doc_handle is None: |
| return "" |
|
|
| if page_number < 0 or page_number >= self.num_pages: |
| return "" |
|
|
| page = self._doc_handle[page_number] |
| return page.get_text() |
|
|
| def close(self): |
| """Close PDF document.""" |
| if self._doc_handle is not None: |
| self._doc_handle.close() |
| self._doc_handle = None |
|
|
|
|
| class ImageDocument(LoadedDocument): |
| """Loaded image document (single page).""" |
|
|
| _image: Optional[np.ndarray] = None |
|
|
| def get_page_image(self, page_number: int = 0, dpi: int = 300) -> np.ndarray: |
| """Return the image (images are single-page).""" |
| if page_number != 0: |
| raise ValueError("Image documents have only one page (page 0)") |
|
|
| if self._image is None: |
| |
| with Image.open(self.source_path) as img: |
| if img.mode != "RGB": |
| img = img.convert("RGB") |
| self._image = np.array(img) |
|
|
| return self._image |
|
|
| def close(self): |
| """Clear image from memory.""" |
| self._image = None |
|
|
|
|
| class DocumentLoader: |
| """ |
| Document loader with support for PDF and image files. |
| """ |
|
|
| SUPPORTED_EXTENSIONS = { |
| ".pdf": "pdf", |
| ".png": "image", |
| ".jpg": "image", |
| ".jpeg": "image", |
| ".tiff": "image", |
| ".tif": "image", |
| ".bmp": "image", |
| ".webp": "image", |
| } |
|
|
| def __init__(self, default_dpi: int = 300, cache_enabled: bool = True): |
| """ |
| Initialize document loader. |
| |
| Args: |
| default_dpi: Default DPI for PDF rendering |
| cache_enabled: Whether to cache rendered pages |
| """ |
| self.default_dpi = default_dpi |
| self.cache_enabled = cache_enabled |
|
|
| |
| if not HAS_PYMUPDF and not HAS_PDF2IMAGE: |
| logger.warning("No PDF backend available. PDF loading will fail.") |
|
|
| def load( |
| self, |
| source: Union[str, Path, BinaryIO], |
| document_id: Optional[str] = None, |
| ) -> LoadedDocument: |
| """ |
| Load a document from file path or file object. |
| |
| Args: |
| source: File path or file-like object |
| document_id: Optional document ID (generated from hash if not provided) |
| |
| Returns: |
| LoadedDocument instance |
| """ |
| |
| if isinstance(source, (str, Path)): |
| path = Path(source) |
| if not path.exists(): |
| raise FileNotFoundError(f"Document not found: {path}") |
|
|
| source_path = str(path.absolute()) |
| filename = path.name |
| file_size = path.stat().st_size |
| ext = path.suffix.lower() |
|
|
| |
| if document_id is None: |
| document_id = self._generate_doc_id(source_path) |
|
|
| else: |
| raise ValueError("File-like objects not yet supported. Please provide a file path.") |
|
|
| |
| if ext not in self.SUPPORTED_EXTENSIONS: |
| raise ValueError(f"Unsupported file type: {ext}") |
|
|
| file_type = self.SUPPORTED_EXTENSIONS[ext] |
|
|
| |
| if file_type == "pdf": |
| return self._load_pdf(source_path, filename, file_size, document_id) |
| else: |
| return self._load_image(source_path, filename, file_size, document_id) |
|
|
| def _load_pdf( |
| self, |
| source_path: str, |
| filename: str, |
| file_size: int, |
| document_id: str, |
| ) -> PDFDocument: |
| """Load a PDF document.""" |
| if not HAS_PYMUPDF: |
| raise RuntimeError("PyMuPDF required for PDF loading. Install with: pip install pymupdf") |
|
|
| logger.info(f"Loading PDF: {filename}") |
|
|
| doc = fitz.open(source_path) |
| num_pages = len(doc) |
|
|
| |
| pages_info = [] |
| for i in range(num_pages): |
| page = doc[i] |
| rect = page.rect |
| has_text = len(page.get_text().strip()) > 0 |
|
|
| pages_info.append(PageInfo( |
| page_number=i, |
| width=int(rect.width), |
| height=int(rect.height), |
| dpi=72, |
| has_text=has_text, |
| rotation=page.rotation, |
| )) |
|
|
| return PDFDocument( |
| document_id=document_id, |
| source_path=source_path, |
| filename=filename, |
| file_type="pdf", |
| file_size_bytes=file_size, |
| num_pages=num_pages, |
| pages_info=pages_info, |
| _doc_handle=doc, |
| ) |
|
|
| def _load_image( |
| self, |
| source_path: str, |
| filename: str, |
| file_size: int, |
| document_id: str, |
| ) -> ImageDocument: |
| """Load an image document.""" |
| logger.info(f"Loading image: {filename}") |
|
|
| with Image.open(source_path) as img: |
| width, height = img.size |
|
|
| pages_info = [PageInfo( |
| page_number=0, |
| width=width, |
| height=height, |
| dpi=self.default_dpi, |
| has_text=False, |
| )] |
|
|
| return ImageDocument( |
| document_id=document_id, |
| source_path=source_path, |
| filename=filename, |
| file_type="image", |
| file_size_bytes=file_size, |
| num_pages=1, |
| pages_info=pages_info, |
| ) |
|
|
| def _generate_doc_id(self, source_path: str) -> str: |
| """Generate document ID from file path and modification time.""" |
| stat = os.stat(source_path) |
| content = f"{source_path}:{stat.st_mtime}:{stat.st_size}" |
| return hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
|
|
| |
| _default_loader: Optional[DocumentLoader] = None |
|
|
|
|
| def get_loader() -> DocumentLoader: |
| """Get or create the default document loader.""" |
| global _default_loader |
| if _default_loader is None: |
| _default_loader = DocumentLoader() |
| return _default_loader |
|
|
|
|
| def load_document( |
| source: Union[str, Path, BinaryIO], |
| document_id: Optional[str] = None, |
| ) -> LoadedDocument: |
| """Load a document using the default loader.""" |
| return get_loader().load(source, document_id) |
|
|
|
|
| def load_pdf(source: Union[str, Path], document_id: Optional[str] = None) -> PDFDocument: |
| """Load a PDF document.""" |
| doc = load_document(source, document_id) |
| if not isinstance(doc, PDFDocument): |
| raise ValueError(f"Expected PDF, got {doc.file_type}") |
| return doc |
|
|
|
|
| def load_image(source: Union[str, Path], document_id: Optional[str] = None) -> ImageDocument: |
| """Load an image document.""" |
| doc = load_document(source, document_id) |
| if not isinstance(doc, ImageDocument): |
| raise ValueError(f"Expected image, got {doc.file_type}") |
| return doc |
|
|
|
|
| def render_page( |
| document: LoadedDocument, |
| page_number: int, |
| dpi: int = 300, |
| ) -> np.ndarray: |
| """Render a document page as a numpy array.""" |
| return document.get_page_image(page_number, dpi) |
|
|