"""Key resolution service for orphaned anime folders. Attempts to resolve provider keys for anime folders that have no key/data file and no database entry, by searching the anime provider and matching folder names to search results. This service runs after nfo_repair_service during the daily folder scan. """ from __future__ import annotations import asyncio import re from pathlib import Path from typing import Optional import structlog from src.config.settings import settings as _settings logger = structlog.get_logger(__name__) # Limit concurrent provider searches to avoid rate-limiting. _SEARCH_SEMAPHORE: asyncio.Semaphore = asyncio.Semaphore(2) def _strip_year_from_folder(folder_name: str) -> str: """Remove trailing year suffix like ' (2020)' from folder name. Args: folder_name: Folder name, e.g. 'Rent-A-Girlfriend (2020)' Returns: Name without year, e.g. 'Rent-A-Girlfriend' """ return re.sub(r"\s*\(\d{4}\)\s*$", "", folder_name).strip() def _extract_year_from_folder(folder_name: str) -> Optional[int]: """Extract year from folder name like 'Anime Name (2020)'. Returns: Year as int or None if not present. """ match = re.search(r"\((\d{4})\)$", folder_name.strip()) if match: return int(match.group(1)) return None def _extract_key_from_link(link: str) -> Optional[str]: """Extract provider key from search result link. Args: link: Link like '/anime/stream/rent-a-girlfriend' or full URL. Returns: Key slug like 'rent-a-girlfriend' or None. """ if not link: return None if "/anime/stream/" in link: parts = link.split("/anime/stream/")[-1].split("/") key = parts[0].strip() return key if key else None # If link is just a slug if "/" not in link and link.strip(): return link.strip() return None def _normalize_for_comparison(text: str) -> str: """Normalize text for case-insensitive comparison. Strips whitespace, lowercases, and removes common punctuation differences that shouldn't affect matching. Args: text: Raw text string. Returns: Normalized lowercase string. """ normalized = text.strip().lower() # Remove common punctuation that varies between sources normalized = re.sub(r"[:\-–—]", " ", normalized) # Collapse multiple spaces normalized = re.sub(r"\s+", " ", normalized) return normalized.strip() async def resolve_key_for_folder(folder_name: str) -> Optional[str]: """Attempt to resolve the provider key for a single folder. Strategy: 1. Strip year suffix from folder name to get search query. 2. Search the anime provider with that query. 3. If exactly ONE result matches the folder name (case-insensitive), return the key extracted from the result link. 4. If zero or multiple matches, return None (not confident enough). Args: folder_name: The anime folder name, e.g. 'Rent-A-Girlfriend (2020)'. Returns: The provider key string, or None if resolution is not confident. """ search_query = _strip_year_from_folder(folder_name) if not search_query: logger.debug("Empty search query after stripping year from '%s'", folder_name) return None async with _SEARCH_SEMAPHORE: try: loop = asyncio.get_running_loop() results = await loop.run_in_executor(None, _search_provider, search_query) except Exception as exc: logger.warning( "Provider search failed for '%s': %s", search_query, exc ) return None if not results: logger.debug("No search results for folder '%s'", folder_name) return None # Filter results: find exact name matches (case-insensitive) normalized_query = _normalize_for_comparison(search_query) exact_matches = [] for result in results: title = result.get("title") or result.get("name") or "" normalized_title = _normalize_for_comparison(title) if normalized_title == normalized_query: key = _extract_key_from_link(result.get("link", "")) if key: exact_matches.append((key, title)) if len(exact_matches) == 1: resolved_key, matched_title = exact_matches[0] logger.info( "Resolved key for folder '%s': key='%s' (matched title: '%s')", folder_name, resolved_key, matched_title, ) return resolved_key if len(exact_matches) > 1: logger.info( "Multiple exact matches for folder '%s' (%d matches), skipping", folder_name, len(exact_matches), ) else: logger.debug( "No exact title match for folder '%s' in %d results", folder_name, len(results), ) return None def _search_provider(query: str) -> list: """Call the anime provider search synchronously. Args: query: Search term. Returns: List of search result dicts with 'link' and 'title'/'name' fields. """ from src.core.providers.provider_factory import Loaders loader = Loaders().GetLoader("aniworld.to") return loader.search(query) async def perform_key_resolution_scan() -> dict[str, int]: """Scan all anime folders and resolve missing keys. Iterates over all subfolders of the anime directory. For each folder that has no corresponding database entry, attempts to resolve the provider key via provider search and saves it to the database. Returns: Dictionary with counts: - 'scanned': total folders checked - 'resolved': keys successfully resolved and saved - 'skipped': folders already in DB or resolution uncertain - 'errors': folders that caused errors during resolution """ from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService stats = {"scanned": 0, "resolved": 0, "skipped": 0, "errors": 0} if not _settings.anime_directory: logger.warning("Key resolution scan skipped — anime directory not configured") return stats anime_dir = Path(_settings.anime_directory) if not anime_dir.is_dir(): logger.warning( "Key resolution scan skipped — anime directory not found: %s", anime_dir, ) return stats # Collect folders that need resolution folders_to_resolve: list[str] = [] async with get_db_session() as db: for series_dir in sorted(anime_dir.iterdir()): if not series_dir.is_dir(): continue folder_name = series_dir.name stats["scanned"] += 1 # Check if already in database existing = await AnimeSeriesService.get_by_folder(db, folder_name) if existing: stats["skipped"] += 1 continue folders_to_resolve.append(folder_name) if not folders_to_resolve: logger.info("Key resolution scan: all folders already have DB entries") return stats logger.info( "Key resolution scan: %d folders need resolution", len(folders_to_resolve) ) # Resolve keys one by one (provider search is rate-limited) for folder_name in folders_to_resolve: try: key = await resolve_key_for_folder(folder_name) if key: # Save to database await _save_resolved_key(folder_name, key) stats["resolved"] += 1 else: stats["skipped"] += 1 except Exception as exc: logger.error( "Error resolving key for folder '%s': %s", folder_name, exc, ) stats["errors"] += 1 logger.info( "Key resolution scan complete: scanned=%d, resolved=%d, skipped=%d, errors=%d", stats["scanned"], stats["resolved"], stats["skipped"], stats["errors"], ) return stats async def _save_resolved_key(folder_name: str, key: str) -> None: """Save a resolved key to the database. Creates a new AnimeSeries entry with the resolved key and folder name. Does NOT write any key/data file to disk. Args: folder_name: The anime folder name (e.g. 'Rent-A-Girlfriend (2020)'). key: The resolved provider key (e.g. 'rent-a-girlfriend'). """ from src.server.database.connection import get_db_session from src.server.database.service import AnimeSeriesService name = _strip_year_from_folder(folder_name) year = _extract_year_from_folder(folder_name) async with get_db_session() as db: # Double-check: another task might have resolved it concurrently existing = await AnimeSeriesService.get_by_folder(db, folder_name) if existing: logger.debug( "Folder '%s' already in DB (resolved concurrently), skipping", folder_name, ) return # Also check if a series with this key already exists existing_key = await AnimeSeriesService.get_by_key(db, key) if existing_key: logger.warning( "Key '%s' already exists in DB for folder '%s', " "cannot assign to folder '%s'", key, existing_key.folder, folder_name, ) return await AnimeSeriesService.create( db, key=key, name=name, site="aniworld.to", folder=folder_name, year=year, loading_status="pending", episodes_loaded=False, ) logger.info( "Saved resolved key '%s' for folder '%s' to database", key, folder_name, )