Aniworld/src/Main.py

222 lines
8.1 KiB
Python

import sys
import os
import logging
from Loaders import AniWorldLoader
from rich.progress import Progress
import SerieList
import SerieScanner
from Loaders.Loaders import Loaders
from Serie import Serie
import time
# Configure logging
logging.basicConfig(level=logging.FATAL, format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR)
console_handler.setFormatter(logging.Formatter(
"%(asctime)s - %(levelname)s - %(funcName)s - %(message)s")
)
for h in logging.root.handlers:
logging.root.removeHandler(h)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger('charset_normalizer').setLevel(logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
for h in logging.getLogger().handlers:
logging.getLogger().removeHandler(h)
class NoKeyFoundException(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class MatchNotFoundError(Exception):
"""Exception raised when an anime key cannot be found."""
pass
class SeriesApp:
def __init__(self, directory_to_search: str):
print("Please wait while initializing...")
self.progress = None
self.directory_to_search = directory_to_search
self.Loaders = Loaders()
loader = self.Loaders.GetLoader(key="aniworld.to")
self.SerieScanner = SerieScanner.SerieScanner(directory_to_search, loader)
self.List = SerieList.SerieList(self.directory_to_search)
self.__InitList__()
def __InitList__(self):
self.series_list = self.List.GetMissingEpisode()
def display_series(self):
"""Print all series with assigned numbers."""
print("\nCurrent result:")
for i, serie in enumerate(self.series_list, 1):
if (Serie.name == ""):
print(f"{i}. {serie.folder}")
else:
print(f"{i}. {serie.name}")
def search(self, words :str) -> list:
loader = self.Loaders.GetLoader(key="aniworld.to")
return loader.Search(words)
def get_user_selection(self):
"""Handle user input for selecting series."""
self.display_series()
while True:
selection = input(
"\nSelect series by number (e.g. '1', '1,2' or 'all') or type 'exit' to return: ").strip().lower()
if selection == "exit":
return None
selected_series = []
if selection == "all":
selected_series = self.series_list
else:
try:
indexes = [int(num) - 1 for num in selection.split(",")]
selected_series = [self.series_list[i] for i in indexes if 0 <= i < len(self.series_list)]
except ValueError:
print("Invalid selection. Going back to the result display.")
self.display_series()
continue
if selected_series:
return selected_series
else:
print("No valid series selected. Going back to the result display.")
return None
def retry(self, func, max_retries=3, delay=2, *args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
func(*args, **kwargs)
return True
except Exception as e:
print(e)
time.sleep(delay)
return False
def download_series(self, series):
"""Simulate the downloading process with a progress bar."""
total_downloaded = 0
total_episodes = sum(sum(len(ep) for ep in serie.episodeDict.values()) for serie in series)
self.progress = Progress()
task1 = self.progress.add_task("[red]Processing...", total=total_episodes)
task2 = self.progress.add_task(f"[green]...", total=0)
self.task3 = self.progress.add_task(f"[Gray]...", total=100) # Setze total auf 100 für Prozentanzeige
self.progress.start()
for serie in series:
serie_episodes = sum(len(ep) for ep in serie.episodeDict.values())
self.progress.update(task2, description=f"[green]{serie.folder}", total=serie_episodes)
downloaded = 0
for season, episodes in serie.episodeDict.items():
for episode in episodes:
loader = self.Loaders.GetLoader(key="aniworld.to")
if loader.IsLanguage(season, episode, serie.key):
self.retry(loader.Download, 3, 1, self.directory_to_search, serie.folder, season, episode, serie.key, "German Dub",self.print_Download_Progress)
downloaded += 1
total_downloaded += 1
self.progress.update(task1, advance=1)
self.progress.update(task2, advance=1)
time.sleep(0.02)
self.progress.stop()
self.progress = None
def print_Download_Progress(self, d):
# Nutze self.progress und self.task3 für Fortschrittsanzeige
if self.progress is None or not hasattr(self, 'task3'):
return
if d['status'] == 'downloading':
total = d.get('total_bytes') or d.get('total_bytes_estimate')
downloaded = d.get('downloaded_bytes', 0)
if total:
percent = downloaded / total * 100
self.progress.update(self.task3, completed=percent, description=f"[gray]Download: {percent:.1f}%")
else:
self.progress.update(self.task3, description=f"[gray]{downloaded/1024/1024:.2f}MB geladen")
elif d['status'] == 'finished':
self.progress.update(self.task3, completed=100, description="[gray]Download abgeschlossen.")
def search_mode(self):
"""Search for a series and allow user to select an option."""
search_string = input("Enter search string: ").strip()
results = self.search(search_string)
if not results:
print("No results found. Returning to start.")
return
print("\nSearch results:")
for i, result in enumerate(results, 1):
print(f"{i}. {result.get('name')}")
while True:
selection = input("\nSelect an option by number or type '<enter>' to return: ").strip().lower()
if selection == "":
return
try:
index = int(selection) - 1
if 0 <= index < len(results):
chosen_name = results[index]
self.List.add(Serie(chosen_name["link"], chosen_name["name"], "aniworld.to", chosen_name["link"], {}))
return
else:
print("Invalid selection. Try again.")
except ValueError:
print("Invalid input. Try again.")
def updateFromReinit(self, folder, counter):
self.progress.update(self.task1, advance=1)
def run(self):
"""Main function to run the app."""
while True:
action = input("\nChoose action ('s' for search, 'i' for init or 'd' for download): ").strip().lower()
if action == "s":
self.search_mode()
if action == "i":
print("\nRescanning series...\n")
self.progress = Progress()
self.task1 = self.progress.add_task("[red]items processed...", total=300)
self.progress.start()
self.SerieScanner.Reinit()
self.SerieScanner.Scan(self.updateFromReinit)
self.List = SerieList.SerieList(self.directory_to_search)
self.__InitList__()
self.progress.stop()
self.progress = None
elif action == "d":
selected_series = self.get_user_selection()
if selected_series:
self.download_series(selected_series)
# Run the app
if __name__ == "__main__":
# Read the base directory from an environment variable
directory_to_search = os.getenv("ANIME_DIRECTORY", "\\\\sshfs.r\\ubuntu@192.168.178.43\\media\\serien\\Serien")
app = SeriesApp(directory_to_search)
app.run()