- Add sanitize_folder_name utility for filesystem-safe folder names - Add sanitized_folder property to Serie entity - Update SerieList.add() to use sanitized display names for folders - Add scan_single_series() method for targeted episode scanning - Enhance add_series endpoint: DB save -> folder create -> targeted scan - Update response to include missing_episodes and total_missing - Add comprehensive unit tests for new functionality - Update API tests with proper mock support
649 lines
24 KiB
Python
649 lines
24 KiB
Python
"""
|
|
SerieScanner - Scans directories for anime series and missing episodes.
|
|
|
|
This module provides functionality to scan anime directories, identify
|
|
missing episodes, and report progress through callback interfaces.
|
|
|
|
Note:
|
|
This module is pure domain logic. Database operations are handled
|
|
by the service layer (AnimeService).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import traceback
|
|
import uuid
|
|
from typing import Callable, Iterable, Iterator, Optional
|
|
|
|
from src.core.entities.series import Serie
|
|
from src.core.exceptions.Exceptions import MatchNotFoundError, NoKeyFoundException
|
|
from src.core.interfaces.callbacks import (
|
|
CallbackManager,
|
|
CompletionContext,
|
|
ErrorContext,
|
|
OperationType,
|
|
ProgressContext,
|
|
ProgressPhase,
|
|
)
|
|
from src.core.providers.base_provider import Loader
|
|
|
|
logger = logging.getLogger(__name__)
|
|
error_logger = logging.getLogger("error")
|
|
no_key_found_logger = logging.getLogger("series.nokey")
|
|
|
|
|
|
class SerieScanner:
|
|
"""
|
|
Scans directories for anime series and identifies missing episodes.
|
|
|
|
Supports progress callbacks for real-time scanning updates.
|
|
|
|
Note:
|
|
This class is pure domain logic. Database operations are handled
|
|
by the service layer (AnimeService). Scan results are stored
|
|
in keyDict and can be retrieved after scanning.
|
|
|
|
Example:
|
|
scanner = SerieScanner("/path/to/anime", loader)
|
|
scanner.scan()
|
|
# Results are in scanner.keyDict
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
basePath: str,
|
|
loader: Loader,
|
|
callback_manager: Optional[CallbackManager] = None,
|
|
) -> None:
|
|
"""
|
|
Initialize the SerieScanner.
|
|
|
|
Args:
|
|
basePath: Base directory containing anime series
|
|
loader: Loader instance for fetching series information
|
|
callback_manager: Optional callback manager for progress updates
|
|
|
|
Raises:
|
|
ValueError: If basePath is invalid or doesn't exist
|
|
"""
|
|
# Validate basePath to prevent directory traversal attacks
|
|
if not basePath or not basePath.strip():
|
|
raise ValueError("Base path cannot be empty")
|
|
|
|
# Resolve to absolute path and validate it exists
|
|
abs_path = os.path.abspath(basePath)
|
|
if not os.path.exists(abs_path):
|
|
raise ValueError(f"Base path does not exist: {abs_path}")
|
|
if not os.path.isdir(abs_path):
|
|
raise ValueError(f"Base path is not a directory: {abs_path}")
|
|
|
|
self.directory: str = abs_path
|
|
self.keyDict: dict[str, Serie] = {}
|
|
self.loader: Loader = loader
|
|
self._callback_manager: CallbackManager = (
|
|
callback_manager or CallbackManager()
|
|
)
|
|
self._current_operation_id: Optional[str] = None
|
|
|
|
logger.info("Initialized SerieScanner with base path: %s", abs_path)
|
|
|
|
@property
|
|
def callback_manager(self) -> CallbackManager:
|
|
"""Get the callback manager instance."""
|
|
return self._callback_manager
|
|
|
|
def reinit(self) -> None:
|
|
"""Reinitialize the series dictionary (keyed by serie.key)."""
|
|
self.keyDict: dict[str, Serie] = {}
|
|
|
|
def get_total_to_scan(self) -> int:
|
|
"""Get the total number of folders to scan.
|
|
|
|
Returns:
|
|
Total count of folders with MP4 files
|
|
"""
|
|
result = self.__find_mp4_files()
|
|
return sum(1 for _ in result)
|
|
|
|
def scan(
|
|
self,
|
|
callback: Optional[Callable[[str, int], None]] = None
|
|
) -> None:
|
|
"""
|
|
Scan directories for anime series and missing episodes.
|
|
|
|
Results are stored in self.keyDict and can be retrieved after
|
|
scanning. Data files are also saved to disk for persistence.
|
|
|
|
Args:
|
|
callback: Optional callback function (folder, count) for
|
|
progress updates
|
|
|
|
Raises:
|
|
Exception: If scan fails critically
|
|
"""
|
|
# Generate unique operation ID
|
|
self._current_operation_id = str(uuid.uuid4())
|
|
|
|
logger.info("Starting scan for missing episodes")
|
|
|
|
# Notify scan starting
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
phase=ProgressPhase.STARTING,
|
|
current=0,
|
|
total=0,
|
|
percentage=0.0,
|
|
message="Initializing scan"
|
|
)
|
|
)
|
|
|
|
try:
|
|
# Get total items to process
|
|
total_to_scan = self.get_total_to_scan()
|
|
logger.info("Total folders to scan: %d", total_to_scan)
|
|
|
|
# The scanner enumerates folders with mp4 files, loads existing
|
|
# metadata, calculates the missing episodes via the provider, and
|
|
# persists the refreshed metadata while emitting progress events.
|
|
result = self.__find_mp4_files()
|
|
counter = 0
|
|
|
|
for folder, mp4_files in result:
|
|
try:
|
|
counter += 1
|
|
|
|
# Calculate progress
|
|
if total_to_scan > 0:
|
|
percentage = (counter / total_to_scan) * 100
|
|
else:
|
|
percentage = 0.0
|
|
|
|
# Progress is surfaced both through the callback manager
|
|
# (for the web/UI layer) and, for compatibility, through a
|
|
# legacy callback that updates CLI progress bars.
|
|
# Notify progress
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
phase=ProgressPhase.IN_PROGRESS,
|
|
current=counter,
|
|
total=total_to_scan,
|
|
percentage=percentage,
|
|
message=f"Scanning: {folder}",
|
|
details=f"Found {len(mp4_files)} episodes"
|
|
)
|
|
)
|
|
|
|
# Call legacy callback if provided
|
|
if callback:
|
|
callback(folder, counter)
|
|
|
|
serie = self.__read_data_from_file(folder)
|
|
if (
|
|
serie is not None
|
|
and serie.key
|
|
and serie.key.strip()
|
|
):
|
|
# Delegate the provider to compare local files with
|
|
# remote metadata, yielding missing episodes per
|
|
# season. Results are saved back to disk so that both
|
|
# CLI and API consumers see consistent state.
|
|
missing_episodes, _site = (
|
|
self.__get_missing_episodes_and_season(
|
|
serie.key, mp4_files
|
|
)
|
|
)
|
|
serie.episodeDict = missing_episodes
|
|
serie.folder = folder
|
|
data_path = os.path.join(
|
|
self.directory, folder, 'data'
|
|
)
|
|
serie.save_to_file(data_path)
|
|
|
|
# Store by key (primary identifier), not folder
|
|
if serie.key in self.keyDict:
|
|
logger.error(
|
|
"Duplicate series found with key '%s' "
|
|
"(folder: '%s')",
|
|
serie.key,
|
|
folder
|
|
)
|
|
else:
|
|
self.keyDict[serie.key] = serie
|
|
logger.debug(
|
|
"Stored series with key '%s' (folder: '%s')",
|
|
serie.key,
|
|
folder
|
|
)
|
|
no_key_found_logger.info(
|
|
"Saved Serie: '%s'", str(serie)
|
|
)
|
|
|
|
except NoKeyFoundException as nkfe:
|
|
# Log error and notify via callback
|
|
error_msg = f"Error processing folder '{folder}': {nkfe}"
|
|
logger.error(error_msg)
|
|
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=nkfe,
|
|
message=error_msg,
|
|
recoverable=True,
|
|
metadata={"folder": folder, "key": None}
|
|
)
|
|
)
|
|
except Exception as e:
|
|
# Log error and notify via callback
|
|
error_msg = (
|
|
f"Folder: '{folder}' - "
|
|
f"Unexpected error: {e}"
|
|
)
|
|
error_logger.error(
|
|
"%s\n%s",
|
|
error_msg,
|
|
traceback.format_exc()
|
|
)
|
|
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=e,
|
|
message=error_msg,
|
|
recoverable=True,
|
|
metadata={"folder": folder, "key": None}
|
|
)
|
|
)
|
|
continue
|
|
|
|
# Notify scan completion
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
success=True,
|
|
message=f"Scan completed. Processed {counter} folders.",
|
|
statistics={
|
|
"total_folders": counter,
|
|
"series_found": len(self.keyDict)
|
|
}
|
|
)
|
|
)
|
|
|
|
logger.info(
|
|
"Scan completed. Processed %d folders, found %d series",
|
|
counter,
|
|
len(self.keyDict)
|
|
)
|
|
|
|
except Exception as e:
|
|
# Critical error - notify and re-raise
|
|
error_msg = f"Critical scan error: {e}"
|
|
logger.error("%s\n%s", error_msg, traceback.format_exc())
|
|
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=e,
|
|
message=error_msg,
|
|
recoverable=False
|
|
)
|
|
)
|
|
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
success=False,
|
|
message=error_msg
|
|
)
|
|
)
|
|
|
|
raise
|
|
|
|
def __find_mp4_files(self) -> Iterator[tuple[str, list[str]]]:
|
|
"""Find all .mp4 files in the directory structure."""
|
|
logger.info("Scanning for .mp4 files")
|
|
for anime_name in os.listdir(self.directory):
|
|
anime_path = os.path.join(self.directory, anime_name)
|
|
if os.path.isdir(anime_path):
|
|
mp4_files: list[str] = []
|
|
has_files = False
|
|
for root, _, files in os.walk(anime_path):
|
|
for file in files:
|
|
if file.endswith(".mp4"):
|
|
mp4_files.append(os.path.join(root, file))
|
|
has_files = True
|
|
yield anime_name, mp4_files if has_files else []
|
|
|
|
def __remove_year(self, input_string: str) -> str:
|
|
"""Remove year information from input string."""
|
|
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
|
|
logger.debug(
|
|
"Removed year from '%s' -> '%s'",
|
|
input_string,
|
|
cleaned_string
|
|
)
|
|
return cleaned_string
|
|
|
|
def __read_data_from_file(self, folder_name: str) -> Optional[Serie]:
|
|
"""Read serie data from file or key file.
|
|
|
|
Args:
|
|
folder_name: Filesystem folder name
|
|
(used only to locate data files)
|
|
|
|
Returns:
|
|
Serie object with valid key if found, None otherwise
|
|
|
|
Note:
|
|
The returned Serie will have its 'key' as the primary identifier.
|
|
The 'folder' field is metadata only.
|
|
"""
|
|
folder_path = os.path.join(self.directory, folder_name)
|
|
key = None
|
|
key_file = os.path.join(folder_path, 'key')
|
|
serie_file = os.path.join(folder_path, 'data')
|
|
|
|
if os.path.exists(key_file):
|
|
with open(key_file, 'r', encoding='utf-8') as file:
|
|
key = file.read().strip()
|
|
logger.info(
|
|
"Key found for folder '%s': %s",
|
|
folder_name,
|
|
key
|
|
)
|
|
return Serie(key, "", "aniworld.to", folder_name, dict())
|
|
|
|
if os.path.exists(serie_file):
|
|
with open(serie_file, "rb") as file:
|
|
logger.info(
|
|
"load serie_file from '%s': %s",
|
|
folder_name,
|
|
serie_file
|
|
)
|
|
return Serie.load_from_file(serie_file)
|
|
|
|
return None
|
|
|
|
def __get_episode_and_season(self, filename: str) -> tuple[int, int]:
|
|
"""Extract season and episode numbers from filename.
|
|
|
|
Args:
|
|
filename: Filename to parse
|
|
|
|
Returns:
|
|
Tuple of (season, episode) as integers
|
|
|
|
Raises:
|
|
MatchNotFoundError: If pattern not found
|
|
"""
|
|
pattern = r'S(\d+)E(\d+)'
|
|
match = re.search(pattern, filename)
|
|
if match:
|
|
season = match.group(1)
|
|
episode = match.group(2)
|
|
logger.debug(
|
|
"Extracted season %s, episode %s from '%s'",
|
|
season,
|
|
episode,
|
|
filename
|
|
)
|
|
return int(season), int(episode)
|
|
else:
|
|
logger.error(
|
|
"Failed to find season/episode pattern in '%s'",
|
|
filename
|
|
)
|
|
raise MatchNotFoundError(
|
|
"Season and episode pattern not found in the filename."
|
|
)
|
|
|
|
def __get_episodes_and_seasons(
|
|
self,
|
|
mp4_files: Iterable[str]
|
|
) -> dict[int, list[int]]:
|
|
"""Get episodes grouped by season from mp4 files.
|
|
|
|
Args:
|
|
mp4_files: List of MP4 filenames
|
|
|
|
Returns:
|
|
Dictionary mapping season to list of episode numbers
|
|
"""
|
|
episodes_dict: dict[int, list[int]] = {}
|
|
|
|
for file in mp4_files:
|
|
season, episode = self.__get_episode_and_season(file)
|
|
|
|
if season in episodes_dict:
|
|
episodes_dict[season].append(episode)
|
|
else:
|
|
episodes_dict[season] = [episode]
|
|
return episodes_dict
|
|
|
|
def __get_missing_episodes_and_season(
|
|
self,
|
|
key: str,
|
|
mp4_files: Iterable[str]
|
|
) -> tuple[dict[int, list[int]], str]:
|
|
"""Get missing episodes for a serie.
|
|
|
|
Args:
|
|
key: Series key
|
|
mp4_files: List of MP4 filenames
|
|
|
|
Returns:
|
|
Tuple of (episodes_dict, site_name)
|
|
"""
|
|
# key season , value count of episodes
|
|
expected_dict = self.loader.get_season_episode_count(key)
|
|
filedict = self.__get_episodes_and_seasons(mp4_files)
|
|
episodes_dict: dict[int, list[int]] = {}
|
|
for season, expected_count in expected_dict.items():
|
|
existing_episodes = filedict.get(season, [])
|
|
missing_episodes = [
|
|
ep for ep in range(1, expected_count + 1)
|
|
if ep not in existing_episodes
|
|
and self.loader.is_language(season, ep, key)
|
|
]
|
|
|
|
if missing_episodes:
|
|
episodes_dict[season] = missing_episodes
|
|
|
|
return episodes_dict, "aniworld.to"
|
|
|
|
def scan_single_series(
|
|
self,
|
|
key: str,
|
|
folder: str,
|
|
) -> dict[int, list[int]]:
|
|
"""
|
|
Scan a single series for missing episodes.
|
|
|
|
This method performs a targeted scan for only the specified series,
|
|
without triggering a full library rescan. It fetches available
|
|
episodes from the provider and compares with local files.
|
|
|
|
Args:
|
|
key: The unique provider key for the series
|
|
folder: The filesystem folder name where the series is stored
|
|
|
|
Returns:
|
|
dict[int, list[int]]: Dictionary mapping season numbers to lists
|
|
of missing episode numbers. Empty dict if no missing episodes.
|
|
|
|
Raises:
|
|
ValueError: If key or folder is empty
|
|
|
|
Example:
|
|
>>> scanner = SerieScanner("/path/to/anime", loader)
|
|
>>> missing = scanner.scan_single_series(
|
|
... "attack-on-titan",
|
|
... "Attack on Titan"
|
|
... )
|
|
>>> print(missing)
|
|
{1: [5, 6, 7], 2: [1, 2]}
|
|
"""
|
|
if not key or not key.strip():
|
|
raise ValueError("Series key cannot be empty")
|
|
if not folder or not folder.strip():
|
|
raise ValueError("Series folder cannot be empty")
|
|
|
|
logger.info(
|
|
"Starting targeted scan for series: %s (folder: %s)",
|
|
key,
|
|
folder
|
|
)
|
|
|
|
# Generate unique operation ID for this targeted scan
|
|
operation_id = str(uuid.uuid4())
|
|
|
|
# Notify scan starting
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=operation_id,
|
|
phase=ProgressPhase.STARTING,
|
|
current=0,
|
|
total=1,
|
|
percentage=0.0,
|
|
message=f"Scanning series: {folder}",
|
|
details=f"Key: {key}"
|
|
)
|
|
)
|
|
|
|
try:
|
|
# Get the folder path
|
|
folder_path = os.path.join(self.directory, folder)
|
|
|
|
# Check if folder exists
|
|
if not os.path.isdir(folder_path):
|
|
logger.info(
|
|
"Series folder does not exist yet: %s - "
|
|
"will scan for available episodes from provider",
|
|
folder_path
|
|
)
|
|
mp4_files: list[str] = []
|
|
else:
|
|
# Find existing MP4 files in the folder
|
|
mp4_files = []
|
|
for root, _, files in os.walk(folder_path):
|
|
for file in files:
|
|
if file.endswith(".mp4"):
|
|
mp4_files.append(os.path.join(root, file))
|
|
|
|
logger.debug(
|
|
"Found %d existing MP4 files in folder %s",
|
|
len(mp4_files),
|
|
folder
|
|
)
|
|
|
|
# Get missing episodes from provider
|
|
missing_episodes, site = self.__get_missing_episodes_and_season(
|
|
key, mp4_files
|
|
)
|
|
|
|
# Update progress
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=operation_id,
|
|
phase=ProgressPhase.IN_PROGRESS,
|
|
current=1,
|
|
total=1,
|
|
percentage=100.0,
|
|
message=f"Scanned: {folder}",
|
|
details=f"Found {sum(len(eps) for eps in missing_episodes.values())} missing episodes"
|
|
)
|
|
)
|
|
|
|
# Create or update Serie in keyDict
|
|
if key in self.keyDict:
|
|
# Update existing serie
|
|
self.keyDict[key].episodeDict = missing_episodes
|
|
logger.debug(
|
|
"Updated existing series %s with %d missing episodes",
|
|
key,
|
|
sum(len(eps) for eps in missing_episodes.values())
|
|
)
|
|
else:
|
|
# Create new serie entry
|
|
serie = Serie(
|
|
key=key,
|
|
name="", # Will be populated by caller if needed
|
|
site=site,
|
|
folder=folder,
|
|
episodeDict=missing_episodes
|
|
)
|
|
self.keyDict[key] = serie
|
|
logger.debug(
|
|
"Created new series entry for %s with %d missing episodes",
|
|
key,
|
|
sum(len(eps) for eps in missing_episodes.values())
|
|
)
|
|
|
|
# Notify completion
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=operation_id,
|
|
success=True,
|
|
message=f"Scan completed for {folder}",
|
|
statistics={
|
|
"missing_episodes": sum(
|
|
len(eps) for eps in missing_episodes.values()
|
|
),
|
|
"seasons_with_missing": len(missing_episodes)
|
|
}
|
|
)
|
|
)
|
|
|
|
logger.info(
|
|
"Targeted scan completed for %s: %d missing episodes across %d seasons",
|
|
key,
|
|
sum(len(eps) for eps in missing_episodes.values()),
|
|
len(missing_episodes)
|
|
)
|
|
|
|
return missing_episodes
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to scan series {key}: {e}"
|
|
logger.error(error_msg, exc_info=True)
|
|
|
|
# Notify error
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=operation_id,
|
|
error=e,
|
|
message=error_msg,
|
|
recoverable=True,
|
|
metadata={"key": key, "folder": folder}
|
|
)
|
|
)
|
|
|
|
# Notify completion with failure
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=operation_id,
|
|
success=False,
|
|
message=error_msg
|
|
)
|
|
)
|
|
|
|
# Return empty dict on error (scan failed but not critical)
|
|
return {}
|
|
|