Aniworld/Loader.py

81 lines
3.0 KiB
Python

import os
import re
import subprocess
import logging
from aniworld.models import Anime
from aniworld.config import PROVIDER_HEADERS, INVALID_PATH_CHARS
from aniworld.parser import arguments
# Read timeout from environment variable, default to 600 seconds (10 minutes)
timeout = int(os.getenv("DOWNLOAD_TIMEOUT", 600))
download_error_logger = logging.getLogger("DownloadErrors")
download_error_handler = logging.FileHandler("download_errors.log")
download_error_handler.setLevel(logging.ERROR)
download_error_logger.addHandler(download_error_handler)
def download(anime: Anime): # pylint: disable=too-many-branches
for episode in anime:
sanitized_anime_title = ''.join(
char for char in anime.title if char not in INVALID_PATH_CHARS
)
if episode.season == 0:
output_file = (
f"{sanitized_anime_title} - "
f"Movie {episode.episode:02} - "
f"({anime.language}).mp4"
)
else:
output_file = (
f"{sanitized_anime_title} - "
f"S{episode.season:02}E{episode.episode:03} - "
f"({anime.language}).mp4"
)
output_path = os.path.join(anime.output_directory, output_file)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
command = [
"yt-dlp",
episode.get_direct_link(anime.provider, anime.language),
"--fragment-retries", "infinite",
"--concurrent-fragments", "4",
"-o", output_path,
"--quiet",
"--no-warnings"
]
if anime.provider in PROVIDER_HEADERS:
for header in PROVIDER_HEADERS[anime.provider]:
command.extend(["--add-header", header])
if arguments.only_command:
logging.info(
f"{anime.title} - S{episode.season}E{episode.episode} - ({anime.language}): "
f"{' '.join(str(item) if item is not None else '' for item in command)}"
)
continue
try:
subprocess.run(command, check=True, timeout=timeout)
except subprocess.TimeoutExpired:
logging.error(f"Download timed out after {timeout} seconds: {' '.join(str(item) for item in command)}")
except subprocess.CalledProcessError:
logging.error(f"Error running command: {' '.join(str(item) for item in command)}")
except KeyboardInterrupt:
logging.warning("Download interrupted by user.")
output_dir = os.path.dirname(output_path)
is_empty = True
for file_name in os.listdir(output_dir):
if re.search(r'\.(part|ytdl|part-Frag\d+)$', file_name):
os.remove(os.path.join(output_dir, file_name))
else:
is_empty = False
if is_empty or not os.listdir(output_dir):
os.rmdir(output_dir)
logging.info(f"Removed empty download directory: {output_dir}")