Files
Aniworld/src/server/SeriesApp.py
Lukas 2be7b692b9 Fix get_all_series_from_data_files to read data files directly
Previously, the method created a SerieList instance which only loads
from database, not from data files. Now reads data files directly
and parses JSON to create AnimeSeries objects.

Also added _load_data_file helper function and fixed logger.warning
calls to use proper format strings instead of keyword arguments.

Updated unit tests to use real temp directories instead of mocks.
2026-06-04 22:04:46 +02:00

818 lines
26 KiB
Python

"""
SeriesApp - Core application logic for anime series management.
This module provides the main application interface for searching,
downloading, and managing anime series with support for async callbacks,
progress reporting, and error handling.
Note:
This module is pure domain logic with no database dependencies.
Database operations are handled by the service layer (AnimeService).
"""
import asyncio
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional
from events import Events
from src.config.settings import settings
from src.server.database.SerieList import SerieList
from src.server.database.models import AnimeSeries
from src.server.providers.provider_factory import Loaders
from src.server.SerieScanner import SerieScanner
logger = logging.getLogger(__name__)
class DownloadStatusEventArgs:
"""Event arguments for download status events."""
def __init__(
self,
serie_folder: str,
season: int,
episode: int,
status: str,
key: Optional[str] = None,
progress: float = 0.0,
message: Optional[str] = None,
error: Optional[Exception] = None,
eta: Optional[int] = None,
mbper_sec: Optional[float] = None,
item_id: Optional[str] = None,
):
"""
Initialize download status event arguments.
Args:
serie_folder: Serie folder name (metadata only, used for
file paths)
season: Season number
episode: Episode number
status: Status message (e.g., "started", "progress",
"completed", "failed")
key: Serie unique identifier (provider key, primary
identifier)
progress: Download progress (0.0 to 1.0)
message: Optional status message
error: Optional error if status is "failed"
eta: Estimated time remaining in seconds
mbper_sec: Download speed in MB/s
item_id: Optional download queue item ID for tracking
"""
self.serie_folder = serie_folder
self.key = key
self.season = season
self.episode = episode
self.status = status
self.progress = progress
self.message = message
self.error = error
self.eta = eta
self.mbper_sec = mbper_sec
self.item_id = item_id
class ScanStatusEventArgs:
"""Event arguments for scan status events."""
def __init__(
self,
current: int,
total: int,
folder: str,
status: str,
key: Optional[str] = None,
progress: float = 0.0,
message: Optional[str] = None,
error: Optional[Exception] = None,
):
"""
Initialize scan status event arguments.
Args:
current: Current item being scanned
total: Total items to scan
folder: Current folder being scanned (metadata only)
status: Status message (e.g., "started", "progress",
"completed", "failed", "cancelled")
key: Serie unique identifier if applicable (provider key,
primary identifier)
progress: Scan progress (0.0 to 1.0)
message: Optional status message
error: Optional error if status is "failed"
"""
self.current = current
self.total = total
self.folder = folder
self.key = key
self.status = status
self.progress = progress
self.message = message
self.error = error
class SeriesApp:
"""
Main application class for anime series management.
Provides functionality for:
- Searching anime series
- Downloading episodes
- Scanning directories for missing episodes
- Managing series lists
Supports async callbacks for progress reporting.
Note:
This class is now pure domain logic with no database dependencies.
Database operations are handled by the service layer (AnimeService).
Events:
download_status: Raised when download status changes.
Handler signature: def handler(args: DownloadStatusEventArgs)
scan_status: Raised when scan status changes.
Handler signature: def handler(args: ScanStatusEventArgs)
"""
def __init__(
self,
directory_to_search: str,
):
"""
Initialize SeriesApp.
Args:
directory_to_search: Base directory for anime series
"""
self.directory_to_search = directory_to_search
# Initialize thread pool executor
self.executor = ThreadPoolExecutor(max_workers=3)
# Initialize events
self._events = Events()
self.loaders = Loaders()
self.loader = self.loaders.GetLoader(key="aniworld.to")
self.serie_scanner = SerieScanner(
directory_to_search,
self.loader,
)
# Series will be loaded from database by the service layer during application setup
self.list = SerieList(self.directory_to_search)
self.series_list: List[Any] = []
# Initialize empty list - series loaded later via load_series_from_list()
# No need to call _init_list_sync() anymore
# NFO service removed - metadata handling moved to server layer
self.nfo_service = None
logger.info(
"SeriesApp initialized for directory: %s",
directory_to_search,
)
@property
def download_status(self):
"""
Event raised when download status changes.
Subscribe using:
app.download_status += handler
"""
return self._events.download_status
@download_status.setter
def download_status(self, value):
"""Set download_status event handler."""
self._events.download_status = value
@property
def scan_status(self):
"""
Event raised when scan status changes.
Subscribe using:
app.scan_status += handler
"""
return self._events.scan_status
@scan_status.setter
def scan_status(self, value):
"""Set scan_status event handler."""
self._events.scan_status = value
def load_series_from_list(self, series: list) -> None:
"""
Load series into the in-memory list.
This method is called by the service layer after loading
series from the database.
Args:
series: List of Serie objects to load
"""
self.list.keyDict.clear()
for serie in series:
self.list.keyDict[serie.key] = serie
self.series_list = self.list.GetMissingEpisode()
logger.debug(
"Loaded %d series with %d having missing episodes",
len(series),
len(self.series_list)
)
async def search(self, words: str) -> List[Dict[str, Any]]:
"""
Search for anime series (async).
Args:
words: Search query
Returns:
List of search results
Raises:
RuntimeError: If search fails
"""
logger.info("Searching for: %s", words)
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
self.executor,
self.loader.search,
words
)
logger.info("Found %d results", len(results))
return results
async def download(
self,
serie_folder: str,
season: int,
episode: int,
key: str,
language: str = "German Dub",
item_id: Optional[str] = None,
) -> bool:
"""
Download an episode (async).
Args:
serie_folder: Serie folder name (metadata only, used for
file path construction)
season: Season number
episode: Episode number
key: Serie unique identifier (provider key, primary
identifier for lookups)
language: Language preference
item_id: Optional download queue item ID for progress
tracking
Returns:
True if download succeeded, False otherwise
Note:
The 'key' parameter is the primary identifier for series
lookups. The 'serie_folder' parameter is only used for
filesystem operations.
"""
logger.info(
"Starting download: %s (key: %s) S%02dE%02d",
serie_folder,
key,
season,
episode
)
# Fire download started event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="started",
message="Download started",
item_id=item_id,
)
)
# Create series folder if it doesn't exist
folder_path = os.path.join(self.directory_to_search, serie_folder)
if not os.path.exists(folder_path):
try:
os.makedirs(folder_path, exist_ok=True)
logger.info(
"Created series folder: %s (key: %s)",
folder_path,
key
)
except OSError as e:
logger.error(
"Failed to create series folder %s: %s",
folder_path,
str(e)
)
# Fire download failed event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="failed",
message=f"Failed to create folder: {str(e)}",
item_id=item_id,
)
)
return False
try:
def download_progress_handler(progress_info):
"""Handle download progress events from loader."""
# Throttle progress logging to avoid spam
status = progress_info.get("status", "")
if status in ("downloading", "finished"):
logger.debug(
"download_progress_handler called with: %s", progress_info
)
downloaded = progress_info.get('downloaded_bytes', 0)
total_bytes = (
progress_info.get('total_bytes')
or progress_info.get('total_bytes_estimate', 0)
)
speed = progress_info.get('speed', 0) # bytes/sec
eta = progress_info.get('eta') # seconds
mbper_sec = speed / (1024 * 1024) if speed else None
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="progress",
message="Download progress",
progress=(
(downloaded / total_bytes) * 100
if total_bytes else 0
),
eta=eta,
mbper_sec=mbper_sec,
item_id=item_id,
)
)
# Subscribe to loader's download progress events
self.loader.subscribe_download_progress(download_progress_handler)
try:
# Perform download in thread to avoid blocking event loop
loop = asyncio.get_running_loop()
download_success = await loop.run_in_executor(
self.executor,
self.loader.download,
self.directory_to_search,
serie_folder,
season,
episode,
key,
language
)
finally:
# Always unsubscribe after download completes or fails
self.loader.unsubscribe_download_progress(
download_progress_handler
)
if download_success:
logger.info(
"Download completed: %s (key: %s) S%02dE%02d",
serie_folder,
key,
season,
episode
)
# Fire download completed event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="completed",
progress=1.0,
message="Download completed successfully",
item_id=item_id,
)
)
else:
logger.warning(
"Download failed: %s (key: %s) S%02dE%02d",
serie_folder,
key,
season,
episode
)
# Fire download failed event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="failed",
message="Download failed",
item_id=item_id,
)
)
return download_success
except InterruptedError:
# Download was cancelled - propagate the cancellation
logger.info(
"Download cancelled: %s (key: %s) S%02dE%02d",
serie_folder,
key,
season,
episode,
)
# Fire download cancelled event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="cancelled",
message="Download cancelled by user",
item_id=item_id,
)
)
raise # Re-raise to propagate cancellation
except Exception as e: # pylint: disable=broad-except
logger.error(
"Download error: %s (key: %s) S%02dE%02d - %s",
serie_folder,
key,
season,
episode,
str(e),
exc_info=True,
)
# Fire download error event
self._events.download_status(
DownloadStatusEventArgs(
serie_folder=serie_folder,
key=key,
season=season,
episode=episode,
status="failed",
error=e,
message=f"Download error: {str(e)}",
item_id=item_id,
)
)
return False
async def rescan(self) -> list:
"""
Rescan directory for missing episodes (async).
This method performs a file-based scan and returns the results.
Database persistence is handled by the service layer (AnimeService).
Returns:
List of Serie objects found during scan with their
missing episodes.
Note:
This method no longer saves to database directly. The returned
list should be persisted by the caller (AnimeService).
"""
logger.info("Starting directory rescan")
total_to_scan = 0
try:
# Get total items to scan
logger.info("Getting total items to scan...")
loop = asyncio.get_running_loop()
total_to_scan = await loop.run_in_executor(
self.executor,
self.serie_scanner.get_total_to_scan
)
logger.info("Total folders to scan: %d", total_to_scan)
# Fire scan started event
logger.info(
"Firing scan_status 'started' event, handler=%s",
self._events.scan_status
)
self._events.scan_status(
ScanStatusEventArgs(
current=0,
total=total_to_scan,
folder="",
status="started",
progress=0.0,
message="Scan started",
)
)
# Reinitialize scanner
await loop.run_in_executor(
self.executor,
self.serie_scanner.reinit
)
def scan_progress_handler(progress_data):
"""Handle scan progress events from scanner."""
# Fire scan progress event
message = progress_data.get('message', '')
folder = message.replace('Scanning: ', '')
self._events.scan_status(
ScanStatusEventArgs(
current=progress_data.get('current', 0),
total=progress_data.get('total', total_to_scan),
folder=folder,
status="progress",
progress=(
progress_data.get('percentage', 0.0) / 100.0
),
message=message,
)
)
# Subscribe to scanner's progress events
self.serie_scanner.subscribe_on_progress(scan_progress_handler)
try:
# Perform scan (file-based, returns results in scanner.keyDict)
await loop.run_in_executor(
self.executor,
self.serie_scanner.scan
)
finally:
# Always unsubscribe after scan completes or fails
self.serie_scanner.unsubscribe_on_progress(
scan_progress_handler
)
# Get scanned series from scanner
scanned_series = list(self.serie_scanner.keyDict.values())
# Update in-memory list with scan results
self.list.keyDict.clear()
for serie in scanned_series:
self.list.keyDict[serie.key] = serie
self.series_list = self.list.GetMissingEpisode()
logger.info("Directory rescan completed successfully")
# Fire scan completed event
logger.info(
"Firing scan_status 'completed' event, handler=%s",
self._events.scan_status
)
self._events.scan_status(
ScanStatusEventArgs(
current=total_to_scan,
total=total_to_scan,
folder="",
status="completed",
progress=1.0,
message=(
f"Scan completed. Found {len(self.series_list)} "
"series with missing episodes."
),
)
)
return scanned_series
except InterruptedError:
logger.warning("Scan cancelled by user")
# Fire scan cancelled event
self._events.scan_status(
ScanStatusEventArgs(
current=0,
total=total_to_scan,
folder="",
status="cancelled",
message="Scan cancelled by user",
)
)
raise
except Exception as e:
logger.error("Scan error: %s", str(e), exc_info=True)
# Fire scan failed event
self._events.scan_status(
ScanStatusEventArgs(
current=0,
total=total_to_scan,
folder="",
status="failed",
error=e,
message=f"Scan error: {str(e)}",
)
)
raise
async def get_series_list(self) -> List[Any]:
"""
Get the current series list (async).
Returns:
List of series with missing episodes
"""
return self.series_list
async def refresh_series_list(self) -> None:
"""
Reload the cached series list from the underlying data store.
This is an async operation.
"""
await self._init_list()
def _get_serie_by_key(self, key: str) -> Optional[AnimeSeries]:
"""
Get a series by its unique provider key.
This is the primary method for series lookups within SeriesApp.
Args:
key: The unique provider identifier (e.g.,
"attack-on-titan")
Returns:
The AnimeSeries instance if found, None otherwise
Note:
This method uses the SerieList.get_by_key() method which
looks up series by their unique key, not by folder name.
"""
return self.list.get_by_key(key)
def get_all_series_from_data_files(self) -> List[AnimeSeries]:
"""
Get all series from data files in the anime directory.
Scans the directory_to_search for all 'data' files and loads
the AnimeSeries metadata from each file. This method is synchronous
and can be wrapped with asyncio.to_thread if needed for async
contexts.
Returns:
List of AnimeSeries objects found in data files. Returns an empty
list if no data files are found or if the directory doesn't
exist.
Example:
series_app = SeriesApp("/path/to/anime")
all_series = series_app.get_all_series_from_data_files()
for anime in all_series:
print(f"Found: {anime.name} (key={anime.key})")
"""
logger.info(
"Scanning for data files in directory: %s",
self.directory_to_search
)
all_series: List[AnimeSeries] = []
try:
if not os.path.isdir(self.directory_to_search):
logger.warning(
"Directory does not exist: %s",
self.directory_to_search
)
return []
except (OSError, ValueError) as e:
logger.error(
"Failed to scan directory for data files: %s",
str(e),
exc_info=True
)
return []
try:
for folder_name in os.listdir(self.directory_to_search):
folder_path = os.path.join(
self.directory_to_search, folder_name
)
if not os.path.isdir(folder_path):
continue
data_file = os.path.join(folder_path, "data")
if not os.path.isfile(data_file):
continue
series_data = _load_data_file(data_file)
if series_data is None:
continue
key = series_data.get("key")
if not key:
logger.warning(
"Data file missing key, skipping: %s",
data_file
)
continue
anime = AnimeSeries(
key=key,
name=series_data.get("name") or folder_name,
site=series_data.get("site", "https://aniworld.to"),
folder=series_data.get("folder", folder_name),
year=series_data.get("year"),
)
episode_dict = series_data.get("episodeDict", {})
if episode_dict:
anime._episode_dict_cache = {
int(season): episodes
for season, episodes in episode_dict.items()
}
all_series.append(anime)
except (OSError, ValueError) as e:
logger.error(
"Failed to scan directory for data files: %s",
str(e),
exc_info=True
)
return []
logger.info(
"Found %d series from data files in %s",
len(all_series),
self.directory_to_search
)
return all_series
def shutdown(self) -> None:
"""
Shutdown the thread pool executor.
Should be called when the SeriesApp instance is no longer needed
to properly clean up resources.
"""
if hasattr(self, 'executor'):
self.executor.shutdown(wait=True)
logger.info("ThreadPoolExecutor shut down successfully")
def _load_data_file(data_file_path: str) -> Optional[dict]:
"""Load and parse a legacy 'data' file (JSON).
Args:
data_file_path: Path to the data file
Returns:
Parsed data dict or None if parsing fails
"""
import json
try:
with open(data_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
logger.warning("Data file is not a dictionary: %s", data_file_path)
return None
return data
except json.JSONDecodeError as e:
logger.warning(
"Failed to parse legacy data file (JSON error): %s - %s",
data_file_path, str(e)
)
return None
except Exception as e:
logger.warning(
"Failed to read legacy data file: %s - %s",
data_file_path, str(e)
)
return None