Spaces:
Running
on
Zero
Running
on
Zero
| """Cache decorators for easy integration with existing functions. | |
| This module provides decorators to automatically cache function results | |
| with configurable TTL strategies and cache key generation. | |
| """ | |
| import functools | |
| import hashlib | |
| import inspect | |
| import json | |
| import logging | |
| from typing import Any, Callable, Optional, TypeVar | |
| from backend.caching.factory import get_cache | |
| from backend.caching.redis_cache import CacheDataType, TTLStrategy | |
| logger = logging.getLogger(__name__) | |
| T = TypeVar("T") | |
| def cached( | |
| namespace: str = "default", | |
| data_type: CacheDataType = CacheDataType.ANALYSIS_RESULTS, | |
| ttl: Optional[int] = None, | |
| key_prefix: Optional[str] = None, | |
| include_self: bool = False, | |
| ) -> Callable[[Callable[..., T]], Callable[..., T]]: | |
| """Decorator to cache function results. | |
| Args: | |
| namespace: Cache namespace for key generation. | |
| data_type: Type of data being cached (for TTL strategy). | |
| ttl: Custom TTL override (seconds). | |
| key_prefix: Optional prefix for cache key. | |
| include_self: Include 'self' in cache key for instance methods. | |
| Returns: | |
| Decorated function with caching. | |
| Example: | |
| >>> @cached(namespace="market", data_type=CacheDataType.MARKET_DATA) | |
| >>> def get_stock_price(ticker: str) -> float: | |
| ... # Expensive API call | |
| ... return fetch_price_from_api(ticker) | |
| >>> | |
| >>> # First call - fetches from API and caches | |
| >>> price1 = get_stock_price("AAPL") | |
| >>> | |
| >>> # Second call - returns cached value | |
| >>> price2 = get_stock_price("AAPL") | |
| """ | |
| def decorator(func: Callable[..., T]) -> Callable[..., T]: | |
| def wrapper(*args, **kwargs) -> T: | |
| cache = get_cache() | |
| # Generate cache key from function arguments | |
| cache_key = _generate_cache_key( | |
| namespace, | |
| func.__name__, | |
| args, | |
| kwargs, | |
| key_prefix=key_prefix, | |
| include_self=include_self, | |
| ) | |
| # Try to get from cache | |
| cached_value = cache.get(cache_key) | |
| if cached_value is not None: | |
| logger.debug(f"Cache HIT for {cache_key}") | |
| return cached_value | |
| logger.debug(f"Cache MISS for {cache_key}") | |
| # Call function and cache result | |
| result = func(*args, **kwargs) | |
| # Get TTL for this data type | |
| cache_ttl = ttl if ttl is not None else TTLStrategy.get_ttl(data_type) | |
| # Cache the result | |
| cache.set(cache_key, result, ttl=cache_ttl) | |
| return result | |
| # Add cache control methods to the wrapper | |
| wrapper.cache_key = lambda *args, **kwargs: _generate_cache_key( | |
| namespace, | |
| func.__name__, | |
| args, | |
| kwargs, | |
| key_prefix=key_prefix, | |
| include_self=include_self, | |
| ) | |
| wrapper.invalidate = lambda *args, **kwargs: get_cache().delete( | |
| wrapper.cache_key(*args, **kwargs) | |
| ) | |
| return wrapper | |
| return decorator | |
| def cached_async( | |
| namespace: str = "default", | |
| data_type: CacheDataType = CacheDataType.ANALYSIS_RESULTS, | |
| ttl: Optional[int] = None, | |
| key_prefix: Optional[str] = None, | |
| include_self: bool = False, | |
| ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: | |
| """Decorator to cache async function results. | |
| Args: | |
| namespace: Cache namespace for key generation. | |
| data_type: Type of data being cached (for TTL strategy). | |
| ttl: Custom TTL override (seconds). | |
| key_prefix: Optional prefix for cache key. | |
| include_self: Include 'self' in cache key for instance methods. | |
| Returns: | |
| Decorated async function with caching. | |
| Example: | |
| >>> @cached_async(namespace="portfolio", data_type=CacheDataType.PORTFOLIO_METRICS) | |
| >>> async def analyse_portfolio(portfolio_id: str) -> dict: | |
| ... # Expensive async analysis | |
| ... return await perform_analysis(portfolio_id) | |
| >>> | |
| >>> # First call - performs analysis and caches | |
| >>> result1 = await analyse_portfolio("portfolio-123") | |
| >>> | |
| >>> # Second call - returns cached value | |
| >>> result2 = await analyse_portfolio("portfolio-123") | |
| """ | |
| def decorator(func: Callable[..., Any]) -> Callable[..., Any]: | |
| async def wrapper(*args, **kwargs) -> Any: | |
| cache = get_cache() | |
| # Generate cache key from function arguments | |
| cache_key = _generate_cache_key( | |
| namespace, | |
| func.__name__, | |
| args, | |
| kwargs, | |
| key_prefix=key_prefix, | |
| include_self=include_self, | |
| ) | |
| # Try to get from cache | |
| cached_value = cache.get(cache_key) | |
| if cached_value is not None: | |
| logger.debug(f"Cache HIT for {cache_key}") | |
| return cached_value | |
| logger.debug(f"Cache MISS for {cache_key}") | |
| # Call async function and cache result | |
| result = await func(*args, **kwargs) | |
| # Get TTL for this data type | |
| cache_ttl = ttl if ttl is not None else TTLStrategy.get_ttl(data_type) | |
| # Cache the result | |
| cache.set(cache_key, result, ttl=cache_ttl) | |
| return result | |
| # Add cache control methods to the wrapper | |
| wrapper.cache_key = lambda *args, **kwargs: _generate_cache_key( | |
| namespace, | |
| func.__name__, | |
| args, | |
| kwargs, | |
| key_prefix=key_prefix, | |
| include_self=include_self, | |
| ) | |
| wrapper.invalidate = lambda *args, **kwargs: get_cache().delete( | |
| wrapper.cache_key(*args, **kwargs) | |
| ) | |
| return wrapper | |
| return decorator | |
| def cache_invalidate(namespace: str, pattern: str = "*") -> int: | |
| """Invalidate cache entries matching a pattern. | |
| Args: | |
| namespace: Cache namespace. | |
| pattern: Pattern to match (default: all in namespace). | |
| Returns: | |
| Number of entries invalidated. | |
| Example: | |
| >>> # Invalidate all portfolio cache entries | |
| >>> cache_invalidate("portfolio", "portfolio-123:*") | |
| >>> | |
| >>> # Invalidate all market data | |
| >>> cache_invalidate("market") | |
| """ | |
| cache = get_cache() | |
| full_pattern = f"{namespace}:{pattern}" | |
| deleted = cache.delete_pattern(full_pattern) | |
| logger.info(f"Invalidated {deleted} cache entries matching {full_pattern}") | |
| return deleted | |
| def _generate_cache_key( | |
| namespace: str, | |
| func_name: str, | |
| args: tuple, | |
| kwargs: dict, | |
| key_prefix: Optional[str] = None, | |
| include_self: bool = False, | |
| ) -> str: | |
| """Generate deterministic cache key from function arguments. | |
| Args: | |
| namespace: Cache namespace. | |
| func_name: Function name. | |
| args: Positional arguments. | |
| kwargs: Keyword arguments. | |
| key_prefix: Optional prefix. | |
| include_self: Include 'self' parameter for instance methods. | |
| Returns: | |
| Generated cache key. | |
| """ | |
| # Filter out 'self' from args unless explicitly included | |
| filtered_args = args | |
| if not include_self and args and hasattr(args[0], "__dict__"): | |
| # Likely an instance method - skip first arg (self) | |
| filtered_args = args[1:] | |
| # Create a deterministic representation of arguments | |
| key_data = { | |
| "args": [_serialise_arg(arg) for arg in filtered_args], | |
| "kwargs": {k: _serialise_arg(v) for k, v in sorted(kwargs.items())}, | |
| } | |
| # Generate hash of arguments | |
| key_json = json.dumps(key_data, sort_keys=True) | |
| args_hash = hashlib.md5(key_json.encode()).hexdigest()[:16] | |
| # Build cache key | |
| parts = [namespace, func_name, args_hash] | |
| if key_prefix: | |
| parts.insert(0, key_prefix) | |
| return ":".join(parts) | |
| def _serialise_arg(arg: Any) -> Any: | |
| """Serialise argument for cache key generation. | |
| Args: | |
| arg: Argument to serialise. | |
| Returns: | |
| Serialisable representation. | |
| """ | |
| # Handle common types | |
| if isinstance(arg, (str, int, float, bool, type(None))): | |
| return arg | |
| # Handle lists and tuples | |
| if isinstance(arg, (list, tuple)): | |
| return [_serialise_arg(item) for item in arg] | |
| # Handle dictionaries | |
| if isinstance(arg, dict): | |
| return {k: _serialise_arg(v) for k, v in sorted(arg.items())} | |
| # Handle Pydantic models | |
| if hasattr(arg, "model_dump"): | |
| return arg.model_dump() | |
| # Handle objects with __dict__ | |
| if hasattr(arg, "__dict__"): | |
| return str(arg.__dict__) | |
| # Fallback to string representation | |
| return str(arg) | |