Aniworld/src/core/SeriesApp.py
Lukas 684337fd0c Add data file to database sync functionality
- Add get_all_series_from_data_files() to SeriesApp
- Sync series from data files to DB on startup
- Add unit tests for new SeriesApp method
- Add integration tests for sync functionality
- Update documentation
2025-12-13 09:32:57 +01:00

655 lines
20 KiB
Python

"""
SeriesApp - Core application logic for anime series management.
This module provides the main application interface for searching,
downloading, and managing anime series with support for async callbacks,
progress reporting, and error handling.
"""
import asyncio
import logging
import warnings
from typing import Any, Dict, List, Optional
from events import Events
try:
from sqlalchemy.ext.asyncio import AsyncSession
except ImportError: # pragma: no cover - optional dependency
AsyncSession = object # type: ignore
from src.core.entities.SerieList import SerieList
from src.core.entities.series import Serie
from src.core.providers.provider_factory import Loaders
from src.core.SerieScanner import SerieScanner
logger = logging.getLogger(__name__)
class DownloadStatusEventArgs:
"""Event arguments for download status events."""
def __init__(
self,
serie_folder: str,
season: int,
episode: int,
status: str,
key: Optional[str] = None,
progress: float = 0.0,
message: Optional[str] = None,
error: Optional[Exception] = None,
eta: Optional[int] = None,
mbper_sec: Optional[float] = None,
item_id: Optional[str] = None,
):
"""
Initialize download status event arguments.
Args:
serie_folder: Serie folder name (metadata only, used for
file paths)
season: Season number
episode: Episode number
status: Status message (e.g., "started", "progress",
"completed", "failed")
key: Serie unique identifier (provider key, primary
identifier)
progress: Download progress (0.0 to 1.0)
message: Optional status message
error: Optional error if status is "failed"
eta: Estimated time remaining in seconds
mbper_sec: Download speed in MB/s
item_id: Optional download queue item ID for tracking
"""
self.serie_folder = serie_folder
self.key = key
self.season = season
self.episode = episode
self.status = status
self.progress = progress
self.message = message
self.error = error
self.eta = eta
self.mbper_sec = mbper_sec
self.item_id = item_id
class ScanStatusEventArgs:
"""Event arguments for scan status events."""
def __init__(
self,
current: int,
total: int,
folder: str,
status: str,
key: Optional[str] = None,
progress: float = 0.0,
message: Optional[str] = None,
error: Optional[Exception] = None,
):
"""
Initialize scan status event arguments.
Args:
current: Current item being scanned
total: Total items to scan
folder: Current folder being scanned (metadata only)
status: Status message (e.g., "started", "progress",
"completed", "failed", "cancelled")
key: Serie unique identifier if applicable (provider key,
primary identifier)
progress: Scan progress (0.0 to 1.0)
message: Optional status message
error: Optional error if status is "failed"
"""
self.current = current
self.total = total
self.folder = folder
self.key = key
self.status = status
self.progress = progress
self.message = message
self.error = error
class SeriesApp:
"""
Main application class for anime series management.
Provides functionality for:
- Searching anime series
- Downloading episodes
- Scanning directories for missing episodes
- Managing series lists
Supports async callbacks for progress reporting.
Events:
download_status: Raised when download status changes.
Handler signature: def handler(args: DownloadStatusEventArgs)
scan_status: Raised when scan status changes.
Handler signature: def handler(args: ScanStatusEventArgs)
"""
def __init__(
self,
directory_to_search: str,
db_session: Optional[AsyncSession] = None,
):
"""
Initialize SeriesApp.
Args:
directory_to_search: Base directory for anime series
db_session: Optional database session for database-backed
storage. When provided, SerieList and SerieScanner will
use the database instead of file-based storage.
"""
self.directory_to_search = directory_to_search
self._db_session = db_session
# Initialize events
self._events = Events()
self._events.download_status = None
self._events.scan_status = None
self.loaders = Loaders()
self.loader = self.loaders.GetLoader(key="aniworld.to")
self.serie_scanner = SerieScanner(
directory_to_search, self.loader, db_session=db_session
)
self.list = SerieList(
self.directory_to_search, db_session=db_session
)
# Synchronous init used during constructor to avoid awaiting
# in __init__
self._init_list_sync()
logger.info(
"SeriesApp initialized for directory: %s (db_session: %s)",
directory_to_search,
"provided" if db_session else "none"
)
@property
def download_status(self):
"""
Event raised when download status changes.
Subscribe using:
app.download_status += handler
"""
return self._events.download_status
@download_status.setter
def download_status(self, value):
"""Set download_status event handler."""
self._events.download_status = value
@property
def scan_status(self):
"""
Event raised when scan status changes.
Subscribe using:
app.scan_status += handler
"""
return self._events.scan_status
@scan_status.setter
def scan_status(self, value):
"""Set scan_status event handler."""
self._events.scan_status = value
@property
def db_session(self) -> Optional[AsyncSession]:
"""
Get the database session.
Returns:
AsyncSession or None: The database session if configured
"""
return self._db_session
def set_db_session(self, session: Optional[AsyncSession]) -> None:
"""
Update the database session.
Also updates the db_session on SerieList and SerieScanner.
Args:
session: The new database session or None
"""
self._db_session = session
self.list._db_session = session
self.serie_scanner._db_session = session
logger.debug(
"Database session updated: %s",
"provided" if session else "none"
)
async def init_from_db_async(self) -> None:
"""
Initialize series list from database (async).
This should be called when using database storage instead of
the synchronous file-based initialization.
"""
if self._db_session:
await self.list.load_series_from_db(self._db_session)
self.series_list = self.list.GetMissingEpisode()
logger.debug(
"Loaded %d series with missing episodes from database",
len(self.series_list)
)
else:
warnings.warn(
"init_from_db_async called without db_session configured",
UserWarning
)
def _init_list_sync(self) -> None:
"""Synchronous initialization helper for constructor."""
self.series_list = self.list.GetMissingEpisode()
logger.debug(
"Loaded %d series with missing episodes",
len(self.series_list)
)
async def _init_list(self) -> None:
"""Initialize the series list with missing episodes (async)."""
self.series_list = await asyncio.to_thread(
self.list.GetMissingEpisode
)
logger.debug(
"Loaded %d series with missing episodes",
len(self.series_list)
)
async def search(self, words: str) -> List[Dict[str, Any]]:
"""
Search for anime series (async).
Args:
words: Search query
Returns:
List of search results
Raises:
RuntimeError: If search fails
"""
logger.info("Searching for: %s", words)
results = await asyncio.to_thread(self.loader.search, words)
logger.info("Found %d results", len(results))
return results
async def download(
self,
serie_folder: str,
season: int,
episode: int,
key: str,
language: str = "German Dub",
item_id: Optional[str] = None,
) -> bool:
"""
Download an episode (async).
Args:
serie_folder: Serie folder name (metadata only, used for
file path construction)
season: Season number
episode: Episode number
key: Serie unique identifier (provider key, primary
identifier for lookups)
language: Language preference
item_id: Optional download queue item ID for progress
tracking
Returns:
True if download succeeded, False otherwise
Note:
The 'key' parameter is the primary identifier for series
lookups. The 'serie_folder' parameter is only used for
filesystem operations.
"""
logger.info(
"Starting download: %s (key: %s) S%02dE%02d",
serie_folder,
key,
season,
episode
)
# Fire download started event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="started",
message="Download started",
item_id=item_id,
)
)
try:
def download_callback(progress_info):
logger.debug(
"wrapped_callback called with: %s", progress_info
)
downloaded = progress_info.get('downloaded_bytes', 0)
total_bytes = (
progress_info.get('total_bytes')
or progress_info.get('total_bytes_estimate', 0)
)
speed = progress_info.get('speed', 0) # bytes/sec
eta = progress_info.get('eta') # seconds
mbper_sec = speed / (1024 * 1024) if speed else None
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="progress",
message="Download progress",
progress=(
(downloaded / total_bytes) * 100
if total_bytes else 0
),
eta=eta,
mbper_sec=mbper_sec,
item_id=item_id,
)
)
# Perform download in thread to avoid blocking event loop
download_success = await asyncio.to_thread(
self.loader.download,
self.directory_to_search,
serie_folder,
season,
episode,
key,
language,
download_callback
)
if download_success:
logger.info(
"Download completed: %s (key: %s) S%02dE%02d",
serie_folder,
key,
season,
episode
)
# Fire download completed event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="completed",
progress=1.0,
message="Download completed successfully",
item_id=item_id,
)
)
else:
logger.warning(
"Download failed: %s (key: %s) S%02dE%02d",
serie_folder,
key,
season,
episode
)
# Fire download failed event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="failed",
message="Download failed",
item_id=item_id,
)
)
return download_success
except Exception as e:
logger.error(
"Download error: %s (key: %s) S%02dE%02d - %s",
serie_folder,
key,
season,
episode,
str(e),
exc_info=True,
)
# Fire download error event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="failed",
error=e,
message=f"Download error: {str(e)}",
item_id=item_id,
)
)
return False
async def rescan(self) -> int:
"""
Rescan directory for missing episodes (async).
Returns:
Number of series with missing episodes after rescan.
"""
logger.info("Starting directory rescan")
try:
# Get total items to scan
total_to_scan = await asyncio.to_thread(
self.serie_scanner.get_total_to_scan
)
logger.info("Total folders to scan: %d", total_to_scan)
# Fire scan started event
self._events.scan_status(
ScanStatusEventArgs(
current=0,
total=total_to_scan,
folder="",
status="started",
progress=0.0,
message="Scan started",
)
)
# Reinitialize scanner
await asyncio.to_thread(self.serie_scanner.reinit)
def scan_callback(folder: str, current: int):
# Calculate progress
if total_to_scan > 0:
progress = current / total_to_scan
else:
progress = 0.0
# Fire scan progress event
self._events.scan_status(
ScanStatusEventArgs(
current=current,
total=total_to_scan,
folder=folder,
status="progress",
progress=progress,
message=f"Scanning: {folder}",
)
)
# Perform scan
await asyncio.to_thread(self.serie_scanner.scan, scan_callback)
# Reinitialize list
self.list = SerieList(self.directory_to_search)
await self._init_list()
logger.info("Directory rescan completed successfully")
# Fire scan completed event
self._events.scan_status(
ScanStatusEventArgs(
current=total_to_scan,
total=total_to_scan,
folder="",
status="completed",
progress=1.0,
message=(
f"Scan completed. Found {len(self.series_list)} "
"series with missing episodes."
),
)
)
return len(self.series_list)
except InterruptedError:
logger.warning("Scan cancelled by user")
# Fire scan cancelled event
self._events.scan_status(
ScanStatusEventArgs(
current=0,
total=total_to_scan if 'total_to_scan' in locals() else 0,
folder="",
status="cancelled",
message="Scan cancelled by user",
)
)
raise
except Exception as e:
logger.error("Scan error: %s", str(e), exc_info=True)
# Fire scan failed event
self._events.scan_status(
ScanStatusEventArgs(
current=0,
total=total_to_scan if 'total_to_scan' in locals() else 0,
folder="",
status="failed",
error=e,
message=f"Scan error: {str(e)}",
)
)
raise
async def get_series_list(self) -> List[Any]:
"""
Get the current series list (async).
Returns:
List of series with missing episodes
"""
return self.series_list
async def refresh_series_list(self) -> None:
"""
Reload the cached series list from the underlying data store.
This is an async operation.
"""
await self._init_list()
def _get_serie_by_key(self, key: str) -> Optional[Serie]:
"""
Get a series by its unique provider key.
This is the primary method for series lookups within SeriesApp.
Args:
key: The unique provider identifier (e.g.,
"attack-on-titan")
Returns:
The Serie instance if found, None otherwise
Note:
This method uses the SerieList.get_by_key() method which
looks up series by their unique key, not by folder name.
"""
return self.list.get_by_key(key)
def get_all_series_from_data_files(self) -> List[Serie]:
"""
Get all series from data files in the anime directory.
Scans the directory_to_search for all 'data' files and loads
the Serie metadata from each file. This method is synchronous
and can be wrapped with asyncio.to_thread if needed for async
contexts.
Returns:
List of Serie objects found in data files. Returns an empty
list if no data files are found or if the directory doesn't
exist.
Example:
series_app = SeriesApp("/path/to/anime")
all_series = series_app.get_all_series_from_data_files()
for serie in all_series:
print(f"Found: {serie.name} (key={serie.key})")
"""
logger.info(
"Scanning for data files in directory: %s",
self.directory_to_search
)
# Create a fresh SerieList instance for file-based loading
# This ensures we get all series from data files without
# interfering with the main instance's state
try:
temp_list = SerieList(
self.directory_to_search,
db_session=None, # Force file-based loading
skip_load=False # Allow automatic loading
)
except Exception as e:
logger.error(
"Failed to scan directory for data files: %s",
str(e),
exc_info=True
)
return []
# Get all series from the temporary list
all_series = temp_list.get_all()
logger.info(
"Found %d series from data files in %s",
len(all_series),
self.directory_to_search
)
return all_series