| """ |
| Image Cropping Utilities |
| |
| Functions for extracting and managing region crops from document images. |
| """ |
|
|
| import hashlib |
| import logging |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
| import numpy as np |
| from PIL import Image |
|
|
| from ..chunks.models import BoundingBox, DocumentChunk |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def crop_region( |
| image: Union[np.ndarray, Image.Image], |
| bbox: BoundingBox, |
| padding_percent: float = 0.02, |
| ) -> np.ndarray: |
| """ |
| Crop a region from an image. |
| |
| Args: |
| image: Source image (numpy array or PIL Image) |
| bbox: Bounding box to crop (can be normalized or pixel) |
| padding_percent: Padding to add around the crop (0-1) |
| |
| Returns: |
| Cropped image as numpy array |
| """ |
| |
| if isinstance(image, Image.Image): |
| image = np.array(image) |
|
|
| height, width = image.shape[:2] |
|
|
| |
| if bbox.normalized: |
| pixel_bbox = bbox.to_pixel(width, height) |
| else: |
| pixel_bbox = bbox |
|
|
| |
| pad_x = int(pixel_bbox.width * padding_percent) |
| pad_y = int(pixel_bbox.height * padding_percent) |
|
|
| x_min = max(0, int(pixel_bbox.x_min) - pad_x) |
| y_min = max(0, int(pixel_bbox.y_min) - pad_y) |
| x_max = min(width, int(pixel_bbox.x_max) + pad_x) |
| y_max = min(height, int(pixel_bbox.y_max) + pad_y) |
|
|
| |
| if x_max <= x_min or y_max <= y_min: |
| logger.warning(f"Invalid crop region: ({x_min}, {y_min}, {x_max}, {y_max})") |
| return np.zeros((1, 1, 3), dtype=np.uint8) |
|
|
| return image[y_min:y_max, x_min:x_max].copy() |
|
|
|
|
| def crop_chunk( |
| image: Union[np.ndarray, Image.Image], |
| chunk: DocumentChunk, |
| padding_percent: float = 0.02, |
| ) -> np.ndarray: |
| """ |
| Crop the region corresponding to a chunk. |
| |
| Args: |
| image: Page image |
| chunk: Document chunk with bbox |
| padding_percent: Padding around crop |
| |
| Returns: |
| Cropped image |
| """ |
| return crop_region(image, chunk.bbox, padding_percent) |
|
|
|
|
| def crop_multiple_regions( |
| image: Union[np.ndarray, Image.Image], |
| bboxes: List[BoundingBox], |
| padding_percent: float = 0.02, |
| ) -> List[np.ndarray]: |
| """ |
| Crop multiple regions from an image. |
| |
| Args: |
| image: Source image |
| bboxes: List of bounding boxes |
| padding_percent: Padding around crops |
| |
| Returns: |
| List of cropped images |
| """ |
| return [crop_region(image, bbox, padding_percent) for bbox in bboxes] |
|
|
|
|
| class CropManager: |
| """ |
| Manages crop extraction and storage. |
| |
| Provides caching and organized storage for document crops. |
| """ |
|
|
| def __init__( |
| self, |
| output_dir: Union[str, Path], |
| format: str = "png", |
| quality: int = 95, |
| ): |
| self.output_dir = Path(output_dir) |
| self.format = format.lower() |
| self.quality = quality |
| self._cache: Dict[str, str] = {} |
|
|
| |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| def get_crop_path( |
| self, |
| doc_id: str, |
| page: int, |
| bbox: BoundingBox, |
| ) -> Path: |
| """Generate a path for a crop.""" |
| |
| bbox_str = f"{bbox.x_min:.4f}_{bbox.y_min:.4f}_{bbox.x_max:.4f}_{bbox.y_max:.4f}" |
| bbox_hash = hashlib.md5(bbox_str.encode()).hexdigest()[:8] |
|
|
| filename = f"{doc_id}_p{page}_{bbox_hash}.{self.format}" |
| return self.output_dir / doc_id / filename |
|
|
| def save_crop( |
| self, |
| image: Union[np.ndarray, Image.Image], |
| doc_id: str, |
| page: int, |
| bbox: BoundingBox, |
| padding_percent: float = 0.02, |
| ) -> str: |
| """ |
| Crop and save a region. |
| |
| Args: |
| image: Source page image |
| doc_id: Document ID |
| page: Page number |
| bbox: Region to crop |
| padding_percent: Padding around crop |
| |
| Returns: |
| Path to saved crop |
| """ |
| |
| cache_key = f"{doc_id}_{page}_{bbox.xyxy}" |
| if cache_key in self._cache: |
| return self._cache[cache_key] |
|
|
| |
| crop = crop_region(image, bbox, padding_percent) |
|
|
| |
| pil_crop = Image.fromarray(crop) |
|
|
| |
| crop_path = self.get_crop_path(doc_id, page, bbox) |
| crop_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| |
| if self.format == "jpg" or self.format == "jpeg": |
| pil_crop.save(crop_path, format="JPEG", quality=self.quality) |
| else: |
| pil_crop.save(crop_path, format=self.format.upper()) |
|
|
| |
| path_str = str(crop_path) |
| self._cache[cache_key] = path_str |
|
|
| return path_str |
|
|
| def save_chunk_crop( |
| self, |
| image: Union[np.ndarray, Image.Image], |
| chunk: DocumentChunk, |
| padding_percent: float = 0.02, |
| ) -> str: |
| """ |
| Save crop for a document chunk. |
| |
| Args: |
| image: Page image |
| chunk: Chunk to crop |
| padding_percent: Padding around crop |
| |
| Returns: |
| Path to saved crop |
| """ |
| return self.save_crop( |
| image=image, |
| doc_id=chunk.doc_id, |
| page=chunk.page, |
| bbox=chunk.bbox, |
| padding_percent=padding_percent, |
| ) |
|
|
| def get_cached_crop( |
| self, |
| doc_id: str, |
| page: int, |
| bbox: BoundingBox, |
| ) -> Optional[str]: |
| """Get path to cached crop if it exists.""" |
| cache_key = f"{doc_id}_{page}_{bbox.xyxy}" |
| return self._cache.get(cache_key) |
|
|
| def load_crop(self, path: Union[str, Path]) -> Optional[np.ndarray]: |
| """Load a crop from disk.""" |
| path = Path(path) |
| if not path.exists(): |
| return None |
|
|
| try: |
| img = Image.open(path) |
| return np.array(img) |
| except Exception as e: |
| logger.warning(f"Failed to load crop {path}: {e}") |
| return None |
|
|
| def clear_cache(self) -> None: |
| """Clear the path cache.""" |
| self._cache.clear() |
|
|
| def cleanup_doc(self, doc_id: str) -> int: |
| """ |
| Remove all crops for a document. |
| |
| Returns number of files removed. |
| """ |
| doc_dir = self.output_dir / doc_id |
| if not doc_dir.exists(): |
| return 0 |
|
|
| count = 0 |
| for crop_file in doc_dir.glob(f"*.{self.format}"): |
| try: |
| crop_file.unlink() |
| count += 1 |
| except Exception: |
| pass |
|
|
| |
| try: |
| doc_dir.rmdir() |
| except OSError: |
| pass |
|
|
| |
| self._cache = { |
| k: v for k, v in self._cache.items() |
| if not k.startswith(f"{doc_id}_") |
| } |
|
|
| return count |
|
|
|
|
| def create_annotated_image( |
| image: Union[np.ndarray, Image.Image], |
| bboxes: List[BoundingBox], |
| labels: Optional[List[str]] = None, |
| colors: Optional[List[Tuple[int, int, int]]] = None, |
| line_width: int = 2, |
| font_size: int = 12, |
| ) -> np.ndarray: |
| """ |
| Create an annotated image with bounding boxes. |
| |
| Args: |
| image: Source image |
| bboxes: Bounding boxes to draw |
| labels: Optional labels for each box |
| colors: Optional colors for each box (RGB tuples) |
| line_width: Line width for boxes |
| font_size: Font size for labels |
| |
| Returns: |
| Annotated image as numpy array |
| """ |
| from PIL import ImageDraw, ImageFont |
|
|
| |
| if isinstance(image, np.ndarray): |
| pil_image = Image.fromarray(image).copy() |
| else: |
| pil_image = image.copy() |
|
|
| draw = ImageDraw.Draw(pil_image) |
| width, height = pil_image.size |
|
|
| |
| default_colors = [ |
| (255, 0, 0), |
| (0, 255, 0), |
| (0, 0, 255), |
| (255, 255, 0), |
| (255, 0, 255), |
| (0, 255, 255), |
| (255, 128, 0), |
| (128, 0, 255), |
| ] |
|
|
| |
| try: |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size) |
| except Exception: |
| font = ImageFont.load_default() |
|
|
| for i, bbox in enumerate(bboxes): |
| |
| if colors and i < len(colors): |
| color = colors[i] |
| else: |
| color = default_colors[i % len(default_colors)] |
|
|
| |
| if bbox.normalized: |
| x_min = int(bbox.x_min * width) |
| y_min = int(bbox.y_min * height) |
| x_max = int(bbox.x_max * width) |
| y_max = int(bbox.y_max * height) |
| else: |
| x_min = int(bbox.x_min) |
| y_min = int(bbox.y_min) |
| x_max = int(bbox.x_max) |
| y_max = int(bbox.y_max) |
|
|
| |
| draw.rectangle( |
| [(x_min, y_min), (x_max, y_max)], |
| outline=color, |
| width=line_width, |
| ) |
|
|
| |
| if labels and i < len(labels): |
| label = labels[i] |
| |
| text_bbox = draw.textbbox((x_min, y_min - font_size - 4), label, font=font) |
| draw.rectangle(text_bbox, fill=color) |
| |
| draw.text( |
| (x_min, y_min - font_size - 4), |
| label, |
| fill=(255, 255, 255), |
| font=font, |
| ) |
|
|
| return np.array(pil_image) |
|
|
|
|
| def highlight_region( |
| image: Union[np.ndarray, Image.Image], |
| bbox: BoundingBox, |
| highlight_color: Tuple[int, int, int] = (255, 255, 0), |
| opacity: float = 0.3, |
| ) -> np.ndarray: |
| """ |
| Highlight a region in an image with semi-transparent overlay. |
| |
| Args: |
| image: Source image |
| bbox: Region to highlight |
| highlight_color: Color for highlight (RGB) |
| opacity: Opacity of highlight (0-1) |
| |
| Returns: |
| Image with highlighted region |
| """ |
| |
| if isinstance(image, Image.Image): |
| img_array = np.array(image).copy() |
| else: |
| img_array = image.copy() |
|
|
| height, width = img_array.shape[:2] |
|
|
| |
| if bbox.normalized: |
| x_min = int(bbox.x_min * width) |
| y_min = int(bbox.y_min * height) |
| x_max = int(bbox.x_max * width) |
| y_max = int(bbox.y_max * height) |
| else: |
| x_min = int(bbox.x_min) |
| y_min = int(bbox.y_min) |
| x_max = int(bbox.x_max) |
| y_max = int(bbox.y_max) |
|
|
| |
| x_min = max(0, x_min) |
| y_min = max(0, y_min) |
| x_max = min(width, x_max) |
| y_max = min(height, y_max) |
|
|
| |
| overlay = np.full((y_max - y_min, x_max - x_min, 3), highlight_color, dtype=np.uint8) |
|
|
| |
| region = img_array[y_min:y_max, x_min:x_max] |
| blended = (region * (1 - opacity) + overlay * opacity).astype(np.uint8) |
| img_array[y_min:y_max, x_min:x_max] = blended |
|
|
| return img_array |
|
|