849 lines
31 KiB
Python
849 lines
31 KiB
Python
"""
|
|
SerieScanner - Scans directories for anime series and missing episodes.
|
|
|
|
This module provides functionality to scan anime directories, identify
|
|
missing episodes, and report progress through callback interfaces.
|
|
|
|
The scanner supports two modes of operation:
|
|
1. File-based mode (legacy): Saves scan results to data files
|
|
2. Database mode (preferred): Saves scan results to SQLite database
|
|
|
|
Database mode is preferred for new code. File-based mode is kept for
|
|
backward compatibility with CLI usage.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import traceback
|
|
import uuid
|
|
import warnings
|
|
from typing import TYPE_CHECKING, Callable, Iterable, Iterator, Optional
|
|
|
|
from src.core.entities.series import Serie
|
|
from src.core.exceptions.Exceptions import MatchNotFoundError, NoKeyFoundException
|
|
from src.core.interfaces.callbacks import (
|
|
CallbackManager,
|
|
CompletionContext,
|
|
ErrorContext,
|
|
OperationType,
|
|
ProgressContext,
|
|
ProgressPhase,
|
|
)
|
|
from src.core.providers.base_provider import Loader
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from src.server.database.models import AnimeSeries
|
|
|
|
logger = logging.getLogger(__name__)
|
|
error_logger = logging.getLogger("error")
|
|
no_key_found_logger = logging.getLogger("series.nokey")
|
|
|
|
|
|
class SerieScanner:
|
|
"""
|
|
Scans directories for anime series and identifies missing episodes.
|
|
|
|
Supports progress callbacks for real-time scanning updates.
|
|
|
|
The scanner supports two modes:
|
|
1. File-based (legacy): Set db_session=None, saves to data files
|
|
2. Database mode: Provide db_session, saves to SQLite database
|
|
|
|
Example:
|
|
# File-based mode (legacy)
|
|
scanner = SerieScanner("/path/to/anime", loader)
|
|
scanner.scan()
|
|
|
|
# Database mode (preferred)
|
|
async with get_db_session() as db:
|
|
scanner = SerieScanner("/path/to/anime", loader, db_session=db)
|
|
await scanner.scan_async()
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
basePath: str,
|
|
loader: Loader,
|
|
callback_manager: Optional[CallbackManager] = None,
|
|
db_session: Optional["AsyncSession"] = None
|
|
) -> None:
|
|
"""
|
|
Initialize the SerieScanner.
|
|
|
|
Args:
|
|
basePath: Base directory containing anime series
|
|
loader: Loader instance for fetching series information
|
|
callback_manager: Optional callback manager for progress updates
|
|
db_session: Optional database session for database mode.
|
|
If provided, scan_async() should be used instead of scan().
|
|
|
|
Raises:
|
|
ValueError: If basePath is invalid or doesn't exist
|
|
"""
|
|
# Validate basePath to prevent directory traversal attacks
|
|
if not basePath or not basePath.strip():
|
|
raise ValueError("Base path cannot be empty")
|
|
|
|
# Resolve to absolute path and validate it exists
|
|
abs_path = os.path.abspath(basePath)
|
|
if not os.path.exists(abs_path):
|
|
raise ValueError(f"Base path does not exist: {abs_path}")
|
|
if not os.path.isdir(abs_path):
|
|
raise ValueError(f"Base path is not a directory: {abs_path}")
|
|
|
|
self.directory: str = abs_path
|
|
self.keyDict: dict[str, Serie] = {}
|
|
self.loader: Loader = loader
|
|
self._callback_manager: CallbackManager = (
|
|
callback_manager or CallbackManager()
|
|
)
|
|
self._current_operation_id: Optional[str] = None
|
|
self._db_session: Optional["AsyncSession"] = db_session
|
|
|
|
logger.info("Initialized SerieScanner with base path: %s", abs_path)
|
|
|
|
@property
|
|
def callback_manager(self) -> CallbackManager:
|
|
"""Get the callback manager instance."""
|
|
return self._callback_manager
|
|
|
|
def reinit(self) -> None:
|
|
"""Reinitialize the series dictionary (keyed by serie.key)."""
|
|
self.keyDict: dict[str, Serie] = {}
|
|
|
|
def get_total_to_scan(self) -> int:
|
|
"""Get the total number of folders to scan.
|
|
|
|
Returns:
|
|
Total count of folders with MP4 files
|
|
"""
|
|
result = self.__find_mp4_files()
|
|
return sum(1 for _ in result)
|
|
|
|
def scan(
|
|
self,
|
|
callback: Optional[Callable[[str, int], None]] = None
|
|
) -> None:
|
|
"""
|
|
Scan directories for anime series and missing episodes (file-based).
|
|
|
|
This method saves results to data files. For database storage,
|
|
use scan_async() instead.
|
|
|
|
.. deprecated:: 2.0.0
|
|
Use :meth:`scan_async` for database-backed storage.
|
|
File-based storage will be removed in a future version.
|
|
|
|
Args:
|
|
callback: Optional legacy callback function (folder, count)
|
|
|
|
Raises:
|
|
Exception: If scan fails critically
|
|
"""
|
|
warnings.warn(
|
|
"File-based scan() is deprecated. Use scan_async() for "
|
|
"database storage.",
|
|
DeprecationWarning,
|
|
stacklevel=2
|
|
)
|
|
# Generate unique operation ID
|
|
self._current_operation_id = str(uuid.uuid4())
|
|
|
|
logger.info("Starting scan for missing episodes")
|
|
|
|
# Notify scan starting
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
phase=ProgressPhase.STARTING,
|
|
current=0,
|
|
total=0,
|
|
percentage=0.0,
|
|
message="Initializing scan"
|
|
)
|
|
)
|
|
|
|
try:
|
|
# Get total items to process
|
|
total_to_scan = self.get_total_to_scan()
|
|
logger.info("Total folders to scan: %d", total_to_scan)
|
|
|
|
# The scanner enumerates folders with mp4 files, loads existing
|
|
# metadata, calculates the missing episodes via the provider, and
|
|
# persists the refreshed metadata while emitting progress events.
|
|
result = self.__find_mp4_files()
|
|
counter = 0
|
|
|
|
for folder, mp4_files in result:
|
|
try:
|
|
counter += 1
|
|
|
|
# Calculate progress
|
|
if total_to_scan > 0:
|
|
percentage = (counter / total_to_scan) * 100
|
|
else:
|
|
percentage = 0.0
|
|
|
|
# Progress is surfaced both through the callback manager
|
|
# (for the web/UI layer) and, for compatibility, through a
|
|
# legacy callback that updates CLI progress bars.
|
|
# Notify progress
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
phase=ProgressPhase.IN_PROGRESS,
|
|
current=counter,
|
|
total=total_to_scan,
|
|
percentage=percentage,
|
|
message=f"Scanning: {folder}",
|
|
details=f"Found {len(mp4_files)} episodes"
|
|
)
|
|
)
|
|
|
|
# Call legacy callback if provided
|
|
if callback:
|
|
callback(folder, counter)
|
|
|
|
serie = self.__read_data_from_file(folder)
|
|
if (
|
|
serie is not None
|
|
and serie.key
|
|
and serie.key.strip()
|
|
):
|
|
# Delegate the provider to compare local files with
|
|
# remote metadata, yielding missing episodes per
|
|
# season. Results are saved back to disk so that both
|
|
# CLI and API consumers see consistent state.
|
|
missing_episodes, _site = (
|
|
self.__get_missing_episodes_and_season(
|
|
serie.key, mp4_files
|
|
)
|
|
)
|
|
serie.episodeDict = missing_episodes
|
|
serie.folder = folder
|
|
data_path = os.path.join(
|
|
self.directory, folder, 'data'
|
|
)
|
|
serie.save_to_file(data_path)
|
|
|
|
# Store by key (primary identifier), not folder
|
|
if serie.key in self.keyDict:
|
|
logger.error(
|
|
"Duplicate series found with key '%s' "
|
|
"(folder: '%s')",
|
|
serie.key,
|
|
folder
|
|
)
|
|
else:
|
|
self.keyDict[serie.key] = serie
|
|
logger.debug(
|
|
"Stored series with key '%s' (folder: '%s')",
|
|
serie.key,
|
|
folder
|
|
)
|
|
no_key_found_logger.info(
|
|
"Saved Serie: '%s'", str(serie)
|
|
)
|
|
|
|
except NoKeyFoundException as nkfe:
|
|
# Log error and notify via callback
|
|
error_msg = f"Error processing folder '{folder}': {nkfe}"
|
|
logger.error(error_msg)
|
|
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=nkfe,
|
|
message=error_msg,
|
|
recoverable=True,
|
|
metadata={"folder": folder, "key": None}
|
|
)
|
|
)
|
|
except Exception as e:
|
|
# Log error and notify via callback
|
|
error_msg = (
|
|
f"Folder: '{folder}' - "
|
|
f"Unexpected error: {e}"
|
|
)
|
|
error_logger.error(
|
|
"%s\n%s",
|
|
error_msg,
|
|
traceback.format_exc()
|
|
)
|
|
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=e,
|
|
message=error_msg,
|
|
recoverable=True,
|
|
metadata={"folder": folder, "key": None}
|
|
)
|
|
)
|
|
continue
|
|
|
|
# Notify scan completion
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
success=True,
|
|
message=f"Scan completed. Processed {counter} folders.",
|
|
statistics={
|
|
"total_folders": counter,
|
|
"series_found": len(self.keyDict)
|
|
}
|
|
)
|
|
)
|
|
|
|
logger.info(
|
|
"Scan completed. Processed %d folders, found %d series",
|
|
counter,
|
|
len(self.keyDict)
|
|
)
|
|
|
|
except Exception as e:
|
|
# Critical error - notify and re-raise
|
|
error_msg = f"Critical scan error: {e}"
|
|
logger.error("%s\n%s", error_msg, traceback.format_exc())
|
|
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=e,
|
|
message=error_msg,
|
|
recoverable=False
|
|
)
|
|
)
|
|
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
success=False,
|
|
message=error_msg
|
|
)
|
|
)
|
|
|
|
raise
|
|
|
|
async def scan_async(
|
|
self,
|
|
db: "AsyncSession",
|
|
callback: Optional[Callable[[str, int], None]] = None
|
|
) -> None:
|
|
"""
|
|
Scan directories for anime series and save to database.
|
|
|
|
This is the preferred method for scanning when using database
|
|
storage. Results are saved to the database instead of files.
|
|
|
|
Args:
|
|
db: Database session for async operations
|
|
callback: Optional legacy callback function (folder, count)
|
|
|
|
Raises:
|
|
Exception: If scan fails critically
|
|
|
|
Example:
|
|
async with get_db_session() as db:
|
|
scanner = SerieScanner("/path/to/anime", loader)
|
|
await scanner.scan_async(db)
|
|
"""
|
|
# Generate unique operation ID
|
|
self._current_operation_id = str(uuid.uuid4())
|
|
|
|
logger.info("Starting async scan for missing episodes (database mode)")
|
|
|
|
# Notify scan starting
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
phase=ProgressPhase.STARTING,
|
|
current=0,
|
|
total=0,
|
|
percentage=0.0,
|
|
message="Initializing scan (database mode)"
|
|
)
|
|
)
|
|
|
|
try:
|
|
# Get total items to process
|
|
total_to_scan = self.get_total_to_scan()
|
|
logger.info("Total folders to scan: %d", total_to_scan)
|
|
|
|
result = self.__find_mp4_files()
|
|
counter = 0
|
|
saved_to_db = 0
|
|
|
|
for folder, mp4_files in result:
|
|
try:
|
|
counter += 1
|
|
|
|
# Calculate progress
|
|
if total_to_scan > 0:
|
|
percentage = (counter / total_to_scan) * 100
|
|
else:
|
|
percentage = 0.0
|
|
|
|
# Notify progress
|
|
self._callback_manager.notify_progress(
|
|
ProgressContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
phase=ProgressPhase.IN_PROGRESS,
|
|
current=counter,
|
|
total=total_to_scan,
|
|
percentage=percentage,
|
|
message=f"Scanning: {folder}",
|
|
details=f"Found {len(mp4_files)} episodes"
|
|
)
|
|
)
|
|
|
|
# Call legacy callback if provided
|
|
if callback:
|
|
callback(folder, counter)
|
|
|
|
serie = self.__read_data_from_file(folder)
|
|
if (
|
|
serie is not None
|
|
and serie.key
|
|
and serie.key.strip()
|
|
):
|
|
# Get missing episodes from provider
|
|
missing_episodes, _site = (
|
|
self.__get_missing_episodes_and_season(
|
|
serie.key, mp4_files
|
|
)
|
|
)
|
|
serie.episodeDict = missing_episodes
|
|
serie.folder = folder
|
|
|
|
# Save to database instead of file
|
|
await self._save_serie_to_db(serie, db)
|
|
saved_to_db += 1
|
|
|
|
# Store by key in memory cache
|
|
if serie.key in self.keyDict:
|
|
logger.error(
|
|
"Duplicate series found with key '%s' "
|
|
"(folder: '%s')",
|
|
serie.key,
|
|
folder
|
|
)
|
|
else:
|
|
self.keyDict[serie.key] = serie
|
|
logger.debug(
|
|
"Stored series with key '%s' (folder: '%s')",
|
|
serie.key,
|
|
folder
|
|
)
|
|
|
|
except NoKeyFoundException as nkfe:
|
|
error_msg = f"Error processing folder '{folder}': {nkfe}"
|
|
logger.error(error_msg)
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=nkfe,
|
|
message=error_msg,
|
|
recoverable=True,
|
|
metadata={"folder": folder, "key": None}
|
|
)
|
|
)
|
|
except Exception as e:
|
|
error_msg = (
|
|
f"Folder: '{folder}' - Unexpected error: {e}"
|
|
)
|
|
error_logger.error(
|
|
"%s\n%s",
|
|
error_msg,
|
|
traceback.format_exc()
|
|
)
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=e,
|
|
message=error_msg,
|
|
recoverable=True,
|
|
metadata={"folder": folder, "key": None}
|
|
)
|
|
)
|
|
continue
|
|
|
|
# Notify scan completion
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
success=True,
|
|
message=f"Scan completed. Processed {counter} folders.",
|
|
statistics={
|
|
"total_folders": counter,
|
|
"series_found": len(self.keyDict),
|
|
"saved_to_db": saved_to_db
|
|
}
|
|
)
|
|
)
|
|
|
|
logger.info(
|
|
"Async scan completed. Processed %d folders, "
|
|
"found %d series, saved %d to database",
|
|
counter,
|
|
len(self.keyDict),
|
|
saved_to_db
|
|
)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Critical async scan error: {e}"
|
|
logger.error("%s\n%s", error_msg, traceback.format_exc())
|
|
|
|
self._callback_manager.notify_error(
|
|
ErrorContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
error=e,
|
|
message=error_msg,
|
|
recoverable=False
|
|
)
|
|
)
|
|
|
|
self._callback_manager.notify_completion(
|
|
CompletionContext(
|
|
operation_type=OperationType.SCAN,
|
|
operation_id=self._current_operation_id,
|
|
success=False,
|
|
message=error_msg
|
|
)
|
|
)
|
|
|
|
raise
|
|
|
|
async def _save_serie_to_db(
|
|
self,
|
|
serie: Serie,
|
|
db: "AsyncSession"
|
|
) -> Optional["AnimeSeries"]:
|
|
"""
|
|
Save or update a series in the database.
|
|
|
|
Creates a new record if the series doesn't exist, or updates
|
|
the episodes if they have changed.
|
|
|
|
Args:
|
|
serie: Serie instance to save
|
|
db: Database session for async operations
|
|
|
|
Returns:
|
|
Created or updated AnimeSeries instance, or None if unchanged
|
|
"""
|
|
from src.server.database.service import AnimeSeriesService, EpisodeService
|
|
|
|
# Check if series already exists
|
|
existing = await AnimeSeriesService.get_by_key(db, serie.key)
|
|
|
|
if existing:
|
|
# Build existing episode dict from episodes for comparison
|
|
existing_episodes = await EpisodeService.get_by_series(
|
|
db, existing.id
|
|
)
|
|
existing_dict: dict[int, list[int]] = {}
|
|
for ep in existing_episodes:
|
|
if ep.season not in existing_dict:
|
|
existing_dict[ep.season] = []
|
|
existing_dict[ep.season].append(ep.episode_number)
|
|
for season in existing_dict:
|
|
existing_dict[season].sort()
|
|
|
|
# Update episodes if changed
|
|
if existing_dict != serie.episodeDict:
|
|
# Add new episodes
|
|
new_dict = serie.episodeDict or {}
|
|
for season, episode_numbers in new_dict.items():
|
|
existing_eps = set(existing_dict.get(season, []))
|
|
for ep_num in episode_numbers:
|
|
if ep_num not in existing_eps:
|
|
await EpisodeService.create(
|
|
db=db,
|
|
series_id=existing.id,
|
|
season=season,
|
|
episode_number=ep_num,
|
|
)
|
|
|
|
# Update folder if changed
|
|
if existing.folder != serie.folder:
|
|
await AnimeSeriesService.update(
|
|
db,
|
|
existing.id,
|
|
folder=serie.folder
|
|
)
|
|
|
|
logger.info(
|
|
"Updated series in database: %s (key=%s)",
|
|
serie.name,
|
|
serie.key
|
|
)
|
|
return existing
|
|
else:
|
|
logger.debug(
|
|
"Series unchanged in database: %s (key=%s)",
|
|
serie.name,
|
|
serie.key
|
|
)
|
|
return None
|
|
else:
|
|
# Create new series
|
|
anime_series = await AnimeSeriesService.create(
|
|
db=db,
|
|
key=serie.key,
|
|
name=serie.name,
|
|
site=serie.site,
|
|
folder=serie.folder,
|
|
)
|
|
|
|
# Create Episode records
|
|
if serie.episodeDict:
|
|
for season, episode_numbers in serie.episodeDict.items():
|
|
for ep_num in episode_numbers:
|
|
await EpisodeService.create(
|
|
db=db,
|
|
series_id=anime_series.id,
|
|
season=season,
|
|
episode_number=ep_num,
|
|
)
|
|
|
|
logger.info(
|
|
"Created series in database: %s (key=%s)",
|
|
serie.name,
|
|
serie.key
|
|
)
|
|
return anime_series
|
|
|
|
async def _update_serie_in_db(
|
|
self,
|
|
serie: Serie,
|
|
db: "AsyncSession"
|
|
) -> Optional["AnimeSeries"]:
|
|
"""
|
|
Update an existing series in the database.
|
|
|
|
Args:
|
|
serie: Serie instance to update
|
|
db: Database session for async operations
|
|
|
|
Returns:
|
|
Updated AnimeSeries instance, or None if not found
|
|
"""
|
|
from src.server.database.service import AnimeSeriesService, EpisodeService
|
|
|
|
existing = await AnimeSeriesService.get_by_key(db, serie.key)
|
|
if not existing:
|
|
logger.warning(
|
|
"Cannot update non-existent series: %s (key=%s)",
|
|
serie.name,
|
|
serie.key
|
|
)
|
|
return None
|
|
|
|
# Update basic fields
|
|
await AnimeSeriesService.update(
|
|
db,
|
|
existing.id,
|
|
name=serie.name,
|
|
site=serie.site,
|
|
folder=serie.folder,
|
|
)
|
|
|
|
# Update episodes - add any new ones
|
|
if serie.episodeDict:
|
|
existing_episodes = await EpisodeService.get_by_series(
|
|
db, existing.id
|
|
)
|
|
existing_dict: dict[int, set[int]] = {}
|
|
for ep in existing_episodes:
|
|
if ep.season not in existing_dict:
|
|
existing_dict[ep.season] = set()
|
|
existing_dict[ep.season].add(ep.episode_number)
|
|
|
|
for season, episode_numbers in serie.episodeDict.items():
|
|
existing_eps = existing_dict.get(season, set())
|
|
for ep_num in episode_numbers:
|
|
if ep_num not in existing_eps:
|
|
await EpisodeService.create(
|
|
db=db,
|
|
series_id=existing.id,
|
|
season=season,
|
|
episode_number=ep_num,
|
|
)
|
|
|
|
logger.info(
|
|
"Updated series in database: %s (key=%s)",
|
|
serie.name,
|
|
serie.key
|
|
)
|
|
return existing
|
|
|
|
def __find_mp4_files(self) -> Iterator[tuple[str, list[str]]]:
|
|
"""Find all .mp4 files in the directory structure."""
|
|
logger.info("Scanning for .mp4 files")
|
|
for anime_name in os.listdir(self.directory):
|
|
anime_path = os.path.join(self.directory, anime_name)
|
|
if os.path.isdir(anime_path):
|
|
mp4_files: list[str] = []
|
|
has_files = False
|
|
for root, _, files in os.walk(anime_path):
|
|
for file in files:
|
|
if file.endswith(".mp4"):
|
|
mp4_files.append(os.path.join(root, file))
|
|
has_files = True
|
|
yield anime_name, mp4_files if has_files else []
|
|
|
|
def __remove_year(self, input_string: str) -> str:
|
|
"""Remove year information from input string."""
|
|
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
|
|
logger.debug(
|
|
"Removed year from '%s' -> '%s'",
|
|
input_string,
|
|
cleaned_string
|
|
)
|
|
return cleaned_string
|
|
|
|
def __read_data_from_file(self, folder_name: str) -> Optional[Serie]:
|
|
"""Read serie data from file or key file.
|
|
|
|
Args:
|
|
folder_name: Filesystem folder name
|
|
(used only to locate data files)
|
|
|
|
Returns:
|
|
Serie object with valid key if found, None otherwise
|
|
|
|
Note:
|
|
The returned Serie will have its 'key' as the primary identifier.
|
|
The 'folder' field is metadata only.
|
|
"""
|
|
folder_path = os.path.join(self.directory, folder_name)
|
|
key = None
|
|
key_file = os.path.join(folder_path, 'key')
|
|
serie_file = os.path.join(folder_path, 'data')
|
|
|
|
if os.path.exists(key_file):
|
|
with open(key_file, 'r', encoding='utf-8') as file:
|
|
key = file.read().strip()
|
|
logger.info(
|
|
"Key found for folder '%s': %s",
|
|
folder_name,
|
|
key
|
|
)
|
|
return Serie(key, "", "aniworld.to", folder_name, dict())
|
|
|
|
if os.path.exists(serie_file):
|
|
with open(serie_file, "rb") as file:
|
|
logger.info(
|
|
"load serie_file from '%s': %s",
|
|
folder_name,
|
|
serie_file
|
|
)
|
|
return Serie.load_from_file(serie_file)
|
|
|
|
return None
|
|
|
|
def __get_episode_and_season(self, filename: str) -> tuple[int, int]:
|
|
"""Extract season and episode numbers from filename.
|
|
|
|
Args:
|
|
filename: Filename to parse
|
|
|
|
Returns:
|
|
Tuple of (season, episode) as integers
|
|
|
|
Raises:
|
|
MatchNotFoundError: If pattern not found
|
|
"""
|
|
pattern = r'S(\d+)E(\d+)'
|
|
match = re.search(pattern, filename)
|
|
if match:
|
|
season = match.group(1)
|
|
episode = match.group(2)
|
|
logger.debug(
|
|
"Extracted season %s, episode %s from '%s'",
|
|
season,
|
|
episode,
|
|
filename
|
|
)
|
|
return int(season), int(episode)
|
|
else:
|
|
logger.error(
|
|
"Failed to find season/episode pattern in '%s'",
|
|
filename
|
|
)
|
|
raise MatchNotFoundError(
|
|
"Season and episode pattern not found in the filename."
|
|
)
|
|
|
|
def __get_episodes_and_seasons(
|
|
self,
|
|
mp4_files: Iterable[str]
|
|
) -> dict[int, list[int]]:
|
|
"""Get episodes grouped by season from mp4 files.
|
|
|
|
Args:
|
|
mp4_files: List of MP4 filenames
|
|
|
|
Returns:
|
|
Dictionary mapping season to list of episode numbers
|
|
"""
|
|
episodes_dict: dict[int, list[int]] = {}
|
|
|
|
for file in mp4_files:
|
|
season, episode = self.__get_episode_and_season(file)
|
|
|
|
if season in episodes_dict:
|
|
episodes_dict[season].append(episode)
|
|
else:
|
|
episodes_dict[season] = [episode]
|
|
return episodes_dict
|
|
|
|
def __get_missing_episodes_and_season(
|
|
self,
|
|
key: str,
|
|
mp4_files: Iterable[str]
|
|
) -> tuple[dict[int, list[int]], str]:
|
|
"""Get missing episodes for a serie.
|
|
|
|
Args:
|
|
key: Series key
|
|
mp4_files: List of MP4 filenames
|
|
|
|
Returns:
|
|
Tuple of (episodes_dict, site_name)
|
|
"""
|
|
# key season , value count of episodes
|
|
expected_dict = self.loader.get_season_episode_count(key)
|
|
filedict = self.__get_episodes_and_seasons(mp4_files)
|
|
episodes_dict: dict[int, list[int]] = {}
|
|
for season, expected_count in expected_dict.items():
|
|
existing_episodes = filedict.get(season, [])
|
|
missing_episodes = [
|
|
ep for ep in range(1, expected_count + 1)
|
|
if ep not in existing_episodes
|
|
and self.loader.is_language(season, ep, key)
|
|
]
|
|
|
|
if missing_episodes:
|
|
episodes_dict[season] = missing_episodes
|
|
|
|
return episodes_dict, "aniworld.to"
|