801 lines
28 KiB
Python
801 lines
28 KiB
Python
"""Unit tests for Cache Service.
|
|
|
|
Tests cover:
|
|
- In-memory cache operations (get, set, delete, exists)
|
|
- TTL expiration and eviction
|
|
- Cache key generation
|
|
- Pattern matching and deletion
|
|
- Get-or-set logic
|
|
- Multiple values operations
|
|
- Error handling
|
|
"""
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from src.server.services.cache_service import (
|
|
CacheService,
|
|
InMemoryCacheBackend,
|
|
RedisCacheBackend,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def in_memory_cache():
|
|
"""Create in-memory cache backend."""
|
|
return InMemoryCacheBackend(max_size=100)
|
|
|
|
|
|
@pytest.fixture
|
|
def cache_service(in_memory_cache):
|
|
"""Create cache service with in-memory backend."""
|
|
return CacheService(backend=in_memory_cache, default_ttl=300)
|
|
|
|
|
|
class TestInMemoryCacheBackend:
|
|
"""Tests for in-memory cache backend."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_set_basic(self, in_memory_cache):
|
|
"""Test basic get/set operations."""
|
|
await in_memory_cache.set("key1", "value1")
|
|
result = await in_memory_cache.get("key1")
|
|
assert result == "value1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_nonexistent_key(self, in_memory_cache):
|
|
"""Test get returns None for nonexistent keys."""
|
|
result = await in_memory_cache.get("nonexistent")
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_key(self, in_memory_cache):
|
|
"""Test deleting a key."""
|
|
await in_memory_cache.set("key1", "value1")
|
|
result = await in_memory_cache.delete("key1")
|
|
assert result is True
|
|
assert await in_memory_cache.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_nonexistent_key(self, in_memory_cache):
|
|
"""Test deleting nonexistent key returns False."""
|
|
result = await in_memory_cache.delete("nonexistent")
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exists_key(self, in_memory_cache):
|
|
"""Test checking key existence."""
|
|
await in_memory_cache.set("key1", "value1")
|
|
assert await in_memory_cache.exists("key1") is True
|
|
assert await in_memory_cache.exists("nonexistent") is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clear_cache(self, in_memory_cache):
|
|
"""Test clearing entire cache."""
|
|
await in_memory_cache.set("key1", "value1")
|
|
await in_memory_cache.set("key2", "value2")
|
|
result = await in_memory_cache.clear()
|
|
assert result is True
|
|
assert await in_memory_cache.get("key1") is None
|
|
assert await in_memory_cache.get("key2") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_many(self, in_memory_cache):
|
|
"""Test getting multiple keys."""
|
|
await in_memory_cache.set("key1", "value1")
|
|
await in_memory_cache.set("key2", "value2")
|
|
|
|
result = await in_memory_cache.get_many(["key1", "key2", "key3"])
|
|
assert result == {"key1": "value1", "key2": "value2"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_many(self, in_memory_cache):
|
|
"""Test setting multiple keys."""
|
|
items = {"key1": "value1", "key2": "value2"}
|
|
result = await in_memory_cache.set_many(items)
|
|
assert result is True
|
|
|
|
assert await in_memory_cache.get("key1") == "value1"
|
|
assert await in_memory_cache.get("key2") == "value2"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ttl_expiration(self, in_memory_cache):
|
|
"""Test TTL expiration."""
|
|
await in_memory_cache.set("key1", "value1", ttl=1)
|
|
assert await in_memory_cache.get("key1") == "value1"
|
|
|
|
# Wait for expiration
|
|
await asyncio.sleep(1.1)
|
|
assert await in_memory_cache.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ttl_none(self, in_memory_cache):
|
|
"""Test that None TTL means no expiration."""
|
|
await in_memory_cache.set("key1", "value1", ttl=None)
|
|
await asyncio.sleep(0.1)
|
|
assert await in_memory_cache.get("key1") == "value1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_pattern(self, in_memory_cache):
|
|
"""Test pattern-based deletion."""
|
|
await in_memory_cache.set("user:1", "data1")
|
|
await in_memory_cache.set("user:2", "data2")
|
|
await in_memory_cache.set("post:1", "data3")
|
|
|
|
result = await in_memory_cache.delete_pattern("user:*")
|
|
assert result == 2
|
|
|
|
assert await in_memory_cache.get("user:1") is None
|
|
assert await in_memory_cache.get("post:1") == "data3"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_max_size_eviction(self, in_memory_cache):
|
|
"""Test that oldest item is evicted when cache is full."""
|
|
# Set max_size to 2
|
|
cache = InMemoryCacheBackend(max_size=2)
|
|
|
|
await cache.set("key1", "value1")
|
|
await asyncio.sleep(0.01)
|
|
await cache.set("key2", "value2")
|
|
await asyncio.sleep(0.01)
|
|
await cache.set("key3", "value3") # Should evict key1
|
|
|
|
# key1 should be evicted (oldest)
|
|
assert await cache.get("key1") is None
|
|
assert await cache.get("key2") == "value2"
|
|
assert await cache.get("key3") == "value3"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_operations(self, in_memory_cache):
|
|
"""Test concurrent cache operations."""
|
|
async def set_key(key, value):
|
|
await in_memory_cache.set(key, value)
|
|
|
|
tasks = [set_key(f"key{i}", f"value{i}") for i in range(10)]
|
|
await asyncio.gather(*tasks)
|
|
|
|
# All values should be set
|
|
for i in range(10):
|
|
assert await in_memory_cache.get(f"key{i}") == f"value{i}"
|
|
|
|
|
|
class TestCacheService:
|
|
"""Tests for cache service."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_set(self, cache_service):
|
|
"""Test basic get/set operations."""
|
|
await cache_service.set("key1", "value1")
|
|
result = await cache_service.get("key1")
|
|
assert result == "value1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_with_default(self, cache_service):
|
|
"""Test get with default value."""
|
|
result = await cache_service.get("nonexistent", "default")
|
|
assert result == "default"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exists(self, cache_service):
|
|
"""Test exists operation."""
|
|
await cache_service.set("key1", "value1")
|
|
assert await cache_service.exists("key1") is True
|
|
assert await cache_service.exists("nonexistent") is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete(self, cache_service):
|
|
"""Test delete operation."""
|
|
await cache_service.set("key1", "value1")
|
|
result = await cache_service.delete("key1")
|
|
assert result is True
|
|
assert await cache_service.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_clear(self, cache_service):
|
|
"""Test clear operation."""
|
|
await cache_service.set("key1", "value1")
|
|
await cache_service.set("key2", "value2")
|
|
result = await cache_service.clear()
|
|
assert result is True
|
|
assert await cache_service.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_make_key_simple(self, cache_service):
|
|
"""Test key generation from arguments."""
|
|
key = cache_service._make_key("user", 123, type="profile")
|
|
assert "user" in key
|
|
assert "123" in key
|
|
assert "type=profile" in key
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_make_key_long_hashing(self, cache_service):
|
|
"""Test long keys are hashed."""
|
|
long_arg = "x" * 300
|
|
key = cache_service._make_key(long_arg)
|
|
# Should be hashed, so much shorter
|
|
assert len(key) < len(long_arg)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_or_set_exists(self, cache_service):
|
|
"""Test get_or_set returns cached value if exists."""
|
|
await cache_service.set("key1", "cached")
|
|
factory = MagicMock(return_value="computed")
|
|
|
|
result = await cache_service.get_or_set("key1", factory)
|
|
assert result == "cached"
|
|
factory.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_or_set_compute_sync(self, cache_service):
|
|
"""Test get_or_set computes and caches value."""
|
|
factory = MagicMock(return_value="computed")
|
|
|
|
result = await cache_service.get_or_set("key1", factory)
|
|
assert result == "computed"
|
|
factory.assert_called_once()
|
|
|
|
# Should be cached
|
|
cached = await cache_service.get("key1")
|
|
assert cached == "computed"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_or_set_compute_async(self, cache_service):
|
|
"""Test get_or_set with async factory."""
|
|
async def async_factory():
|
|
return "async_computed"
|
|
|
|
result = await cache_service.get_or_set("key1", async_factory)
|
|
assert result == "async_computed"
|
|
|
|
cached = await cache_service.get("key1")
|
|
assert cached == "async_computed"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalidate_pattern(self, cache_service):
|
|
"""Test pattern-based invalidation."""
|
|
await cache_service.set("user:1", "data1")
|
|
await cache_service.set("user:2", "data2")
|
|
await cache_service.set("post:1", "data3")
|
|
|
|
result = await cache_service.invalidate_pattern("user:*")
|
|
assert result == 2
|
|
|
|
assert await cache_service.get("user:1") is None
|
|
assert await cache_service.get("post:1") == "data3"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_anime_list(self, cache_service):
|
|
"""Test caching anime list."""
|
|
anime_list = [
|
|
{"id": 1, "name": "Anime 1"},
|
|
{"id": 2, "name": "Anime 2"},
|
|
]
|
|
|
|
result = await cache_service.cache_anime_list(anime_list)
|
|
assert result is True
|
|
|
|
cached = await cache_service.get_anime_list()
|
|
assert cached == anime_list
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_anime_list_empty(self, cache_service):
|
|
"""Test getting anime list when not cached."""
|
|
result = await cache_service.get_anime_list()
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ttl_default(self, cache_service):
|
|
"""Test default TTL is applied."""
|
|
service = CacheService(default_ttl=1)
|
|
await service.set("key1", "value1") # No TTL specified
|
|
|
|
assert await service.get("key1") == "value1"
|
|
await asyncio.sleep(1.1)
|
|
assert await service.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ttl_override(self, cache_service):
|
|
"""Test TTL can be overridden."""
|
|
service = CacheService(default_ttl=10)
|
|
await service.set("key1", "value1", ttl=1) # Override
|
|
|
|
await asyncio.sleep(1.1)
|
|
assert await service.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_key_prefix(self, cache_service):
|
|
"""Test key prefix is applied."""
|
|
service = CacheService(key_prefix="app:")
|
|
key = service._make_key("test")
|
|
assert key.startswith("app:")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_complex_value_types(self, cache_service):
|
|
"""Test caching complex value types."""
|
|
values = {
|
|
"dict": {"a": 1, "b": 2},
|
|
"list": [1, 2, 3],
|
|
"tuple": (1, 2, 3),
|
|
"set": {1, 2, 3},
|
|
}
|
|
|
|
for key, value in values.items():
|
|
await cache_service.set(key, value)
|
|
cached = await cache_service.get(key)
|
|
if isinstance(value, set):
|
|
assert set(cached) == value
|
|
else:
|
|
assert cached == value
|
|
|
|
|
|
class TestRedisCacheBackendMock:
|
|
"""Tests for Redis cache backend (with mocks)."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_prefix(self):
|
|
"""Test Redis key prefixing."""
|
|
backend = RedisCacheBackend(prefix="test:")
|
|
key = backend._make_key("mykey")
|
|
assert key == "test:mykey"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_get_error_handling(self):
|
|
"""Test Redis backend error handling on get."""
|
|
backend = RedisCacheBackend()
|
|
|
|
# Mock _get_redis to raise error
|
|
with patch.object(backend, "_get_redis", side_effect=Exception("Connection failed")):
|
|
result = await backend.get("key1")
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_delete_error_handling(self):
|
|
"""Test Redis backend error handling on delete."""
|
|
backend = RedisCacheBackend()
|
|
|
|
# Mock _get_redis to raise error
|
|
with patch.object(backend, "_get_redis", side_effect=Exception("Connection failed")):
|
|
result = await backend.delete("key1")
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_set_error_handling(self):
|
|
"""Test Redis backend error handling on set."""
|
|
backend = RedisCacheBackend()
|
|
|
|
with patch.object(backend, "_get_redis", side_effect=Exception("Connection failed")):
|
|
result = await backend.set("key1", "value1")
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_exists_error_handling(self):
|
|
"""Test Redis backend error handling on exists."""
|
|
backend = RedisCacheBackend()
|
|
|
|
with patch.object(backend, "_get_redis", side_effect=Exception("Connection failed")):
|
|
result = await backend.exists("key1")
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_clear_error_handling(self):
|
|
"""Test Redis backend error handling on clear."""
|
|
backend = RedisCacheBackend()
|
|
|
|
with patch.object(backend, "_get_redis", side_effect=Exception("Connection failed")):
|
|
result = await backend.clear()
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_get_many_error_handling(self):
|
|
"""Test Redis backend error handling on get_many."""
|
|
backend = RedisCacheBackend()
|
|
|
|
with patch.object(backend, "_get_redis", side_effect=Exception("Connection failed")):
|
|
result = await backend.get_many(["key1", "key2"])
|
|
assert result == {}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_set_many_error_handling(self):
|
|
"""Test Redis backend error handling on set_many."""
|
|
backend = RedisCacheBackend()
|
|
|
|
# set_many calls set individually, so first call will fail
|
|
with patch.object(backend, "set", return_value=False):
|
|
result = await backend.set_many({"key1": "value1"})
|
|
# It still returns True as it iterates, but set() fails
|
|
assert result is True # set_many doesn't check individual results
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_delete_pattern_error_handling(self):
|
|
"""Test Redis backend error handling on delete_pattern."""
|
|
backend = RedisCacheBackend()
|
|
|
|
with patch.object(backend, "_get_redis", side_effect=Exception("Connection failed")):
|
|
result = await backend.delete_pattern("key*")
|
|
assert result == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_close(self):
|
|
"""Test Redis connection close."""
|
|
backend = RedisCacheBackend()
|
|
|
|
# Create mock Redis connection
|
|
mock_redis = AsyncMock()
|
|
mock_redis.close = MagicMock()
|
|
mock_redis.wait_closed = AsyncMock()
|
|
backend._redis = mock_redis
|
|
|
|
await backend.close()
|
|
|
|
mock_redis.close.assert_called_once()
|
|
mock_redis.wait_closed.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_redis_close_no_connection(self):
|
|
"""Test Redis close when no connection exists."""
|
|
backend = RedisCacheBackend()
|
|
backend._redis = None
|
|
|
|
# Should not raise error
|
|
await backend.close()
|
|
|
|
|
|
class TestCacheEdgeCases:
|
|
"""Tests for edge cases and error scenarios."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_none_value(self, cache_service):
|
|
"""Test that None values are treated as cache miss."""
|
|
await cache_service.set("key1", None)
|
|
# None values return default since they're treated as cache miss
|
|
result = await cache_service.get("key1", "default")
|
|
assert result == "default"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_zero_and_false(self, cache_service):
|
|
"""Test caching falsy values."""
|
|
await cache_service.set("zero", 0)
|
|
await cache_service.set("false", False)
|
|
await cache_service.set("empty", "")
|
|
|
|
assert await cache_service.get("zero") == 0
|
|
assert await cache_service.get("false") is False
|
|
assert await cache_service.get("empty") == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_overwrite(self, cache_service):
|
|
"""Test overwriting cached values."""
|
|
await cache_service.set("key1", "value1")
|
|
await cache_service.set("key1", "value2")
|
|
|
|
result = await cache_service.get("key1")
|
|
assert result == "value2"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_thread_safety(self, in_memory_cache):
|
|
"""Test concurrent access to cache is thread-safe."""
|
|
async def worker(worker_id):
|
|
for i in range(10):
|
|
key = f"worker:{worker_id}:key{i}"
|
|
await in_memory_cache.set(key, i)
|
|
result = await in_memory_cache.get(key)
|
|
assert result == i
|
|
|
|
tasks = [worker(i) for i in range(5)]
|
|
await asyncio.gather(*tasks)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_many_partial(self, in_memory_cache):
|
|
"""Test get_many with partial keys found."""
|
|
await in_memory_cache.set("key1", "value1")
|
|
await in_memory_cache.set("key3", "value3")
|
|
|
|
result = await in_memory_cache.get_many(["key1", "key2", "key3"])
|
|
assert len(result) == 2
|
|
assert result["key1"] == "value1"
|
|
assert result["key3"] == "value3"
|
|
assert "key2" not in result
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_expired_on_exists_check(self, in_memory_cache):
|
|
"""Test that expired items are removed on exists check."""
|
|
await in_memory_cache.set("key1", "value1", ttl=1)
|
|
|
|
# Key exists initially
|
|
assert await in_memory_cache.exists("key1") is True
|
|
|
|
# Wait for expiration
|
|
await asyncio.sleep(1.1)
|
|
|
|
# Now should not exist
|
|
assert await in_memory_cache.exists("key1") is False
|
|
|
|
# And should be removed from cache dict
|
|
assert "key1" not in in_memory_cache.cache
|
|
|
|
|
|
class TestCacheBackendAbstraction:
|
|
"""Tests for cache backend abstraction."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_backend_switching(self):
|
|
"""Test switching between different backends."""
|
|
# Start with in-memory
|
|
backend1 = InMemoryCacheBackend()
|
|
service1 = CacheService(backend=backend1)
|
|
|
|
await service1.set("key1", "value1")
|
|
assert await service1.get("key1") == "value1"
|
|
|
|
# Switch to different in-memory instance
|
|
backend2 = InMemoryCacheBackend()
|
|
service2 = CacheService(backend=backend2)
|
|
|
|
# New instance has fresh cache
|
|
assert await service2.get("key1") is None
|
|
|
|
# Original still has data
|
|
assert await service1.get("key1") == "value1"
|
|
|
|
|
|
class TestAnimeSpecificCaching:
|
|
"""Tests for anime-specific caching methods."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_anime_detail(self, cache_service):
|
|
"""Test caching anime detail."""
|
|
anime_data = {"id": "123", "name": "Test Anime", "episodes": 12}
|
|
|
|
result = await cache_service.cache_anime_detail("123", anime_data)
|
|
assert result is True
|
|
|
|
cached = await cache_service.get_anime_detail("123")
|
|
assert cached == anime_data
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_anime_detail_not_cached(self, cache_service):
|
|
"""Test getting uncached anime detail."""
|
|
result = await cache_service.get_anime_detail("nonexistent")
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalidate_anime_cache(self, cache_service):
|
|
"""Test invalidating all anime cache."""
|
|
await cache_service.cache_anime_list([{"id": 1}])
|
|
await cache_service.cache_anime_detail("123", {"name": "Test"})
|
|
|
|
# Both should be cached
|
|
assert await cache_service.get_anime_list() is not None
|
|
assert await cache_service.get_anime_detail("123") is not None
|
|
|
|
# Invalidate all anime cache
|
|
count = await cache_service.invalidate_anime_cache()
|
|
assert count >= 2
|
|
|
|
# Both should be gone
|
|
assert await cache_service.get_anime_list() is None
|
|
assert await cache_service.get_anime_detail("123") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_config(self, cache_service):
|
|
"""Test caching configuration."""
|
|
config = {"setting1": "value1", "setting2": "value2"}
|
|
|
|
result = await cache_service.cache_config(config)
|
|
assert result is True
|
|
|
|
cached = await cache_service.get_config()
|
|
assert cached == config
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_config_not_cached(self, cache_service):
|
|
"""Test getting uncached config."""
|
|
result = await cache_service.get_config()
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalidate_config_cache(self, cache_service):
|
|
"""Test invalidating config cache."""
|
|
config = {"setting": "value"}
|
|
await cache_service.cache_config(config)
|
|
|
|
assert await cache_service.get_config() is not None
|
|
|
|
result = await cache_service.invalidate_config_cache()
|
|
assert result is True
|
|
|
|
assert await cache_service.get_config() is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_with_custom_ttl(self, cache_service):
|
|
"""Test anime caching with custom TTL."""
|
|
anime_list = [{"id": 1}, {"id": 2}]
|
|
|
|
result = await cache_service.cache_anime_list(anime_list, ttl=1)
|
|
assert result is True
|
|
|
|
# Should be cached
|
|
assert await cache_service.get_anime_list() == anime_list
|
|
|
|
# Wait for expiration
|
|
await asyncio.sleep(1.1)
|
|
|
|
# Should be expired
|
|
assert await cache_service.get_anime_list() is None
|
|
|
|
|
|
class TestGlobalCacheService:
|
|
"""Tests for global cache service functions."""
|
|
|
|
def test_get_cache_service_singleton(self):
|
|
"""Test global cache service is singleton."""
|
|
# Reset global instance
|
|
import src.server.services.cache_service as cache_module
|
|
from src.server.services.cache_service import _cache_service, get_cache_service
|
|
cache_module._cache_service = None
|
|
|
|
service1 = get_cache_service()
|
|
service2 = get_cache_service()
|
|
|
|
assert service1 is service2
|
|
|
|
def test_configure_cache_service_memory(self):
|
|
"""Test configuring cache service with memory backend."""
|
|
from src.server.services.cache_service import configure_cache_service
|
|
|
|
service = configure_cache_service(
|
|
backend_type="memory",
|
|
default_ttl=600,
|
|
max_size=500,
|
|
)
|
|
|
|
assert service is not None
|
|
assert isinstance(service.backend, InMemoryCacheBackend)
|
|
assert service.default_ttl == 600
|
|
assert service.key_prefix == "aniworld:"
|
|
|
|
def test_configure_cache_service_redis(self):
|
|
"""Test configuring cache service with Redis backend."""
|
|
from src.server.services.cache_service import configure_cache_service
|
|
|
|
service = configure_cache_service(
|
|
backend_type="redis",
|
|
redis_url="redis://localhost:6379",
|
|
default_ttl=1200,
|
|
)
|
|
|
|
assert service is not None
|
|
assert isinstance(service.backend, RedisCacheBackend)
|
|
assert service.default_ttl == 1200
|
|
|
|
|
|
class TestSetManyWithTTL:
|
|
"""Tests for set_many with TTL."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_many_with_ttl(self, in_memory_cache):
|
|
"""Test set_many with TTL parameter."""
|
|
items = {"key1": "value1", "key2": "value2"}
|
|
|
|
result = await in_memory_cache.set_many(items, ttl=1)
|
|
assert result is True
|
|
|
|
# Both should be cached
|
|
assert await in_memory_cache.get("key1") == "value1"
|
|
assert await in_memory_cache.get("key2") == "value2"
|
|
|
|
# Wait for expiration
|
|
await asyncio.sleep(1.1)
|
|
|
|
# Both should be expired
|
|
assert await in_memory_cache.get("key1") is None
|
|
assert await in_memory_cache.get("key2") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_many_no_ttl(self, in_memory_cache):
|
|
"""Test set_many without TTL."""
|
|
items = {"key1": "value1", "key2": "value2"}
|
|
|
|
result = await in_memory_cache.set_many(items)
|
|
assert result is True
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
# Should still be cached
|
|
assert await in_memory_cache.get("key1") == "value1"
|
|
assert await in_memory_cache.get("key2") == "value2"
|
|
|
|
|
|
class TestDeletePatternEdgeCases:
|
|
"""Tests for delete_pattern edge cases."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_pattern_no_matches(self, in_memory_cache):
|
|
"""Test delete_pattern with no matching keys."""
|
|
await in_memory_cache.set("key1", "value1")
|
|
|
|
result = await in_memory_cache.delete_pattern("nomatch:*")
|
|
assert result == 0
|
|
|
|
# Original key should still exist
|
|
assert await in_memory_cache.get("key1") == "value1"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_pattern_complex(self, in_memory_cache):
|
|
"""Test delete_pattern with complex patterns."""
|
|
await in_memory_cache.set("user:1:profile", "data1")
|
|
await in_memory_cache.set("user:1:settings", "data2")
|
|
await in_memory_cache.set("user:2:profile", "data3")
|
|
await in_memory_cache.set("post:1", "data4")
|
|
|
|
# Delete all user:1 keys
|
|
result = await in_memory_cache.delete_pattern("user:1:*")
|
|
assert result == 2
|
|
|
|
# user:1 keys should be gone
|
|
assert await in_memory_cache.get("user:1:profile") is None
|
|
assert await in_memory_cache.get("user:1:settings") is None
|
|
|
|
# Others should remain
|
|
assert await in_memory_cache.get("user:2:profile") == "data3"
|
|
assert await in_memory_cache.get("post:1") == "data4"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_pattern_exact_match(self, in_memory_cache):
|
|
"""Test delete_pattern with exact key (no wildcards)."""
|
|
await in_memory_cache.set("exact_key", "value")
|
|
|
|
result = await in_memory_cache.delete_pattern("exact_key")
|
|
assert result == 1
|
|
|
|
assert await in_memory_cache.get("exact_key") is None
|
|
|
|
|
|
class TestCacheServiceWithDifferentTTL:
|
|
"""Tests for CacheService with different default TTL."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_default_ttl_applied(self):
|
|
"""Test that service default TTL is applied."""
|
|
backend = InMemoryCacheBackend()
|
|
service = CacheService(backend=backend, default_ttl=1)
|
|
|
|
await service.set("key1", "value1")
|
|
|
|
assert await service.get("key1") == "value1"
|
|
|
|
await asyncio.sleep(1.1)
|
|
|
|
assert await service.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_ttl_override(self):
|
|
"""Test overriding service default TTL."""
|
|
backend = InMemoryCacheBackend()
|
|
service = CacheService(backend=backend, default_ttl=10)
|
|
|
|
# Override with shorter TTL
|
|
await service.set("key1", "value1", ttl=1)
|
|
|
|
await asyncio.sleep(1.1)
|
|
|
|
assert await service.get("key1") is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_service_with_key_prefix(self):
|
|
"""Test service with key prefix."""
|
|
backend = InMemoryCacheBackend()
|
|
service = CacheService(backend=backend, key_prefix="test:")
|
|
|
|
key = service._make_key("mykey")
|
|
assert key.startswith("test:")
|
|
|
|
await service.set("mykey", "value")
|
|
|
|
# CacheService passes raw key to backend
|
|
result = await service.get("mykey")
|
|
assert result == "value"
|