Add database loading methods to SerieList

- Add load_all_from_db() for bulk loading series from DB
- Add _load_single_series_from_db() for loading single series by folder
- Add invalidate_cache() to clear in-memory cache
- Add tests for all new methods

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-05-26 18:26:25 +02:00
parent 102d83e947
commit 53d6da5dac
2 changed files with 818 additions and 392 deletions

View File

@@ -1,392 +1,527 @@
"""Utilities for loading and managing stored anime series metadata. """Utilities for loading and managing stored anime series metadata.
This module provides the SerieList class for managing collections of anime This module provides the SerieList class for managing collections of anime
series metadata. It uses file-based storage as fallback when database series metadata. It supports loading from both filesystem (legacy) and
is not available. database (primary).
Note: Note:
This module is part of the core domain layer. Database operations This module is part of the core domain layer. Database operations
are handled by the service layer via add_to_db(). are handled by the service layer via add_to_db().
""" """
from __future__ import annotations from __future__ import annotations
import asyncio import logging
import logging import os
import os import warnings
import warnings from json import JSONDecodeError
from json import JSONDecodeError from typing import Dict, Iterable, List, Optional
from typing import Dict, Iterable, List, Optional
from src.core.entities.series import Serie
from src.core.entities.series import Serie
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class SerieList:
class SerieList: """
""" Represents the collection of cached series stored on disk.
Represents the collection of cached series stored on disk.
Series are identified by their unique 'key' (provider identifier).
Series are identified by their unique 'key' (provider identifier). The 'folder' is metadata only and not used for lookups.
The 'folder' is metadata only and not used for lookups.
This class manages in-memory series data loaded from filesystem.
This class manages in-memory series data loaded from filesystem. It has no database dependencies - all persistence is handled by
It has no database dependencies - all persistence is handled by the service layer.
the service layer.
Example:
Example: # File-based mode
# File-based mode serie_list = SerieList("/path/to/anime")
serie_list = SerieList("/path/to/anime") series = serie_list.get_all()
series = serie_list.get_all()
Attributes:
Attributes: directory: Path to the anime directory
directory: Path to the anime directory keyDict: Internal dictionary mapping serie.key to Serie objects
keyDict: Internal dictionary mapping serie.key to Serie objects """
"""
def __init__(
def __init__( self,
self, base_path: str,
base_path: str, skip_load: bool = False
skip_load: bool = False ) -> None:
) -> None: """Initialize the SerieList.
"""Initialize the SerieList.
Args:
Args: base_path: Path to the anime directory
base_path: Path to the anime directory skip_load: If True, skip automatic loading of series from files.
skip_load: If True, skip automatic loading of series from files. Useful when planning to load from database instead.
Useful when planning to load from database instead. """
""" self.directory: str = base_path
self.directory: str = base_path # Internal storage using serie.key as the dictionary key
# Internal storage using serie.key as the dictionary key self.keyDict: Dict[str, Serie] = {}
self.keyDict: Dict[str, Serie] = {}
# Only auto-load from files if not skipping
# Only auto-load from files if not skipping if not skip_load:
if not skip_load: self.load_series()
self.load_series()
def add(self, serie: Serie, use_sanitized_folder: bool = True) -> str:
def add(self, serie: Serie, use_sanitized_folder: bool = True) -> str: """
""" Persist a new series if it is not already present (file-based mode).
Persist a new series if it is not already present (file-based mode).
Uses serie.key for identification. Creates the filesystem folder
Uses serie.key for identification. Creates the filesystem folder using either the sanitized display name (default) or the existing
using either the sanitized display name (default) or the existing folder property.
folder property.
Args:
Args: serie: The Serie instance to add
serie: The Serie instance to add use_sanitized_folder: If True (default), use serie.sanitized_folder
use_sanitized_folder: If True (default), use serie.sanitized_folder for the filesystem folder name based on display name.
for the filesystem folder name based on display name. If False, use serie.folder as-is for backward compatibility.
If False, use serie.folder as-is for backward compatibility.
Returns:
Returns: str: The folder path that was created/used
str: The folder path that was created/used
Note:
Note: This method creates data files on disk. For database storage,
This method creates data files on disk. For database storage, use add_to_db() instead.
use add_to_db() instead. """
""" if self.contains(serie.key):
if self.contains(serie.key): # Return existing folder path
# Return existing folder path existing = self.keyDict[serie.key]
existing = self.keyDict[serie.key] return os.path.join(self.directory, existing.folder)
return os.path.join(self.directory, existing.folder)
# Determine folder name to use
# Determine folder name to use if use_sanitized_folder:
if use_sanitized_folder: folder_name = serie.sanitized_folder
folder_name = serie.sanitized_folder # Update the serie's folder property to match what we create
# Update the serie's folder property to match what we create serie.folder = folder_name
serie.folder = folder_name else:
else: folder_name = serie.folder
folder_name = serie.folder
data_path = os.path.join(self.directory, folder_name, "data")
data_path = os.path.join(self.directory, folder_name, "data") anime_path = os.path.join(self.directory, folder_name)
anime_path = os.path.join(self.directory, folder_name) os.makedirs(anime_path, exist_ok=True)
os.makedirs(anime_path, exist_ok=True) if not os.path.isfile(data_path):
if not os.path.isfile(data_path): serie.save_to_file(data_path)
serie.save_to_file(data_path) # Store by key, not folder
# Store by key, not folder self.keyDict[serie.key] = serie
self.keyDict[serie.key] = serie
return anime_path
return anime_path
async def add_to_db(self, serie: Serie) -> bool:
async def add_to_db(self, serie: Serie) -> bool: """Persist a new series to the database.
"""Persist a new series to the database.
Creates the filesystem folder using serie.folder, then persists
Creates the filesystem folder using serie.folder, then persists the series metadata to the database.
the series metadata to the database.
Args:
Args: serie: The Serie instance to add
serie: The Serie instance to add
Returns:
Returns: True if successful, False otherwise
True if successful, False otherwise """
""" try:
try: from src.server.database.connection import get_async_session_factory
from src.server.database.connection import get_async_session_factory from src.server.database.service import AnimeSeriesService, EpisodeService
from src.server.database.service import AnimeSeriesService, EpisodeService
folder_name = serie.folder
folder_name = serie.folder anime_path = os.path.join(self.directory, folder_name)
anime_path = os.path.join(self.directory, folder_name) os.makedirs(anime_path, exist_ok=True)
os.makedirs(anime_path, exist_ok=True)
session_factory = get_async_session_factory()
session_factory = get_async_session_factory() db = session_factory()
db = session_factory() try:
try: existing = await AnimeSeriesService.get_by_key(db, serie.key)
existing = await AnimeSeriesService.get_by_key(db, serie.key) if existing:
if existing: logger.debug(
logger.debug( "Series '%s' (key=%s) already exists in DB, skipping",
"Series '%s' (key=%s) already exists in DB, skipping", serie.name, serie.key
serie.name, serie.key )
) return True
return True
anime_series = await AnimeSeriesService.create(
anime_series = await AnimeSeriesService.create( db=db,
db=db, key=serie.key,
key=serie.key, name=serie.name,
name=serie.name, site=serie.site,
site=serie.site, folder=folder_name,
folder=folder_name, year=serie.year
year=serie.year )
) for season, eps in serie.episodeDict.items():
for season, eps in serie.episodeDict.items(): for ep in eps:
for ep in eps: await EpisodeService.create(
await EpisodeService.create( db=db,
db=db, series_id=anime_series.id,
series_id=anime_series.id, season=season,
season=season, episode_number=ep
episode_number=ep )
) await db.commit()
await db.commit() self.keyDict[serie.key] = serie
self.keyDict[serie.key] = serie logger.info(
logger.info( "Persisted series '%s' to database",
"Persisted series '%s' to database", serie.name
serie.name )
) return True
return True except Exception as e:
except Exception as e: await db.rollback()
await db.rollback() logger.error(
logger.error( "Failed to persist series '%s' to DB: %s",
"Failed to persist series '%s' to DB: %s", serie.key, e, exc_info=True
serie.key, e, exc_info=True )
) return False
return False finally:
finally: await db.close()
await db.close() except Exception as e:
except Exception as e: logger.error(
logger.error( "Could not add series '%s' to DB (DB unavailable?): %s",
"Could not add series '%s' to DB (DB unavailable?): %s", serie.key, e
serie.key, e )
) return False
return False
def contains(self, key: str) -> bool:
def contains(self, key: str) -> bool: """
""" Return True when a series identified by ``key`` already exists.
Return True when a series identified by ``key`` already exists.
Args:
Args: key: The unique provider identifier for the series
key: The unique provider identifier for the series
Returns:
Returns: True if the series exists in the collection
True if the series exists in the collection """
""" return key in self.keyDict
return key in self.keyDict
def load_series(self) -> None:
def load_series(self) -> None: """Populate the in-memory map with metadata discovered on disk."""
"""Populate the in-memory map with metadata discovered on disk."""
logger.info("Scanning anime folders in %s", self.directory)
logger.info("Scanning anime folders in %s", self.directory) try:
try: entries: Iterable[str] = os.listdir(self.directory)
entries: Iterable[str] = os.listdir(self.directory) except OSError as error:
except OSError as error: logger.error(
logger.error( "Unable to scan directory %s: %s",
"Unable to scan directory %s: %s", self.directory,
self.directory, error,
error, )
) return
return
nfo_stats = {"total": 0, "with_nfo": 0, "without_nfo": 0}
nfo_stats = {"total": 0, "with_nfo": 0, "without_nfo": 0} media_stats = {
media_stats = { "with_poster": 0,
"with_poster": 0, "without_poster": 0,
"without_poster": 0, "with_logo": 0,
"with_logo": 0, "without_logo": 0,
"without_logo": 0, "with_fanart": 0,
"with_fanart": 0, "without_fanart": 0
"without_fanart": 0 }
}
for anime_folder in entries:
for anime_folder in entries: anime_path = os.path.join(self.directory, anime_folder, "data")
anime_path = os.path.join(self.directory, anime_folder, "data") if os.path.isfile(anime_path):
if os.path.isfile(anime_path): logger.debug("Found data file for folder %s", anime_folder)
logger.debug("Found data file for folder %s", anime_folder) serie = self._load_data(anime_folder, anime_path)
serie = self._load_data(anime_folder, anime_path)
if serie:
if serie: nfo_stats["total"] += 1
nfo_stats["total"] += 1 # Check for NFO file
# Check for NFO file nfo_file_path = os.path.join(
nfo_file_path = os.path.join( self.directory, anime_folder, "tvshow.nfo"
self.directory, anime_folder, "tvshow.nfo" )
) if os.path.isfile(nfo_file_path):
if os.path.isfile(nfo_file_path): serie.nfo_path = nfo_file_path
serie.nfo_path = nfo_file_path nfo_stats["with_nfo"] += 1
nfo_stats["with_nfo"] += 1 else:
else: nfo_stats["without_nfo"] += 1
nfo_stats["without_nfo"] += 1 logger.debug(
logger.debug( "Series '%s' (key: %s) is missing tvshow.nfo",
"Series '%s' (key: %s) is missing tvshow.nfo", serie.name,
serie.name, serie.key
serie.key )
)
# Check for media files
# Check for media files folder_path = os.path.join(self.directory, anime_folder)
folder_path = os.path.join(self.directory, anime_folder)
poster_path = os.path.join(folder_path, "poster.jpg")
poster_path = os.path.join(folder_path, "poster.jpg") if os.path.isfile(poster_path):
if os.path.isfile(poster_path): media_stats["with_poster"] += 1
media_stats["with_poster"] += 1 else:
else: media_stats["without_poster"] += 1
media_stats["without_poster"] += 1 logger.debug(
logger.debug( "Series '%s' (key: %s) is missing poster.jpg",
"Series '%s' (key: %s) is missing poster.jpg", serie.name,
serie.name, serie.key
serie.key )
)
logo_path = os.path.join(folder_path, "logo.png")
logo_path = os.path.join(folder_path, "logo.png") if os.path.isfile(logo_path):
if os.path.isfile(logo_path): media_stats["with_logo"] += 1
media_stats["with_logo"] += 1 else:
else: media_stats["without_logo"] += 1
media_stats["without_logo"] += 1 logger.debug(
logger.debug( "Series '%s' (key: %s) is missing logo.png",
"Series '%s' (key: %s) is missing logo.png", serie.name,
serie.name, serie.key
serie.key )
)
fanart_path = os.path.join(folder_path, "fanart.jpg")
fanart_path = os.path.join(folder_path, "fanart.jpg") if os.path.isfile(fanart_path):
if os.path.isfile(fanart_path): media_stats["with_fanart"] += 1
media_stats["with_fanart"] += 1 else:
else: media_stats["without_fanart"] += 1
media_stats["without_fanart"] += 1 logger.debug(
logger.debug( "Series '%s' (key: %s) is missing fanart.jpg",
"Series '%s' (key: %s) is missing fanart.jpg", serie.name,
serie.name, serie.key
serie.key )
)
continue
continue
logger.warning(
logger.warning( "Skipping folder %s because no metadata file was found",
"Skipping folder %s because no metadata file was found", anime_folder,
anime_folder, )
)
# Log summary statistics
# Log summary statistics if nfo_stats["total"] > 0:
if nfo_stats["total"] > 0: logger.info(
logger.info( "NFO scan complete: %d series total, %d with NFO, %d without NFO",
"NFO scan complete: %d series total, %d with NFO, %d without NFO", nfo_stats["total"],
nfo_stats["total"], nfo_stats["with_nfo"],
nfo_stats["with_nfo"], nfo_stats["without_nfo"]
nfo_stats["without_nfo"] )
) logger.info(
logger.info( "Media scan complete: Poster (%d/%d), Logo (%d/%d), Fanart (%d/%d)",
"Media scan complete: Poster (%d/%d), Logo (%d/%d), Fanart (%d/%d)", media_stats["with_poster"],
media_stats["with_poster"], nfo_stats["total"],
nfo_stats["total"], media_stats["with_logo"],
media_stats["with_logo"], nfo_stats["total"],
nfo_stats["total"], media_stats["with_fanart"],
media_stats["with_fanart"], nfo_stats["total"]
nfo_stats["total"] )
)
def _load_data(self, anime_folder: str, data_path: str) -> Optional[Serie]:
def _load_data(self, anime_folder: str, data_path: str) -> Optional[Serie]: """
""" Load a single series metadata file into the in-memory collection.
Load a single series metadata file into the in-memory collection.
Args:
Args: anime_folder: The folder name (for logging only)
anime_folder: The folder name (for logging only) data_path: Path to the metadata file
data_path: Path to the metadata file
Returns:
Returns: Serie: The loaded Serie object, or None if loading failed
Serie: The loaded Serie object, or None if loading failed """
""" try:
try: serie = Serie.load_from_file(data_path)
serie = Serie.load_from_file(data_path) # Store by key, not folder
# Store by key, not folder self.keyDict[serie.key] = serie
self.keyDict[serie.key] = serie logger.debug(
logger.debug( "Successfully loaded metadata for %s (key: %s)",
"Successfully loaded metadata for %s (key: %s)", anime_folder,
anime_folder, serie.key
serie.key )
) return serie
return serie except (OSError, JSONDecodeError, KeyError, ValueError) as error:
except (OSError, JSONDecodeError, KeyError, ValueError) as error: logger.error(
logger.error( "Failed to load metadata for folder %s from %s: %s",
"Failed to load metadata for folder %s from %s: %s", anime_folder,
anime_folder, data_path,
data_path, error,
error, )
) return None
return None
def GetMissingEpisode(self) -> List[Serie]:
def GetMissingEpisode(self) -> List[Serie]: """Return all series that still contain missing episodes."""
"""Return all series that still contain missing episodes.""" return [
return [ serie
serie for serie in self.keyDict.values()
for serie in self.keyDict.values() if serie.episodeDict
if serie.episodeDict ]
]
def get_missing_episodes(self) -> List[Serie]:
def get_missing_episodes(self) -> List[Serie]: """PEP8-friendly alias for :meth:`GetMissingEpisode`."""
"""PEP8-friendly alias for :meth:`GetMissingEpisode`.""" return self.GetMissingEpisode()
return self.GetMissingEpisode()
def GetList(self) -> List[Serie]:
def GetList(self) -> List[Serie]: """Return all series instances stored in the list."""
"""Return all series instances stored in the list.""" return list(self.keyDict.values())
return list(self.keyDict.values())
def get_all(self) -> List[Serie]:
def get_all(self) -> List[Serie]: """PEP8-friendly alias for :meth:`GetList`."""
"""PEP8-friendly alias for :meth:`GetList`.""" return self.GetList()
return self.GetList()
def get_by_key(self, key: str) -> Optional[Serie]:
def get_by_key(self, key: str) -> Optional[Serie]: """
""" Get a series by its unique provider key.
Get a series by its unique provider key.
This is the primary method for series lookup.
This is the primary method for series lookup.
Args:
Args: key: The unique provider identifier (e.g., "attack-on-titan")
key: The unique provider identifier (e.g., "attack-on-titan")
Returns:
Returns: The Serie instance if found, None otherwise
The Serie instance if found, None otherwise """
""" return self.keyDict.get(key)
return self.keyDict.get(key)
def get_by_folder(self, folder: str) -> Optional[Serie]:
def get_by_folder(self, folder: str) -> Optional[Serie]: """
""" Get a series by its folder name.
Get a series by its folder name.
.. deprecated:: 2.0.0
.. deprecated:: 2.0.0 Use :meth:`get_by_key` instead. Folder-based lookups will be
Use :meth:`get_by_key` instead. Folder-based lookups will be removed in version 3.0.0. The `folder` field is metadata only
removed in version 3.0.0. The `folder` field is metadata only and should not be used for identification.
and should not be used for identification.
This method is provided for backward compatibility only.
This method is provided for backward compatibility only. Prefer using get_by_key() for new code.
Prefer using get_by_key() for new code.
Args:
Args: folder: The filesystem folder name (e.g., "Attack on Titan (2013)")
folder: The filesystem folder name (e.g., "Attack on Titan (2013)")
Returns:
Returns: The Serie instance if found, None otherwise
The Serie instance if found, None otherwise """
""" warnings.warn(
warnings.warn( "get_by_folder() is deprecated and will be removed in v3.0.0. "
"get_by_folder() is deprecated and will be removed in v3.0.0. " "Use get_by_key() instead. The 'folder' field is metadata only.",
"Use get_by_key() instead. The 'folder' field is metadata only.", DeprecationWarning,
DeprecationWarning, stacklevel=2
stacklevel=2 )
) for serie in self.keyDict.values():
for serie in self.keyDict.values(): if serie.folder == folder:
if serie.folder == folder: return serie
return serie return None
return None
async def load_all_from_db(self) -> int:
"""Load all series from database into in-memory cache.
Retrieves all anime series from the database with their episodes
and populates the in-memory keyDict for fast access.
This method replaces file-based loading. Use after initialization
when database is ready.
Returns:
int: Number of series loaded into cache
"""
from src.server.database.connection import get_async_session_factory
from src.server.database.service import AnimeSeriesService
try:
session_factory = get_async_session_factory()
db = session_factory()
try:
anime_series_list = await AnimeSeriesService.get_all(
db, with_episodes=True
)
count = 0
for anime_series in anime_series_list:
episode_dict: Dict[int, List[int]] = {}
if anime_series.episodes:
for ep in anime_series.episodes:
if ep.season not in episode_dict:
episode_dict[ep.season] = []
episode_dict[ep.season].append(ep.episode_number)
serie = Serie(
key=anime_series.key,
name=anime_series.name,
site=anime_series.site,
folder=anime_series.folder,
episodeDict=episode_dict,
year=anime_series.year
)
self.keyDict[serie.key] = serie
count += 1
logger.info(
"Loaded %d series from database into in-memory cache",
count
)
return count
finally:
await db.close()
except RuntimeError:
logger.warning(
"Database not available, skipping DB load"
)
return 0
async def _load_single_series_from_db(
self,
anime_folder: str
) -> Optional[Serie]:
"""Load a single series from database by folder name.
Looks up a series in the database by its folder name and adds
it to the in-memory cache.
Args:
anime_folder: The filesystem folder name to look up
Returns:
Serie if found and loaded, None otherwise
"""
from src.server.database.connection import get_async_session_factory
from src.server.database.service import AnimeSeriesService
try:
session_factory = get_async_session_factory()
db = session_factory()
try:
anime_series = await AnimeSeriesService.get_by_folder(
db, anime_folder
)
if not anime_series:
logger.debug(
"Series with folder '%s' not found in DB",
anime_folder
)
return None
episode_dict: Dict[int, List[int]] = {}
if anime_series.episodes:
for ep in anime_series.episodes:
if ep.season not in episode_dict:
episode_dict[ep.season] = []
episode_dict[ep.season].append(ep.episode_number)
serie = Serie(
key=anime_series.key,
name=anime_series.name,
site=anime_series.site,
folder=anime_series.folder,
episodeDict=episode_dict,
year=anime_series.year
)
self.keyDict[serie.key] = serie
logger.debug(
"Loaded series '%s' (key=%s) from DB",
serie.name, serie.key
)
return serie
finally:
await db.close()
except RuntimeError:
logger.warning(
"Database not available, cannot load series '%s'",
anime_folder
)
return None
def invalidate_cache(self) -> None:
"""Clear the in-memory cache.
Use after database modifications to force reload from DB
on next access.
"""
self.keyDict.clear()
logger.debug("SerieList in-memory cache invalidated")
def reload(self) -> None:
"""Reload series from filesystem (legacy mode).
Warning:
This method uses file-based loading and should only be
used as fallback when database is not available.
"""
self.load_series()

View File

@@ -0,0 +1,291 @@
"""Tests for SerieList database loading functionality."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.core.entities.series import Serie
@pytest.fixture
def mock_session_factory():
"""Create a mock async session factory."""
mock_session = AsyncMock()
mock_session_factory = MagicMock(return_value=mock_session)
return mock_session_factory, mock_session
@pytest.fixture
def sample_anime_series():
"""Create a sample AnimeSeries DB model for testing."""
mock = MagicMock()
mock.key = "attack-on-titan"
mock.name = "Attack on Titan"
mock.site = "aniworld.to"
mock.folder = "Attack on Titan (2013)"
mock.year = 2013
mock.episodes = [
MagicMock(season=1, episode_number=1),
MagicMock(season=1, episode_number=2),
MagicMock(season=1, episode_number=3),
MagicMock(season=2, episode_number=1),
MagicMock(season=2, episode_number=2),
]
return mock
@pytest.fixture
def sample_serie():
"""Create a sample Serie for testing."""
return Serie(
key="attack-on-titan",
name="Attack on Titan",
site="aniworld.to",
folder="Attack on Titan (2013)",
episodeDict={1: [1, 2, 3], 2: [1, 2]},
year=2013
)
class TestLoadAllFromDb:
"""Test load_all_from_db method."""
@pytest.mark.asyncio
async def test_load_all_from_db(self, mock_session_factory, sample_anime_series):
"""Verify SerieList loads all series from DB."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_all",
return_value=[sample_anime_series]
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
count = await serie_list.load_all_from_db()
assert count == 1
assert "attack-on-titan" in serie_list.keyDict
serie = serie_list.keyDict["attack-on-titan"]
assert serie.name == "Attack on Titan"
assert serie.key == "attack-on-titan"
assert serie.year == 2013
@pytest.mark.asyncio
async def test_load_all_from_db_multiple_series(
self, mock_session_factory, sample_anime_series
):
"""Verify SerieList loads multiple series from DB."""
mock_factory, mock_session = mock_session_factory
mock_series2 = MagicMock()
mock_series2.key = "one-piece"
mock_series2.name = "One Piece"
mock_series2.site = "aniworld.to"
mock_series2.folder = "One Piece"
mock_series2.year = 1999
mock_series2.episodes = []
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_all",
return_value=[sample_anime_series, mock_series2]
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
count = await serie_list.load_all_from_db()
assert count == 2
assert "attack-on-titan" in serie_list.keyDict
assert "one-piece" in serie_list.keyDict
@pytest.mark.asyncio
async def test_load_all_from_db_rebuilds_episode_dict(
self, mock_session_factory, sample_anime_series
):
"""Verify episode_dict is correctly built from Episode records."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_all",
return_value=[sample_anime_series]
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
await serie_list.load_all_from_db()
serie = serie_list.keyDict["attack-on-titan"]
assert 1 in serie.episodeDict
assert 2 in serie.episodeDict
assert sorted(serie.episodeDict[1]) == [1, 2, 3]
assert sorted(serie.episodeDict[2]) == [1, 2]
@pytest.mark.asyncio
async def test_load_all_from_db_no_series(self, mock_session_factory):
"""Verify SerieList handles empty DB."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_all",
return_value=[]
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
count = await serie_list.load_all_from_db()
assert count == 0
assert len(serie_list.keyDict) == 0
@pytest.mark.asyncio
async def test_load_all_from_db_db_not_initialized(self, mock_session_factory):
"""Verify SerieList handles uninitialized DB gracefully."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_all",
side_effect=RuntimeError("Database not initialized")
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
count = await serie_list.load_all_from_db()
assert count == 0
assert len(serie_list.keyDict) == 0
class TestLoadSingleSeriesFromDb:
"""Test _load_single_series_from_db method."""
@pytest.mark.asyncio
async def test_load_single_series_from_db(
self, mock_session_factory, sample_anime_series
):
"""Verify SerieList loads a single series from DB by folder."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_by_folder",
return_value=sample_anime_series
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
serie = await serie_list._load_single_series_from_db("Attack on Titan (2013)")
assert serie is not None
assert serie.key == "attack-on-titan"
assert "attack-on-titan" in serie_list.keyDict
@pytest.mark.asyncio
async def test_load_single_series_from_db_not_found(
self, mock_session_factory
):
"""Verify SerieList handles series not found in DB."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_by_folder",
return_value=None
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
serie = await serie_list._load_single_series_from_db("Unknown Series")
assert serie is None
assert len(serie_list.keyDict) == 0
@pytest.mark.asyncio
async def test_load_single_series_from_db_db_not_initialized(
self, mock_session_factory
):
"""Verify SerieList handles uninitialized DB gracefully."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_by_folder",
side_effect=RuntimeError("Database not initialized")
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
serie = await serie_list._load_single_series_from_db("Some Folder")
assert serie is None
class TestInvalidateCache:
"""Test invalidate_cache method."""
def test_invalidate_cache_clears_keydict(self, sample_serie):
"""Verify invalidate_cache clears the in-memory cache."""
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
serie_list.keyDict["attack-on-titan"] = sample_serie
assert len(serie_list.keyDict) == 1
serie_list.invalidate_cache()
assert len(serie_list.keyDict) == 0
def test_invalidate_cache_allows_reload(self, mock_session_factory, sample_anime_series):
"""Verify cache can be reloaded after invalidation."""
mock_factory, mock_session = mock_session_factory
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
):
with patch(
"src.server.database.service.AnimeSeriesService.get_all",
return_value=[sample_anime_series]
):
from src.core.entities.SerieList import SerieList
serie_list = SerieList("/tmp", skip_load=True)
serie_list.keyDict["some-key"] = MagicMock()
serie_list.invalidate_cache()
assert len(serie_list.keyDict) == 0
# Reload
import asyncio
asyncio.get_event_loop().run_until_complete(serie_list.load_all_from_db())
assert len(serie_list.keyDict) == 1