import os import re import logging from .entities.series import Serie import traceback from ..infrastructure.logging.GlobalLogger import error_logger, noKeyFound_logger from .exceptions.Exceptions import NoKeyFoundException, MatchNotFoundError from .providers.base_provider import Loader class SerieScanner: def __init__(self, basePath: str, loader: Loader): self.directory = basePath self.folderDict: dict[str, Serie] = {} # Proper initialization self.loader = loader logging.info(f"Initialized Loader with base path: {self.directory}") def Reinit(self): self.folderDict: dict[str, Serie] = {} # Proper initialization def is_null_or_whitespace(self, s): return s is None or s.strip() == "" def GetTotalToScan(self): result = self.__find_mp4_files() return sum(1 for _ in result) def Scan(self, callback): logging.info("Starting process to load missing episodes") result = self.__find_mp4_files() counter = 0 for folder, mp4_files in result: try: counter += 1 callback(folder, counter) serie = self.__ReadDataFromFile(folder) if (serie != None and not self.is_null_or_whitespace(serie.key)): missings, site = self.__GetMissingEpisodesAndSeason(serie.key, mp4_files) serie.episodeDict = missings serie.folder = folder serie.save_to_file(os.path.join(os.path.join(self.directory, folder), 'data')) if (serie.key in self.folderDict): logging.ERROR(f"dublication found: {serie.key}"); pass self.folderDict[serie.key] = serie noKeyFound_logger.info(f"Saved Serie: '{str(serie)}'") except NoKeyFoundException as nkfe: NoKeyFoundException.error(f"Error processing folder '{folder}': {nkfe}") except Exception as e: error_logger.error(f"Folder: '{folder}' - Unexpected error processing folder '{folder}': {e} \n {traceback.format_exc()}") continue def __find_mp4_files(self): logging.info("Scanning for .mp4 files") for anime_name in os.listdir(self.directory): anime_path = os.path.join(self.directory, anime_name) if os.path.isdir(anime_path): mp4_files = [] has_files = False for root, _, files in os.walk(anime_path): for file in files: if file.endswith(".mp4"): mp4_files.append(os.path.join(root, file)) has_files = True yield anime_name, mp4_files if has_files else [] def __remove_year(self, input_string: str): cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip() logging.debug(f"Removed year from '{input_string}' -> '{cleaned_string}'") return cleaned_string def __ReadDataFromFile(self, folder_name: str): folder_path = os.path.join(self.directory, folder_name) key = None key_file = os.path.join(folder_path, 'key') serie_file = os.path.join(folder_path, 'data') if os.path.exists(key_file): with open(key_file, 'r') as file: key = file.read().strip() logging.info(f"Key found for folder '{folder_name}': {key}") return Serie(key, "", "aniworld.to", folder_name, dict()) if os.path.exists(serie_file): with open(serie_file, "rb") as file: logging.info(f"load serie_file from '{folder_name}': {serie_file}") return Serie.load_from_file(serie_file) return None def __GetEpisodeAndSeason(self, filename: str): pattern = r'S(\d+)E(\d+)' match = re.search(pattern, filename) if match: season = match.group(1) episode = match.group(2) logging.debug(f"Extracted season {season}, episode {episode} from '{filename}'") return int(season), int(episode) else: logging.error(f"Failed to find season/episode pattern in '{filename}'") raise MatchNotFoundError("Season and episode pattern not found in the filename.") def __GetEpisodesAndSeasons(self, mp4_files: []): episodes_dict = {} for file in mp4_files: season, episode = self.__GetEpisodeAndSeason(file) if season in episodes_dict: episodes_dict[season].append(episode) else: episodes_dict[season] = [episode] return episodes_dict def __GetMissingEpisodesAndSeason(self, key: str, mp4_files: []): expected_dict = self.loader.get_season_episode_count(key) # key season , value count of episodes filedict = self.__GetEpisodesAndSeasons(mp4_files) episodes_dict = {} for season, expected_count in expected_dict.items(): existing_episodes = filedict.get(season, []) missing_episodes = [ep for ep in range(1, expected_count + 1) if ep not in existing_episodes and self.loader.IsLanguage(season, ep, key)] if missing_episodes: episodes_dict[season] = missing_episodes return episodes_dict, "aniworld.to"