This commit is contained in:
2025-10-22 13:38:46 +02:00
parent 1f39f07c5d
commit 04799633b4
9 changed files with 411 additions and 571 deletions

View File

@@ -10,7 +10,7 @@ import os
import re
import traceback
import uuid
from typing import Callable, Optional
from typing import Callable, Iterable, Iterator, Optional
from src.core.entities.series import Serie
from src.core.exceptions.Exceptions import MatchNotFoundError, NoKeyFoundException
@@ -40,7 +40,7 @@ class SerieScanner:
basePath: str,
loader: Loader,
callback_manager: Optional[CallbackManager] = None
):
) -> None:
"""
Initialize the SerieScanner.
@@ -49,10 +49,12 @@ class SerieScanner:
loader: Loader instance for fetching series information
callback_manager: Optional callback manager for progress updates
"""
self.directory = basePath
self.directory: str = basePath
self.folderDict: dict[str, Serie] = {}
self.loader = loader
self._callback_manager = callback_manager or CallbackManager()
self.loader: Loader = loader
self._callback_manager: CallbackManager = (
callback_manager or CallbackManager()
)
self._current_operation_id: Optional[str] = None
logger.info("Initialized SerieScanner with base path: %s", basePath)
@@ -62,22 +64,22 @@ class SerieScanner:
"""Get the callback manager instance."""
return self._callback_manager
def reinit(self):
def reinit(self) -> None:
"""Reinitialize the folder dictionary."""
self.folderDict: dict[str, Serie] = {}
def is_null_or_whitespace(self, s):
def is_null_or_whitespace(self, value: Optional[str]) -> bool:
"""Check if a string is None or whitespace.
Args:
s: String value to check
value: String value to check
Returns:
True if string is None or contains only whitespace
"""
return s is None or s.strip() == ""
return value is None or value.strip() == ""
def get_total_to_scan(self):
def get_total_to_scan(self) -> int:
"""Get the total number of folders to scan.
Returns:
@@ -86,7 +88,10 @@ class SerieScanner:
result = self.__find_mp4_files()
return sum(1 for _ in result)
def scan(self, callback: Optional[Callable[[str, int], None]] = None):
def scan(
self,
callback: Optional[Callable[[str, int], None]] = None
) -> None:
"""
Scan directories for anime series and missing episodes.
@@ -127,10 +132,10 @@ class SerieScanner:
counter += 1
# Calculate progress
percentage = (
(counter / total_to_scan * 100)
if total_to_scan > 0 else 0
)
if total_to_scan > 0:
percentage = (counter / total_to_scan) * 100
else:
percentage = 0.0
# Notify progress
self._callback_manager.notify_progress(
@@ -262,13 +267,13 @@ class SerieScanner:
raise
def __find_mp4_files(self):
def __find_mp4_files(self) -> Iterator[tuple[str, list[str]]]:
"""Find all .mp4 files in the directory structure."""
logger.info("Scanning for .mp4 files")
for anime_name in os.listdir(self.directory):
anime_path = os.path.join(self.directory, anime_name)
if os.path.isdir(anime_path):
mp4_files = []
mp4_files: list[str] = []
has_files = False
for root, _, files in os.walk(anime_path):
for file in files:
@@ -277,7 +282,7 @@ class SerieScanner:
has_files = True
yield anime_name, mp4_files if has_files else []
def __remove_year(self, input_string: str):
def __remove_year(self, input_string: str) -> str:
"""Remove year information from input string."""
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
logger.debug(
@@ -287,7 +292,7 @@ class SerieScanner:
)
return cleaned_string
def __read_data_from_file(self, folder_name: str):
def __read_data_from_file(self, folder_name: str) -> Optional[Serie]:
"""Read serie data from file or key file.
Args:
@@ -322,7 +327,7 @@ class SerieScanner:
return None
def __get_episode_and_season(self, filename: str):
def __get_episode_and_season(self, filename: str) -> tuple[int, int]:
"""Extract season and episode numbers from filename.
Args:
@@ -355,7 +360,10 @@ class SerieScanner:
"Season and episode pattern not found in the filename."
)
def __get_episodes_and_seasons(self, mp4_files: list):
def __get_episodes_and_seasons(
self,
mp4_files: Iterable[str]
) -> dict[int, list[int]]:
"""Get episodes grouped by season from mp4 files.
Args:
@@ -364,7 +372,7 @@ class SerieScanner:
Returns:
Dictionary mapping season to list of episode numbers
"""
episodes_dict = {}
episodes_dict: dict[int, list[int]] = {}
for file in mp4_files:
season, episode = self.__get_episode_and_season(file)
@@ -375,7 +383,11 @@ class SerieScanner:
episodes_dict[season] = [episode]
return episodes_dict
def __get_missing_episodes_and_season(self, key: str, mp4_files: list):
def __get_missing_episodes_and_season(
self,
key: str,
mp4_files: Iterable[str]
) -> tuple[dict[int, list[int]], str]:
"""Get missing episodes for a serie.
Args:
@@ -388,7 +400,7 @@ class SerieScanner:
# key season , value count of episodes
expected_dict = self.loader.get_season_episode_count(key)
filedict = self.__get_episodes_and_seasons(mp4_files)
episodes_dict = {}
episodes_dict: dict[int, list[int]] = {}
for season, expected_count in expected_dict.items():
existing_episodes = filedict.get(season, [])
missing_episodes = [