Aniworld/src/cli/Main.py
2025-10-22 15:54:36 +02:00

317 lines
11 KiB
Python

"""Command-line interface for the Aniworld anime download manager."""
import logging
import os
from typing import Optional, Sequence
from rich.progress import Progress
from src.core.entities.series import Serie
from src.core.SeriesApp import SeriesApp as CoreSeriesApp
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
logger = logging.getLogger(__name__)
class SeriesCLI:
"""Thin wrapper around :class:`SeriesApp` providing an interactive CLI."""
def __init__(self, directory_to_search: str) -> None:
print("Please wait while initializing...")
self.directory_to_search = directory_to_search
self.series_app = CoreSeriesApp(directory_to_search)
self._progress: Optional[Progress] = None
self._overall_task_id: Optional[int] = None
self._series_task_id: Optional[int] = None
self._episode_task_id: Optional[int] = None
self._scan_task_id: Optional[int] = None
# ------------------------------------------------------------------
# Utility helpers
# ------------------------------------------------------------------
def _get_series_list(self) -> Sequence[Serie]:
"""Return the currently cached series with missing episodes."""
return self.series_app.get_series_list()
# ------------------------------------------------------------------
# Display & selection
# ------------------------------------------------------------------
def display_series(self) -> None:
"""Print all series with assigned numbers."""
series = self._get_series_list()
if not series:
print("\nNo series with missing episodes were found.")
return
print("\nCurrent result:")
for index, serie in enumerate(series, start=1):
name = (serie.name or "").strip()
label = name if name else serie.folder
print(f"{index}. {label}")
def get_user_selection(self) -> Optional[Sequence[Serie]]:
"""Prompt the user to select one or more series for download."""
series = list(self._get_series_list())
if not series:
print("No series available for download.")
return None
self.display_series()
prompt = (
"\nSelect series by number (e.g. '1', '1,2' or 'all') "
"or type 'exit' to return: "
)
selection = input(prompt).strip().lower()
if selection in {"exit", ""}:
return None
if selection == "all":
return series
try:
indexes = [
int(value.strip()) - 1
for value in selection.split(",")
]
except ValueError:
print("Invalid selection. Returning to main menu.")
return None
chosen = [
series[i]
for i in indexes
if 0 <= i < len(series)
]
if not chosen:
print("No valid series selected.")
return None
return chosen
# ------------------------------------------------------------------
# Download logic
# ------------------------------------------------------------------
def download_series(self, series: Sequence[Serie]) -> None:
"""Download all missing episodes for the provided series list."""
total_episodes = sum(
len(episodes)
for serie in series
for episodes in serie.episodeDict.values()
)
if total_episodes == 0:
print("Selected series do not contain missing episodes.")
return
self._progress = Progress()
with self._progress:
self._overall_task_id = self._progress.add_task(
"[red]Processing...", total=total_episodes
)
self._series_task_id = self._progress.add_task(
"[green]Current series", total=1
)
self._episode_task_id = self._progress.add_task(
"[gray]Download", total=100
)
for serie in series:
serie_total = sum(len(eps) for eps in serie.episodeDict.values())
self._progress.update(
self._series_task_id,
total=max(serie_total, 1),
completed=0,
description=f"[green]{serie.folder}",
)
for season, episodes in serie.episodeDict.items():
for episode in episodes:
if not self.series_app.loader.is_language(
season, episode, serie.key
):
logger.info(
"Skipping %s S%02dE%02d because the desired language is unavailable",
serie.folder,
season,
episode,
)
continue
result = self.series_app.download(
serieFolder=serie.folder,
season=season,
episode=episode,
key=serie.key,
callback=self._update_download_progress,
)
if not result.success:
logger.error("Download failed: %s", result.message)
self._progress.advance(self._overall_task_id)
self._progress.advance(self._series_task_id)
self._progress.update(
self._episode_task_id,
completed=0,
description="[gray]Waiting...",
)
self._progress = None
self.series_app.refresh_series_list()
def _update_download_progress(self, percent: float) -> None:
"""Update the episode progress bar based on download progress."""
if not self._progress or self._episode_task_id is None:
return
description = f"[gray]Download: {percent:.1f}%"
self._progress.update(
self._episode_task_id,
completed=percent,
description=description,
)
# ------------------------------------------------------------------
# Rescan logic
# ------------------------------------------------------------------
def rescan(self) -> None:
"""Trigger a rescan of the anime directory using the core app."""
total_to_scan = self.series_app.SerieScanner.get_total_to_scan()
total_to_scan = max(total_to_scan, 1)
self._progress = Progress()
with self._progress:
self._scan_task_id = self._progress.add_task(
"[red]Scanning folders...",
total=total_to_scan,
)
result = self.series_app.ReScan(
callback=self._wrap_scan_callback(total_to_scan)
)
self._progress = None
self._scan_task_id = None
if result.success:
print(result.message)
else:
print(f"Scan failed: {result.message}")
def _wrap_scan_callback(self, total: int):
"""Create a callback that updates the scan progress bar."""
def _callback(folder: str, current: int) -> None:
if not self._progress or self._scan_task_id is None:
return
self._progress.update(
self._scan_task_id,
completed=min(current, total),
description=f"[green]{folder}",
)
return _callback
# ------------------------------------------------------------------
# Search & add logic
# ------------------------------------------------------------------
def search_mode(self) -> None:
"""Search for a series and add it to the local list if chosen."""
query = input("Enter search string: ").strip()
if not query:
return
results = self.series_app.search(query)
if not results:
print("No results found. Returning to main menu.")
return
print("\nSearch results:")
for index, result in enumerate(results, start=1):
print(f"{index}. {result.get('name', 'Unknown')}")
selection = input(
"\nSelect an option by number or press <enter> to cancel: "
).strip()
if selection == "":
return
try:
chosen_index = int(selection) - 1
except ValueError:
print("Invalid input. Returning to main menu.")
return
if not (0 <= chosen_index < len(results)):
print("Invalid selection. Returning to main menu.")
return
chosen = results[chosen_index]
serie = Serie(
chosen.get("link", ""),
chosen.get("name", "Unknown"),
"aniworld.to",
chosen.get("link", ""),
{},
)
self.series_app.List.add(serie)
self.series_app.refresh_series_list()
print(f"Added '{serie.name}' to the local catalogue.")
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
def run(self) -> None:
"""Run the interactive CLI loop."""
while True:
action = input(
"\nChoose action ('s' for search, 'i' for rescan, 'd' for download, 'q' to quit): "
).strip().lower()
if action == "s":
self.search_mode()
elif action == "i":
print("\nRescanning series...\n")
self.rescan()
elif action == "d":
selected_series = self.get_user_selection()
if selected_series:
self.download_series(selected_series)
elif action in {"q", "quit", "exit"}:
print("Goodbye!")
break
else:
print("Unknown command. Please choose 's', 'i', 'd', or 'q'.")
def configure_logging() -> None:
"""Set up a basic logging configuration for the CLI."""
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger("charset_normalizer").setLevel(logging.ERROR)
def main() -> None:
"""Entry point for the CLI application."""
configure_logging()
default_dir = os.getenv("ANIME_DIRECTORY")
if not default_dir:
print(
"Environment variable ANIME_DIRECTORY is not set. Please configure it to the base anime directory."
)
return
app = SeriesCLI(default_dir)
app.run()
if __name__ == "__main__":
main()