| """ |
| Document and Page Caching |
| |
| File-based caching for rendered pages and processing results. |
| Supports LRU eviction and configurable storage backends. |
| """ |
|
|
| import hashlib |
| import json |
| import logging |
| import os |
| import pickle |
| import shutil |
| import time |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union |
|
|
| import numpy as np |
| from PIL import Image |
|
|
| logger = logging.getLogger(__name__) |
|
|
| T = TypeVar("T") |
|
|
|
|
| @dataclass |
| class CacheConfig: |
| """Configuration for document cache.""" |
|
|
| cache_dir: Path = field(default_factory=lambda: Path.home() / ".cache" / "sparknet" / "documents") |
| max_size_gb: float = 10.0 |
| ttl_hours: float = 168.0 |
| enabled: bool = True |
| compression: bool = True |
|
|
| def __post_init__(self): |
| self.cache_dir = Path(self.cache_dir) |
|
|
|
|
| @dataclass |
| class CacheEntry: |
| """Metadata for a cached item.""" |
|
|
| key: str |
| path: Path |
| size_bytes: int |
| created_at: float |
| last_accessed: float |
| ttl_hours: float |
| metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
| @property |
| def is_expired(self) -> bool: |
| """Check if entry has expired.""" |
| age_hours = (time.time() - self.created_at) / 3600 |
| return age_hours > self.ttl_hours |
|
|
|
|
| class DocumentCache: |
| """ |
| File-based cache for document processing results. |
| |
| Features: |
| - LRU eviction when cache exceeds max size |
| - TTL-based expiration |
| - Separate namespaces for different data types |
| - Compressed storage option |
| """ |
|
|
| NAMESPACES = ["pages", "ocr", "layout", "chunks", "metadata"] |
|
|
| def __init__(self, config: Optional[CacheConfig] = None): |
| self.config = config or CacheConfig() |
| self._index: Dict[str, CacheEntry] = {} |
| self._index_path: Optional[Path] = None |
|
|
| if self.config.enabled: |
| self._init_cache_dir() |
| self._load_index() |
|
|
| def _init_cache_dir(self) -> None: |
| """Initialize cache directory structure.""" |
| self.config.cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for namespace in self.NAMESPACES: |
| (self.config.cache_dir / namespace).mkdir(exist_ok=True) |
|
|
| self._index_path = self.config.cache_dir / "index.json" |
|
|
| def _load_index(self) -> None: |
| """Load cache index from disk.""" |
| if self._index_path and self._index_path.exists(): |
| try: |
| with open(self._index_path, "r") as f: |
| data = json.load(f) |
|
|
| for key, entry_data in data.items(): |
| entry = CacheEntry( |
| key=entry_data["key"], |
| path=Path(entry_data["path"]), |
| size_bytes=entry_data["size_bytes"], |
| created_at=entry_data["created_at"], |
| last_accessed=entry_data["last_accessed"], |
| ttl_hours=entry_data.get("ttl_hours", self.config.ttl_hours), |
| metadata=entry_data.get("metadata", {}) |
| ) |
| self._index[key] = entry |
| except Exception as e: |
| logger.warning(f"Failed to load cache index: {e}") |
| self._index = {} |
|
|
| def _save_index(self) -> None: |
| """Save cache index to disk.""" |
| if not self._index_path: |
| return |
|
|
| try: |
| data = {} |
| for key, entry in self._index.items(): |
| data[key] = { |
| "key": entry.key, |
| "path": str(entry.path), |
| "size_bytes": entry.size_bytes, |
| "created_at": entry.created_at, |
| "last_accessed": entry.last_accessed, |
| "ttl_hours": entry.ttl_hours, |
| "metadata": entry.metadata |
| } |
|
|
| with open(self._index_path, "w") as f: |
| json.dump(data, f) |
| except Exception as e: |
| logger.warning(f"Failed to save cache index: {e}") |
|
|
| def _generate_key( |
| self, |
| doc_path: Union[str, Path], |
| namespace: str, |
| *args, |
| **kwargs |
| ) -> str: |
| """Generate a unique cache key.""" |
| doc_path = Path(doc_path) |
|
|
| |
| try: |
| mtime = doc_path.stat().st_mtime |
| except OSError: |
| mtime = 0 |
|
|
| key_parts = [ |
| str(doc_path.absolute()), |
| str(mtime), |
| namespace, |
| *[str(a) for a in args], |
| *[f"{k}={v}" for k, v in sorted(kwargs.items())] |
| ] |
|
|
| key_str = "|".join(key_parts) |
| return hashlib.sha256(key_str.encode()).hexdigest() |
|
|
| def _get_cache_path(self, key: str, namespace: str, ext: str = ".pkl") -> Path: |
| """Get file path for a cache entry.""" |
| return self.config.cache_dir / namespace / f"{key}{ext}" |
|
|
| def _get_total_size(self) -> int: |
| """Get total cache size in bytes.""" |
| return sum(entry.size_bytes for entry in self._index.values()) |
|
|
| def _evict_if_needed(self, required_bytes: int = 0) -> None: |
| """Evict entries if cache exceeds max size.""" |
| max_bytes = self.config.max_size_gb * 1024 * 1024 * 1024 |
| current_size = self._get_total_size() |
|
|
| if current_size + required_bytes <= max_bytes: |
| return |
|
|
| |
| entries = sorted( |
| self._index.values(), |
| key=lambda e: e.last_accessed |
| ) |
|
|
| |
| for entry in entries: |
| if current_size + required_bytes <= max_bytes: |
| break |
|
|
| self._delete_entry(entry.key) |
| current_size -= entry.size_bytes |
|
|
| def _delete_entry(self, key: str) -> None: |
| """Delete a cache entry.""" |
| if key not in self._index: |
| return |
|
|
| entry = self._index[key] |
| try: |
| if entry.path.exists(): |
| entry.path.unlink() |
| except Exception as e: |
| logger.warning(f"Failed to delete cache file: {e}") |
|
|
| del self._index[key] |
|
|
| def _cleanup_expired(self) -> int: |
| """Remove expired entries. Returns number removed.""" |
| expired_keys = [ |
| key for key, entry in self._index.items() |
| if entry.is_expired |
| ] |
|
|
| for key in expired_keys: |
| self._delete_entry(key) |
|
|
| if expired_keys: |
| self._save_index() |
|
|
| return len(expired_keys) |
|
|
| |
|
|
| def get_page_image( |
| self, |
| doc_path: Union[str, Path], |
| page_number: int, |
| dpi: int = 200 |
| ) -> Optional[np.ndarray]: |
| """Get cached page image.""" |
| if not self.config.enabled: |
| return None |
|
|
| key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) |
|
|
| if key not in self._index: |
| return None |
|
|
| entry = self._index[key] |
| if entry.is_expired: |
| self._delete_entry(key) |
| return None |
|
|
| try: |
| |
| img = Image.open(entry.path) |
| arr = np.array(img) |
|
|
| |
| entry.last_accessed = time.time() |
| self._save_index() |
|
|
| return arr |
| except Exception as e: |
| logger.warning(f"Failed to load cached page: {e}") |
| self._delete_entry(key) |
| return None |
|
|
| def set_page_image( |
| self, |
| doc_path: Union[str, Path], |
| page_number: int, |
| image: np.ndarray, |
| dpi: int = 200 |
| ) -> bool: |
| """Cache a page image.""" |
| if not self.config.enabled: |
| return False |
|
|
| key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) |
| cache_path = self._get_cache_path(key, "pages", ".png") |
|
|
| try: |
| |
| img = Image.fromarray(image) |
|
|
| |
| estimated_size = image.nbytes // 10 |
|
|
| self._evict_if_needed(estimated_size) |
|
|
| |
| img.save(cache_path, format="PNG", optimize=self.config.compression) |
|
|
| |
| entry = CacheEntry( |
| key=key, |
| path=cache_path, |
| size_bytes=cache_path.stat().st_size, |
| created_at=time.time(), |
| last_accessed=time.time(), |
| ttl_hours=self.config.ttl_hours, |
| metadata={"page": page_number, "dpi": dpi} |
| ) |
| self._index[key] = entry |
| self._save_index() |
|
|
| return True |
| except Exception as e: |
| logger.warning(f"Failed to cache page image: {e}") |
| return False |
|
|
| def get( |
| self, |
| doc_path: Union[str, Path], |
| namespace: str, |
| *args, |
| **kwargs |
| ) -> Optional[Any]: |
| """Get a cached object.""" |
| if not self.config.enabled: |
| return None |
|
|
| key = self._generate_key(doc_path, namespace, *args, **kwargs) |
|
|
| if key not in self._index: |
| return None |
|
|
| entry = self._index[key] |
| if entry.is_expired: |
| self._delete_entry(key) |
| return None |
|
|
| try: |
| with open(entry.path, "rb") as f: |
| data = pickle.load(f) |
|
|
| entry.last_accessed = time.time() |
| self._save_index() |
|
|
| return data |
| except Exception as e: |
| logger.warning(f"Failed to load cached object: {e}") |
| self._delete_entry(key) |
| return None |
|
|
| def set( |
| self, |
| doc_path: Union[str, Path], |
| namespace: str, |
| value: Any, |
| *args, |
| **kwargs |
| ) -> bool: |
| """Cache an object.""" |
| if not self.config.enabled: |
| return False |
|
|
| key = self._generate_key(doc_path, namespace, *args, **kwargs) |
| cache_path = self._get_cache_path(key, namespace, ".pkl") |
|
|
| try: |
| |
| data = pickle.dumps(value) |
| self._evict_if_needed(len(data)) |
|
|
| |
| with open(cache_path, "wb") as f: |
| f.write(data) |
|
|
| entry = CacheEntry( |
| key=key, |
| path=cache_path, |
| size_bytes=len(data), |
| created_at=time.time(), |
| last_accessed=time.time(), |
| ttl_hours=self.config.ttl_hours |
| ) |
| self._index[key] = entry |
| self._save_index() |
|
|
| return True |
| except Exception as e: |
| logger.warning(f"Failed to cache object: {e}") |
| return False |
|
|
| def invalidate_document(self, doc_path: Union[str, Path]) -> int: |
| """Invalidate all cache entries for a document. Returns count removed.""" |
| doc_path = Path(doc_path).absolute() |
| doc_str = str(doc_path) |
|
|
| keys_to_remove = [] |
| for key, entry in self._index.items(): |
| |
| if entry.metadata.get("doc_path") == doc_str: |
| keys_to_remove.append(key) |
|
|
| for key in keys_to_remove: |
| self._delete_entry(key) |
|
|
| if keys_to_remove: |
| self._save_index() |
|
|
| return len(keys_to_remove) |
|
|
| def clear(self) -> None: |
| """Clear entire cache.""" |
| if self.config.cache_dir.exists(): |
| shutil.rmtree(self.config.cache_dir) |
|
|
| self._index = {} |
| self._init_cache_dir() |
|
|
| def get_stats(self) -> Dict[str, Any]: |
| """Get cache statistics.""" |
| total_size = self._get_total_size() |
| return { |
| "enabled": self.config.enabled, |
| "total_entries": len(self._index), |
| "total_size_bytes": total_size, |
| "total_size_mb": total_size / (1024 * 1024), |
| "max_size_gb": self.config.max_size_gb, |
| "utilization_percent": (total_size / (self.config.max_size_gb * 1024 * 1024 * 1024)) * 100, |
| "cache_dir": str(self.config.cache_dir), |
| "namespaces": { |
| ns: sum(1 for e in self._index.values() if ns in str(e.path)) |
| for ns in self.NAMESPACES |
| } |
| } |
|
|
|
|
| |
| _global_cache: Optional[DocumentCache] = None |
|
|
|
|
| def get_document_cache(config: Optional[CacheConfig] = None) -> DocumentCache: |
| """Get or create global document cache.""" |
| global _global_cache |
|
|
| if _global_cache is None or config is not None: |
| _global_cache = DocumentCache(config) |
|
|
| return _global_cache |
|
|
|
|
| def cached_page( |
| cache: Optional[DocumentCache] = None, |
| dpi: int = 200 |
| ) -> Callable: |
| """ |
| Decorator for caching page rendering results. |
| |
| Usage: |
| @cached_page(cache, dpi=200) |
| def render_page(doc_path, page_number): |
| # ... rendering logic |
| return image_array |
| """ |
| def decorator(func: Callable) -> Callable: |
| def wrapper(doc_path: Union[str, Path], page_number: int, *args, **kwargs): |
| _cache = cache or get_document_cache() |
|
|
| |
| cached = _cache.get_page_image(doc_path, page_number, dpi) |
| if cached is not None: |
| return cached |
|
|
| |
| result = func(doc_path, page_number, *args, **kwargs) |
|
|
| if result is not None: |
| _cache.set_page_image(doc_path, page_number, result, dpi) |
|
|
| return result |
|
|
| return wrapper |
| return decorator |
|
|