added console app

This commit is contained in:
Lukas Pupka-Lipinski 2025-06-07 21:14:45 +02:00
parent aeed2df7d0
commit 3faa6f9a40
9 changed files with 349 additions and 314 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
/.idea/*
/aniworld/bin/*
/aniworld/lib/*
/src/__pycache__/*
/src/__pycache__/

@ -1 +1 @@
Subproject commit 03f08a86338e7b1e97bdf1164809698d9ab18ab7
Subproject commit a267efa72a12af19df6641ab2ab39e494a1d0ee8

View File

@ -2,11 +2,16 @@ import os
import re
import subprocess
import logging
from aniworld.models import Anime
from aniworld.config import PROVIDER_HEADERS, INVALID_PATH_CHARS
import json
import requests
import html
from urllib.parse import quote
from Serie import Serie
from aniworld.models import Anime, Episode, NoMachingLanguage
from aniworld.config import PROVIDER_HEADERS, INVALID_PATH_CHARS, ANIWORLD_TO, session, DEFAULT_REQUEST_TIMEOUT
from aniworld.parser import arguments
# Read timeout from environment variable, default to 600 seconds (10 minutes)
timeout = int(os.getenv("DOWNLOAD_TIMEOUT", 600))
@ -15,6 +20,57 @@ download_error_handler = logging.FileHandler("../download_errors.log")
download_error_handler.setLevel(logging.ERROR)
download_error_logger.addHandler(download_error_handler)
def CreateSerie(searchEntry):
return Serie(searchEntry["link"], searchEntry["name"], "aniworld.to", searchEntry["link"], {})
def search_anime(keyword: str = None) -> str:
search_url = f"{ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(keyword)}"
anime_list = fetch_anime_list(search_url)
return anime_list
def fetch_anime_list(url: str) -> list:
response = session.get(url, timeout=DEFAULT_REQUEST_TIMEOUT)
response.raise_for_status()
clean_text = response.text.strip()
try:
decoded_data = json.loads(html.unescape(clean_text))
return decoded_data if isinstance(decoded_data, list) else []
except json.JSONDecodeError:
try:
# Remove BOM and problematic characters
clean_text = clean_text.encode('utf-8').decode('utf-8-sig')
# Remove problematic characters
clean_text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', clean_text)
# Parse the new text
decoded_data = json.loads(clean_text)
return decoded_data if isinstance(decoded_data, list) else []
except (requests.RequestException, json.JSONDecodeError) as exc:
raise ValueError("Could not get valid anime: ") from exc
def AniWorld_download_episode(directory, folder, season, episode, key):
"""Helper function to download an individual episode."""
try:
folder_path = os.path.join(directory, folder, f"Season {season}")
anime = Anime(
episode_list=[Episode(slug=key, season=season, episode=episode)],
language="German Dub",
output_directory=folder_path
)
logging.info(f"Downloading anime {key} season {season} episode {episode}")
download(anime)
logging.info(f"Downloading completed anime {key} season {season} episode {episode}")
except KeyError as keye:
download_error_logger.error(f"Language not found for anime: {key} season: {season} episode: {episode}")
except NoMachingLanguage as ee:
download_error_logger.error(f"Language not found for anime: {key} season: {season} episode: {episode}")
except Exception as e:
logging.error(f"Error downloading episode {episode} of season {season} for anime {key}: {e}")
def download(anime: Anime): # pylint: disable=too-many-branches
for episode in anime:
sanitized_anime_title = ''.join(
@ -43,7 +99,8 @@ def download(anime: Anime): # pylint: disable=too-many-branches
#"--concurrent-fragments", "4",
"-o", output_path,
"--quiet",
"--no-warnings"
"--no-warnings",
"--progress"
]
if anime.provider in PROVIDER_HEADERS:

184
src/Main.py Normal file
View File

@ -0,0 +1,184 @@
import sys
import os
import logging
import AniWorldLoader
import SerieList
import SerieScanner
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(funcName)s - %(message)s")
)
logging.getLogger().addHandler(console_handler)
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
logging.getLogger('charset_normalizer').setLevel(logging.INFO)
logging.getLogger().setLevel(logging.INFO)
error_logger = logging.getLogger("ErrorLog")
error_handler = logging.FileHandler("../errors.log")
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
noKeyFound_logger = logging.getLogger("NoKeyFound")
noKeyFound_handler = logging.FileHandler("../NoKeyFound.log")
noKeyFound_handler.setLevel(logging.ERROR)
noKeyFound_logger.addHandler(noKeyFound_handler)
noGerFound_logger = logging.getLogger("noGerFound")
noGerFound_handler = logging.FileHandler("../noGerFound.log")
noGerFound_handler.setLevel(logging.ERROR)
noGerFound_logger.addHandler(noGerFound_handler)
class NoKeyFoundException(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class MatchNotFoundError(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class SeriesApp:
def __init__(self, directory_to_search: str):
self.directory_to_search = directory_to_search
self.SerieScanner = SerieScanner.SerieScanner(directory_to_search)
self.List = SerieList.SerieList(self.directory_to_search)
print("Please wait while initializing...")
self.__InitList__()
def __InitList__(self):
self.series_list = self.List.GetMissingEpisode()
def display_series(self):
"""Print all series with assigned numbers."""
print("\nCurrent result:")
for i, serie in enumerate(self.series_list, 1):
print(f"{i}. {serie}")
def search(self, words :str) -> list:
return AniWorldLoader.search_anime(words)
def get_user_selection(self):
"""Handle user input for selecting series."""
self.display_series()
while True:
selection = input(
"\nSelect series by number (e.g. '1', '1,2' or 'all') or type 'rescan' to refresh: ").strip().lower()
if selection == "rescan":
self.rescan()
return None
selected_series = []
if selection == "all":
selected_series = self.series_list
else:
try:
indexes = [int(num) - 1 for num in selection.split(",")]
selected_series = [self.series_list[i] for i in indexes if 0 <= i < len(self.series_list)]
except ValueError:
print("Invalid selection. Going back to the result display.")
self.display_series()
continue
if selected_series:
return selected_series
else:
print("No valid series selected. Going back to the result display.")
return None
def print_progress_bar(self, current, total, length=20):
"""Generate progress bar string"""
filled_length = int(length * current // total)
bar = "@" * filled_length + "-" * (length - filled_length)
return f"[{bar}] {current} / {total}"
def download_series(self, series):
"""Simulate the downloading process with a progress bar."""
total_downloaded = 0
total_episodes = sum(sum(len(ep) for ep in serie.episodeDict.values()) for serie in series)
for serie in series:
serie_episodes = sum(len(ep) for ep in serie.episodeDict.values())
downloaded = 0
print(f"\nStarting download for {serie.folder}...\n")
for season, episodes in serie.episodeDict.items():
for episode in episodes:
if serie.site == "aniworld.to":
AniWorldLoader.AniWorld_download_episode(self.directory_to_search, serie.folder, season, episode, serie.key)
downloaded += 1
total_downloaded += 1
progress = self.print_progress_bar(downloaded, serie_episodes)
total_progress = self.print_progress_bar(total_downloaded, total_episodes)
sys.stdout.write(f"\r{serie.name}: {progress} (Total: {total_progress})")
sys.stdout.flush()
print("\nDownload complete!\n")
def rescan(self):
"""Rescan and refresh the series list."""
print("\nRescanning series...\n")
self.SerieScanner.Scan()
self.__InitList__()
def search_mode(self):
"""Search for a series and allow user to select an option."""
search_string = input("Enter search string: ").strip()
results = self.search(search_string)
if not results:
print("No results found. Returning to start.")
return
print("\nSearch results:")
for i, result in enumerate(results, 1):
print(f"{i}. {result}")
while True:
selection = input("\nSelect an option by number or type 'start' to return: ").strip().lower()
if selection == "start":
return
try:
index = int(selection) - 1
if 0 <= index < len(results):
chosen_name = results[index]
self.List.add(AniWorldLoader.CreateSerie(chosen_name))
return
else:
print("Invalid selection. Try again.")
except ValueError:
print("Invalid input. Try again.")
def run(self):
"""Main function to run the app."""
while True:
action = input("\nChoose action ('search' or 'local'): ").strip().lower()
if action == "search":
self.search_mode()
elif action == "local":
selected_series = self.get_user_selection()
if selected_series:
self.download_series(selected_series)
print("\nProgress: [@@@@@@@@@@@@@@@@@@@@] Complete")
break
else:
print("Invalid action. Please enter 'search' or 'local'.")
# Run the app
if __name__ == "__main__":
# Read the base directory from an environment variable
directory_to_search = os.getenv("ANIME_DIRECTORY", "\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
app = SeriesApp(directory_to_search)
app.run()

View File

@ -1,24 +1,16 @@
import json
class Serie:
def __init__(self, key: str, name: str, site: str, folder: str):
def __init__(self, key: str, name: str, site: str, folder: str, episodeDict: dict[int, list[int]]):
self._key = key
self._name = name
self._site = site
self._folder = folder
self._episodeDict = dict[int, list[int]]
self._episodeDict = episodeDict
def __str__(self):
"""String representation of Serie object"""
return f"Serie(key='{self.key}', name='{self.name}', site='{self.site}', folder='{self.folder}', episodeDict={self.episodeDict})"
def to_dict(self):
"""Convert Serie object to dictionary"""
return {
"key": self.key,
"name": self.name,
"site": self.site,
"folder": self.folder,
"episodeDict": {str(k): v for k, v in self._episodeDict.items()} # Convert int keys to str
}
@property
def key(self) -> str:
return self._key
@ -58,3 +50,33 @@ class Serie:
@episodeDict.setter
def episodeDict(self, value: dict[int, list[int]]):
self._episodeDict = value
def to_dict(self):
"""Convert Serie object to dictionary for JSON serialization."""
return {
"key": self.key,
"name": self.name,
"site": self.site,
"folder": self.folder,
"episodeDict": {str(k): list(v) for k, v in self.episodeDict.items()}
}
@staticmethod
def from_dict(data: dict):
"""Create a Serie object from dictionary."""
episode_dict = {int(k): v for k, v in data["episodeDict"].items()} # Convert keys to int
return Serie(data["key"], data["name"], data["site"], data["folder"], episode_dict)
def save_to_file(self, filename: str):
"""Save Serie object to JSON file."""
with open(filename, "w") as file:
json.dump(self.to_dict(), file, indent=4)
@classmethod
def load_from_file(cls, filename: str) -> "Serie":
"""Load Serie object from JSON file."""
with open(filename, "r") as file:
data = json.load(file)
return cls.from_dict(data)

45
src/SerieList.py Normal file
View File

@ -0,0 +1,45 @@
import os
import json
import logging
from Serie import Serie
class SerieList:
def __init__(self, basePath: str):
self.directory = basePath
self.folderDict: dict[str, Serie] = {} # Proper initialization
self.load_series()
def add(self, serie:Serie):
dataPath = os.path.join(self.directory, serie.folder, "data")
animePath = os.path.join(self.directory, serie.folder)
os.makedirs(animePath, exist_ok=True)
if not os.path.isfile(dataPath):
serie.save_to_file(dataPath)
self.folderDict[serie.folder] = serie;
def load_series(self):
""" Scan folders and load data files """
logging.info(f"Scanning anime folders in: {self.directory}")
for anime_folder in os.listdir(self.directory):
anime_path = os.path.join(self.directory, anime_folder, "data")
if os.path.isfile(anime_path):
logging.info(f"Found data folder: {anime_path}")
self.load_data(anime_folder, anime_path)
else:
logging.warning(f"Skipping {anime_folder} - No data folder found")
def load_data(self, anime_folder, data_path):
""" Load pickle files from the data folder """
try:
self.folderDict[anime_folder] = Serie.load_from_file(data_path)
logging.info(f"Successfully loaded {data_path} for {anime_folder}")
except Exception as e:
logging.error(f"Failed to load {data_path} in {anime_folder}: {e}")
def GetMissingEpisode(self):
"""Find all series with a non-empty episodeDict"""
return [serie for serie in self.folderDict.values() if len(serie.episodeDict) > 0]
#k = AnimeList("\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
#bbabab = k.GetMissingEpisode()
#print(bbabab)

View File

@ -2,7 +2,6 @@ import os
import re
import logging
from collections import defaultdict
import pickle
from Serie import Serie
import json
import traceback
@ -11,16 +10,15 @@ from Exceptions import NoKeyFoundException, MatchNotFoundError
import requests
from aniworld.common import get_season_episode_count
class FolderLookup:
class SerieScanner:
def __init__(self, basePath: str):
self.directory = basePath
self.folderDict: dict[str, list[Serie]] = {} # Proper initialization
self.folderDict: dict[str, Serie] = {} # Proper initialization
logging.info(f"Initialized Loader with base path: {self.directory}")
self.__init()
def is_null_or_whitespace(self, s):
return s is None or s.strip() == ""
def __init(self):
def Scan(self):
logging.info("Starting process to load missing episodes")
result = self.__find_mp4_files()
@ -28,16 +26,15 @@ class FolderLookup:
try:
serie = self.__ReadDataFromFile(folder)
if (serie != None and not self.is_null_or_whitespace(serie.key)):
continue
missings, site = self.__GetMissingEpisodesAndSeason(serie.key, mp4_files)
serie.episodeDict = missings
self.__SaveData(serie, folder)
serie.save_to_file(os.path.join(os.path.join(self.directory, folder), 'data'))
if folder not in self.folderDict:
self.folderDict[folder] = []
self.folderDict[folder].append(serie)
self.folderDict[folder] = serie
noKeyFound_logger.info(f"Saved Serie: '{str(serie)}'")
except NoKeyFoundException as nkfe:
noKeyFound_logger.error(f"Error processing folder '{folder}': {nkfe}")
NoKeyFoundException.error(f"Error processing folder '{folder}': {nkfe}")
except Exception as e:
error_logger.error(f"Folder: '{folder}' - Unexpected error processing folder '{folder}': {e} \n {traceback.format_exc()}")
continue
@ -45,35 +42,17 @@ class FolderLookup:
def __find_mp4_files(self):
logging.info("Scanning for .mp4 files")
for root_folder_name in os.listdir(self.directory):
folder_data = defaultdict(list) # Dictionary to store MP4 files per folder
folder = os.path.join(self.directory, root_folder_name)
logging.info(f"Processing folder: {root_folder_name}")
# First pass: Scan all folders and collect MP4 file data
for root, dirs, files in os.walk(folder):
mp4_files = [file for file in files if file.endswith('.mp4')]
if mp4_files:
folder_data[root_folder_name].extend(mp4_files)
yield root_folder_name, folder_data[root_folder_name]
for dir in self.__find_empty_folders():
logging.info(f"Found no .mp4 files in {dir}")
yield dir, []
def __find_empty_folders(self):
"""Yield folder names that do not contain any mp4 files in a given directory."""
for folder in os.listdir(self.directory):
folder_path = os.path.join(self.directory, folder)
if os.path.isdir(folder_path): # Ensure it's a directory
has_mp4 = any(file.endswith(".mp4") for file in os.listdir(folder_path))
if not has_mp4:
yield folder # Yield the folder name if no mp4 files found
for anime_name in os.listdir(self.directory):
anime_path = os.path.join(self.directory, anime_name)
if os.path.isdir(anime_path):
mp4_files = []
has_files = False
for root, _, files in os.walk(anime_path):
for file in files:
if file.endswith(".mp4"):
mp4_files.append(os.path.join(root, file))
has_files = True
yield anime_name, mp4_files if has_files else []
def __remove_year(self, input_string: str):
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
@ -88,25 +67,17 @@ class FolderLookup:
if os.path.exists(key_file):
with open(key_file, 'r') as file:
logging.info(f"Key found for folder '{folder_name}': {key}")
key = file.read().strip()
return Serie(key, "", "aniworld.to" ,folder_name)
logging.info(f"Key found for folder '{folder_name}': {key}")
return Serie(key, "", "aniworld.to", folder_name, dict())
if os.path.exists(serie_file):
with open(serie_file, "rb") as file:
logging.info(f"load serie_file from '{folder_name}': {serie_file}")
return pickle.load(file)
return Serie.load_from_file(serie_file)
return None
def __SaveData(self, serie: Serie, folder_name: str):
"""Saves a Serie object to a file using JSON."""
folder_path = os.path.join(self.directory, folder_name)
serie_file = os.path.join(folder_path, 'data')
with open(serie_file, "w", encoding="utf-8") as file:
json.dump(serie.to_dict(), file, indent=4)
def __GetEpisodeAndSeason(self, filename: str):
pattern = r'S(\d+)E(\d+)'
@ -143,8 +114,8 @@ class FolderLookup:
missing_episodes = [ep for ep in range(1, expected_count + 1) if ep not in existing_episodes]
if missing_episodes:
episodes_dict[season] = [missing_episodes]
episodes_dict[season] = missing_episodes
return episodes_dict, "aniworld.to"
gg = FolderLookup("\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")

View File

@ -1,59 +0,0 @@
import sys
import os
import traceback
import re
import logging
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from aniworld.models import Anime, Episode
from aniworld.common import get_season_episode_count, get_movie_episode_count
from aniworld.search import search_anime
from Loader import download
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(funcName)s - %(message)s")
)
logging.getLogger().addHandler(console_handler)
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
logging.getLogger('charset_normalizer').setLevel(logging.INFO)
logging.getLogger().setLevel(logging.INFO)
error_logger = logging.getLogger("ErrorLog")
error_handler = logging.FileHandler("../errors.log")
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
noKeyFound_logger = logging.getLogger("NoKeyFound")
noKeyFound_handler = logging.FileHandler("../NoKeyFound.log")
noKeyFound_handler.setLevel(logging.ERROR)
noKeyFound_logger.addHandler(noKeyFound_handler)
noGerFound_logger = logging.getLogger("noGerFound")
noGerFound_handler = logging.FileHandler("../noGerFound.log")
noGerFound_handler.setLevel(logging.ERROR)
noGerFound_logger.addHandler(noGerFound_handler)
class NoKeyFoundException(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class MatchNotFoundError(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class ConsoleLoader:
def __init__(self, basePath: str):
self.directory = basePath
logging.info(f"Initialized Loader with base path: {self.directory}")
# Read the base directory from an environment variable
directory_to_search = os.getenv("ANIME_DIRECTORY", "\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
loader = Loader(directory_to_search)
loader.LoadMissing()

View File

@ -1,187 +0,0 @@
import os
import traceback
import re
import logging
from collections import defaultdict
from aniworld.models import Anime, Episode
from aniworld.common import get_season_episode_count
from aniworld.search import search_anime
from src.Loader import download
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(funcName)s - %(message)s")
)
logging.getLogger().addHandler(console_handler)
logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO)
logging.getLogger('charset_normalizer').setLevel(logging.INFO)
logging.getLogger().setLevel(logging.INFO)
error_logger = logging.getLogger("ErrorLog")
error_handler = logging.FileHandler("../errors.log")
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
noKeyFound_logger = logging.getLogger("NoKeyFound")
noKeyFound_handler = logging.FileHandler("../NoKeyFound.log")
noKeyFound_handler.setLevel(logging.ERROR)
noKeyFound_logger.addHandler(noKeyFound_handler)
noGerFound_logger = logging.getLogger("noGerFound")
noGerFound_handler = logging.FileHandler("../noGerFound.log")
noGerFound_handler.setLevel(logging.ERROR)
noGerFound_logger.addHandler(noGerFound_handler)
class NoKeyFoundException(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class MatchNotFoundError(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class Loader:
def __init__(self, basePath: str):
self.directory = basePath
logging.info(f"Initialized Loader with base path: {self.directory}")
def __find_mp4_files(self):
logging.info("Scanning for .mp4 files")
for root_folder_name in os.listdir(self.directory):
folder_data = defaultdict(list) # Dictionary to store MP4 files per folder
folder = os.path.join(self.directory, root_folder_name)
logging.info(f"Processing folder: {root_folder_name}")
# First pass: Scan all folders and collect MP4 file data
for root, dirs, files in os.walk(folder):
mp4_files = [file for file in files if file.endswith('.mp4')]
if mp4_files:
folder_data[root_folder_name].extend(mp4_files)
yield root_folder_name, folder_data[root_folder_name]
for dir in self.__find_empty_folders():
logging.info(f"Found no .mp4 files in {dir}")
yield dir, []
def __find_empty_folders(self):
"""Yield folder names that do not contain any mp4 files in a given directory."""
for folder in os.listdir(self.directory):
folder_path = os.path.join(self.directory, folder)
if os.path.isdir(folder_path): # Ensure it's a directory
has_mp4 = any(file.endswith(".mp4") for file in os.listdir(folder_path))
if not has_mp4:
yield folder # Yield the folder name if no mp4 files found
def __remove_year(self, input_string: str):
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
logging.debug(f"Removed year from '{input_string}' -> '{cleaned_string}'")
return cleaned_string
def __check_and_generate_key(self, folder_name: str):
folder_path = os.path.join(self.directory, folder_name)
key_file = os.path.join(folder_path, 'key')
if os.path.exists(key_file):
with open(key_file, 'r') as file:
key = file.read().strip()
logging.info(f"Key found for folder '{folder_name}': {key}")
return key
else:
try:
key = search_anime(folder_name, True)
if key:
key = key[0]['link']
with open(key_file, 'w') as file:
file.write(key)
logging.info(f"Generated new key for folder '{folder_name}': {key}")
return key
else:
raise NoKeyFoundException(f"No key found for folder '{folder_name}'")
except Exception as e:
raise NoKeyFoundException(f"Failed to retrieve key for folder '{folder_name}'") from e
def __GetEpisodeAndSeason(self, filename: str):
pattern = r'S(\d+)E(\d+)'
match = re.search(pattern, filename)
if match:
season = match.group(1)
episode = match.group(2)
logging.debug(f"Extracted season {season}, episode {episode} from '{filename}'")
return int(season), int(episode)
else:
logging.error(f"Failed to find season/episode pattern in '{filename}'")
raise MatchNotFoundError("Season and episode pattern not found in the filename.")
def __GetEpisodesAndSeasons(self, mp4_files: []):
episodes_dict = {}
for file in mp4_files:
season, episode = self.__GetEpisodeAndSeason(file)
if season in episodes_dict:
episodes_dict[season].append(episode)
else:
episodes_dict[season] = [episode]
return episodes_dict
def __GetMissingEpisodesAndSeason(self, key: str, mp4_files: []):
expected_dict = get_season_episode_count(key) # key season , value count of episodes
filedict = self.__GetEpisodesAndSeasons(mp4_files)
for season, expected_count in expected_dict.items():
existing_episodes = filedict.get(season, [])
missing_episodes = [ep for ep in range(1, expected_count + 1) if ep not in existing_episodes]
if missing_episodes:
yield season, missing_episodes
def LoadMissing(self):
logging.info("Starting process to load missing episodes")
result = self.__find_mp4_files()
def download_episode(folder, season, episode, key):
"""Helper function to download an individual episode."""
try:
folder_path = os.path.join(self.directory, folder, f"Season {season}")
anime = Anime(
episode_list=[Episode(slug=key, season=season, episode=episode)],
language="German Dub",
output_directory=folder_path
)
logging.info(f"Downloading anime {key} season {season} episode {episode}")
download(anime)
logging.info(f"Downloading completed anime {key} season {season} episode {episode}")
except KeyError as keye:
noGerFound_logger.error(f"Language not found for anime: {key} season: {season} episode: {episode}")
except Exception as e:
logging.error(f"Error downloading episode {episode} of season {season} for anime {key}: {e}")
for folder, mp4_files in result:
try:
key = self.__check_and_generate_key(folder)
missings = self.__GetMissingEpisodesAndSeason(key, mp4_files)
for season, missing_episodes in missings:
logging.info(f"Missing episodes for {key}\nSeason {str(season)}: Episodes: " + ",".join(f"{''.join(str(v))}" for v in missing_episodes))
for episode in missing_episodes:
download_episode(folder, season, episode, key)
except NoKeyFoundException as nkfe:
noKeyFound_logger.error(f"Error processing folder '{folder}': {nkfe}")
except Exception as e:
error_logger.error(f"Unexpected error processing folder '{folder}': {e} \n {traceback.format_exc()}")
continue
# Read the base directory from an environment variable
directory_to_search = os.getenv("ANIME_DIRECTORY", "\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
#directory_to_search = os.getenv("ANIME_DIRECTORY", "D:\\sss")
loader = Loader(directory_to_search)
loader.LoadMissing()