refactor: Apply PEP8 naming conventions - convert PascalCase methods to snake_case

This comprehensive refactoring applies PEP8 naming conventions across the codebase:

## Core Changes:

### src/cli/Main.py
- Renamed __InitList__() to __init_list__()
- Renamed print_Download_Progress() to print_download_progress()
- Fixed variable naming: task3 -> download_progress_task
- Fixed parameter spacing: words :str -> words: str
- Updated all method calls to use snake_case
- Added comprehensive docstrings

### src/core/SerieScanner.py
- Renamed Scan() to scan()
- Renamed GetTotalToScan() to get_total_to_scan()
- Renamed Reinit() to reinit()
- Renamed private methods to snake_case:
  - __ReadDataFromFile() -> __read_data_from_file()
  - __GetMissingEpisodesAndSeason() -> __get_missing_episodes_and_season()
  - __GetEpisodeAndSeason() -> __get_episode_and_season()
  - __GetEpisodesAndSeasons() -> __get_episodes_and_seasons()
- Added comprehensive docstrings to all methods
- Fixed long line issues

### src/core/providers/base_provider.py
- Refactored abstract base class with proper naming:
  - Search() -> search()
  - IsLanguage() -> is_language()
  - Download() -> download()
  - GetSiteKey() -> get_site_key()
  - GetTitle() -> get_title()
- Added proper type hints (Dict, List, etc.)
- Added comprehensive docstrings explaining contracts
- Fixed newline at end of file

### src/core/providers/aniworld_provider.py
- Renamed public methods to snake_case:
  - Search() -> search()
  - IsLanguage() -> is_language()
  - Download() -> download()
  - GetSiteKey() -> get_site_key()
  - GetTitle() -> get_title()
  - ClearCache() -> clear_cache()
  - RemoveFromCache() -> remove_from_cache()
- Renamed private methods to snake_case:
  - _GetLanguageKey() -> _get_language_key()
  - _GetKeyHTML() -> _get_key_html()
  - _GetEpisodeHTML() -> _get_episode_html()
- Fixed import organization
- Improved code formatting and line lengths
- Added docstrings to all methods

### src/core/SeriesApp.py
- Updated all calls to use new snake_case method names
- Updated loader calls: loader.Search() -> loader.search()
- Updated loader calls: loader.Download() -> loader.download()
- Updated scanner calls: SerieScanner.GetTotalToScan() -> SerieScanner.get_total_to_scan()
- Updated scanner calls: SerieScanner.Reinit() -> SerieScanner.reinit()
- Updated scanner calls: SerieScanner.Scan() -> SerieScanner.scan()

### tests/unit/test_series_app.py
- Updated mock calls to use new snake_case method names:
  - get_total_to_scan() instead of GetTotalToScan()
  - reinit() instead of Reinit()
  - scan() instead of Scan()

## Verification:
- All unit tests pass 
- All integration tests pass 
- All tests pass 
- No breaking changes to functionality

## Standards Applied:
- PEP 8: Function/method names use lowercase with underscores (snake_case)
- PEP 257: Added comprehensive docstrings
- Type hints: Proper type annotations where applicable
- Code formatting: Fixed line lengths and spacing
This commit is contained in:
2025-10-22 12:44:42 +02:00
parent 80507119b7
commit f64ba74d93
8 changed files with 536 additions and 299 deletions

View File

@@ -1,21 +1,20 @@
import html
import json
import logging
import os
import re
import logging
import json
import requests
import html
import shutil
from urllib.parse import quote
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .base_provider import Loader
from ..interfaces.providers import Providers
from yt_dlp import YoutubeDL
import shutil
from ..interfaces.providers import Providers
from .base_provider import Loader
# Read timeout from environment variable, default to 600 seconds (10 minutes)
timeout = int(os.getenv("DOWNLOAD_TIMEOUT", 600))
@@ -79,14 +78,24 @@ class AniworldLoader(Loader):
self._EpisodeHTMLDict = {}
self.Providers = Providers()
def ClearCache(self):
def clear_cache(self):
"""Clear the cached HTML data."""
self._KeyHTMLDict = {}
self._EpisodeHTMLDict = {}
def RemoveFromCache(self):
def remove_from_cache(self):
"""Remove episode HTML from cache."""
self._EpisodeHTMLDict = {}
def Search(self, word: str) -> list:
def search(self, word: str) -> list:
"""Search for anime series.
Args:
word: Search term
Returns:
List of found series
"""
search_url = f"{self.ANIWORLD_TO}/ajax/seriesSearch?keyword={quote(word)}"
anime_list = self.fetch_anime_list(search_url)
@@ -114,25 +123,37 @@ class AniworldLoader(Loader):
except (requests.RequestException, json.JSONDecodeError) as exc:
raise ValueError("Could not get valid anime: ") from exc
def _GetLanguageKey(self, language: str) -> int:
languageCode = 0
if (language == "German Dub"):
languageCode = 1
if (language == "English Sub"):
languageCode = 2
if (language == "German Sub"):
languageCode = 3
return languageCode
def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool:
def _get_language_key(self, language: str) -> int:
"""Convert language name to language code.
Language Codes:
1: German Dub
2: English Sub
3: German Sub
"""
Language Codes:
1: German Dub
2: English Sub
3: German Sub
"""
languageCode = self._GetLanguageKey(language)
language_code = 0
if language == "German Dub":
language_code = 1
if language == "English Sub":
language_code = 2
if language == "German Sub":
language_code = 3
return language_code
episode_soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
def is_language(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
) -> bool:
"""Check if episode is available in specified language."""
language_code = self._get_language_key(language)
episode_soup = BeautifulSoup(
self._get_episode_html(season, episode, key).content,
'html.parser'
)
change_language_box_div = episode_soup.find(
'div', class_='changeLanguageBox')
languages = []
@@ -144,11 +165,22 @@ class AniworldLoader(Loader):
if lang_key and lang_key.isdigit():
languages.append(int(lang_key))
return languageCode in languages
return language_code in languages
def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int, key: str, language: str = "German Dub", progress_callback: callable = None) -> bool:
def download(
self,
base_directory: str,
serie_folder: str,
season: int,
episode: int,
key: str,
language: str = "German Dub",
progress_callback=None
) -> bool:
"""Download episode to specified directory."""
sanitized_anime_title = ''.join(
char for char in self.GetTitle(key) if char not in self.INVALID_PATH_CHARS
char for char in self.get_title(key)
if char not in self.INVALID_PATH_CHARS
)
if season == 0:
@@ -164,19 +196,24 @@ class AniworldLoader(Loader):
f"({language}).mp4"
)
folderPath = os.path.join(os.path.join(baseDirectory, serieFolder), f"Season {season}")
output_path = os.path.join(folderPath, output_file)
folder_path = os.path.join(
os.path.join(base_directory, serie_folder),
f"Season {season}"
)
output_path = os.path.join(folder_path, output_file)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
temp_dir = "./Temp/"
os.makedirs(os.path.dirname(temp_dir), exist_ok=True)
temp_Path = os.path.join(temp_dir, output_file)
temp_path = os.path.join(temp_dir, output_file)
for provider in self.SUPPORTED_PROVIDERS:
link, header = self._get_direct_link_from_provider(season, episode, key, language)
link, header = self._get_direct_link_from_provider(
season, episode, key, language
)
ydl_opts = {
'fragment_retries': float('inf'),
'outtmpl': temp_Path,
'outtmpl': temp_path,
'quiet': True,
'no_warnings': True,
'progress_with_newline': False,
@@ -191,18 +228,23 @@ class AniworldLoader(Loader):
with YoutubeDL(ydl_opts) as ydl:
ydl.download([link])
if (os.path.exists(temp_Path)):
shutil.copy(temp_Path, output_path)
os.remove(temp_Path)
if os.path.exists(temp_path):
shutil.copy(temp_path, output_path)
os.remove(temp_path)
break
self.ClearCache()
self.clear_cache()
return True
def GetSiteKey(self) -> str:
def get_site_key(self) -> str:
"""Get the site key for this provider."""
return "aniworld.to"
def GetTitle(self, key: str) -> str:
soup = BeautifulSoup(self._GetKeyHTML(key).content, 'html.parser')
def get_title(self, key: str) -> str:
"""Get anime title from series key."""
soup = BeautifulSoup(
self._get_key_html(key).content,
'html.parser'
)
title_div = soup.find('div', class_='series-title')
if title_div:
@@ -210,20 +252,21 @@ class AniworldLoader(Loader):
return ""
def _GetKeyHTML(self, key: str):
def _get_key_html(self, key: str):
"""Get cached HTML for series key."""
if key in self._KeyHTMLDict:
return self._KeyHTMLDict[key]
return self._KeyHTMLDict[key]
self._KeyHTMLDict[key] = self.session.get(
f"{self.ANIWORLD_TO}/anime/stream/{key}",
timeout=self.DEFAULT_REQUEST_TIMEOUT
)
return self._KeyHTMLDict[key]
def _GetEpisodeHTML(self, season: int, episode: int, key: str):
if key in self._EpisodeHTMLDict:
return self._EpisodeHTMLDict[(key, season, episode)]
def _get_episode_html(self, season: int, episode: int, key: str):
"""Get cached HTML for episode."""
if key in self._EpisodeHTMLDict:
return self._EpisodeHTMLDict[(key, season, episode)]
link = (
f"{self.ANIWORLD_TO}/anime/stream/{key}/"
@@ -233,29 +276,27 @@ class AniworldLoader(Loader):
self._EpisodeHTMLDict[(key, season, episode)] = html
return self._EpisodeHTMLDict[(key, season, episode)]
def _get_provider_from_html(self, season: int, episode: int, key: str) -> dict:
"""
Parses the HTML content to extract streaming providers,
their language keys, and redirect links.
def _get_provider_from_html(
self,
season: int,
episode: int,
key: str
) -> dict:
"""Parse HTML content to extract streaming providers.
Returns a dictionary with provider names as keys
and language key-to-redirect URL mappings as values.
Example:
Returns a dictionary with provider names as keys
and language key-to-redirect URL mappings as values.
Example:
{
'VOE': {1: 'https://aniworld.to/redirect/1766412',
2: 'https://aniworld.to/redirect/1766405'},
'Doodstream': {1: 'https://aniworld.to/redirect/1987922',
2: 'https://aniworld.to/redirect/2700342'},
...
}
Access redirect link with:
print(self.provider["VOE"][2])
"""
soup = BeautifulSoup(self._GetEpisodeHTML(season, episode, key).content, 'html.parser')
soup = BeautifulSoup(
self._get_episode_html(season, episode, key).content,
'html.parser'
)
providers = {}
episode_links = soup.find_all(
@@ -267,54 +308,87 @@ class AniworldLoader(Loader):
for link in episode_links:
provider_name_tag = link.find('h4')
provider_name = provider_name_tag.text.strip() if provider_name_tag else None
provider_name = (
provider_name_tag.text.strip()
if provider_name_tag else None
)
redirect_link_tag = link.find('a', class_='watchEpisode')
redirect_link = redirect_link_tag['href'] if redirect_link_tag else None
redirect_link = (
redirect_link_tag['href']
if redirect_link_tag else None
)
lang_key = link.get('data-lang-key')
lang_key = int(
lang_key) if lang_key and lang_key.isdigit() else None
lang_key = (
int(lang_key)
if lang_key and lang_key.isdigit() else None
)
if provider_name and redirect_link and lang_key:
if provider_name not in providers:
providers[provider_name] = {}
providers[provider_name][lang_key] = f"{self.ANIWORLD_TO}{redirect_link}"
providers[provider_name][lang_key] = (
f"{self.ANIWORLD_TO}{redirect_link}"
)
return providers
def _get_redirect_link(self, season: int, episode: int, key: str, language: str = "German Dub") -> str:
languageCode = self._GetLanguageKey(language)
if (self.IsLanguage(season, episode, key, language)):
for provider_name, lang_dict in self._get_provider_from_html(season, episode, key).items():
if languageCode in lang_dict:
return(lang_dict[languageCode], provider_name)
break
def _get_redirect_link(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
):
"""Get redirect link for episode in specified language."""
language_code = self._get_language_key(language)
if self.is_language(season, episode, key, language):
for (provider_name, lang_dict) in (
self._get_provider_from_html(
season, episode, key
).items()
):
if language_code in lang_dict:
return (lang_dict[language_code], provider_name)
return None
def _get_embeded_link(self, season: int, episode: int, key: str, language: str = "German Dub"):
redirect_link, provider_name = self._get_redirect_link(season, episode, key, language)
def _get_embeded_link(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
):
"""Get embedded link from redirect link."""
redirect_link, provider_name = (
self._get_redirect_link(season, episode, key, language)
)
embeded_link = self.session.get(
redirect_link, timeout=self.DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': self.RANDOM_USER_AGENT}).url
redirect_link,
timeout=self.DEFAULT_REQUEST_TIMEOUT,
headers={'User-Agent': self.RANDOM_USER_AGENT}
).url
return embeded_link
def _get_direct_link_from_provider(self, season: int, episode: int, key: str, language: str = "German Dub") -> str:
"""
providers = {
"Vidmoly": get_direct_link_from_vidmoly,
"Vidoza": get_direct_link_from_vidoza,
"VOE": get_direct_link_from_voe,
"Doodstream": get_direct_link_from_doodstream,
"SpeedFiles": get_direct_link_from_speedfiles,
"Luluvdo": get_direct_link_from_luluvdo
}
"""
embeded_link = self._get_embeded_link(season, episode, key, language)
def _get_direct_link_from_provider(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
):
"""Get direct download link from streaming provider."""
embeded_link = self._get_embeded_link(
season, episode, key, language
)
if embeded_link is None:
return None
return self.Providers.GetProvider("VOE").GetLink(embeded_link, self.DEFAULT_REQUEST_TIMEOUT)
return self.Providers.GetProvider(
"VOE"
).GetLink(embeded_link, self.DEFAULT_REQUEST_TIMEOUT)
def get_season_episode_count(self, slug : str) -> dict:
base_url = f"{self.ANIWORLD_TO}/anime/stream/{slug}/"

View File

@@ -1,27 +1,94 @@
from abc import ABC, abstractmethod
from typing import Dict, List
class Loader(ABC):
"""Abstract base class for anime data loaders/providers."""
@abstractmethod
def Search(self, word: str) -> list:
def search(self, word: str) -> List[Dict]:
"""Search for anime series by name.
Args:
word: Search term
Returns:
List of found series as dictionaries
"""
pass
@abstractmethod
def IsLanguage(self, season: int, episode: int, key: str, language: str = "German Dub") -> bool:
def is_language(
self,
season: int,
episode: int,
key: str,
language: str = "German Dub"
) -> bool:
"""Check if episode exists in specified language.
Args:
season: Season number
episode: Episode number
key: Series key
language: Language to check (default: German Dub)
Returns:
True if episode exists in specified language
"""
pass
@abstractmethod
def Download(self, baseDirectory: str, serieFolder: str, season: int, episode: int, key: str, progress_callback: callable = None) -> bool:
def download(
self,
base_directory: str,
serie_folder: str,
season: int,
episode: int,
key: str,
progress_callback=None
) -> bool:
"""Download episode to specified directory.
Args:
base_directory: Base directory for downloads
serie_folder: Series folder name
season: Season number
episode: Episode number
key: Series key
progress_callback: Optional callback for progress updates
Returns:
True if download successful
"""
pass
@abstractmethod
def GetSiteKey(self) -> str:
def get_site_key(self) -> str:
"""Get the site key/identifier for this provider.
Returns:
Site key string (e.g., 'aniworld.to')
"""
pass
@abstractmethod
def GetTitle(self) -> str:
def get_title(self) -> str:
"""Get the human-readable title of this provider.
Returns:
Provider title string
"""
pass
@abstractmethod
def get_season_episode_count(self, slug: str) -> dict:
pass
def get_season_episode_count(self, slug: str) -> Dict[int, int]:
"""Get season and episode counts for a series.
Args:
slug: Series slug/key
Returns:
Dictionary mapping season number to episode count
"""
pass