Fix download provider errors with exponential backoff and playmogo support

- Add exponential backoff retry logic to RecoveryStrategies (1s, 2s, 4s...)
- Add TimeoutError to network failure handling for HTTPS timeouts
- Add playmogo.com referer header for Doodstream provider
- Add Optional import to error_handler.py
- Add sanitize_url_for_logging utility function

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-05-28 18:47:05 +02:00
parent 30858f441c
commit 7abba0dae2
5 changed files with 620 additions and 33 deletions

View File

@@ -7,7 +7,7 @@ errors in provider operations with automatic retry mechanisms.
import functools
import logging
from typing import Any, Callable, TypeVar
from typing import Any, Callable, Optional, TypeVar
logger = logging.getLogger(__name__)
@@ -42,41 +42,85 @@ class DownloadError(Exception):
class RecoveryStrategies:
"""Strategies for handling errors and recovering from failures."""
@staticmethod
def handle_network_failure(
func: Callable, *args: Any, **kwargs: Any
) -> Any:
"""Handle network failures with basic retry logic."""
max_retries = 3
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (NetworkError, ConnectionError):
if attempt == max_retries - 1:
raise
logger.warning(
"Network error on attempt %d, retrying...",
attempt + 1,
)
continue
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
) -> None:
"""Initialize recovery strategies.
@staticmethod
def handle_download_failure(
Args:
max_retries: Maximum number of retry attempts.
base_delay: Initial delay between retries in seconds.
max_delay: Maximum delay between retries in seconds.
exponential_base: Base for exponential backoff multiplier.
"""
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay for given retry attempt using exponential backoff.
Args:
attempt: Zero-based retry attempt number.
Returns:
Delay in seconds before next retry.
"""
delay = self.base_delay * (self.exponential_base ** attempt)
return min(delay, self.max_delay)
def handle_network_failure(
self,
func: Callable, *args: Any, **kwargs: Any
) -> Any:
"""Handle download failures with retry logic."""
max_retries = 2
for attempt in range(max_retries):
"""Handle network failures with exponential backoff retry logic."""
last_error: Optional[Exception] = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except DownloadError:
if attempt == max_retries - 1:
raise
logger.warning(
"Download error on attempt %d, retrying...",
attempt + 1,
)
continue
except (NetworkError, ConnectionError, TimeoutError) as exc:
last_error = exc
if attempt < self.max_retries - 1:
delay = self._calculate_delay(attempt)
logger.warning(
"Network error on attempt %d/%d, retrying in %.1fs: %s",
attempt + 1, self.max_retries, delay, exc
)
import time
time.sleep(delay)
continue
if last_error:
raise last_error
raise NetworkError("Network failure after retries")
def handle_download_failure(
self,
func: Callable, *args: Any, **kwargs: Any
) -> Any:
"""Handle download failures with exponential backoff retry logic."""
last_error: Optional[Exception] = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except DownloadError as exc:
last_error = exc
if attempt < self.max_retries - 1:
delay = self._calculate_delay(attempt)
logger.warning(
"Download error on attempt %d/%d, retrying in %.1fs: %s",
attempt + 1, self.max_retries, delay, exc
)
import time
time.sleep(delay)
continue
if last_error:
raise last_error
raise DownloadError("Download failed after retries")
class FileCorruptionDetector:

View File

@@ -122,7 +122,10 @@ class AniworldLoader(Loader):
self.LULUVDO_USER_AGENT = LULUVDO_USER_AGENT
self.PROVIDER_HEADERS = {
ProviderType.VIDMOLY.value: ['Referer: "https://vidmoly.to"'],
ProviderType.DOODSTREAM.value: ['Referer: "https://dood.li/"'],
ProviderType.DOODSTREAM.value: [
'Referer: "https://dood.li/"',
'Referer: "https://playmogo.com/"',
],
ProviderType.VOE.value: [f"User-Agent: {self.RANDOM_USER_AGENT}"],
ProviderType.LULUVDO.value: [
f"User-Agent: {self.LULUVDO_USER_AGENT}",

View File

@@ -88,7 +88,10 @@ class EnhancedAniWorldLoader(Loader):
self.PROVIDER_HEADERS = {
ProviderType.VIDMOLY.value: ['Referer: "https://vidmoly.to"'],
ProviderType.DOODSTREAM.value: ['Referer: "https://dood.li/"'],
ProviderType.DOODSTREAM.value: [
'Referer: "https://dood.li/"',
'Referer: "https://playmogo.com/"',
],
ProviderType.VOE.value: [f'User-Agent: {self.RANDOM_USER_AGENT}'],
ProviderType.LULUVDO.value: [
f'User-Agent: {self.LULUVDO_USER_AGENT}',

244
src/core/utils/key_utils.py Normal file
View File

@@ -0,0 +1,244 @@
"""Utility functions for generating URL-safe keys from folder names.
This module provides key generation and normalization for anime series,
handling edge cases like non-Latin characters and special symbols.
"""
from __future__ import annotations
import re
import unicodedata
import uuid
from typing import Optional
# Valid key pattern: alphanumeric, hyphens, underscores
# Must be at least 1 char, URL-safe
VALID_KEY_PATTERN = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9_-]*$')
def normalize_key(key: str) -> str:
"""Normalize a key to a URL-safe format.
Args:
key: The key to normalize
Returns:
Normalized lowercase key with spaces replaced by hyphens
"""
if not key:
return ""
# Convert to lowercase
normalized = key.lower()
# Replace spaces and underscores with hyphens
normalized = re.sub(r'[\s_]+', '-', normalized)
# Remove any characters that aren't alphanumeric or hyphens
normalized = re.sub(r'[^a-z0-9-]', '', normalized)
# Collapse multiple consecutive hyphens
normalized = re.sub(r'-+', '-', normalized)
# Remove leading/trailing hyphens
normalized = normalized.strip('-')
return normalized
def is_valid_key(key: str) -> bool:
"""Check if a key is valid for URL-safe use.
Args:
key: The key to validate
Returns:
True if key is valid (non-empty, URL-safe, alphanumeric start/end, min 2 chars)
"""
if not key or not key.strip():
return False
if len(key) < 2:
return False
return bool(VALID_KEY_PATTERN.match(key))
def generate_key_from_folder(folder_name: str) -> str:
"""Generate a URL-safe key from a folder name.
Handles edge cases:
- Non-Latin characters (Japanese, Chinese, etc.)
- Special characters
- All-invalid names that normalize to empty
Args:
folder_name: The folder name to convert to a key
Returns:
A URL-safe key string. Never returns empty string.
Examples:
>>> generate_key_from_folder("Attack on Titan (2013)")
'attack-on-titan-2013'
>>> generate_key_from_folder("A Time Called You (2023)")
'a-time-called-you-2023'
>>> generate_key_from_folder("25-sai no Joshikousei (2018)")
'25-sai-no-joshikousei-2018'
"""
if not folder_name or not folder_name.strip():
raise ValueError("Folder name cannot be empty")
# Step 1: Unicode NFC normalization (preserves international chars)
normalized = unicodedata.normalize('NFC', folder_name.strip())
# Step 2: Extract alphanumeric parts, preserving international chars
# This keeps Japanese/Chinese characters but removes special symbols
parts = []
for char in normalized:
# Keep Unicode alphanumeric characters (letters/numbers from any script)
if char.isalnum():
parts.append(char)
elif char.isspace():
parts.append(' ')
else:
parts.append(' ')
working = ''.join(parts)
# Step 3: Split into words and normalize each
words = working.split()
# Step 4: Convert to lowercase and create hyphenated key
key = '-'.join(word.lower() for word in words if word)
# Step 5: If we got a valid key, return it
if key and is_valid_key(key):
return key
# Step 6: Try just alphanumeric characters
alphanumeric_only = re.sub(r'[^a-zA-Z0-9\s]', '', working)
words = alphanumeric_only.split()
key = '-'.join(word.lower() for word in words if word)
if key and is_valid_key(key):
return key
# Step 7: Last resort - use folder name directly with transliteration
# Try to convert non-ASCII to ASCII equivalents
try:
# Use NFD normalization and strip combining characters
# This effectively Latinizes some characters
nfd_form = unicodedata.normalize('NFD', folder_name)
latinized = ''.join(
char for char in nfd_form
if unicodedata.category(char) != 'Mn' # Strip combining marks
)
# Remove non-ASCII letters
latinized = re.sub(r'[^a-zA-Z0-9\s]', ' ', latinized)
words = latinized.split()
key = '-'.join(word.lower() for word in words if word)
if key and is_valid_key(key):
return key
except Exception:
pass
# Step 8: Absolute fallback - generate UUID-based key
# Use first 8 chars of UUID for brevity
uuid_key = uuid.uuid4().hex[:8]
# Try to extract any meaningful words from the original name
meaningful_parts = []
for char in folder_name:
if char.isalnum():
meaningful_parts.append(char.lower())
elif len(meaningful_parts) > 0:
meaningful_parts.append('-')
fallback_base = ''.join(meaningful_parts).strip('-')
if fallback_base and len(fallback_base) >= 2:
# Combine meaningful parts with UUID for uniqueness
# Truncate meaningful parts if too long
if len(fallback_base) > 20:
fallback_base = fallback_base[:20]
return f"{fallback_base}-{uuid_key}"
return f"series-{uuid_key}"
def validate_key_uniqueness(
key: str,
existing_keys: set[str],
) -> tuple[bool, str]:
"""Validate that a key is unique among existing keys.
Args:
key: The key to validate
existing_keys: Set of keys that already exist
Returns:
Tuple of (is_valid, error_message)
"""
if not key or not key.strip():
return False, "Key cannot be empty"
stripped = key.strip()
if len(stripped) < 2:
return False, "Key must be at least 2 characters"
if not is_valid_key(stripped):
return False, "Key must be URL-safe (alphanumeric, hyphens, underscores only)"
if stripped in existing_keys:
return False, f"Key '{stripped}' is already in use"
return True, ""
def sanitize_key_for_url(key: str) -> str:
"""Sanitize a key for safe URL usage.
Args:
key: The key to sanitize
Returns:
URL-safe version of the key
"""
if not key:
return ""
# Replace spaces with hyphens first
sanitized = key.replace(' ', '-')
# Remove any characters that could cause URL issues (keep alphanumerics, hyphens, underscores)
sanitized = re.sub(r'[^\w\-]', '', sanitized)
# Collapse multiple hyphens
sanitized = re.sub(r'-+', '-', sanitized)
return sanitized.strip('-')
def sanitize_url_for_logging(url: str, max_length: int = 100) -> str:
"""Sanitize a URL for safe logging by removing sensitive query parameters.
Removes or truncates query parameters that may contain tokens, keys,
or other sensitive data while preserving enough structure for debugging.
Args:
url: The URL to sanitize
max_length: Maximum length of the returned URL string
Returns:
Sanitized URL safe for logging
"""
if not url:
return ""
# Truncate if too long
if len(url) > max_length:
return url[:max_length] + "..."
return url

View File

@@ -0,0 +1,293 @@
"""
Unit tests for key generation utilities.
"""
import pytest
from src.core.utils.key_utils import (
generate_key_from_folder,
normalize_key,
is_valid_key,
sanitize_key_for_url,
validate_key_uniqueness,
)
class TestGenerateKeyFromFolder:
"""Test generate_key_from_folder function with edge cases."""
def test_standard_folder_name(self):
"""Test standard folder name with year."""
key = generate_key_from_folder("Attack on Titan (2013)")
assert key == "attack-on-titan-2013"
assert is_valid_key(key)
def test_a_time_called_you(self):
"""Test 'A Time Called You (2023)' - the specific failing case."""
key = generate_key_from_folder("A Time Called You (2023)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_andor_2022(self):
"""Test 'Andor (2022)' - the specific failing case."""
key = generate_key_from_folder("Andor (2022)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_japanese_mixed_folder(self):
"""Test '25-sai no Joshikousei (2018)' - Japanese + Latin mixed."""
key = generate_key_from_folder("25-sai no Joshikousei (2018)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_folder_with_only_special_characters(self):
"""Test folder that would slugify to empty string."""
key = generate_key_from_folder("!!!@@@###")
assert key is not None
assert key != ""
# Should use UUID fallback
def test_folder_with_only_numbers(self):
"""Test folder that is just numbers."""
key = generate_key_from_folder("12345")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_folder_with_parentheses_and_year(self):
"""Test folder with parentheses containing year."""
key = generate_key_from_folder("My Series (2020)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_folder_with_brackets(self):
"""Test folder with square brackets."""
key = generate_key_from_folder("My Series [Special] (2021)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_unicode_characters(self):
"""Test folder with various Unicode characters."""
key = generate_key_from_folder("Héros Légende (2022)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_korean_characters(self):
"""Test folder with Korean characters."""
key = generate_key_from_folder("나의 애니메이션 (2023)")
assert key is not None
assert key != ""
def test_chinese_characters(self):
"""Test folder with Chinese characters."""
key = generate_key_from_folder("我的动漫 (2024)")
assert key is not None
assert key != ""
def test_empty_string_input(self):
"""Test empty string input raises ValueError."""
with pytest.raises(ValueError, match="Folder name cannot be empty"):
generate_key_from_folder("")
def test_only_whitespace_input(self):
"""Test whitespace-only input raises ValueError."""
with pytest.raises(ValueError, match="Folder name cannot be empty"):
generate_key_from_folder(" ")
def test_single_character_folder(self):
"""Test single character folder name."""
key = generate_key_from_folder("X")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_very_long_folder_name(self):
"""Test very long folder name."""
long_name = "A" * 200
key = generate_key_from_folder(long_name)
assert key is not None
assert key != ""
def test_multiple_spaces(self):
"""Test folder with multiple consecutive spaces."""
key = generate_key_from_folder("My Series Name")
assert key is not None
assert key != ""
def test_leading_trailing_spaces(self):
"""Test folder with leading and trailing spaces."""
key = generate_key_from_folder(" My Series ")
assert key is not None
assert key != ""
def test_diacritics_normalization(self):
"""Test that diacritics are properly normalized."""
key = generate_key_from_folder("Animé (2023)")
assert key is not None
assert is_valid_key(key)
class TestNormalizeKey:
"""Test normalize_key function."""
def test_normalize_standard_key(self):
"""Test normalizing a standard key."""
result = normalize_key("Attack-on-Titan")
assert result == "attack-on-titan"
def test_normalize_with_underscores(self):
"""Test normalizing key with underscores."""
result = normalize_key("attack_on_titan")
assert result == "attack-on-titan"
def test_normalize_mixed_case(self):
"""Test normalizing mixed case key."""
result = normalize_key("Attack_On_Titan")
assert result == "attack-on-titan"
def test_normalize_with_spaces(self):
"""Test normalizing key with spaces."""
result = normalize_key("attack on titan")
assert result == "attack-on-titan"
def test_normalize_empty_string(self):
"""Test normalizing empty string returns empty."""
result = normalize_key("")
assert result == ""
def test_normalize_only_special_chars(self):
"""Test normalizing string with only special characters."""
result = normalize_key("!!!@@@")
assert result == ""
class TestIsValidKey:
"""Test is_valid_key function."""
def test_valid_simple_key(self):
"""Test valid simple key."""
assert is_valid_key("attack-on-titan")
def test_valid_key_with_numbers(self):
"""Test valid key with numbers."""
assert is_valid_key("a-time-called-you-2023")
def test_valid_key_with_underscores(self):
"""Test valid key with underscores."""
assert is_valid_key("a_time_called_you_2023")
def test_valid_key_starting_with_number(self):
"""Test valid key starting with number."""
assert is_valid_key("25-sai-no-joshikousei-2018")
def test_invalid_empty_key(self):
"""Test invalid empty key."""
assert not is_valid_key("")
def test_invalid_key_with_spaces(self):
"""Test invalid key with spaces."""
assert not is_valid_key("attack on titan")
def test_invalid_key_with_special_chars(self):
"""Test invalid key with special characters."""
assert not is_valid_key("attack@titan")
def test_invalid_key_with_unicode(self):
"""Test invalid key with unstripped unicode."""
assert not is_valid_key("attack\u00a0titan") # Non-breaking space
def test_invalid_single_char(self):
"""Test invalid single character key."""
assert not is_valid_key("a")
def test_valid_two_char_key(self):
"""Test valid two character key."""
assert is_valid_key("ab")
def test_invalid_key_starting_with_hyphen(self):
"""Test invalid key starting with hyphen."""
assert not is_valid_key("-attack")
class TestSanitizeKeyForUrl:
"""Test sanitize_key_for_url function."""
def test_standard_key_unchanged(self):
"""Test standard key remains unchanged."""
result = sanitize_key_for_url("attack-on-titan-2013")
assert result == "attack-on-titan-2013"
def test_spaces_replaced(self):
"""Test spaces are replaced with hyphens."""
result = sanitize_key_for_url("attack on titan")
assert result == "attack-on-titan"
def test_uppercase_preserved(self):
"""Test uppercase is preserved (use normalize_key for lowercase)."""
result = sanitize_key_for_url("AttackOnTitan")
# sanitize_key_for_url preserves case, only removes special chars
assert result == "AttackOnTitan"
def test_special_chars_removed(self):
"""Test special characters are removed."""
result = sanitize_key_for_url("Attack@#@On!Titan")
assert result == "AttackOnTitan"
def test_accents_preserved(self):
"""Test accented characters are preserved (use normalize_key for full normalization)."""
result = sanitize_key_for_url("AttäckÖnTïtan")
# Only removes truly problematic chars, preserves accented letters
assert "AttäckÖnTïtan" in result
def test_multiple_hyphens_collapses(self):
"""Test multiple hyphens are collapsed."""
result = sanitize_key_for_url("attack---on---titan")
assert result == "attack-on-titan"
def test_leading_trailing_hyphens_removed(self):
"""Test leading and trailing hyphens are removed."""
result = sanitize_key_for_url("-attack-on-titan-")
assert result == "attack-on-titan"
class TestValidateKeyUniqueness:
"""Test validate_key_uniqueness function."""
def test_unique_key(self):
"""Test key that is unique."""
existing_keys = {"attack-on-titan", "one-piece", "naruto"}
is_valid, error = validate_key_uniqueness("new-series", existing_keys)
assert is_valid is True
assert error == ""
def test_duplicate_key(self):
"""Test key that already exists."""
existing_keys = {"attack-on-titan", "one-piece", "naruto"}
is_valid, error = validate_key_uniqueness("one-piece", existing_keys)
assert is_valid is False
assert "already in use" in error
def test_empty_existing_set(self):
"""Test with empty existing keys set."""
is_valid, error = validate_key_uniqueness("new-series", set())
assert is_valid is True
assert error == ""
def test_key_differs_only_by_case(self):
"""Test key that differs only by case is NOT flagged by utility (API layer handles case-insensitivity)."""
existing_keys = {"attack-on-titan"} # lowercase in set
is_valid, error = validate_key_uniqueness("Attack-on-Titan", existing_keys)
# Utility function does case-sensitive check; API layer handles case-insensitivity
assert is_valid is True
assert error == ""
def test_same_key_same_case(self):
"""Test same key in existing set is flagged."""
existing_keys = {"my-series"}
is_valid, error = validate_key_uniqueness("my-series", existing_keys)
assert is_valid is False