| """ |
| Caching utilities for SPARKNET |
| Provides LRU caching for LLM responses and embeddings |
| Following FAANG best practices for performance optimization |
| """ |
|
|
| import hashlib |
| import json |
| from typing import Any, Optional, Dict, Callable |
| from functools import wraps |
| from datetime import datetime, timedelta |
| from cachetools import TTLCache, LRUCache |
| from loguru import logger |
|
|
|
|
| class LLMResponseCache: |
| """ |
| Cache for LLM responses to reduce API calls and latency. |
| |
| Features: |
| - TTL-based expiration |
| - LRU eviction policy |
| - Content-based hashing |
| - Statistics tracking |
| |
| Example: |
| cache = LLMResponseCache(maxsize=1000, ttl=3600) |
| |
| # Check cache |
| cached = cache.get(prompt, model) |
| if cached: |
| return cached |
| |
| # Store result |
| cache.set(prompt, model, response) |
| """ |
|
|
| def __init__( |
| self, |
| maxsize: int = 1000, |
| ttl: int = 3600, |
| enabled: bool = True, |
| ): |
| """ |
| Initialize LLM response cache. |
| |
| Args: |
| maxsize: Maximum number of cached responses |
| ttl: Time-to-live in seconds |
| enabled: Whether caching is enabled |
| """ |
| self.maxsize = maxsize |
| self.ttl = ttl |
| self.enabled = enabled |
| self._cache: TTLCache = TTLCache(maxsize=maxsize, ttl=ttl) |
|
|
| |
| self._hits = 0 |
| self._misses = 0 |
|
|
| logger.info(f"Initialized LLMResponseCache (maxsize={maxsize}, ttl={ttl}s)") |
|
|
| def _hash_key(self, prompt: str, model: str, **kwargs) -> str: |
| """Generate cache key from prompt and parameters.""" |
| key_data = { |
| "prompt": prompt, |
| "model": model, |
| **kwargs, |
| } |
| key_str = json.dumps(key_data, sort_keys=True) |
| return hashlib.sha256(key_str.encode()).hexdigest() |
|
|
| def get(self, prompt: str, model: str, **kwargs) -> Optional[str]: |
| """ |
| Get cached response if available. |
| |
| Args: |
| prompt: The prompt sent to the LLM |
| model: Model identifier |
| **kwargs: Additional parameters |
| |
| Returns: |
| Cached response or None |
| """ |
| if not self.enabled: |
| return None |
|
|
| key = self._hash_key(prompt, model, **kwargs) |
| result = self._cache.get(key) |
|
|
| if result is not None: |
| self._hits += 1 |
| logger.debug(f"Cache HIT for model={model}") |
| else: |
| self._misses += 1 |
|
|
| return result |
|
|
| def set(self, prompt: str, model: str, response: str, **kwargs): |
| """ |
| Store response in cache. |
| |
| Args: |
| prompt: The prompt sent to the LLM |
| model: Model identifier |
| response: The LLM response |
| **kwargs: Additional parameters |
| """ |
| if not self.enabled: |
| return |
|
|
| key = self._hash_key(prompt, model, **kwargs) |
| self._cache[key] = response |
| logger.debug(f"Cached response for model={model}") |
|
|
| def invalidate(self, prompt: str, model: str, **kwargs): |
| """Invalidate a specific cache entry.""" |
| key = self._hash_key(prompt, model, **kwargs) |
| self._cache.pop(key, None) |
|
|
| def clear(self): |
| """Clear all cached entries.""" |
| self._cache.clear() |
| logger.info("LLM response cache cleared") |
|
|
| @property |
| def stats(self) -> Dict[str, Any]: |
| """Get cache statistics.""" |
| total = self._hits + self._misses |
| hit_rate = (self._hits / total * 100) if total > 0 else 0 |
|
|
| return { |
| "hits": self._hits, |
| "misses": self._misses, |
| "total": total, |
| "hit_rate": f"{hit_rate:.1f}%", |
| "size": len(self._cache), |
| "maxsize": self.maxsize, |
| "enabled": self.enabled, |
| } |
|
|
|
|
| class EmbeddingCache: |
| """ |
| Cache for text embeddings to avoid recomputation. |
| |
| Uses LRU policy with configurable size. |
| Embeddings are stored as lists of floats. |
| """ |
|
|
| def __init__(self, maxsize: int = 10000, enabled: bool = True): |
| """ |
| Initialize embedding cache. |
| |
| Args: |
| maxsize: Maximum number of cached embeddings |
| enabled: Whether caching is enabled |
| """ |
| self.maxsize = maxsize |
| self.enabled = enabled |
| self._cache: LRUCache = LRUCache(maxsize=maxsize) |
|
|
| self._hits = 0 |
| self._misses = 0 |
|
|
| logger.info(f"Initialized EmbeddingCache (maxsize={maxsize})") |
|
|
| def _hash_key(self, text: str, model: str) -> str: |
| """Generate cache key from text and model.""" |
| key_str = f"{model}:{text}" |
| return hashlib.sha256(key_str.encode()).hexdigest() |
|
|
| def get(self, text: str, model: str) -> Optional[list]: |
| """Get cached embedding if available.""" |
| if not self.enabled: |
| return None |
|
|
| key = self._hash_key(text, model) |
| result = self._cache.get(key) |
|
|
| if result is not None: |
| self._hits += 1 |
| else: |
| self._misses += 1 |
|
|
| return result |
|
|
| def set(self, text: str, model: str, embedding: list): |
| """Store embedding in cache.""" |
| if not self.enabled: |
| return |
|
|
| key = self._hash_key(text, model) |
| self._cache[key] = embedding |
|
|
| def get_batch(self, texts: list, model: str) -> tuple: |
| """ |
| Get cached embeddings for a batch of texts. |
| |
| Returns: |
| Tuple of (cached_results, uncached_indices) |
| """ |
| results = {} |
| uncached = [] |
|
|
| for i, text in enumerate(texts): |
| cached = self.get(text, model) |
| if cached is not None: |
| results[i] = cached |
| else: |
| uncached.append(i) |
|
|
| return results, uncached |
|
|
| def set_batch(self, texts: list, model: str, embeddings: list): |
| """Store batch of embeddings.""" |
| for text, embedding in zip(texts, embeddings): |
| self.set(text, model, embedding) |
|
|
| @property |
| def stats(self) -> Dict[str, Any]: |
| """Get cache statistics.""" |
| total = self._hits + self._misses |
| hit_rate = (self._hits / total * 100) if total > 0 else 0 |
|
|
| return { |
| "hits": self._hits, |
| "misses": self._misses, |
| "hit_rate": f"{hit_rate:.1f}%", |
| "size": len(self._cache), |
| "maxsize": self.maxsize, |
| } |
|
|
|
|
| def cached_llm_call(cache: LLMResponseCache): |
| """ |
| Decorator for caching LLM function calls. |
| |
| Example: |
| @cached_llm_call(llm_cache) |
| async def generate_response(prompt: str, model: str) -> str: |
| ... |
| """ |
|
|
| def decorator(func: Callable) -> Callable: |
| @wraps(func) |
| async def async_wrapper(prompt: str, model: str, **kwargs): |
| |
| cached = cache.get(prompt, model, **kwargs) |
| if cached is not None: |
| return cached |
|
|
| |
| result = await func(prompt, model, **kwargs) |
|
|
| |
| cache.set(prompt, model, result, **kwargs) |
| return result |
|
|
| @wraps(func) |
| def sync_wrapper(prompt: str, model: str, **kwargs): |
| |
| cached = cache.get(prompt, model, **kwargs) |
| if cached is not None: |
| return cached |
|
|
| |
| result = func(prompt, model, **kwargs) |
|
|
| |
| cache.set(prompt, model, result, **kwargs) |
| return result |
|
|
| import asyncio |
| if asyncio.iscoroutinefunction(func): |
| return async_wrapper |
| return sync_wrapper |
|
|
| return decorator |
|
|
|
|
| |
| _llm_cache: Optional[LLMResponseCache] = None |
| _embedding_cache: Optional[EmbeddingCache] = None |
|
|
|
|
| def get_llm_cache() -> LLMResponseCache: |
| """Get or create the global LLM response cache.""" |
| global _llm_cache |
| if _llm_cache is None: |
| _llm_cache = LLMResponseCache() |
| return _llm_cache |
|
|
|
|
| def get_embedding_cache() -> EmbeddingCache: |
| """Get or create the global embedding cache.""" |
| global _embedding_cache |
| if _embedding_cache is None: |
| _embedding_cache = EmbeddingCache() |
| return _embedding_cache |
|
|