Aniworld/main.py

153 lines
6.3 KiB
Python

import sys
import os
import traceback
import re
import logging
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from aniworld.models import Anime, Episode
from aniworld.common import get_season_episode_count, get_movie_episode_count
from aniworld.search import search_anime
from Loader import download
# Configure basic logging to the console
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
class MatchNotFoundError(Exception):
"""Custom exception raised when the pattern match is not found."""
pass
class Loader:
def __init__(self, basePath: str):
self.directory = basePath
logging.warning(f"Initialized Loader with base path: {self.directory}")
def __find_mp4_files(self):
logging.warning("Scanning for .mp4 files")
for root_folder_name in os.listdir(self.directory):
folder_data = defaultdict(list) # Dictionary to store MP4 files per folder
folder = os.path.join(self.directory, root_folder_name)
# First pass: Scan all folders and collect MP4 file data
for root, dirs, files in os.walk(folder):
mp4_files = [file for file in files if file.endswith('.mp4')]
if mp4_files:
folder_data[root_folder_name].extend(mp4_files)
yield root_folder_name, folder_data[root_folder_name]
for dir in self.__find_empty_folders():
logging.debug(f"Found no .mp4 files in {dir}")
yield dir, []
def __find_empty_folders(self):
"""Yield folder names that do not contain any mp4 files in a given directory."""
for folder in os.listdir(self.directory):
folder_path = os.path.join(self.directory, folder)
if os.path.isdir(folder_path): # Ensure it's a directory
has_mp4 = any(file.endswith(".mp4") for file in os.listdir(folder_path))
if not has_mp4:
yield folder # Yield the folder name if no mp4 files found
def __remove_year(self, input_string: str):
cleaned_string = re.sub(r'\(\d{4}\)', '', input_string).strip()
logging.debug(f"Removed year from '{input_string}' -> '{cleaned_string}'")
return cleaned_string
def __check_and_generate_key(self, folder_name: str):
folder_path = os.path.join(self.directory, folder_name)
key_file = os.path.join(folder_path, 'key')
if os.path.exists(key_file):
with open(key_file, 'r') as file:
key = file.read().strip()
logging.debug(f"Key found for folder '{folder_name}': {key}")
return key
else:
key = search_anime(self.__remove_year(folder_name), True)
if (len(key) >= 1):
key = key[0]['link']
with open(key_file, 'w') as file:
file.write(key)
logging.warning(f"Generated new key for folder '{folder_name}': {key}")
return key
raise Exception()
def __GetEpisodeAndSeason(self, filename: str):
pattern = r'S(\d+)E(\d+)'
match = re.search(pattern, filename)
if match:
season = match.group(1)
episode = match.group(2)
logging.debug(f"Extracted season {season}, episode {episode} from '{filename}'")
return int(season), int(episode)
else:
logging.error(f"Failed to find season/episode pattern in '{filename}'")
raise MatchNotFoundError("Season and episode pattern not found in the filename.")
def __GetEpisodesAndSeasons(self, mp4_files: []):
episodes_dict = {}
for file in mp4_files:
season, episode = self.__GetEpisodeAndSeason(file)
if season in episodes_dict:
episodes_dict[season].append(episode)
else:
episodes_dict[season] = [episode]
return episodes_dict
def __GetMissingEpisodesAndSeason(self, key: str, mp4_files: []):
expected_dict = get_season_episode_count(key) # key season , value count of episodes
filedict = self.__GetEpisodesAndSeasons(mp4_files)
for season, expected_count in expected_dict.items():
existing_episodes = filedict.get(season, [])
missing_episodes = [ep for ep in range(1, expected_count + 1) if ep not in existing_episodes]
if missing_episodes:
yield season, missing_episodes
def LoadMissing(self):
logging.warning("Starting process to load missing episodes")
result = self.__find_mp4_files()
def download_episode(folder, season, episode, key):
"""Helper function to download an individual episode."""
try:
folder_path = os.path.join(self.directory, folder, f"Season {season}")
anime = Anime(
episode_list=[Episode(slug=key, season=season, episode=episode)],
language="German Dub",
output_directory=folder_path
)
logging.warning(f"Downloading episode {episode} of season {season} for anime {key}")
download(anime)
except Exception as e:
logging.error(f"Error downloading episode {episode} of season {season} for anime {key}: {e}")
# Using ThreadPoolExecutor to run downloads in parallel
with ThreadPoolExecutor(max_workers=5) as executor: # Adjust number of workers as needed
for folder, mp4_files in result:
try:
key = self.__check_and_generate_key(folder)
missings = self.__GetMissingEpisodesAndSeason(key, mp4_files)
for season, missing_episodes in missings:
for episode in missing_episodes:
executor.submit(download_episode, folder, season, episode, key)
except Exception as e:
logging.error(f"Error processing folder '{folder}': {e}")
traceback.print_exc()
continue
# Read the base directory from an environment variable
directory_to_search = os.getenv("ANIME_DIRECTORY", "\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
#directory_to_search = os.getenv("ANIME_DIRECTORY", "D:\sss")
loader = Loader(directory_to_search)
loader.LoadMissing()