Files
Aniworld/src/server/services/cache_service.py

732 lines
19 KiB
Python

"""
Cache Service for AniWorld.
This module provides caching functionality with support for both
in-memory and Redis backends to improve application performance.
"""
import asyncio
import hashlib
import logging
import pickle
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class CacheBackend(ABC):
"""Abstract base class for cache backends."""
@abstractmethod
async def get(self, key: str) -> Optional[Any]:
"""
Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None if not found
"""
pass
@abstractmethod
async def set(
self, key: str, value: Any, ttl: Optional[int] = None
) -> bool:
"""
Set value in cache.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds
Returns:
True if successful
"""
pass
@abstractmethod
async def delete(self, key: str) -> bool:
"""
Delete value from cache.
Args:
key: Cache key
Returns:
True if key was deleted
"""
pass
@abstractmethod
async def exists(self, key: str) -> bool:
"""
Check if key exists in cache.
Args:
key: Cache key
Returns:
True if key exists
"""
pass
@abstractmethod
async def clear(self) -> bool:
"""
Clear all cached values.
Returns:
True if successful
"""
pass
@abstractmethod
async def get_many(self, keys: List[str]) -> Dict[str, Any]:
"""
Get multiple values from cache.
Args:
keys: List of cache keys
Returns:
Dictionary mapping keys to values
"""
pass
@abstractmethod
async def set_many(
self, items: Dict[str, Any], ttl: Optional[int] = None
) -> bool:
"""
Set multiple values in cache.
Args:
items: Dictionary of key-value pairs
ttl: Time to live in seconds
Returns:
True if successful
"""
pass
@abstractmethod
async def delete_pattern(self, pattern: str) -> int:
"""
Delete all keys matching pattern.
Args:
pattern: Pattern to match (supports wildcards)
Returns:
Number of keys deleted
"""
pass
class InMemoryCacheBackend(CacheBackend):
"""In-memory cache backend using dictionary."""
def __init__(self, max_size: int = 1000):
"""
Initialize in-memory cache.
Args:
max_size: Maximum number of items to cache
"""
self.cache: Dict[str, Dict[str, Any]] = {}
self.max_size = max_size
self._lock = asyncio.Lock()
def _is_expired(self, item: Dict[str, Any]) -> bool:
"""
Check if cache item is expired.
Args:
item: Cache item with expiry
Returns:
True if expired
"""
if item.get("expiry") is None:
return False
return datetime.utcnow() > item["expiry"]
def _evict_oldest(self) -> None:
"""Evict oldest cache item when cache is full."""
if len(self.cache) >= self.max_size:
# Remove oldest item
oldest_key = min(
self.cache.keys(),
key=lambda k: self.cache[k].get("created", datetime.utcnow()),
)
del self.cache[oldest_key]
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
async with self._lock:
if key not in self.cache:
logger.debug("Cache miss for key: %s", key)
return None
item = self.cache[key]
if self._is_expired(item):
logger.debug("Cache expired for key: %s", key)
del self.cache[key]
return None
logger.debug("Cache hit for key: %s", key)
return item["value"]
async def set(
self, key: str, value: Any, ttl: Optional[int] = None
) -> bool:
"""Set value in cache."""
async with self._lock:
self._evict_oldest()
expiry = None
if ttl:
expiry = datetime.utcnow() + timedelta(seconds=ttl)
self.cache[key] = {
"value": value,
"expiry": expiry,
"created": datetime.utcnow(),
}
logger.debug("Cached key: %s (ttl=%s)", key, ttl)
return True
async def delete(self, key: str) -> bool:
"""Delete value from cache."""
async with self._lock:
if key in self.cache:
del self.cache[key]
logger.debug("Deleted cache key: %s", key)
return True
logger.debug("Cache delete skipped; key not found: %s", key)
return False
async def exists(self, key: str) -> bool:
"""Check if key exists in cache."""
async with self._lock:
if key not in self.cache:
return False
item = self.cache[key]
if self._is_expired(item):
del self.cache[key]
return False
return True
async def clear(self) -> bool:
"""Clear all cached values."""
async with self._lock:
self.cache.clear()
logger.debug("Cleared in-memory cache")
return True
async def get_many(self, keys: List[str]) -> Dict[str, Any]:
"""Get multiple values from cache."""
result = {}
for key in keys:
value = await self.get(key)
if value is not None:
result[key] = value
return result
async def set_many(
self, items: Dict[str, Any], ttl: Optional[int] = None
) -> bool:
"""Set multiple values in cache."""
for key, value in items.items():
await self.set(key, value, ttl)
return True
async def delete_pattern(self, pattern: str) -> int:
"""Delete all keys matching pattern."""
import fnmatch
async with self._lock:
keys_to_delete = [
key for key in self.cache.keys() if fnmatch.fnmatch(key, pattern)
]
for key in keys_to_delete:
del self.cache[key]
return len(keys_to_delete)
class RedisCacheBackend(CacheBackend):
"""Redis cache backend."""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
prefix: str = "aniworld:",
):
"""
Initialize Redis cache.
Args:
redis_url: Redis connection URL
prefix: Key prefix for namespacing
"""
self.redis_url = redis_url
self.prefix = prefix
self._redis = None
async def _get_redis(self):
"""Get Redis connection."""
if self._redis is None:
try:
import aioredis
self._redis = await aioredis.create_redis_pool(self.redis_url)
logger.debug("Connected to Redis at %s", self.redis_url)
except ImportError:
logger.error(
"aioredis not installed. Install with: pip install aioredis"
)
raise
except Exception as e:
logger.error("Failed to connect to Redis: %s", e)
raise
return self._redis
def _make_key(self, key: str) -> str:
"""Add prefix to key."""
return f"{self.prefix}{key}"
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
try:
redis = await self._get_redis()
data = await redis.get(self._make_key(key))
if data is None:
return None
return pickle.loads(data)
except Exception as e:
logger.error("Redis get error: %s", e)
return None
async def set(
self, key: str, value: Any, ttl: Optional[int] = None
) -> bool:
"""Set value in cache."""
try:
redis = await self._get_redis()
data = pickle.dumps(value)
if ttl:
await redis.setex(self._make_key(key), ttl, data)
else:
await redis.set(self._make_key(key), data)
return True
except Exception as e:
logger.error("Redis set error: %s", e)
return False
async def delete(self, key: str) -> bool:
"""Delete value from cache."""
try:
redis = await self._get_redis()
result = await redis.delete(self._make_key(key))
return result > 0
except Exception as e:
logger.error("Redis delete error: %s", e)
return False
async def exists(self, key: str) -> bool:
"""Check if key exists in cache."""
try:
redis = await self._get_redis()
return await redis.exists(self._make_key(key))
except Exception as e:
logger.error("Redis exists error: %s", e)
return False
async def clear(self) -> bool:
"""Clear all cached values with prefix."""
try:
redis = await self._get_redis()
keys = await redis.keys(f"{self.prefix}*")
if keys:
await redis.delete(*keys)
return True
except Exception as e:
logger.error("Redis clear error: %s", e)
return False
async def get_many(self, keys: List[str]) -> Dict[str, Any]:
"""Get multiple values from cache."""
try:
redis = await self._get_redis()
prefixed_keys = [self._make_key(k) for k in keys]
values = await redis.mget(*prefixed_keys)
result = {}
for key, value in zip(keys, values):
if value is not None:
result[key] = pickle.loads(value)
return result
except Exception as e:
logger.error("Redis get_many error: %s", e)
return {}
async def set_many(
self, items: Dict[str, Any], ttl: Optional[int] = None
) -> bool:
"""Set multiple values in cache."""
try:
for key, value in items.items():
await self.set(key, value, ttl)
return True
except Exception as e:
logger.error("Redis set_many error: %s", e)
return False
async def delete_pattern(self, pattern: str) -> int:
"""Delete all keys matching pattern."""
try:
redis = await self._get_redis()
full_pattern = f"{self.prefix}{pattern}"
keys = await redis.keys(full_pattern)
if keys:
await redis.delete(*keys)
return len(keys)
return 0
except Exception as e:
logger.error("Redis delete_pattern error: %s", e)
return 0
async def close(self) -> None:
"""Close Redis connection."""
if self._redis:
self._redis.close()
await self._redis.wait_closed()
class CacheService:
"""Main cache service with automatic key generation and TTL management."""
def __init__(
self,
backend: Optional[CacheBackend] = None,
default_ttl: int = 3600,
key_prefix: str = "",
):
"""
Initialize cache service.
Args:
backend: Cache backend to use
default_ttl: Default time to live in seconds
key_prefix: Prefix for all cache keys
"""
self.backend = backend or InMemoryCacheBackend()
self.default_ttl = default_ttl
self.key_prefix = key_prefix
def _make_key(self, *args: Any, **kwargs: Any) -> str:
"""
Generate cache key from arguments.
Args:
*args: Positional arguments
**kwargs: Keyword arguments
Returns:
Cache key string
"""
# Create a stable key from arguments
key_parts = [str(arg) for arg in args]
key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
key_str = ":".join(key_parts)
# Hash long keys
if len(key_str) > 200:
key_hash = hashlib.md5(key_str.encode()).hexdigest()
return f"{self.key_prefix}{key_hash}"
return f"{self.key_prefix}{key_str}"
async def get(
self, key: str, default: Optional[Any] = None
) -> Optional[Any]:
"""
Get value from cache.
Args:
key: Cache key
default: Default value if not found
Returns:
Cached value or default
"""
value = await self.backend.get(key)
return value if value is not None else default
async def set(
self, key: str, value: Any, ttl: Optional[int] = None
) -> bool:
"""
Set value in cache.
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds (uses default if None)
Returns:
True if successful
"""
if ttl is None:
ttl = self.default_ttl
return await self.backend.set(key, value, ttl)
async def delete(self, key: str) -> bool:
"""
Delete value from cache.
Args:
key: Cache key
Returns:
True if deleted
"""
return await self.backend.delete(key)
async def exists(self, key: str) -> bool:
"""
Check if key exists in cache.
Args:
key: Cache key
Returns:
True if exists
"""
return await self.backend.exists(key)
async def clear(self) -> bool:
"""
Clear all cached values.
Returns:
True if successful
"""
return await self.backend.clear()
async def get_or_set(
self,
key: str,
factory,
ttl: Optional[int] = None,
) -> Any:
"""
Get value from cache or compute and cache it.
Args:
key: Cache key
factory: Callable to compute value if not cached
ttl: Time to live in seconds
Returns:
Cached or computed value
"""
value = await self.get(key)
if value is None:
# Compute value
if asyncio.iscoroutinefunction(factory):
value = await factory()
else:
value = factory()
# Cache it
await self.set(key, value, ttl)
return value
async def invalidate_pattern(self, pattern: str) -> int:
"""
Invalidate all keys matching pattern.
Args:
pattern: Pattern to match
Returns:
Number of keys invalidated
"""
return await self.backend.delete_pattern(pattern)
async def cache_anime_list(
self, anime_list: List[Dict[str, Any]], ttl: Optional[int] = None
) -> bool:
"""
Cache anime list.
Args:
anime_list: List of anime data
ttl: Time to live in seconds
Returns:
True if successful
"""
key = self._make_key("anime", "list")
return await self.set(key, anime_list, ttl)
async def get_anime_list(self) -> Optional[List[Dict[str, Any]]]:
"""
Get cached anime list.
Returns:
Cached anime list or None
"""
key = self._make_key("anime", "list")
return await self.get(key)
async def cache_anime_detail(
self, anime_id: str, data: Dict[str, Any], ttl: Optional[int] = None
) -> bool:
"""
Cache anime detail.
Args:
anime_id: Anime identifier
data: Anime data
ttl: Time to live in seconds
Returns:
True if successful
"""
key = self._make_key("anime", "detail", anime_id)
return await self.set(key, data, ttl)
async def get_anime_detail(self, anime_id: str) -> Optional[Dict[str, Any]]:
"""
Get cached anime detail.
Args:
anime_id: Anime identifier
Returns:
Cached anime data or None
"""
key = self._make_key("anime", "detail", anime_id)
return await self.get(key)
async def invalidate_anime_cache(self) -> int:
"""
Invalidate all anime-related cache.
Returns:
Number of keys invalidated
"""
return await self.invalidate_pattern(f"{self.key_prefix}anime*")
async def cache_config(
self, config: Dict[str, Any], ttl: Optional[int] = None
) -> bool:
"""
Cache configuration.
Args:
config: Configuration data
ttl: Time to live in seconds
Returns:
True if successful
"""
key = self._make_key("config")
return await self.set(key, config, ttl)
async def get_config(self) -> Optional[Dict[str, Any]]:
"""
Get cached configuration.
Returns:
Cached configuration or None
"""
key = self._make_key("config")
return await self.get(key)
async def invalidate_config_cache(self) -> bool:
"""
Invalidate configuration cache.
Returns:
True if successful
"""
key = self._make_key("config")
return await self.delete(key)
# Global cache service instance
_cache_service: Optional[CacheService] = None
def get_cache_service() -> CacheService:
"""
Get the global cache service instance.
Returns:
CacheService instance
"""
global _cache_service
if _cache_service is None:
_cache_service = CacheService()
return _cache_service
def configure_cache_service(
backend_type: str = "memory",
redis_url: str = "redis://localhost:6379",
default_ttl: int = 3600,
max_size: int = 1000,
) -> CacheService:
"""
Configure the global cache service.
Args:
backend_type: Type of backend ("memory" or "redis")
redis_url: Redis connection URL (for redis backend)
default_ttl: Default time to live in seconds
max_size: Maximum cache size (for memory backend)
Returns:
Configured CacheService instance
"""
global _cache_service
if backend_type == "redis":
backend = RedisCacheBackend(redis_url=redis_url)
else:
backend = InMemoryCacheBackend(max_size=max_size)
_cache_service = CacheService(
backend=backend, default_ttl=default_ttl, key_prefix="aniworld:"
)
return _cache_service