Aniworld/src/core/SerieScanner.py
2025-12-02 13:24:22 +01:00

788 lines
29 KiB
Python

"""
SerieScanner - Scans directories for anime series and missing episodes.
This module provides functionality to scan anime directories, identify
missing episodes, and report progress through callback interfaces.
The scanner supports two modes of operation:
1. File-based mode (legacy): Saves scan results to data files
2. Database mode (preferred): Saves scan results to SQLite database
Database mode is preferred for new code. File-based mode is kept for
backward compatibility with CLI usage.
"""
from __future__ import annotations
import logging
import os
import re
import traceback
import uuid
import warnings
from typing import TYPE_CHECKING, Callable, Iterable, Iterator, Optional
from src.core.entities.series import Serie
from src.core.exceptions.Exceptions import MatchNotFoundError, NoKeyFoundException
from src.core.interfaces.callbacks import (
CallbackManager,
CompletionContext,
ErrorContext,
OperationType,
ProgressContext,
ProgressPhase,
)
from src.core.providers.base_provider import Loader
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
from src.server.database.models import AnimeSeries
logger = logging.getLogger(__name__)
error_logger = logging.getLogger("error")
no_key_found_logger = logging.getLogger("series.nokey")
class SerieScanner:
"""
Scans directories for anime series and identifies missing episodes.
Supports progress callbacks for real-time scanning updates.
The scanner supports two modes:
1. File-based (legacy): Set db_session=None, saves to data files
2. Database mode: Provide db_session, saves to SQLite database
Example:
# File-based mode (legacy)
scanner = SerieScanner("/path/to/anime", loader)
scanner.scan()
# Database mode (preferred)
async with get_db_session() as db:
scanner = SerieScanner("/path/to/anime", loader, db_session=db)
await scanner.scan_async()
"""
def __init__(
self,
basePath: str,
loader: Loader,
callback_manager: Optional[CallbackManager] = None,
db_session: Optional["AsyncSession"] = None
) -> None:
"""
Initialize the SerieScanner.
Args:
basePath: Base directory containing anime series
loader: Loader instance for fetching series information
callback_manager: Optional callback manager for progress updates
db_session: Optional database session for database mode.
If provided, scan_async() should be used instead of scan().
Raises:
ValueError: If basePath is invalid or doesn't exist
"""
# Validate basePath to prevent directory traversal attacks
if not basePath or not basePath.strip():
raise ValueError("Base path cannot be empty")
# Resolve to absolute path and validate it exists
abs_path = os.path.abspath(basePath)
if not os.path.exists(abs_path):
raise ValueError(f"Base path does not exist: {abs_path}")
if not os.path.isdir(abs_path):
raise ValueError(f"Base path is not a directory: {abs_path}")
self.directory: str = abs_path
self.keyDict: dict[str, Serie] = {}
self.loader: Loader = loader
self._callback_manager: CallbackManager = (
callback_manager or CallbackManager()
)
self._current_operation_id: Optional[str] = None
self._db_session: Optional["AsyncSession"] = db_session
logger.info("Initialized SerieScanner with base path: %s", abs_path)
@property
def callback_manager(self) -> CallbackManager:
"""Get the callback manager instance."""
return self._callback_manager
def reinit(self) -> None:
"""Reinitialize the series dictionary (keyed by serie.key)."""
self.keyDict: dict[str, Serie] = {}
def get_total_to_scan(self) -> int:
"""Get the total number of folders to scan.
Returns:
Total count of folders with MP4 files
"""
result = self.__find_mp4_files()
return sum(1 for _ in result)
def scan(
self,
callback: Optional[Callable[[str, int], None]] = None
) -> None:
"""
Scan directories for anime series and missing episodes (file-based).
This method saves results to data files. For database storage,
use scan_async() instead.
.. deprecated:: 2.0.0
Use :meth:`scan_async` for database-backed storage.
File-based storage will be removed in a future version.
Args:
callback: Optional legacy callback function (folder, count)
Raises:
Exception: If scan fails critically
"""
warnings.warn(
"File-based scan() is deprecated. Use scan_async() for "
"database storage.",
DeprecationWarning,
stacklevel=2
)
# Generate unique operation ID
self._current_operation_id = str(uuid.uuid4())
logger.info("Starting scan for missing episodes")
# Notify scan starting
self._callback_manager.notify_progress(
ProgressContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
phase=ProgressPhase.STARTING,
current=0,
total=0,
percentage=0.0,
message="Initializing scan"
)
)
try:
# Get total items to process
total_to_scan = self.get_total_to_scan()
logger.info("Total folders to scan: %d", total_to_scan)
# The scanner enumerates folders with mp4 files, loads existing
# metadata, calculates the missing episodes via the provider, and
# persists the refreshed metadata while emitting progress events.
result = self.__find_mp4_files()
counter = 0
for folder, mp4_files in result:
try:
counter += 1
# Calculate progress
if total_to_scan > 0:
percentage = (counter / total_to_scan) * 100
else:
percentage = 0.0
# Progress is surfaced both through the callback manager
# (for the web/UI layer) and, for compatibility, through a
# legacy callback that updates CLI progress bars.
# Notify progress
self._callback_manager.notify_progress(
ProgressContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
phase=ProgressPhase.IN_PROGRESS,
current=counter,
total=total_to_scan,
percentage=percentage,
message=f"Scanning: {folder}",
details=f"Found {len(mp4_files)} episodes"
)
)
# Call legacy callback if provided
if callback:
callback(folder, counter)
serie = self.__read_data_from_file(folder)
if (
serie is not None
and serie.key
and serie.key.strip()
):
# Delegate the provider to compare local files with
# remote metadata, yielding missing episodes per
# season. Results are saved back to disk so that both
# CLI and API consumers see consistent state.
missing_episodes, _site = (
self.__get_missing_episodes_and_season(
serie.key, mp4_files
)
)
serie.episodeDict = missing_episodes
serie.folder = folder
data_path = os.path.join(
self.directory, folder, 'data'
)
serie.save_to_file(data_path)
# Store by key (primary identifier), not folder
if serie.key in self.keyDict:
logger.error(
"Duplicate series found with key '%s' "
"(folder: '%s')",
serie.key,
folder
)
else:
self.keyDict[serie.key] = serie
logger.debug(
"Stored series with key '%s' (folder: '%s')",
serie.key,
folder
)
no_key_found_logger.info(
"Saved Serie: '%s'", str(serie)
)
except NoKeyFoundException as nkfe:
# Log error and notify via callback
error_msg = f"Error processing folder '{folder}': {nkfe}"
logger.error(error_msg)
self._callback_manager.notify_error(
ErrorContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
error=nkfe,
message=error_msg,
recoverable=True,
metadata={"folder": folder, "key": None}
)
)
except Exception as e:
# Log error and notify via callback
error_msg = (
f"Folder: '{folder}' - "
f"Unexpected error: {e}"
)
error_logger.error(
"%s\n%s",
error_msg,
traceback.format_exc()
)
self._callback_manager.notify_error(
ErrorContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
error=e,
message=error_msg,
recoverable=True,
metadata={"folder": folder, "key": None}
)
)
continue
# Notify scan completion
self._callback_manager.notify_completion(
CompletionContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
success=True,
message=f"Scan completed. Processed {counter} folders.",
statistics={
"total_folders": counter,
"series_found": len(self.keyDict)
}
)
)
logger.info(
"Scan completed. Processed %d folders, found %d series",
counter,
len(self.keyDict)
)
except Exception as e:
# Critical error - notify and re-raise
error_msg = f"Critical scan error: {e}"
logger.error("%s\n%s", error_msg, traceback.format_exc())
self._callback_manager.notify_error(
ErrorContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
error=e,
message=error_msg,
recoverable=False
)
)
self._callback_manager.notify_completion(
CompletionContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
success=False,
message=error_msg
)
)
raise
async def scan_async(
self,
db: "AsyncSession",
callback: Optional[Callable[[str, int], None]] = None
) -> None:
"""
Scan directories for anime series and save to database.
This is the preferred method for scanning when using database
storage. Results are saved to the database instead of files.
Args:
db: Database session for async operations
callback: Optional legacy callback function (folder, count)
Raises:
Exception: If scan fails critically
Example:
async with get_db_session() as db:
scanner = SerieScanner("/path/to/anime", loader)
await scanner.scan_async(db)
"""
# Generate unique operation ID
self._current_operation_id = str(uuid.uuid4())
logger.info("Starting async scan for missing episodes (database mode)")
# Notify scan starting
self._callback_manager.notify_progress(
ProgressContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
phase=ProgressPhase.STARTING,
current=0,
total=0,
percentage=0.0,
message="Initializing scan (database mode)"
)
)
try:
# Get total items to process
total_to_scan = self.get_total_to_scan()
logger.info("Total folders to scan: %d", total_to_scan)
result = self.__find_mp4_files()
counter = 0
saved_to_db = 0
for folder, mp4_files in result:
try:
counter += 1
# Calculate progress
if total_to_scan > 0:
percentage = (counter / total_to_scan) * 100
else:
percentage = 0.0
# Notify progress
self._callback_manager.notify_progress(
ProgressContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
phase=ProgressPhase.IN_PROGRESS,
current=counter,
total=total_to_scan,
percentage=percentage,
message=f"Scanning: {folder}",
details=f"Found {len(mp4_files)} episodes"
)
)
# Call legacy callback if provided
if callback:
callback(folder, counter)
serie = self.__read_data_from_file(folder)
if (
serie is not None
and serie.key
and serie.key.strip()
):
# Get missing episodes from provider
missing_episodes, _site = (
self.__get_missing_episodes_and_season(
serie.key, mp4_files
)
)
serie.episodeDict = missing_episodes
serie.folder = folder
# Save to database instead of file
await self._save_serie_to_db(serie, db)
saved_to_db += 1
# Store by key in memory cache
if serie.key in self.keyDict:
logger.error(
"Duplicate series found with key '%s' "
"(folder: '%s')",
serie.key,
folder
)
else:
self.keyDict[serie.key] = serie
logger.debug(
"Stored series with key '%s' (folder: '%s')",
serie.key,
folder
)
except NoKeyFoundException as nkfe:
error_msg = f"Error processing folder '{folder}': {nkfe}"
logger.error(error_msg)
self._callback_manager.notify_error(
ErrorContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
error=nkfe,
message=error_msg,
recoverable=True,
metadata={"folder": folder, "key": None}
)
)
except Exception as e:
error_msg = (
f"Folder: '{folder}' - Unexpected error: {e}"
)
error_logger.error(
"%s\n%s",
error_msg,
traceback.format_exc()
)
self._callback_manager.notify_error(
ErrorContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
error=e,
message=error_msg,
recoverable=True,
metadata={"folder": folder, "key": None}
)
)
continue
# Notify scan completion
self._callback_manager.notify_completion(
CompletionContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
success=True,
message=f"Scan completed. Processed {counter} folders.",
statistics={
"total_folders": counter,
"series_found": len(self.keyDict),
"saved_to_db": saved_to_db
}
)
)
logger.info(
"Async scan completed. Processed %d folders, "
"found %d series, saved %d to database",
counter,
len(self.keyDict),
saved_to_db
)
except Exception as e:
error_msg = f"Critical async scan error: {e}"
logger.error("%s\n%s", error_msg, traceback.format_exc())
self._callback_manager.notify_error(
ErrorContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
error=e,
message=error_msg,
recoverable=False
)
)
self._callback_manager.notify_completion(
CompletionContext(
operation_type=OperationType.SCAN,
operation_id=self._current_operation_id,
success=False,
message=error_msg
)
)
raise
async def _save_serie_to_db(
self,
serie: Serie,
db: "AsyncSession"
) -> Optional["AnimeSeries"]:
"""
Save or update a series in the database.
Creates a new record if the series doesn't exist, or updates
the episode_dict if it has changed.
Args:
serie: Serie instance to save
db: Database session for async operations
Returns:
Created or updated AnimeSeries instance, or None if unchanged
"""
from src.server.database.service import AnimeSeriesService
# Check if series already exists
existing = await AnimeSeriesService.get_by_key(db, serie.key)
if existing:
# Update episode_dict if changed
if existing.episode_dict != serie.episodeDict:
updated = await AnimeSeriesService.update(
db,
existing.id,
episode_dict=serie.episodeDict,
folder=serie.folder
)
logger.info(
"Updated series in database: %s (key=%s)",
serie.name,
serie.key
)
return updated
else:
logger.debug(
"Series unchanged in database: %s (key=%s)",
serie.name,
serie.key
)
return None
else:
# Create new series
anime_series = await AnimeSeriesService.create(
db=db,
key=serie.key,
name=serie.name,
site=serie.site,
folder=serie.folder,
episode_dict=serie.episodeDict,
)
logger.info(
"Created series in database: %s (key=%s)",
serie.name,
serie.key
)
return anime_series
async def _update_serie_in_db(
self,
serie: Serie,
db: "AsyncSession"
) -> Optional["AnimeSeries"]:
"""
Update an existing series in the database.
Args:
serie: Serie instance to update
db: Database session for async operations
Returns:
Updated AnimeSeries instance, or None if not found
"""
from src.server.database.service import AnimeSeriesService
existing = await AnimeSeriesService.get_by_key(db, serie.key)
if not existing:
logger.warning(
"Cannot update non-existent series: %s (key=%s)",
serie.name,
serie.key
)
return None
updated = await AnimeSeriesService.update(
db,
existing.id,
name=serie.name,
site=serie.site,
folder=serie.folder,
episode_dict=serie.episodeDict,
)
logger.info(
"Updated series in database: %s (key=%s)",
serie.name,
serie.key
)
return updated
def __find_mp4_files(self) -> Iterator[tuple[str, list[str]]]:
"""Find all .mp4 files in the directory structure."""
logger.info("Scanning for .mp4 files")
for anime_name in os.listdir(self.directory):
anime_path = os.path.join(self.directory, anime_name)
if os.path.isdir(anime_path):
mp4_files: list[str] = []
has_files = False
for root, _, files in os.walk(anime_path):
for file in files:
if file.endswith(".mp4"):
mp4_files.append(os.path.join(root, file))
has_files = True
yield anime_name, mp4_files if has_files else []
def __remove_year(self, input_string: str) -> str:
"""Remove year information from input string."""
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
logger.debug(
"Removed year from '%s' -> '%s'",
input_string,
cleaned_string
)
return cleaned_string
def __read_data_from_file(self, folder_name: str) -> Optional[Serie]:
"""Read serie data from file or key file.
Args:
folder_name: Filesystem folder name
(used only to locate data files)
Returns:
Serie object with valid key if found, None otherwise
Note:
The returned Serie will have its 'key' as the primary identifier.
The 'folder' field is metadata only.
"""
folder_path = os.path.join(self.directory, folder_name)
key = None
key_file = os.path.join(folder_path, 'key')
serie_file = os.path.join(folder_path, 'data')
if os.path.exists(key_file):
with open(key_file, 'r', encoding='utf-8') as file:
key = file.read().strip()
logger.info(
"Key found for folder '%s': %s",
folder_name,
key
)
return Serie(key, "", "aniworld.to", folder_name, dict())
if os.path.exists(serie_file):
with open(serie_file, "rb") as file:
logger.info(
"load serie_file from '%s': %s",
folder_name,
serie_file
)
return Serie.load_from_file(serie_file)
return None
def __get_episode_and_season(self, filename: str) -> tuple[int, int]:
"""Extract season and episode numbers from filename.
Args:
filename: Filename to parse
Returns:
Tuple of (season, episode) as integers
Raises:
MatchNotFoundError: If pattern not found
"""
pattern = r'S(\d+)E(\d+)'
match = re.search(pattern, filename)
if match:
season = match.group(1)
episode = match.group(2)
logger.debug(
"Extracted season %s, episode %s from '%s'",
season,
episode,
filename
)
return int(season), int(episode)
else:
logger.error(
"Failed to find season/episode pattern in '%s'",
filename
)
raise MatchNotFoundError(
"Season and episode pattern not found in the filename."
)
def __get_episodes_and_seasons(
self,
mp4_files: Iterable[str]
) -> dict[int, list[int]]:
"""Get episodes grouped by season from mp4 files.
Args:
mp4_files: List of MP4 filenames
Returns:
Dictionary mapping season to list of episode numbers
"""
episodes_dict: dict[int, list[int]] = {}
for file in mp4_files:
season, episode = self.__get_episode_and_season(file)
if season in episodes_dict:
episodes_dict[season].append(episode)
else:
episodes_dict[season] = [episode]
return episodes_dict
def __get_missing_episodes_and_season(
self,
key: str,
mp4_files: Iterable[str]
) -> tuple[dict[int, list[int]], str]:
"""Get missing episodes for a serie.
Args:
key: Series key
mp4_files: List of MP4 filenames
Returns:
Tuple of (episodes_dict, site_name)
"""
# key season , value count of episodes
expected_dict = self.loader.get_season_episode_count(key)
filedict = self.__get_episodes_and_seasons(mp4_files)
episodes_dict: dict[int, list[int]] = {}
for season, expected_count in expected_dict.items():
existing_episodes = filedict.get(season, [])
missing_episodes = [
ep for ep in range(1, expected_count + 1)
if ep not in existing_episodes
and self.loader.is_language(season, ep, key)
]
if missing_episodes:
episodes_dict[season] = missing_episodes
return episodes_dict, "aniworld.to"