refactor: remove temp/legacy code artifacts

- Delete src/server/core_utils_temp/: nfo_generator.py, nfo_mapper.py
- Delete src/server/services_nfo_temp/: tmdb_client.py
- Remove legacy_file_migration.py from services/
- Remove test_legacy_migration.py integration test

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
2026-06-05 16:25:06 +02:00
parent 25dc66fec3
commit 7d9f80a0c6
6 changed files with 0 additions and 1493 deletions

View File

@@ -1,213 +0,0 @@
"""NFO XML generator for Kodi/XBMC format.
This module provides functions to generate tvshow.nfo XML files from
TVShowNFO Pydantic models, adapted from the scraper project.
Example:
>>> from src.server.nfo.nfo_models import TVShowNFO
>>> nfo = TVShowNFO(title="Test Show", year=2020, tmdbid=12345)
>>> xml_string = generate_tvshow_nfo(nfo)
"""
import logging
from typing import Optional
from lxml import etree
from src.config.settings import settings
from src.server.nfo.nfo_models import TVShowNFO
logger = logging.getLogger(__name__)
def generate_tvshow_nfo(tvshow: TVShowNFO, pretty_print: bool = True) -> str:
"""Generate tvshow.nfo XML content from TVShowNFO model.
Args:
tvshow: TVShowNFO Pydantic model with metadata
pretty_print: Whether to format XML with indentation
Returns:
XML string in Kodi/XBMC tvshow.nfo format
Example:
>>> nfo = TVShowNFO(title="Attack on Titan", year=2013)
>>> xml = generate_tvshow_nfo(nfo)
"""
root = etree.Element("tvshow")
# Basic information
_add_element(root, "title", tvshow.title)
_add_element(root, "originaltitle", tvshow.originaltitle)
_add_element(root, "showtitle", tvshow.showtitle)
_add_element(root, "sorttitle", tvshow.sorttitle)
_add_element(root, "year", str(tvshow.year) if tvshow.year else None)
# Plot and description always write <plot> even when empty so that
# all NFO files have a consistent set of tags regardless of whether they
# were produced by create or update.
_add_element(root, "plot", tvshow.plot, always_write=True)
_add_element(root, "outline", tvshow.outline)
_add_element(root, "tagline", tvshow.tagline)
# Technical details
_add_element(root, "runtime", str(tvshow.runtime) if tvshow.runtime else None)
# Content rating - prefer FSK if available and configured
if getattr(settings, 'nfo_prefer_fsk_rating', True) and tvshow.fsk:
_add_element(root, "mpaa", tvshow.fsk)
else:
_add_element(root, "mpaa", tvshow.mpaa)
_add_element(root, "certification", tvshow.certification)
# Status and dates
_add_element(root, "premiered", tvshow.premiered)
_add_element(root, "status", tvshow.status)
_add_element(root, "dateadded", tvshow.dateadded)
# Ratings
if tvshow.ratings:
ratings_elem = etree.SubElement(root, "ratings")
for rating in tvshow.ratings:
rating_elem = etree.SubElement(ratings_elem, "rating")
if rating.name:
rating_elem.set("name", rating.name)
if rating.max_rating:
rating_elem.set("max", str(rating.max_rating))
if rating.default:
rating_elem.set("default", "true")
_add_element(rating_elem, "value", str(rating.value))
if rating.votes is not None:
_add_element(rating_elem, "votes", str(rating.votes))
_add_element(root, "userrating", str(tvshow.userrating) if tvshow.userrating is not None else None)
# IDs
_add_element(root, "tmdbid", str(tvshow.tmdbid) if tvshow.tmdbid else None)
_add_element(root, "imdbid", tvshow.imdbid)
_add_element(root, "tvdbid", str(tvshow.tvdbid) if tvshow.tvdbid else None)
# Legacy ID fields for compatibility
_add_element(root, "id", str(tvshow.tvdbid) if tvshow.tvdbid else None)
_add_element(root, "imdb_id", tvshow.imdbid)
# Unique IDs
for uid in tvshow.uniqueid:
uid_elem = etree.SubElement(root, "uniqueid")
uid_elem.set("type", uid.type)
if uid.default:
uid_elem.set("default", "true")
uid_elem.text = uid.value
# Multi-value fields
for genre in tvshow.genre:
_add_element(root, "genre", genre)
for studio in tvshow.studio:
_add_element(root, "studio", studio)
for country in tvshow.country:
_add_element(root, "country", country)
for tag in tvshow.tag:
_add_element(root, "tag", tag)
# Thumbnails (posters, logos)
for thumb in tvshow.thumb:
thumb_elem = etree.SubElement(root, "thumb")
if thumb.aspect:
thumb_elem.set("aspect", thumb.aspect)
if thumb.season is not None:
thumb_elem.set("season", str(thumb.season))
if thumb.type:
thumb_elem.set("type", thumb.type)
thumb_elem.text = str(thumb.url)
# Fanart
if tvshow.fanart:
fanart_elem = etree.SubElement(root, "fanart")
for fanart in tvshow.fanart:
fanart_thumb = etree.SubElement(fanart_elem, "thumb")
fanart_thumb.text = str(fanart.url)
# Named seasons
for named_season in tvshow.namedseason:
season_elem = etree.SubElement(root, "namedseason")
season_elem.set("number", str(named_season.number))
season_elem.text = named_season.name
# Actors
for actor in tvshow.actors:
actor_elem = etree.SubElement(root, "actor")
_add_element(actor_elem, "name", actor.name)
_add_element(actor_elem, "role", actor.role)
_add_element(actor_elem, "thumb", str(actor.thumb) if actor.thumb else None)
_add_element(actor_elem, "profile", str(actor.profile) if actor.profile else None)
_add_element(actor_elem, "tmdbid", str(actor.tmdbid) if actor.tmdbid else None)
# Additional fields
_add_element(root, "trailer", str(tvshow.trailer) if tvshow.trailer else None)
_add_element(root, "watched", "true" if tvshow.watched else "false")
if tvshow.playcount is not None:
_add_element(root, "playcount", str(tvshow.playcount))
# Generate XML string
xml_str = etree.tostring(
root,
pretty_print=pretty_print,
encoding="unicode",
xml_declaration=False
)
# Add XML declaration
xml_declaration = '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n'
return xml_declaration + xml_str
def _add_element(
parent: etree.Element,
tag: str,
text: Optional[str],
always_write: bool = False,
) -> Optional[etree.Element]:
"""Add a child element to parent if text is not None or empty.
Args:
parent: Parent XML element
tag: Tag name for child element
text: Text content (None or empty strings are skipped
unless *always_write* is True)
always_write: When True the element is created even when
*text* is None/empty (the element will have
no text content). Useful for tags like
``<plot>`` that should always be present.
Returns:
Created element or None if skipped
"""
if text is not None and text != "":
elem = etree.SubElement(parent, tag)
elem.text = text
return elem
if always_write:
return etree.SubElement(parent, tag)
return None
def validate_nfo_xml(xml_string: str) -> bool:
"""Validate NFO XML structure.
Args:
xml_string: XML content to validate
Returns:
True if valid XML, False otherwise
"""
try:
etree.fromstring(xml_string.encode('utf-8'))
return True
except etree.XMLSyntaxError as e:
logger.error("Invalid NFO XML: %s", e)
return False

View File

@@ -1,234 +0,0 @@
"""TMDB to NFO model mapper.
This module converts TMDB API data to TVShowNFO Pydantic models,
keeping the mapping logic separate from the service orchestration.
Example:
>>> model = tmdb_to_nfo_model(tmdb_data, content_ratings, get_image_url, "original")
"""
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional
from src.server.nfo.nfo_models import (
ActorInfo,
ImageInfo,
NamedSeason,
RatingInfo,
TVShowNFO,
UniqueID,
)
logger = logging.getLogger(__name__)
def _extract_rating_by_country(
content_ratings: Dict[str, Any],
country_code: str,
) -> Optional[str]:
"""Extract content rating for a specific country from TMDB content ratings.
Args:
content_ratings: TMDB content ratings response dict with "results" list.
country_code: ISO 3166-1 alpha-2 country code (e.g., "DE", "US").
Returns:
Raw rating string for the requested country, or None if not found.
Example:
>>> _extract_rating_by_country({"results": [{"iso_3166_1": "US", "rating": "TV-14"}]}, "US")
'TV-14'
"""
if not content_ratings or "results" not in content_ratings:
return None
for rating in content_ratings["results"]:
if rating.get("iso_3166_1") == country_code:
return rating.get("rating") or None
return None
def _extract_fsk_rating(content_ratings: Dict[str, Any]) -> Optional[str]:
"""Extract German FSK rating from TMDB content ratings.
Delegates to :func:`_extract_rating_by_country` and then normalises the
raw TMDB string into the 'FSK XX' format expected by Kodi/Jellyfin.
Args:
content_ratings: TMDB content ratings response.
Returns:
Formatted FSK string (e.g., 'FSK 12') or None.
"""
raw = _extract_rating_by_country(content_ratings, "DE")
if raw is None:
return None
fsk_mapping: Dict[str, str] = {
"0": "FSK 0",
"6": "FSK 6",
"12": "FSK 12",
"16": "FSK 16",
"18": "FSK 18",
}
if raw in fsk_mapping:
return fsk_mapping[raw]
# Try to extract numeric part (ordered high→low to avoid partial matches)
for key in ["18", "16", "12", "6", "0"]:
if key in raw:
return fsk_mapping[key]
if raw.startswith("FSK"):
return raw
logger.debug("Unmapped German rating: %s", raw)
return None
def tmdb_to_nfo_model(
tmdb_data: Dict[str, Any],
content_ratings: Optional[Dict[str, Any]],
get_image_url: Callable[[str, str], str],
image_size: str = "original",
) -> TVShowNFO:
"""Convert TMDB API data to a fully-populated TVShowNFO model.
All required NFO tags are explicitly set in this function so that newly
created files are complete without a subsequent repair pass.
Args:
tmdb_data: TMDB TV show details (with credits, external_ids, images
appended via ``append_to_response``).
content_ratings: TMDB content ratings response, or None.
get_image_url: Callable ``(path, size) -> url`` for TMDB images.
image_size: TMDB image size parameter (e.g., ``"original"``, ``"w500"``).
Returns:
TVShowNFO Pydantic model with all available fields populated.
"""
title: str = tmdb_data["name"]
original_title: str = tmdb_data.get("original_name") or title
# --- Year and dates ---
first_air_date: Optional[str] = tmdb_data.get("first_air_date") or None
year: Optional[int] = int(first_air_date[:4]) if first_air_date else None
# --- Ratings ---
ratings: List[RatingInfo] = []
if tmdb_data.get("vote_average"):
ratings.append(RatingInfo(
name="themoviedb",
value=float(tmdb_data["vote_average"]),
votes=tmdb_data.get("vote_count", 0),
max_rating=10,
default=True,
))
# --- External IDs ---
external_ids: Dict[str, Any] = tmdb_data.get("external_ids", {})
imdb_id: Optional[str] = external_ids.get("imdb_id")
tvdb_id: Optional[int] = external_ids.get("tvdb_id")
# --- Images ---
thumb_images: List[ImageInfo] = []
fanart_images: List[ImageInfo] = []
if tmdb_data.get("poster_path"):
thumb_images.append(ImageInfo(
url=get_image_url(tmdb_data["poster_path"], image_size),
aspect="poster",
))
if tmdb_data.get("backdrop_path"):
fanart_images.append(ImageInfo(
url=get_image_url(tmdb_data["backdrop_path"], image_size),
))
logos: List[Dict[str, Any]] = tmdb_data.get("images", {}).get("logos", [])
if logos:
thumb_images.append(ImageInfo(
url=get_image_url(logos[0]["file_path"], image_size),
aspect="clearlogo",
))
# --- Cast (top 10) ---
actors: List[ActorInfo] = []
for member in tmdb_data.get("credits", {}).get("cast", [])[:10]:
actor_thumb: Optional[str] = None
if member.get("profile_path"):
actor_thumb = get_image_url(member["profile_path"], "h632")
actors.append(ActorInfo(
name=member["name"],
role=member.get("character"),
thumb=actor_thumb,
tmdbid=member["id"],
))
# --- Named seasons ---
named_seasons: List[NamedSeason] = []
for season_info in tmdb_data.get("seasons", []):
season_name = season_info.get("name")
season_number = season_info.get("season_number")
if season_name and season_number is not None:
named_seasons.append(NamedSeason(
number=season_number,
name=season_name,
))
# --- Unique IDs ---
unique_ids: List[UniqueID] = []
if tmdb_data.get("id"):
unique_ids.append(UniqueID(type="tmdb", value=str(tmdb_data["id"]), default=False))
if imdb_id:
unique_ids.append(UniqueID(type="imdb", value=imdb_id, default=False))
if tvdb_id:
unique_ids.append(UniqueID(type="tvdb", value=str(tvdb_id), default=True))
# --- Content ratings ---
fsk_rating: Optional[str] = _extract_fsk_rating(content_ratings) if content_ratings else None
mpaa_rating: Optional[str] = (
_extract_rating_by_country(content_ratings, "US") if content_ratings else None
)
# --- Country: prefer origin_country codes; fall back to production_countries names ---
country_list: List[str] = list(tmdb_data.get("origin_country", []))
if not country_list:
country_list = [c["name"] for c in tmdb_data.get("production_countries", [])]
# --- Runtime ---
runtime_list: List[int] = tmdb_data.get("episode_run_time", [])
runtime: Optional[int] = runtime_list[0] if runtime_list else None
return TVShowNFO(
title=title,
originaltitle=original_title,
showtitle=title,
sorttitle=title,
year=year,
plot=tmdb_data.get("overview") or None,
outline=tmdb_data.get("overview") or None,
tagline=tmdb_data.get("tagline") or None,
runtime=runtime,
premiered=first_air_date,
status=tmdb_data.get("status"),
genre=[g["name"] for g in tmdb_data.get("genres", [])],
studio=[n["name"] for n in tmdb_data.get("networks", [])],
country=country_list,
ratings=ratings,
fsk=fsk_rating,
mpaa=mpaa_rating,
tmdbid=tmdb_data.get("id"),
imdbid=imdb_id,
tvdbid=tvdb_id,
uniqueid=unique_ids,
thumb=thumb_images,
fanart=fanart_images,
actors=actors,
namedseason=named_seasons,
watched=False,
dateadded=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
)

View File

@@ -10,7 +10,6 @@ import structlog
from src.config.settings import settings
from src.server.database.service import AnimeSeriesService
from src.server.services.anime_service import sync_legacy_series_to_db
from src.server.services.legacy_file_migration import migrate_series_from_files_to_db
logger = structlog.get_logger(__name__)
@@ -106,26 +105,6 @@ async def _mark_initial_scan_completed() -> None:
)
async def _check_legacy_migration_status() -> bool:
"""Check if legacy key/data file migration has been completed.
Returns:
bool: True if migration was completed, False otherwise
"""
return await _check_scan_status(
check_method=lambda svc, db: svc.is_migration_legacy_files_completed(db),
scan_type="legacy_migration",
log_completed_msg="Legacy file migration already completed, skipping",
log_not_completed_msg="Legacy file migration not yet run, will check for files"
)
async def _mark_legacy_migration_completed() -> None:
"""Mark the legacy file migration as completed in system settings."""
await _mark_scan_completed(
mark_method=lambda svc, db: svc.mark_migration_legacy_files_completed(db),
scan_type="legacy_migration"
)
async def _check_legacy_key_cleanup_status() -> bool:
@@ -150,33 +129,6 @@ async def _mark_legacy_key_cleanup_completed() -> None:
)
async def _migrate_legacy_files() -> int:
"""Migrate series from legacy key/data files to database.
Returns:
int: Number of series migrated
"""
from src.server.database.connection import get_db_session
logger.info("Checking for legacy key/data files to migrate...")
try:
async with get_db_session() as db:
migrated_count = await migrate_series_from_files_to_db(
settings.anime_directory,
db
)
if migrated_count > 0:
logger.info("Migrated %d series from legacy files", migrated_count)
else:
logger.info("No series found in legacy files to migrate")
return migrated_count
except Exception as e:
logger.warning("Failed to migrate legacy files: %s", e)
return 0
async def _cleanup_legacy_key_files() -> int:
@@ -487,12 +439,6 @@ async def perform_initial_setup(progress_service=None):
if folder_scan_count > 0:
logger.info("Created %d series from anime folders", folder_scan_count)
# First, run legacy file migration if needed (independent of initial scan)
is_legacy_migration_done = await _check_legacy_migration_status()
if not is_legacy_migration_done:
await _migrate_legacy_files()
await _mark_legacy_migration_completed()
# Sync series from anime folders to database
await _sync_anime_folders(progress_service)

View File

@@ -1,233 +0,0 @@
"""One-time migration service for legacy key and data files.
This module provides functionality to migrate series data from legacy
file-based storage (key/data files) to the database. The migration is
designed to be idempotent and run only once per environment.
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Optional
import structlog
from sqlalchemy.ext.asyncio import AsyncSession
logger = structlog.get_logger(__name__)
async def migrate_series_from_files_to_db(
anime_dir: str,
db: AsyncSession,
) -> int:
"""Migrate series from legacy key/data files to database.
Scans for folders containing legacy 'key' or 'data' files and imports
any series not already in the database. The DB version wins if a series
exists in both places.
Args:
anime_dir: Path to the anime directory
db: Database session
Returns:
Number of series imported
"""
from src.server.database.service import AnimeSeriesService, EpisodeService
if not anime_dir or not os.path.isdir(anime_dir):
logger.warning(
"Anime directory does not exist, skipping legacy migration",
anime_dir=anime_dir
)
return 0
migrated_count = 0
scanned_count = 0
try:
for folder_name in os.listdir(anime_dir):
folder_path = os.path.join(anime_dir, folder_name)
if not os.path.isdir(folder_path):
continue
scanned_count += 1
# Check for 'key' file (single line with series key)
key_file = os.path.join(folder_path, "key")
# Check for 'data' file (JSON with series metadata)
data_file = os.path.join(folder_path, "data")
series_data: Optional[dict] = None
# Try to load from 'data' file first (more complete)
if os.path.isfile(data_file):
series_data = _load_data_file(data_file)
elif os.path.isfile(key_file):
# Fall back to 'key' file - just the key, need to infer other data
series_data = _load_key_file(key_file, folder_name)
if series_data is None:
continue
key = series_data.get("key")
if not key:
logger.warning(
"Skipping folder with no valid key",
folder=folder_name
)
continue
# Check if already in DB
existing = await AnimeSeriesService.get_by_key(db, key)
if existing:
logger.debug(
"Series already in database, skipping",
key=key,
folder=folder_name
)
continue
# Create the series in DB
try:
name = series_data.get("name") or folder_name
site = series_data.get("site", "https://aniworld.to")
folder = series_data.get("folder", folder_name)
year = series_data.get("year")
anime_series = await AnimeSeriesService.create(
db=db,
key=key,
name=name,
site=site,
folder=folder,
year=year,
)
# Create episodes if present
episode_dict = series_data.get("episodeDict", {})
if episode_dict:
for season, episode_numbers in episode_dict.items():
for episode_number in episode_numbers:
await EpisodeService.create(
db=db,
series_id=anime_series.id,
season=season,
episode_number=episode_number,
)
migrated_count += 1
logger.info(
"Migrated series from legacy file",
key=key,
name=name,
folder=folder_name
)
except Exception as e:
logger.warning(
"Failed to migrate series from legacy file",
key=key,
folder=folder_name,
error=str(e)
)
except Exception as e:
logger.error(
"Legacy migration failed",
anime_dir=anime_dir,
error=str(e),
exc_info=True
)
logger.info(
"Legacy file migration complete",
scanned_folders=scanned_count,
migrated=migrated_count
)
return migrated_count
def _load_data_file(data_file_path: str) -> Optional[dict]:
"""Load and parse a legacy 'data' file (JSON).
Args:
data_file_path: Path to the data file
Returns:
Parsed data dict or None if parsing fails
"""
try:
with open(data_file_path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
logger.warning(
"Data file is not a dictionary",
file=data_file_path
)
return None
# Ensure episodeDict has int keys
if "episodeDict" in data and isinstance(data["episodeDict"], dict):
data["episodeDict"] = {
int(k): v for k, v in data["episodeDict"].items()
}
return data
except json.JSONDecodeError as e:
logger.warning(
"Failed to parse legacy data file (JSON error)",
file=data_file_path,
error=str(e)
)
return None
except Exception as e:
logger.warning(
"Failed to read legacy data file",
file=data_file_path,
error=str(e)
)
return None
def _load_key_file(key_file_path: str, folder_name: str) -> Optional[dict]:
"""Load a legacy 'key' file (single line with series key).
Args:
key_file_path: Path to the key file
folder_name: Folder name to use as fallback name
Returns:
Data dict with key and inferred fields, or None if loading fails
"""
try:
with open(key_file_path, "r", encoding="utf-8") as f:
key = f.read().strip()
if not key:
logger.warning(
"Key file is empty",
file=key_file_path
)
return None
# Infer basic data from key file
return {
"key": key,
"name": folder_name,
"site": "https://aniworld.to",
"folder": folder_name,
"episodeDict": {},
}
except Exception as e:
logger.warning(
"Failed to read legacy key file",
file=key_file_path,
error=str(e)
)
return None

View File

@@ -1,424 +0,0 @@
"""TMDB API client for fetching TV show metadata.
This module provides an async client for The Movie Database (TMDB) API,
adapted from the scraper project to fit the AniworldMain architecture.
Example:
>>> async with TMDBClient(api_key="your_key") as client:
... results = await client.search_tv_show("Attack on Titan")
... show_id = results["results"][0]["id"]
... details = await client.get_tv_show_details(show_id)
"""
import asyncio
import logging
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
import aiohttp
logger = logging.getLogger(__name__)
class TMDBAPIError(Exception):
"""Exception raised for TMDB API errors."""
pass
class TMDBClient:
"""Async TMDB API client for TV show metadata.
Attributes:
api_key: TMDB API key for authentication
base_url: Base URL for TMDB API
image_base_url: Base URL for TMDB images
max_connections: Maximum concurrent connections
session: aiohttp ClientSession for requests
"""
DEFAULT_BASE_URL = "https://api.themoviedb.org/3"
DEFAULT_IMAGE_BASE_URL = "https://image.tmdb.org/t/p"
NEGATIVE_CACHE_TTL = 86400 # 24 hours
def __init__(
self,
api_key: str,
base_url: str = DEFAULT_BASE_URL,
image_base_url: str = DEFAULT_IMAGE_BASE_URL,
max_connections: int = 10
):
"""Initialize TMDB client.
Args:
api_key: TMDB API key
base_url: TMDB API base URL
image_base_url: TMDB image base URL
max_connections: Maximum concurrent connections
"""
if not api_key:
raise ValueError("TMDB API key is required")
self.api_key = api_key
self.base_url = base_url.rstrip('/')
self.image_base_url = image_base_url.rstrip('/')
self.max_connections = max_connections
self.session: Optional[aiohttp.ClientSession] = None
self._cache: Dict[str, Any] = {}
self._negative_cache: Dict[str, float] = {} # query -> timestamp when cached
# TMDB allows ~40 req/s; use 30 concurrent + per-second throttle to stay safe
self._semaphore = asyncio.Semaphore(30)
self._rate_limit_lock = asyncio.Lock()
self._request_timestamps: List[float] = []
self._max_requests_per_second = 35 # Stay under TMDB's ~40/s limit
async def __aenter__(self):
"""Async context manager entry."""
await self._ensure_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()
async def _ensure_session(self):
"""Ensure aiohttp session is created."""
if self.session is None or self.session.closed:
connector = aiohttp.TCPConnector(limit=self.max_connections)
self.session = aiohttp.ClientSession(connector=connector)
async def _request(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
max_retries: int = 5
) -> Dict[str, Any]:
"""Make an async request to TMDB API with retries.
Args:
endpoint: API endpoint (e.g., 'search/tv')
params: Query parameters
max_retries: Maximum retry attempts
Returns:
API response as dictionary
Raises:
TMDBAPIError: If request fails after retries
"""
await self._ensure_session()
url = f"{self.base_url}/{endpoint}"
params = params or {}
params["api_key"] = self.api_key
# Cache key for deduplication
cache_key = f"{endpoint}:{str(sorted(params.items()))}"
if cache_key in self._cache:
logger.debug("Cache hit for %s", endpoint)
return self._cache[cache_key]
# Check negative cache (cached empty results)
negative_cache_key = f"{endpoint}:{str(sorted(params.items()))}"
if negative_cache_key in self._negative_cache:
if time.monotonic() - self._negative_cache[negative_cache_key] < self.NEGATIVE_CACHE_TTL:
logger.debug("Negative cache hit for %s (cached empty result)", endpoint)
return {"results": []}
else:
# Expired negative cache entry
del self._negative_cache[negative_cache_key]
delay = 1
last_error = None
# Rate limiting: ensure we don't exceed ~35 requests/second
async with self._rate_limit_lock:
now = time.monotonic()
# Remove timestamps older than 1 second
self._request_timestamps = [
ts for ts in self._request_timestamps if now - ts < 1.0
]
if len(self._request_timestamps) >= self._max_requests_per_second:
sleep_time = 1.0 - (now - self._request_timestamps[0])
if sleep_time > 0:
logger.debug("Rate throttling: waiting %.2fs", sleep_time)
await asyncio.sleep(sleep_time)
self._request_timestamps.append(time.monotonic())
async with self._semaphore:
for attempt in range(max_retries):
try:
# Re-ensure session before each attempt in case it was closed
await self._ensure_session()
if self.session is None:
raise TMDBAPIError("Session is not available")
logger.debug("TMDB API request: %s (attempt %s)", endpoint, attempt + 1)
async with self.session.get(url, params=params, timeout=aiohttp.ClientTimeout(total=60)) as resp:
if resp.status == 401:
raise TMDBAPIError("Invalid TMDB API key")
elif resp.status == 404:
raise TMDBAPIError(f"Resource not found: {endpoint}")
elif resp.status == 429:
# Rate limit - wait longer with exponential backoff
retry_after = int(resp.headers.get('Retry-After', max(delay * 2, 2)))
logger.warning("Rate limited, waiting %ss", retry_after)
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
data = await resp.json()
self._cache[cache_key] = data
# Cache negative result if empty
if endpoint.startswith("search/") and not data.get("results"):
self._negative_cache[negative_cache_key] = time.monotonic()
logger.debug("Cached negative result for %s", endpoint)
return data
except asyncio.TimeoutError as e:
last_error = e
if attempt < max_retries - 1:
logger.warning("Request timeout (attempt %s), retrying in %ss", attempt + 1, delay)
await asyncio.sleep(delay)
delay *= 2
else:
logger.error("Request timed out after %s attempts", max_retries)
except (aiohttp.ClientError, AttributeError) as e:
last_error = e
# If connector/session was closed, try to recreate it
if "Connector is closed" in str(e) or isinstance(e, AttributeError):
logger.warning(
"Session issue detected, recreating session: %s",
e,
exc_info=True,
)
self.session = None
await self._ensure_session()
# DNS / host-unreachable errors are not transient — abort immediately
error_str = str(e)
if "name resolution" in error_str.lower() or (
isinstance(e, aiohttp.ClientConnectorError) and
"Cannot connect to host" in error_str
):
logger.error("Non-transient connection error, aborting retries: %s", e)
raise TMDBAPIError(f"Request failed after {attempt + 1} attempts: {e}") from e
if attempt < max_retries - 1:
logger.warning("Request failed (attempt %s): %s, retrying in %ss", attempt + 1, e, delay)
await asyncio.sleep(delay)
delay *= 2
else:
logger.error("Request failed after %s attempts: %s", max_retries, e)
raise TMDBAPIError(f"Request failed after {max_retries} attempts: {last_error}")
async def search_tv_show(
self,
query: str,
language: str = "de-DE",
page: int = 1
) -> Dict[str, Any]:
"""Search for TV shows by name.
Args:
query: Search query (show name)
language: Language for results (default: German)
page: Page number for pagination
Returns:
Search results with list of shows
Example:
>>> results = await client.search_tv_show("Attack on Titan")
>>> shows = results["results"]
"""
return await self._request(
"search/tv",
{"query": query, "language": language, "page": page}
)
async def search_multi(
self,
query: str,
language: str = "en-US",
page: int = 1
) -> Dict[str, Any]:
"""Search for movies and TV shows by name using TMDB multi search.
Multi search returns both movies and TV shows, useful for anime
that might be indexed as movies on TMDB.
Args:
query: Search query (show name)
language: Language for results (default: English)
page: Page number for pagination
Returns:
Search results with list of movies and TV shows
Example:
>>> results = await client.search_multi("Suzume no Tojimari")
>>> shows = [r for r in results["results"] if r["media_type"] == "tv"]
"""
return await self._request(
"search/multi",
{"query": query, "language": language, "page": page}
)
async def get_tv_show_details(
self,
tv_id: int,
language: str = "de-DE",
append_to_response: Optional[str] = None
) -> Dict[str, Any]:
"""Get detailed information about a TV show.
Args:
tv_id: TMDB TV show ID
language: Language for metadata
append_to_response: Additional data to include (e.g., "credits,images")
Returns:
TV show details including metadata, cast, etc.
"""
params = {"language": language}
if append_to_response:
params["append_to_response"] = append_to_response
return await self._request(f"tv/{tv_id}", params)
async def get_tv_show_content_ratings(self, tv_id: int) -> Dict[str, Any]:
"""Get content ratings for a TV show.
Args:
tv_id: TMDB TV show ID
Returns:
Content ratings by country
"""
return await self._request(f"tv/{tv_id}/content_ratings")
async def get_tv_show_external_ids(self, tv_id: int) -> Dict[str, Any]:
"""Get external IDs (IMDB, TVDB) for a TV show.
Args:
tv_id: TMDB TV show ID
Returns:
Dictionary with external IDs (imdb_id, tvdb_id, etc.)
"""
return await self._request(f"tv/{tv_id}/external_ids")
async def get_tv_show_images(
self,
tv_id: int,
language: Optional[str] = None
) -> Dict[str, Any]:
"""Get images (posters, backdrops, logos) for a TV show.
Args:
tv_id: TMDB TV show ID
language: Language filter for images (None = all languages)
Returns:
Dictionary with poster, backdrop, and logo lists
"""
params = {}
if language:
params["language"] = language
return await self._request(f"tv/{tv_id}/images", params)
async def download_image(
self,
image_path: str,
local_path: Path,
size: str = "original"
) -> None:
"""Download an image from TMDB.
Args:
image_path: Image path from TMDB API (e.g., "/abc123.jpg")
local_path: Local file path to save image
size: Image size (w500, original, etc.)
Raises:
TMDBAPIError: If download fails
"""
await self._ensure_session()
url = f"{self.image_base_url}/{size}{image_path}"
try:
logger.debug("Downloading image from %s", url)
async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=60)) as resp:
resp.raise_for_status()
# Ensure parent directory exists
local_path.parent.mkdir(parents=True, exist_ok=True)
# Write image data
with open(local_path, "wb") as f:
f.write(await resp.read())
logger.info("Downloaded image to %s", local_path)
except aiohttp.ClientError as e:
raise TMDBAPIError(f"Failed to download image: {e}")
def get_image_url(self, image_path: str, size: str = "original") -> str:
"""Get full URL for an image.
Args:
image_path: Image path from TMDB API
size: Image size (w500, original, etc.)
Returns:
Full image URL
"""
return f"{self.image_base_url}/{size}{image_path}"
async def close(self):
"""Close the aiohttp session and clean up resources."""
if self.session and not self.session.closed:
await self.session.close()
self.session = None
logger.debug("TMDB client session closed")
def __del__(self):
"""Warn if session is unclosed during garbage collection."""
if self.session is not None and not self.session.closed:
logger.warning(
"TMDBClient: unclosed session detected. "
"Use 'async with TMDBClient(...)' or call close() explicitly."
)
def clear_cache(self):
"""Clear the request cache."""
self._cache.clear()
logger.debug("TMDB client cache cleared")
def clear_negative_cache(self):
"""Clear the negative result cache."""
self._negative_cache.clear()
logger.debug("TMDB negative cache cleared")
def cleanup_expired_negative_cache(self) -> int:
"""Remove expired entries from negative cache.
Returns:
Number of entries removed
"""
now = time.monotonic()
expired_keys = [
key for key, timestamp in self._negative_cache.items()
if now - timestamp >= self.NEGATIVE_CACHE_TTL
]
for key in expired_keys:
del self._negative_cache[key]
if expired_keys:
logger.debug("Removed %d expired negative cache entries", len(expired_keys))
return len(expired_keys)

View File

@@ -1,335 +0,0 @@
"""Integration tests for legacy key/data file migration.
Tests the one-time migration safety net that imports series from
legacy key and data files into the database.
"""
import json
import os
import tempfile
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.server.services.legacy_file_migration import (
_load_data_file,
_load_key_file,
migrate_series_from_files_to_db,
)
class TestLoadLegacyFiles:
"""Test helper functions for loading legacy files."""
def test_load_data_file_valid_json(self):
"""Test loading a valid JSON data file."""
with tempfile.TemporaryDirectory() as tmp_dir:
data_file = os.path.join(tmp_dir, "data")
test_data = {
"key": "test-anime",
"name": "Test Anime",
"site": "https://aniworld.to",
"folder": "Test Anime",
"episodeDict": {"1": [1, 2, 3]}
}
with open(data_file, "w", encoding="utf-8") as f:
json.dump(test_data, f)
result = _load_data_file(data_file)
assert result is not None
assert result["key"] == "test-anime"
assert result["name"] == "Test Anime"
# episodeDict keys should be converted to int
assert 1 in result["episodeDict"]
def test_load_data_file_invalid_json(self):
"""Test handling of corrupt JSON data file."""
with tempfile.TemporaryDirectory() as tmp_dir:
data_file = os.path.join(tmp_dir, "data")
with open(data_file, "w", encoding="utf-8") as f:
f.write("this is not valid json {{{")
result = _load_data_file(data_file)
assert result is None
def test_load_data_file_not_dict(self):
"""Test handling of JSON file that is not a dict."""
with tempfile.TemporaryDirectory() as tmp_dir:
data_file = os.path.join(tmp_dir, "data")
with open(data_file, "w", encoding="utf-8") as f:
json.dump(["not", "a", "dict"], f)
result = _load_data_file(data_file)
assert result is None
def test_load_key_file_valid(self):
"""Test loading a key file with valid content."""
with tempfile.TemporaryDirectory() as tmp_dir:
key_file = os.path.join(tmp_dir, "key")
with open(key_file, "w", encoding="utf-8") as f:
f.write("my-anime-key")
result = _load_key_file(key_file, "My Anime")
assert result is not None
assert result["key"] == "my-anime-key"
assert result["name"] == "My Anime"
assert result["site"] == "https://aniworld.to"
assert result["episodeDict"] == {}
def test_load_key_file_empty(self):
"""Test handling of empty key file."""
with tempfile.TemporaryDirectory() as tmp_dir:
key_file = os.path.join(tmp_dir, "key")
with open(key_file, "w", encoding="utf-8") as f:
f.write("")
result = _load_key_file(key_file, "My Anime")
assert result is None
class TestMigrateLegacyFiles:
"""Test the main migration function with database."""
@pytest.mark.asyncio
async def test_migrate_series_from_files_to_db_no_files(self):
"""Test migration with empty directory returns 0."""
mock_db = AsyncMock()
mock_db.execute = AsyncMock()
with tempfile.TemporaryDirectory() as tmp_dir:
count = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count == 0
@pytest.mark.asyncio
async def test_migrate_data_file_to_db(self):
"""Test migration of a legacy data file."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Create a folder with a data file
anime_folder = os.path.join(tmp_dir, "Test Anime")
os.makedirs(anime_folder, exist_ok=True)
data_file = os.path.join(anime_folder, "data")
test_data = {
"key": "migrate-test-anime",
"name": "Migrate Test Anime",
"site": "https://aniworld.to",
"folder": "Test Anime",
"episodeDict": {"1": [1, 2]}
}
with open(data_file, "w", encoding="utf-8") as f:
json.dump(test_data, f)
# Mock the DB session and services
mock_db = AsyncMock()
mock_series_service = AsyncMock()
mock_episode_service = AsyncMock()
# Mock get_by_key returning None (not in DB)
mock_series_service.get_by_key = AsyncMock(return_value=None)
# Mock AnimeSeriesService.create returning a mock with id=1
mock_created_series = MagicMock()
mock_created_series.id = 1
mock_series_service.create = AsyncMock(return_value=mock_created_series)
with patch.dict('sys.modules', {
'src.server.database.service': MagicMock(
AnimeSeriesService=mock_series_service,
EpisodeService=mock_episode_service
)
}):
count = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count == 1
@pytest.mark.asyncio
async def test_migrate_key_file_to_db(self):
"""Test migration of a legacy key file."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Create a folder with only a key file
anime_folder = os.path.join(tmp_dir, "Key Only Anime")
os.makedirs(anime_folder, exist_ok=True)
key_file = os.path.join(anime_folder, "key")
with open(key_file, "w", encoding="utf-8") as f:
f.write("key-only-anime")
# Mock the DB session and services
mock_db = AsyncMock()
mock_series_service = AsyncMock()
mock_episode_service = AsyncMock()
# Mock get_by_key returning None (not in DB)
mock_series_service.get_by_key = AsyncMock(return_value=None)
# Mock AnimeSeriesService.create returning a mock with id=1
mock_created_series = MagicMock()
mock_created_series.id = 1
mock_series_service.create = AsyncMock(return_value=mock_created_series)
with patch.dict('sys.modules', {
'src.server.database.service': MagicMock(
AnimeSeriesService=mock_series_service,
EpisodeService=mock_episode_service
)
}):
count = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count == 1
@pytest.mark.asyncio
async def test_migration_skips_already_migrated(self):
"""Test that migration skips series already in DB."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Create a folder with a data file
anime_folder = os.path.join(tmp_dir, "Already Migrated")
os.makedirs(anime_folder, exist_ok=True)
data_file = os.path.join(anime_folder, "data")
test_data = {
"key": "already-migrated",
"name": "Already Migrated",
"site": "https://aniworld.to",
"folder": "Already Migrated",
"episodeDict": {"1": [1]}
}
with open(data_file, "w", encoding="utf-8") as f:
json.dump(test_data, f)
# Mock the DB session and services
mock_db = AsyncMock()
mock_series_service = AsyncMock()
mock_episode_service = AsyncMock()
# Mock get_by_key returning existing series (already migrated)
mock_existing_series = MagicMock()
mock_existing_series.name = "Modified Name"
mock_series_service.get_by_key = AsyncMock(return_value=mock_existing_series)
with patch.dict('sys.modules', {
'src.server.database.service': MagicMock(
AnimeSeriesService=mock_series_service,
EpisodeService=mock_episode_service
)
}):
count = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count == 0 # No new series migrated
@pytest.mark.asyncio
async def test_migration_handles_corrupt_data_file(self):
"""Test that corrupt data files don't crash migration."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Create a folder with a corrupt data file
corrupt_folder = os.path.join(tmp_dir, "Corrupt Anime")
os.makedirs(corrupt_folder, exist_ok=True)
corrupt_file = os.path.join(corrupt_folder, "data")
with open(corrupt_file, "w", encoding="utf-8") as f:
f.write("not valid json {{{")
# Create a valid folder
valid_folder = os.path.join(tmp_dir, "Valid Anime")
os.makedirs(valid_folder, exist_ok=True)
valid_file = os.path.join(valid_folder, "data")
valid_data = {
"key": "valid-anime",
"name": "Valid Anime",
"site": "https://aniworld.to",
"folder": "Valid Anime",
"episodeDict": {"1": [1]}
}
with open(valid_file, "w", encoding="utf-8") as f:
json.dump(valid_data, f)
# Mock the DB session and services
mock_db = AsyncMock()
mock_series_service = AsyncMock()
mock_episode_service = AsyncMock()
# Mock get_by_key returning None (not in DB)
mock_series_service.get_by_key = AsyncMock(return_value=None)
# Mock AnimeSeriesService.create returning a mock with id=1
mock_created_series = MagicMock()
mock_created_series.id = 1
mock_series_service.create = AsyncMock(return_value=mock_created_series)
with patch.dict('sys.modules', {
'src.server.database.service': MagicMock(
AnimeSeriesService=mock_series_service,
EpisodeService=mock_episode_service
)
}):
# Migration should succeed despite corrupt file
count = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count == 1 # Only the valid one
@pytest.mark.asyncio
async def test_migration_idempotent(self):
"""Test that running migration twice doesn't change DB state."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Create a folder with a data file
anime_folder = os.path.join(tmp_dir, "Idempotent Test")
os.makedirs(anime_folder, exist_ok=True)
data_file = os.path.join(anime_folder, "data")
test_data = {
"key": "idempotent-test",
"name": "Idempotent Test",
"site": "https://aniworld.to",
"folder": "Idempotent Test",
"episodeDict": {"1": [1, 2]}
}
with open(data_file, "w", encoding="utf-8") as f:
json.dump(test_data, f)
# Mock the DB session and services
mock_db = AsyncMock()
mock_series_service = AsyncMock()
mock_episode_service = AsyncMock()
# First call returns None (not in DB), second call returns the series
mock_existing_series = MagicMock()
mock_existing_series.id = 1
mock_series_service.get_by_key = AsyncMock(side_effect=[None, mock_existing_series])
# Mock AnimeSeriesService.create returning a mock with id=1
mock_created_series = MagicMock()
mock_created_series.id = 1
mock_series_service.create = AsyncMock(return_value=mock_created_series)
with patch.dict('sys.modules', {
'src.server.database.service': MagicMock(
AnimeSeriesService=mock_series_service,
EpisodeService=mock_episode_service
)
}):
# First migration
count1 = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count1 == 1
# Second migration
count2 = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count2 == 0 # Already migrated
@pytest.mark.asyncio
async def test_migration_skips_folders_without_files(self):
"""Test that folders without key/data files are skipped."""
with tempfile.TemporaryDirectory() as tmp_dir:
# Create an empty folder (no key or data file)
empty_folder = os.path.join(tmp_dir, "Empty Folder")
os.makedirs(empty_folder, exist_ok=True)
# Create a folder with only a video file
video_folder = os.path.join(tmp_dir, "Video Folder")
os.makedirs(video_folder, exist_ok=True)
with open(os.path.join(video_folder, "episode1.mp4"), "w") as f:
f.write("fake video content")
mock_db = AsyncMock()
count = await migrate_series_from_files_to_db(tmp_dir, mock_db)
assert count == 0