Compare commits

...

22 Commits

Author SHA1 Message Date
76b849fc91 chore: bump version 2026-05-30 12:02:48 +02:00
00b26c8cbc fix: validate generated keys before creating Serie objects
- Add is_valid_key check in SerieScanner._read_data_from_file() to prevent
  passing invalid keys to Serie constructor (caused ValueError)
- Improve error message for key generation failures
- Add warning log before removing duplicate source folders in rename service
2026-05-30 11:42:19 +02:00
a6f2399aca chore: bump version 2026-05-29 19:25:30 +02:00
cf001563b3 refactor: add folder rename configuration and service
Add configurable folder rename patterns via settings with anime_folder_rename_regex and custom_pattern options. Integrate into SerieScanner and SeriesApp for consistent episode organization.
2026-05-29 19:24:09 +02:00
38c12638a4 fix HLS stream warning by disabling native downloader and retrying with ffmpeg
- Set hls_prefer_native: False to skip yt-dlp's native HLS downloader which emits
  'Live HLS streams are not supported' warning
- Add retry logic that catches HLS-related exceptions and retries with
  downloader=ffmpeg and hls_use_mpegts=True
2026-05-29 18:53:47 +02:00
765e43c684 fix(key_utils): drop apostrophes in generate_key_from_folder 2026-05-29 18:20:20 +02:00
5190d32665 chore: bump version 2026-05-28 22:03:52 +02:00
4e6afa31b5 Remove legacy key file support after DB migration
- SerieScanner: Remove key file fallback, keep data file fallback
- SystemSettings: Add legacy_key_cleanup_completed flag
- initialization_service: Add cleanup task to remove key files from folders with DB entries
- Tests updated to reflect key file removal from legacy path

Key files caused duplicate key errors on folder rename. DB is now sole source of truth.
2026-05-28 22:01:37 +02:00
1ef59c5283 feat: add duplicate folder detection and /duplicate-folders API endpoint
- Add DuplicateFolderGroup and DuplicateFoldersResponse Pydantic models
- Add /duplicate-folders GET endpoint for listing pre-existing duplicates
- Add _scan_for_pre_existing_duplicates() function for NFO-based detection
- Add _try_merge_duplicate_group() for auto-merging empty/symlink-only duplicates
- Integrate duplicate detection into validate_and_rename_series_folders workflow
- Skip rename for flagged duplicates to prevent data loss during merge
2026-05-28 21:46:08 +02:00
239341629c Add orphaned folder cleanup after rename
- Add _cleanup_orphaned_folder() to delete/move old folder contents after rename
- Empty folders: delete directly via rmdir()
- Non-empty folders: move contents to new path, then delete old folder
- Handle PermissionError and OSError gracefully with logging
- Add dry_run parameter to preview changes without applying them
- Add --dry-run support to validate_and_rename_series_folders()
- Add unit tests for _cleanup_orphaned_folder and dry-run mode
- All 66 related tests pass

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-28 21:24:13 +02:00
51b7f349f8 fix(scheduler): strip null legacy alias fields from config.json on save
SchedulerConfig.__init__ maps legacy auto_download/folder_scan keys to the
primary auto_download_after_rescan/folder_scan_enabled fields. However,
model_dump() was including auto_download=null and folder_scan=null in
serialised output. When this was written to config.json and reloaded,
those keys were present (albeit null), so the alias mapping was skipped
and the primary fields retained default False values instead of the
configured True values.

Fix:
- Override SchedulerConfig.model_dump() to drop None-valued alias fields
  before returning the serialised dict.
- ConfigService.save_config() re-serialises the scheduler field through
  its overridden model_dump() so the fix applies when writing to disk.

Tests added:
- test_roundtrip_excludes_none_alias_fields: verifies model_dump omits
  null auto_download/folder_scan keys.
- test_save_and_load_scheduler_flags_roundtrip: end-to-end roundtrip
  through ConfigService confirms raw JSON and loaded values match.

Pre-existing failure in test_core_error_handler.py is unrelated.
2026-05-28 21:18:16 +02:00
14b8ef7f06 Add Step 4 fallback: generate key from folder name
- SerieScanner: generate key from folder when no key/data files exist
- Handle edge cases: non-Latin characters, special symbols in folder names
- anime_service: expose loading_status and loading_error fields
- Update tests to match new fallback behavior
2026-05-28 18:48:43 +02:00
7abba0dae2 Fix download provider errors with exponential backoff and playmogo support
- Add exponential backoff retry logic to RecoveryStrategies (1s, 2s, 4s...)
- Add TimeoutError to network failure handling for HTTPS timeouts
- Add playmogo.com referer header for Doodstream provider
- Add Optional import to error_handler.py
- Add sanitize_url_for_logging utility function

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-28 18:47:05 +02:00
30858f441c feat: add manual TMDB/TVDB ID entry for failed lookups
- Add PATCH /api/anime/{key}/metadata-ids endpoint to update IDs
- Add POST /api/anime/{key}/refresh-nfo endpoint to force NFO regeneration
- Add Edit Metadata IDs modal in frontend
- Add showEditMetadataModal, saveMetadataIds, refreshSeriesNfo JS functions
- Add edit-metadata-btn to series cards with database icon
- IDs validated as positive integers or null

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-28 18:38:34 +02:00
33f63ca304 feat(SerieScanner): add folder ignore patterns for non-anime content
- Add NFO_FOLDER_IGNORE_PATTERNS setting to skip TV shows like
  The Last of Us, Loki, Chernobyl, Star Trek Discovery
- Update SerieScanner.__find_mp4_files() to skip ignored folders
- Update SerieList.load_series() to skip ignored folders
- Add should_ignore_folder() method for pattern matching
- Add folder_ignore_patterns property for pattern parsing
- Add comprehensive tests for ignore pattern functionality
- Update NFO_GUIDE.md with ignore patterns documentation
- Update CONFIGURATION.md with new setting

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-28 18:11:45 +02:00
fe9284b80e feat(SerieScanner): add warning event for duplicate series keys
- Add on_warning event system with subscribe/unsubscribe methods
- Change duplicate key handling from error to warning
- Fire on_warning event when duplicate series detected
- Include metadata: key, duplicate_folder, existing_folder
2026-05-28 18:05:07 +02:00
12e5526991 chore: remove obsolete migration script
Migration logic moved to serie_scanner. No longer needed.
2026-05-28 17:55:00 +02:00
bc87bee416 refactor(scheduler): drop separate scheduler.db in favour of MemoryJobStore
Scheduler used a separate SQLite file (scheduler.db) only to persist one
cron job. This was originally required because APScheduler's
SQLAlchemyJobStore is sync-only, creating an async/sync driver conflict
when accessing the same file.

The job is rebuilt from config.json on every startup regardless
(replace_existing=True), so the persisted state only served misfire
detection. Moved misfire detection into the app layer by querying
system_settings.last_scan_timestamp on startup: if the last scan is
>23h but <25h ago, an immediate rescan is triggered.

Change summary:
- Remove SQLAlchemyJobStore; use default MemoryJobStore instead
- Add _check_missed_run() that reads last_scan_timestamp from aniworld.db
- Update docs/DEVELOPMENT.md scheduler troubleshooting section
- Update the scheduler unit test that verified SQLAlchemyJobStore
2026-05-27 22:09:18 +02:00
7ded5a6e4d chore: bump version 2026-05-27 21:41:45 +02:00
d596902ca3 Parse existing NFO for TMDB ID to skip redundant search
Check existing tvshow.nfo for TMDB ID before querying TMDB API.
If found, fetch details directly using cached ID instead of searching.
Reduces API calls and improves performance for already-indexed series.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-27 21:22:24 +02:00
d358a07290 fix async handling in SerieScanner and add image_downloader cleanup
- SerieScanner.scan() now detects running event loop and uses create_task()
  when already in async context, avoids RuntimeError
- NFOService.close() now also closes image_downloader to prevent resource leaks
- Add integration tests for TMDBClient lifecycle management

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-27 20:47:29 +02:00
b9c55f9e7a fix: remove double-call on AsyncSession in SerieScanner
get_async_session_factory() returns session directly, not factory.
Calling result again with () caused 'AsyncSession' object is not callable.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-27 20:19:34 +02:00
40 changed files with 3385 additions and 348 deletions

View File

@@ -1 +1 @@
v1.1.17
v1.2.2

View File

@@ -162,24 +162,37 @@ await client.close() # May not be called if exception raised earlier
### Scheduler Persistence and Recovery
APScheduler stores jobs in `data/scheduler.db` (SQLite) so they survive process restarts:
The scheduler uses APScheduler's in-memory job store. Jobs are reconstructed from `config.json` on every startup — no separate database is needed.
```python
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///./data/scheduler.db"),
}
scheduler = AsyncIOScheduler(jobstores=jobstores)
# Jobs are built from config on startup — no persistence DB required
scheduler = AsyncIOScheduler() # default MemoryJobStore
scheduler.add_job(..., replace_existing=True)
```
**Grace period:** `misfire_grace_time=3600` (1 hour). If server is down at scheduled time and restarts within 1 hour, missed job runs automatically via APScheduler coalesce behavior.
**Startup misfire recovery:** On `start()`, the scheduler checks `system_settings.last_scan_timestamp` in `aniworld.db`. If the last scan is overdue (>23h but <25h ago), an immediate rescan is triggered. This replaces APScheduler's built-in misfire handling which required a separate SQLite database.
**Startup recovery:** On `start()`, scheduler loads persisted jobs from DB. APScheduler handles missed jobs internally when `coalesce=True`.
**Grace period:** If the server was down for more than 25 hours, no automatic recovery occurs to avoid surprise rescans after long downtime.
**Health endpoint:** `GET /health` returns `scheduler_next_run` and `scheduler_last_run` for external monitors (Uptime Kuma, Prometheus, etc.).
**If server is down >1 hour:** No automatic recovery. Manual trigger via `POST /api/scheduler/trigger-rescan` or wait for next scheduled run.
**If server is down too long:** Manual trigger via `POST /api/scheduler/trigger-rescan` or wait for next scheduled run.
### Database Session Management
`get_async_session_factory()` returns a **new AsyncSession instance** directly (not a factory). The function name is historical — callers receive the session immediately:
```python
# Correct usage:
db = get_async_session_factory() # db IS the session
await db.execute(...)
await db.commit()
await db.close()
```
Do NOT call the result again with `()` — that tries to call an `AsyncSession` object, causing `'AsyncSession' object is not callable`.
For context manager usage, prefer `get_db_session()` (auto-commits) or `get_transactional_session()` (manual commit).
### Health Check Endpoints
@@ -241,30 +254,27 @@ DNS checks are warnings because failures can be transient. anime_directory error
#### Scheduler missed a run
1. Server was down at scheduled time (03:00 UTC by default).
2. Check `data/scheduler.db` exists — if not, jobs are not persisted.
3. If server was down >1 hour, missed job is dropped (misfire window exceeded).
2. On restart, the scheduler checks `last_scan_timestamp` — if overdue by 23-25h, it triggers immediately.
3. If server was down >25 hours, missed job is skipped to avoid surprise rescans.
4. Trigger manually: `POST /api/scheduler/trigger-rescan`
5. Monitor next run: `GET /health``scheduler_next_run`
6. If problem repeats, increase `misfire_grace_time` in `scheduler_service.py`.
#### Scheduler not firing (no events at scheduled time)
If the scheduler appears configured but never triggers:
1. **Verify scheduler.db contains the job:**
```bash
sqlite3 data/scheduler.db "SELECT id, next_run_time FROM apscheduler_jobs;"
```
- `next_run_time` should be in the future
- If it's in the past, the server was down when the job should have fired
2. **Check application logs for scheduler startup:**
1. **Check application logs for scheduler startup:**
```
grep "Scheduler service started" fastapi_app.log
```
- If missing, the scheduler failed to start — check for errors above this line
- If present, scheduler started successfully
2. **Verify the job is registered:**
```
grep "Scheduler started with cron trigger" fastapi_app.log
```
3. **Verify APScheduler events in logs:**
```
grep "apscheduler.executors.default" fastapi_app.log

View File

@@ -246,6 +246,7 @@ NFO files are created in the anime directory:
<genre>Action</genre>
<genre>Sci-Fi & Fantasy</genre>
<uniqueid type="tmdb">1429</uniqueid>
<tmdbid>1429</tmdbid>
<thumb aspect="poster">https://image.tmdb.org/t/p/w500/...</thumb>
<fanart>
<thumb>https://image.tmdb.org/t/p/original/...</thumb>
@@ -253,6 +254,13 @@ NFO files are created in the anime directory:
</tvshow>
```
**Manual TMDB ID Override**: To skip TMDB search and use a specific ID directly, include `<tmdbid>YOUR_ID</tmdbid>` in the NFO. This is useful when:
- TMDB search fails for your series (e.g., new or obscure anime)
- You already know the correct TMDB ID
- You want to avoid rate limiting from repeated searches
Aniworld reads `<tmdbid>` element and `<uniqueid type="tmdb">` first. If found, it uses the ID directly instead of searching.
### 4.3 Episode NFO Format
```xml
@@ -629,6 +637,36 @@ Every poster check action is logged:
4. Check network speed to TMDB servers
5. Verify disk I/O performance
### 6.7 TMDB Lookup Fails for My Series
**Problem**: TMDB search fails with "No results found" for a valid series.
**Solutions**:
1. **Check if series exists on TMDB**: Visit https://www.themoviedb.org and search for your series
2. **Use manual ID override**: Add TMDB ID directly to `tvshow.nfo`:
```xml
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<tvshow>
<title>Your Series Name</title>
<tmdbid>12345</tmdbid>
<uniqueid type="tmdb">12345</uniqueid>
</tvshow>
```
Aniworld will use this ID directly instead of searching.
3. **Try alternative titles**: Some anime have different titles (Japanese, romaji, English). If you have access to the folder, rename it to match the TMDB title.
4. **Add to existing NFO**: If `tvshow.nfo` exists but has no TMDB ID, edit it to add:
```xml
<tmdbid>YOUR_TMDB_ID</tmdbid>
```
Then use the Update endpoint to refresh metadata.
5. **Check for rate limiting**: If many lookups fail at once, you may be hitting TMDB rate limits. Wait and retry later.
6. **Verify API key**: Ensure your TMDB API key is valid and has not exceeded usage limits.
---
## 7. Best Practices

0
docs/helper Normal file
View File

View File

@@ -1,6 +1,6 @@
{
"name": "aniworld-web",
"version": "1.1.17",
"version": "1.2.2",
"description": "Aniworld Anime Download Manager - Web Frontend",
"type": "module",
"scripts": {

View File

@@ -1,135 +0,0 @@
#!/usr/bin/env python3
"""Migration script to populate year for existing series from folder names.
This script:
1. Finds all series in the database with year=NULL
2. Extracts year from their folder names using the same pattern as SerieScanner
3. Updates the database records
Usage:
python scripts/migrate_populate_year_from_folder.py [--dry-run]
"""
import argparse
import re
import sys
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlalchemy import select, update
from src.server.database.models import AnimeSeries
from src.server.database.service import DatabaseSession
def extract_year_from_folder_name(folder_name: str) -> int | None:
"""Extract year from folder name if present.
Same logic as SerieScanner._extract_year_from_folder_name.
Args:
folder_name: The folder name to check
Returns:
int or None: Year if found, None otherwise
"""
if not folder_name:
return None
# Look for year in format (YYYY) - typically at end of name
match = re.search(r'\((\d{4})\)', folder_name)
if match:
try:
year = int(match.group(1))
# Validate year is reasonable (between 1900 and 2100)
if 1900 <= year <= 2100:
return year
except ValueError:
pass
return None
async def migrate_year_from_folder(dry_run: bool = True) -> tuple[int, int]:
"""Migrate year field for existing series.
Args:
dry_run: If True, only report what would be changed
Returns:
Tuple of (updated_count, skipped_count)
"""
updated_count = 0
skipped_count = 0
async with DatabaseSession() as db:
# Find all series with NULL year
result = await db.execute(
select(AnimeSeries).where(AnimeSeries.year.is_(None))
)
series_list = result.scalars().all()
print(f"Found {len(series_list)} series with year=NULL")
for series in series_list:
year_from_folder = extract_year_from_folder_name(series.folder)
if year_from_folder:
print(f" {series.folder} -> {year_from_folder}")
if not dry_run:
await db.execute(
update(AnimeSeries)
.where(AnimeSeries.id == series.id)
.values(year=year_from_folder)
)
updated_count += 1
else:
print(f" {series.folder} -> (no year found)")
skipped_count += 1
return updated_count, skipped_count
def main():
parser = argparse.ArgumentParser(description="Migrate year from folder name")
parser.add_argument(
"--dry-run",
action="store_true",
default=True,
help="Show what would be changed without making changes"
)
parser.add_argument(
"--execute",
action="store_true",
help="Actually execute the migration (disabled by default)"
)
args = parser.parse_args()
dry_run = not args.execute
if dry_run:
print("=== DRY RUN MODE ===")
print("No changes will be made. Use --execute to apply changes.\n")
import asyncio
try:
updated, skipped = asyncio.run(migrate_year_from_folder(dry_run=dry_run))
print(f"\n{'Would update' if dry_run else 'Updated'}: {updated} series")
print(f"Skipped (no year in folder): {skipped} series")
if dry_run:
print("\nRun with --execute to apply these changes.")
return 0
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,3 +1,4 @@
import re
import secrets
from typing import Optional
@@ -114,6 +115,40 @@ class Settings(BaseSettings):
validation_alias="NFO_PREFER_FSK_RATING",
description="Prefer German FSK rating over MPAA rating in NFO files"
)
nfo_folder_ignore_patterns: str = Field(
default="The Last of Us|Loki|Chernobyl|Star Trek Discovery|Marvel|Matrix|Fast & Furious|Jurassic|James Bond|Mission: Impossible|Bourne|Hunger Games|Die Hard|John Wick|Pacific Rim|Guardians of the Galaxy|Avengers|Batman|Superman|Wonder Woman|Spider-Man|X-Men|Fantastic Four|Terminator|Predator|Rambo|Rocky|Expendables|Tomb Raider|Jumanji|Jurassic Park|Pirates of the Caribbean|Harry Potter|Lord of the Rings|Hobbit|Game of Thrones|Westworld|Stranger Things|Breaking Bad|Better Call Saul|Sherlock|Downton Abbey|The Crown|Bridgerton|Sex Education|Normal People|Emily in Paris|The Witcher|Servant|Lucifer|Dark|Shadow and Bone|Grimm|Fairytale",
validation_alias="NFO_FOLDER_IGNORE_PATTERNS",
description="Regex patterns for folder names to skip during scan (pipe-separated)"
)
@property
def folder_ignore_patterns(self) -> list[str]:
"""Parse ignore patterns from comma-separated string into list.
Returns:
List of regex patterns to skip during folder scanning.
"""
if not self.nfo_folder_ignore_patterns:
return []
return [
pattern.strip()
for pattern in self.nfo_folder_ignore_patterns.split("|")
if pattern.strip()
]
def should_ignore_folder(self, folder_name: str) -> bool:
"""Check if folder should be ignored based on ignore patterns.
Args:
folder_name: Name of folder to check.
Returns:
True if folder matches any ignore pattern, False otherwise.
"""
for pattern in self.folder_ignore_patterns:
if re.search(pattern, folder_name, re.IGNORECASE):
return True
return False
@property
def allowed_origins(self) -> list[str]:
@@ -134,5 +169,23 @@ class Settings(BaseSettings):
]
return [origin.strip() for origin in raw.split(",") if origin.strip()]
@property
def scan_key_overrides(self) -> dict[str, str]:
"""Return scan key overrides from config.json.
Maps folder names to provider keys for cases where auto-generated
keys from folder names are incorrect.
Returns:
Dict mapping folder names to provider keys.
"""
from src.server.services.config_service import ConfigService
try:
config_service = ConfigService()
config = config_service.load_config()
return config.scan_key_overrides or {}
except Exception:
return {}
settings = Settings()

View File

@@ -20,10 +20,11 @@ from typing import Callable, Iterable, Iterator, Optional
from events import Events
from src.config.settings import settings
from src.core.entities.series import Serie
from src.core.exceptions.Exceptions import MatchNotFoundError, NoKeyFoundException
from src.core.providers.base_provider import Loader
from src.core.utils.key_utils import generate_key_from_folder, is_valid_key
from src.server.database.connection import get_sync_session
from src.server.database.service import AnimeSeriesService, EpisodeService
@@ -44,13 +45,23 @@ class SerieScanner:
in keyDict and can be retrieved after scanning.
Example:
# Synchronous context (CLI):
scanner = SerieScanner("/path/to/anime", loader)
scanner.scan()
scanner.scan() # asyncio.run() used internally when no event loop
# Asynchronous context (server/scheduler):
# scan() detects running event loop and uses create_task()
# internally, so no special handling needed by caller.
# Results are in scanner.keyDict
# With DB lookup fallback:
scanner = SerieScanner("/path/to/anime", loader,
db_lookup=lambda folder: my_db.get_by_folder(folder))
# With scan key overrides:
overrides = {"Folder Name": "correct-provider-key"}
scanner = SerieScanner("/path/to/anime", loader,
scan_key_overrides=overrides)
"""
def __init__(
@@ -58,6 +69,7 @@ class SerieScanner:
basePath: str,
loader: Loader,
db_lookup: Optional[Callable[[str], Optional["Serie"]]] = None,
scan_key_overrides: Optional[dict[str, str]] = None,
) -> None:
"""
Initialize the SerieScanner.
@@ -70,6 +82,10 @@ class SerieScanner:
``key`` file nor a ``data`` file is found in the folder.
This allows the database to supply the series key for
folders that have never had a local key file.
scan_key_overrides: Optional dict mapping folder names to provider
keys. When a folder name is found in this dict, the override
key is used instead of auto-generating from folder name.
Format: {"Folder Name": "actual-provider-key"}
Raises:
ValueError: If basePath is invalid or doesn't exist
@@ -89,11 +105,13 @@ class SerieScanner:
self.keyDict: dict[str, Serie] = {}
self.loader: Loader = loader
self._db_lookup: Optional[Callable[[str], Optional[Serie]]] = db_lookup
self._scan_key_overrides: Optional[dict[str, str]] = scan_key_overrides
self._current_operation_id: Optional[str] = None
self.events = Events()
self.events.on_progress = []
self.events.on_error = []
self.events.on_warning = []
self.events.on_completion = []
logger.info("Initialized SerieScanner with base path: %s", abs_path)
@@ -186,7 +204,25 @@ class SerieScanner:
"""
if handler in self.events.on_error:
self.events.on_error.remove(handler)
def subscribe_on_warning(self, handler):
"""
Subscribe a handler to an event.
Args:
handler: Callable to handle the event
"""
if handler not in self.events.on_warning:
self.events.on_warning.append(handler)
def unsubscribe_on_warning(self, handler):
"""
Unsubscribe a handler from an event.
Args:
handler: Callable to remove
"""
if handler in self.events.on_warning:
self.events.on_warning.remove(handler)
def subscribe_on_completion(self, handler):
"""
Subscribe a handler to an event.
@@ -218,8 +254,7 @@ class SerieScanner:
try:
from src.server.database.connection import get_async_session_factory
session_factory = get_async_session_factory()
db = session_factory()
db = get_async_session_factory()
try:
existing = await AnimeSeriesService.get_by_key(db, serie.key)
if existing:
@@ -433,7 +468,14 @@ class SerieScanner:
# Persist to database (async)
try:
asyncio.run(self._persist_serie_to_db(serie))
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop — safe to use asyncio.run()
asyncio.run(self._persist_serie_to_db(serie))
else:
# Already in async context — schedule as task
asyncio.create_task(self._persist_serie_to_db(serie))
except Exception as e:
logger.warning(
"DB persistence failed for '%s', "
@@ -443,11 +485,27 @@ class SerieScanner:
# Store by key (primary identifier), not folder
if serie.key in self.keyDict:
logger.error(
"Duplicate series found with key '%s' "
"(folder: '%s')",
existing = self.keyDict[serie.key]
logger.warning(
"Duplicate series found with key '%s': "
"folder '%s' maps to same key as existing folder '%s'. "
"Skipping duplicate folder.",
serie.key,
folder
folder,
existing.folder
)
self._safe_call_event(
self.events.on_warning,
{
"operation_id": self._current_operation_id,
"warning": "duplicate_key",
"message": f"Duplicate series skipped: '{folder}' maps to key '{serie.key}' already used by '{existing.folder}'",
"metadata": {
"key": serie.key,
"duplicate_folder": folder,
"existing_folder": existing.folder,
}
}
)
else:
self.keyDict[serie.key] = serie
@@ -551,6 +609,9 @@ class SerieScanner:
for anime_name in os.listdir(self.directory):
anime_path = os.path.join(self.directory, anime_name)
if os.path.isdir(anime_path):
if settings.should_ignore_folder(anime_name):
logger.debug("Skipping ignored folder: %s", anime_name)
continue
mp4_files: list[str] = []
has_files = False
for root, _, files in os.walk(anime_path):
@@ -567,7 +628,9 @@ class SerieScanner:
1. Query DB by folder name
2. If found, return cached Serie object
3. If not in DB, fall back to provider search via _db_lookup callback
4. (Legacy) If still not found, try reading 'key' file as last resort
4. If still not found, try reading 'data' file for legacy deployments
5. Check user-provided key overrides in scan_key_overrides
6. Generate key from folder name as last resort
Args:
folder_name: Filesystem folder name
@@ -576,9 +639,8 @@ class SerieScanner:
Serie object with valid key if found, None otherwise
Note:
DB is the source of truth. File-based lookups (key/data files)
are temporary backward compatibility for deployments with old data.
Will be removed in v3.0.0.
DB is the source of truth. File-based lookups (data files)
are temporary backward compatibility for CLI-only deployments.
"""
# Step 1: Try DB lookup by folder name
try:
@@ -629,25 +691,8 @@ class SerieScanner:
exc
)
# Step 3: Legacy fallback - TEMPORARY (remove in v3.0.0)
# Step 3: Legacy data file fallback (CLI-only deployments)
folder_path = os.path.join(self.directory, folder_name)
key_file = os.path.join(folder_path, 'key')
if os.path.exists(key_file):
logger.warning(
"Using legacy 'key' file for '%s' - this fallback is deprecated "
"and will be removed in v3.0.0",
folder_name
)
with open(key_file, 'r', encoding='utf-8') as file:
key = file.read().strip()
logger.info(
"Key found for folder '%s': %s",
folder_name,
key
)
year_from_folder = self._extract_year_from_folder_name(folder_name)
return Serie(key, "", "aniworld.to", folder_name, dict(), year=year_from_folder)
serie_file = os.path.join(folder_path, 'data')
if os.path.exists(serie_file):
with open(serie_file, "rb") as file:
@@ -658,6 +703,59 @@ class SerieScanner:
)
return Serie.load_from_file(serie_file)
# Step 4: Check for user-provided key overrides before generating
if self._scan_key_overrides and folder_name in self._scan_key_overrides:
override_key = self._scan_key_overrides[folder_name]
year_from_folder = self._extract_year_from_folder_name(folder_name)
logger.info(
"Using scan key override for folder '%s' -> key='%s'",
folder_name,
override_key
)
return Serie(
key=override_key,
name="", # Name will be fetched from provider if needed
site="aniworld.to",
folder=folder_name,
episodeDict=dict(),
year=year_from_folder
)
# Step 5: Generate key from folder name as last resort
# This handles edge cases like non-Latin characters or special symbols
try:
generated_key = generate_key_from_folder(folder_name)
year_from_folder = self._extract_year_from_folder_name(folder_name)
# Validate that the generated key is usable
if not generated_key or not is_valid_key(generated_key):
logger.warning(
"Serie key is invalid for folder '%s' (key='%s') - skipping",
folder_name,
generated_key
)
return None
logger.info(
"Generated key for folder '%s' -> key='%s'",
folder_name,
generated_key
)
return Serie(
key=generated_key,
name="", # Name will be fetched from provider if needed
site="aniworld.to",
folder=folder_name,
episodeDict=dict(),
year=year_from_folder
)
except Exception as exc:
logger.warning(
"Unexpected error generating key for folder '%s': %s",
folder_name,
exc
)
return None
def __get_episode_and_season(self, filename: str) -> tuple[int, int]:

View File

@@ -166,7 +166,10 @@ class SeriesApp:
self.loaders = Loaders()
self.loader = self.loaders.GetLoader(key="aniworld.to")
self.serie_scanner = SerieScanner(
directory_to_search, self.loader, db_lookup=db_lookup
directory_to_search,
self.loader,
db_lookup=db_lookup,
scan_key_overrides=settings.scan_key_overrides,
)
# Skip automatic loading from data files - series will be loaded
# from database by the service layer during application setup

View File

@@ -17,6 +17,7 @@ import warnings
from json import JSONDecodeError
from typing import Dict, Iterable, List, Optional
from src.config.settings import settings
from src.core.entities.series import Serie
logger = logging.getLogger(__name__)
@@ -214,6 +215,9 @@ class SerieList:
}
for anime_folder in entries:
if settings.should_ignore_folder(anime_folder):
logger.debug("Skipping ignored folder: %s", anime_folder)
continue
anime_path = os.path.join(self.directory, anime_folder, "data")
if os.path.isfile(anime_path):
logger.debug("Found data file for folder %s", anime_folder)

View File

@@ -7,7 +7,7 @@ errors in provider operations with automatic retry mechanisms.
import functools
import logging
from typing import Any, Callable, TypeVar
from typing import Any, Callable, Optional, TypeVar
logger = logging.getLogger(__name__)
@@ -42,41 +42,85 @@ class DownloadError(Exception):
class RecoveryStrategies:
"""Strategies for handling errors and recovering from failures."""
@staticmethod
def handle_network_failure(
func: Callable, *args: Any, **kwargs: Any
) -> Any:
"""Handle network failures with basic retry logic."""
max_retries = 3
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (NetworkError, ConnectionError):
if attempt == max_retries - 1:
raise
logger.warning(
"Network error on attempt %d, retrying...",
attempt + 1,
)
continue
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
) -> None:
"""Initialize recovery strategies.
@staticmethod
def handle_download_failure(
Args:
max_retries: Maximum number of retry attempts.
base_delay: Initial delay between retries in seconds.
max_delay: Maximum delay between retries in seconds.
exponential_base: Base for exponential backoff multiplier.
"""
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay for given retry attempt using exponential backoff.
Args:
attempt: Zero-based retry attempt number.
Returns:
Delay in seconds before next retry.
"""
delay = self.base_delay * (self.exponential_base ** attempt)
return min(delay, self.max_delay)
def handle_network_failure(
self,
func: Callable, *args: Any, **kwargs: Any
) -> Any:
"""Handle download failures with retry logic."""
max_retries = 2
for attempt in range(max_retries):
"""Handle network failures with exponential backoff retry logic."""
last_error: Optional[Exception] = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except DownloadError:
if attempt == max_retries - 1:
raise
logger.warning(
"Download error on attempt %d, retrying...",
attempt + 1,
)
continue
except (NetworkError, ConnectionError, TimeoutError) as exc:
last_error = exc
if attempt < self.max_retries - 1:
delay = self._calculate_delay(attempt)
logger.warning(
"Network error on attempt %d/%d, retrying in %.1fs: %s",
attempt + 1, self.max_retries, delay, exc
)
import time
time.sleep(delay)
continue
if last_error:
raise last_error
raise NetworkError("Network failure after retries")
def handle_download_failure(
self,
func: Callable, *args: Any, **kwargs: Any
) -> Any:
"""Handle download failures with exponential backoff retry logic."""
last_error: Optional[Exception] = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except DownloadError as exc:
last_error = exc
if attempt < self.max_retries - 1:
delay = self._calculate_delay(attempt)
logger.warning(
"Download error on attempt %d/%d, retrying in %.1fs: %s",
attempt + 1, self.max_retries, delay, exc
)
import time
time.sleep(delay)
continue
if last_error:
raise last_error
raise DownloadError("Download failed after retries")
class FileCorruptionDetector:

View File

@@ -122,7 +122,10 @@ class AniworldLoader(Loader):
self.LULUVDO_USER_AGENT = LULUVDO_USER_AGENT
self.PROVIDER_HEADERS = {
ProviderType.VIDMOLY.value: ['Referer: "https://vidmoly.to"'],
ProviderType.DOODSTREAM.value: ['Referer: "https://dood.li/"'],
ProviderType.DOODSTREAM.value: [
'Referer: "https://dood.li/"',
'Referer: "https://playmogo.com/"',
],
ProviderType.VOE.value: [f"User-Agent: {self.RANDOM_USER_AGENT}"],
ProviderType.LULUVDO.value: [
f"User-Agent: {self.LULUVDO_USER_AGENT}",
@@ -547,8 +550,10 @@ class AniworldLoader(Loader):
'nocheckcertificate': True,
'logger': logger,
'progress_hooks': [events_progress_hook],
'downloader': 'ffmpeg',
'hls_use_mpegts': True,
# yt-dlp defaults to native HLS downloader which warns about
# "Live HLS streams are not supported" - disable to go
# straight to ffmpeg, avoiding the warning
'hls_prefer_native': False,
}
if header:
@@ -594,6 +599,40 @@ class AniworldLoader(Loader):
_cleanup_temp_file(temp_path)
continue
except Exception as exc:
# Check if this is an HLS-related failure that might succeed
# with additional ffmpeg options
exc_str = str(exc).lower()
is_hls_related = (
'hls' in exc_str or
'live' in exc_str or
'native downloader' in exc_str
)
if is_hls_related and 'ffmpeg' not in str(ydl_opts.get('downloader', '')):
logger.info(
"HLS stream detected, retrying with ffmpeg options: %s",
output_file
)
# Retry with ffmpeg explicitly set
retry_opts = ydl_opts.copy()
retry_opts['downloader'] = 'ffmpeg'
retry_opts['hls_use_mpegts'] = True
try:
with YoutubeDL(retry_opts) as ydl:
info = ydl.extract_info(link, download=True)
if os.path.exists(temp_path):
shutil.copyfile(temp_path, output_path)
os.remove(temp_path)
logger.info(
"Download completed successfully (retry): %s",
output_file
)
self.clear_cache()
return True
except Exception:
_cleanup_temp_file(temp_path)
# Continue to next provider if retry also fails
continue
logger.error(
"YoutubeDL download failed with provider %s: %s: %s",
provider_name, type(exc).__name__, exc

View File

@@ -88,7 +88,10 @@ class EnhancedAniWorldLoader(Loader):
self.PROVIDER_HEADERS = {
ProviderType.VIDMOLY.value: ['Referer: "https://vidmoly.to"'],
ProviderType.DOODSTREAM.value: ['Referer: "https://dood.li/"'],
ProviderType.DOODSTREAM.value: [
'Referer: "https://dood.li/"',
'Referer: "https://playmogo.com/"',
],
ProviderType.VOE.value: [f'User-Agent: {self.RANDOM_USER_AGENT}'],
ProviderType.LULUVDO.value: [
f'User-Agent: {self.LULUVDO_USER_AGENT}',

View File

@@ -163,58 +163,89 @@ class NFOService:
logger.info("Creating series folder: %s", folder_path)
folder_path.mkdir(parents=True, exist_ok=True)
# Check for existing NFO with TMDB ID to skip search
nfo_path = folder_path / "tvshow.nfo"
existing_ids = None
if nfo_path.exists():
try:
existing_ids = self.parse_nfo_ids(nfo_path)
if existing_ids.get("tmdb_id"):
logger.info(
"Found existing TMDB ID %s in NFO, using directly",
existing_ids["tmdb_id"]
)
except Exception as e:
logger.debug("Could not parse existing NFO IDs: %s", e)
try:
await self.tmdb_client._ensure_session()
# Search for TV show - try multiple strategies
tv_show, search_source = await self._search_with_fallback(
search_name, year, alt_titles
)
tv_id = tv_show["id"]
# Use existing TMDB ID if found, otherwise search
if existing_ids and existing_ids.get("tmdb_id"):
tv_id = existing_ids["tmdb_id"]
logger.info("Fetching details directly for TMDB ID: %s", tv_id)
details = await self.tmdb_client.get_tv_show_details(
tv_id,
append_to_response="credits,external_ids,images"
)
content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tv_id)
tv_show = {"id": tv_id, "name": details.get("name", serie_name)}
search_source = "nfo_override"
else:
# Search for TV show - try multiple strategies
tv_show, search_source = await self._search_with_fallback(
search_name, year, alt_titles
)
tv_id = tv_show["id"]
logger.info("Found match: %s (ID: %s)", tv_show['name'], tv_id)
# Get detailed information with multi-language image support
details = await self.tmdb_client.get_tv_show_details(
tv_id,
append_to_response="credits,external_ids,images"
)
# Skip if we already fetched details via nfo_override
if search_source != "nfo_override":
details = await self.tmdb_client.get_tv_show_details(
tv_id,
append_to_response="credits,external_ids,images"
)
# Get content ratings for FSK
content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tv_id)
# Get content ratings for FSK
content_ratings = await self.tmdb_client.get_tv_show_content_ratings(tv_id)
# Enrich with fallback languages for empty overview/tagline
# Pass search result overview as last resort fallback
search_overview = tv_show.get("overview") or None
if not search_overview:
try:
logger.debug(
"No overview in German search result, trying en-US search fallback for: %s",
search_name,
)
en_search_results = await self.tmdb_client.search_tv_show(
search_name,
language="en-US",
)
if en_search_results.get("results"):
en_match = self._find_best_match(
en_search_results["results"], search_name, year
# Enrich with fallback languages for empty overview/tagline
# Pass search result overview as last resort fallback
search_overview = tv_show.get("overview") or None
if not search_overview:
try:
logger.debug(
"No overview in German search result, trying en-US search fallback for: %s",
search_name,
)
search_overview = en_match.get("overview") or None
if search_overview:
logger.info(
"Using en-US search overview fallback for %s",
search_name,
en_search_results = await self.tmdb_client.search_tv_show(
search_name,
language="en-US",
)
if en_search_results.get("results"):
en_match = self._find_best_match(
en_search_results["results"], search_name, year
)
except (TMDBAPIError, Exception) as exc:
logger.warning(
"Failed en-US search fallback for overview: %s",
exc,
)
search_overview = en_match.get("overview") or None
if search_overview:
logger.info(
"Using en-US search overview fallback for %s",
search_name,
)
except (TMDBAPIError, Exception) as exc:
logger.warning(
"Failed en-US search fallback for overview: %s",
exc,
)
details = await self._enrich_details_with_fallback(
details, search_overview=search_overview
)
details = await self._enrich_details_with_fallback(
details, search_overview=search_overview
)
else:
# When using nfo_override, content_ratings already fetched
pass
# Convert TMDB data to TVShowNFO model
nfo_model = tmdb_to_nfo_model(
@@ -646,21 +677,45 @@ class NFOService:
{"query": normalized, "year": year, "lang": "de-DE", "desc": f"normalized:{normalized}"}
)
# Strategy 6: Try search/multi for series indexed as movies
search_strategies.append(
{"query": primary_query, "year": year, "lang": "en-US", "desc": "multi_search", "use_multi": True}
)
last_error = None
for strategy in search_strategies:
query = strategy["query"]
lang = strategy["lang"]
desc = strategy["desc"]
use_multi = strategy.get("use_multi", False)
try:
logger.debug(
"TMDB search attempt: query='%s', lang=%s, year=%s, strategy=%s",
query, lang, strategy["year"], desc
)
search_results = await self.tmdb_client.search_tv_show(
query,
language=lang
)
# Use search/multi for multi_search strategy
if use_multi:
search_results = await self.tmdb_client.search_multi(
query,
language=lang
)
# Filter for TV shows only
if search_results.get("results"):
tv_results = [
r for r in search_results["results"]
if r.get("media_type") == "tv"
]
if tv_results:
search_results["results"] = tv_results
else:
search_results["results"] = []
else:
search_results = await self.tmdb_client.search_tv_show(
query,
language=lang
)
if search_results.get("results"):
# Apply year filter if we have one
@@ -784,6 +839,7 @@ class NFOService:
async def close(self):
"""Clean up resources."""
await self.tmdb_client.close()
await self.image_downloader.close()
async def create_minimal_nfo(
self,

View File

@@ -129,6 +129,9 @@ class SeriesManagerService:
if not self.nfo_service:
return
nfo_exists = False
ids = {}
try:
folder_path = Path(self.anime_directory) / serie_folder
nfo_path = folder_path / "tvshow.nfo"
@@ -195,22 +198,49 @@ class SeriesManagerService:
logger.info(
f"Creating NFO for '{serie_name}' ({serie_folder})"
)
await self.nfo_service.create_tvshow_nfo(
serie_name=serie_name,
serie_folder=serie_folder,
year=year,
download_poster=self.download_poster,
download_logo=self.download_logo,
download_fanart=self.download_fanart
)
logger.info("Successfully created NFO for '%s'", serie_name)
try:
await self.nfo_service.create_tvshow_nfo(
serie_name=serie_name,
serie_folder=serie_folder,
year=year,
download_poster=self.download_poster,
download_logo=self.download_logo,
download_fanart=self.download_fanart
)
logger.info("Successfully created NFO for '%s'", serie_name)
except TMDBAPIError as create_error:
# TMDB lookup failed, create minimal NFO to track the series
logger.warning(
"TMDB lookup failed for '%s', creating minimal NFO: %s",
serie_name, create_error
)
try:
await self.nfo_service.create_minimal_nfo(
serie_name=serie_name,
serie_folder=serie_folder,
year=year
)
logger.info("Created minimal NFO for '%s'", serie_name)
except Exception as minimal_error:
logger.error(
"Failed to create minimal NFO for '%s': %s",
serie_name, minimal_error
)
elif nfo_exists:
logger.debug(
f"NFO exists for '{serie_name}', skipping download"
)
except TMDBAPIError as e:
logger.error("TMDB API error processing '%s': %s", serie_name, e)
# Only log at ERROR if no NFO exists and we have no IDs
# If NFO exists with IDs, this is just a lookup failure, log at DEBUG
if nfo_exists and (ids.get("tmdb_id") or ids.get("tvdb_id")):
logger.debug(
"TMDB API lookup failed for '%s' (has NFO with IDs): %s",
serie_name, e
)
else:
logger.error("TMDB API error processing '%s': %s", serie_name, e)
except Exception as e:
logger.error(
f"Unexpected error processing NFO for '{serie_name}': {e}",

248
src/core/utils/key_utils.py Normal file
View File

@@ -0,0 +1,248 @@
"""Utility functions for generating URL-safe keys from folder names.
This module provides key generation and normalization for anime series,
handling edge cases like non-Latin characters and special symbols.
"""
from __future__ import annotations
import re
import unicodedata
import uuid
from typing import Optional
# Valid key pattern: alphanumeric, hyphens, underscores
# Must be at least 1 char, URL-safe
VALID_KEY_PATTERN = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9_-]*$')
def normalize_key(key: str) -> str:
"""Normalize a key to a URL-safe format.
Args:
key: The key to normalize
Returns:
Normalized lowercase key with spaces replaced by hyphens
"""
if not key:
return ""
# Convert to lowercase
normalized = key.lower()
# Replace spaces and underscores with hyphens
normalized = re.sub(r'[\s_]+', '-', normalized)
# Remove any characters that aren't alphanumeric or hyphens
normalized = re.sub(r'[^a-z0-9-]', '', normalized)
# Collapse multiple consecutive hyphens
normalized = re.sub(r'-+', '-', normalized)
# Remove leading/trailing hyphens
normalized = normalized.strip('-')
return normalized
def is_valid_key(key: str) -> bool:
"""Check if a key is valid for URL-safe use.
Args:
key: The key to validate
Returns:
True if key is valid (non-empty, URL-safe, alphanumeric start/end, min 2 chars)
"""
if not key or not key.strip():
return False
if len(key) < 2:
return False
return bool(VALID_KEY_PATTERN.match(key))
def generate_key_from_folder(folder_name: str) -> str:
"""Generate a URL-safe key from a folder name.
Handles edge cases:
- Non-Latin characters (Japanese, Chinese, etc.)
- Special characters
- All-invalid names that normalize to empty
Args:
folder_name: The folder name to convert to a key
Returns:
A URL-safe key string. Never returns empty string.
Examples:
>>> generate_key_from_folder("Attack on Titan (2013)")
'attack-on-titan-2013'
>>> generate_key_from_folder("A Time Called You (2023)")
'a-time-called-you-2023'
>>> generate_key_from_folder("25-sai no Joshikousei (2018)")
'25-sai-no-joshikousei-2018'
"""
if not folder_name or not folder_name.strip():
raise ValueError("Folder name cannot be empty")
# Step 1: Unicode NFC normalization (preserves international chars)
normalized = unicodedata.normalize('NFC', folder_name.strip())
# Step 2: Extract alphanumeric parts, preserving international chars
# This keeps Japanese/Chinese characters but removes special symbols
parts = []
for char in normalized:
# Keep Unicode alphanumeric characters (letters/numbers from any script)
if char.isalnum():
parts.append(char)
elif char.isspace():
parts.append(' ')
# Handle apostrophes - treat as part of word (remove, don't replace with space)
# This normalizes e.g., "Hell's" -> "Hells"
# Includes: ' (0x27), ' (0x2018), ' (0x2019), ' (0x02BC), ` (0x0060)
elif char in ("'", "'", "'", "'", "`", """, """):
pass # Skip - drop the apostrophe
else:
parts.append(' ')
working = ''.join(parts)
# Step 3: Split into words and normalize each
words = working.split()
# Step 4: Convert to lowercase and create hyphenated key
key = '-'.join(word.lower() for word in words if word)
# Step 5: If we got a valid key, return it
if key and is_valid_key(key):
return key
# Step 6: Try just alphanumeric characters
alphanumeric_only = re.sub(r'[^a-zA-Z0-9\s]', '', working)
words = alphanumeric_only.split()
key = '-'.join(word.lower() for word in words if word)
if key and is_valid_key(key):
return key
# Step 7: Last resort - use folder name directly with transliteration
# Try to convert non-ASCII to ASCII equivalents
try:
# Use NFD normalization and strip combining characters
# This effectively Latinizes some characters
nfd_form = unicodedata.normalize('NFD', folder_name)
latinized = ''.join(
char for char in nfd_form
if unicodedata.category(char) != 'Mn' # Strip combining marks
)
# Remove non-ASCII letters
latinized = re.sub(r'[^a-zA-Z0-9\s]', ' ', latinized)
words = latinized.split()
key = '-'.join(word.lower() for word in words if word)
if key and is_valid_key(key):
return key
except Exception:
pass
# Step 8: Absolute fallback - generate UUID-based key
# Use first 8 chars of UUID for brevity
uuid_key = uuid.uuid4().hex[:8]
# Try to extract any meaningful words from the original name
meaningful_parts = []
for char in folder_name:
if char.isalnum():
meaningful_parts.append(char.lower())
elif len(meaningful_parts) > 0:
meaningful_parts.append('-')
fallback_base = ''.join(meaningful_parts).strip('-')
if fallback_base and len(fallback_base) >= 2:
# Combine meaningful parts with UUID for uniqueness
# Truncate meaningful parts if too long
if len(fallback_base) > 20:
fallback_base = fallback_base[:20]
return f"{fallback_base}-{uuid_key}"
return f"series-{uuid_key}"
def validate_key_uniqueness(
key: str,
existing_keys: set[str],
) -> tuple[bool, str]:
"""Validate that a key is unique among existing keys.
Args:
key: The key to validate
existing_keys: Set of keys that already exist
Returns:
Tuple of (is_valid, error_message)
"""
if not key or not key.strip():
return False, "Key cannot be empty"
stripped = key.strip()
if len(stripped) < 2:
return False, "Key must be at least 2 characters"
if not is_valid_key(stripped):
return False, "Key must be URL-safe (alphanumeric, hyphens, underscores only)"
if stripped in existing_keys:
return False, f"Key '{stripped}' is already in use"
return True, ""
def sanitize_key_for_url(key: str) -> str:
"""Sanitize a key for safe URL usage.
Args:
key: The key to sanitize
Returns:
URL-safe version of the key
"""
if not key:
return ""
# Replace spaces with hyphens first
sanitized = key.replace(' ', '-')
# Remove any characters that could cause URL issues (keep alphanumerics, hyphens, underscores)
sanitized = re.sub(r'[^\w\-]', '', sanitized)
# Collapse multiple hyphens
sanitized = re.sub(r'-+', '-', sanitized)
return sanitized.strip('-')
def sanitize_url_for_logging(url: str, max_length: int = 100) -> str:
"""Sanitize a URL for safe logging by removing sensitive query parameters.
Removes or truncates query parameters that may contain tokens, keys,
or other sensitive data while preserving enough structure for debugging.
Args:
url: The URL to sanitize
max_length: Maximum length of the returned URL string
Returns:
Sanitized URL safe for logging
"""
if not url:
return ""
# Truncate if too long
if len(url) > max_length:
return url[:max_length] + "..."
return url

View File

@@ -1,12 +1,14 @@
import logging
import warnings
from pathlib import Path
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.entities.series import Serie
from src.config.settings import settings
from src.core.utils.key_utils import generate_key_from_folder, is_valid_key
from src.server.database.service import AnimeSeriesService
from src.server.exceptions import (
BadRequestError,
@@ -25,6 +27,9 @@ from src.server.utils.dependencies import (
)
from src.server.utils.filesystem import sanitize_folder_name
from src.server.utils.validators import validate_filter_value, validate_search_query
from src.server.services.folder_rename_service import (
_scan_for_pre_existing_duplicates,
)
logger = logging.getLogger(__name__)
@@ -70,6 +75,100 @@ async def get_anime_status(
) from exc
class DuplicateFolderGroup(BaseModel):
"""A group of duplicate folders for the same series.
Attributes:
key: Series key (provider-assigned unique identifier)
folders: List of folder names that are duplicates
folder_count: Number of duplicate folders
"""
key: str = Field(..., description="Series key (unique identifier)")
folders: List[str] = Field(..., description="List of duplicate folder names")
folder_count: int = Field(..., description="Number of duplicate folders")
class DuplicateFoldersResponse(BaseModel):
"""Response model for duplicate folders listing.
Attributes:
total_groups: Total number of duplicate groups found
duplicate_groups: List of duplicate folder groups
message: Human-readable summary
"""
total_groups: int = Field(..., description="Total number of duplicate groups")
duplicate_groups: List[DuplicateFolderGroup] = Field(
..., description="List of duplicate folder groups"
)
message: str = Field(..., description="Human-readable summary")
@router.get("/duplicate-folders", response_model=DuplicateFoldersResponse)
async def get_duplicate_folders(
_auth: dict = Depends(require_auth),
) -> DuplicateFoldersResponse:
"""List all pre-existing duplicate folder groups.
Scans the anime directory for folders with tvshow.nfo files that
map to the same series key. Returns groups of duplicates for
manual review and cleanup.
Returns:
DuplicateFoldersResponse with groups of duplicate folders
Note:
Not all duplicate folders are safe to merge - some may belong
to different releases (e.g., dubbed vs. subbed). Review carefully
before taking action.
"""
try:
if not settings.anime_directory:
return DuplicateFoldersResponse(
total_groups=0,
duplicate_groups=[],
message="Anime directory not configured",
)
anime_dir = Path(settings.anime_directory)
if not anime_dir.is_dir():
return DuplicateFoldersResponse(
total_groups=0,
duplicate_groups=[],
message=f"Anime directory not found: {anime_dir}",
)
duplicates = _scan_for_pre_existing_duplicates(anime_dir)
groups = [
DuplicateFolderGroup(
key=dup.key,
folders=dup.folders,
folder_count=dup.count,
)
for dup in duplicates
]
if groups:
message = (
f"Found {len(groups)} duplicate group(s). "
"Review carefully - some duplicates may be different releases "
"(e.g., dubbed vs. subbed)."
)
else:
message = "No duplicate folders found."
return DuplicateFoldersResponse(
total_groups=len(groups),
duplicate_groups=groups,
message=message,
)
except Exception as exc:
logger.error("Failed to scan for duplicate folders: %s", str(exc))
raise ServerError(
message=f"Failed to scan for duplicates: {str(exc)}"
) from exc
class AnimeSummary(BaseModel):
"""Summary of an anime series with missing episodes.
@@ -133,6 +232,14 @@ class AnimeSummary(BaseModel):
default=None,
description="ISO timestamp when NFO was last updated"
)
loading_status: Optional[str] = Field(
default=None,
description="Current loading status (e.g., 'completed', 'failed', 'in_progress')"
)
loading_error: Optional[str] = Field(
default=None,
description="Error message if loading failed (e.g., 'key cannot be None or empty')"
)
tmdb_id: Optional[int] = Field(
default=None,
description="The Movie Database (TMDB) ID"
@@ -331,6 +438,8 @@ async def list_anime(
nfo_updated_at=series_dict.get("nfo_updated_at"),
tmdb_id=series_dict.get("tmdb_id"),
tvdb_id=series_dict.get("tvdb_id"),
loading_status=series_dict.get("loading_status"),
loading_error=series_dict.get("loading_error"),
)
)
@@ -1085,6 +1194,346 @@ async def get_anime(
) from exc
class ManualKeyUpdate(BaseModel):
"""Request model for manually updating a series key."""
key: str = Field(
...,
min_length=2,
description="New URL-safe key for the series (alphanumeric, hyphens, underscores)"
)
@router.patch("/{anime_key}/manual-key", response_model=dict)
async def update_series_manual_key(
anime_key: str,
update_data: ManualKeyUpdate,
db: AsyncSession = Depends(get_optional_database_session),
series_app: Any = Depends(get_series_app),
) -> dict:
"""Manually update the key for a series.
This endpoint allows users to supply a key for folders that failed
automatic key generation (e.g., non-Latin characters, special symbols).
Args:
anime_key: Current series key
update_data: New key to assign
db: Database session
series_app: SeriesApp instance for in-memory updates
Returns:
Updated series info with new key
Raises:
HTTPException: If validation fails or series not found
"""
new_key = update_data.key.strip()
# Validate the new key format
if not is_valid_key(new_key):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid key format. Key must be URL-safe (alphanumeric, hyphens, underscores only)"
)
# Find the series - check DB first
series_db = None
if db:
series_db = await AnimeSeriesService.get_by_key(db, anime_key)
# Also check in-memory list if series_app available
found_in_memory = None
if series_app and hasattr(series_app, "list"):
for serie in series_app.list.GetList():
if getattr(serie, "key", None) == anime_key:
found_in_memory = serie
break
if not series_db and not found_in_memory:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Series with key '{anime_key}' not found"
)
# Check if new key is already in use
existing_keys = set()
if db:
all_series = await AnimeSeriesService.get_all(db)
existing_keys = {s.key for s in all_series if s.key != anime_key}
if series_app and hasattr(series_app, "list"):
for serie in series_app.list.GetList():
key = getattr(serie, "key", None)
if key and key != anime_key:
existing_keys.add(key)
if new_key in existing_keys:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Key '{new_key}' is already in use by another series"
)
old_key = anime_key
# Update in database if found
if series_db:
from src.server.database.connection import get_db
async with get_db() as session:
await AnimeSeriesService.update(
session,
series_db.id,
key=new_key,
loading_error=None # Clear error on successful key update
)
await session.commit()
# Update in-memory cache
if found_in_memory:
try:
found_in_memory.key = new_key
logger.info(
"Updated in-memory key for series: %s -> %s",
old_key,
new_key
)
except ValueError as ve:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(ve)
)
logger.info(
"Manual key update successful: %s -> %s",
old_key,
new_key
)
return {
"success": True,
"old_key": old_key,
"new_key": new_key,
"message": f"Key updated from '{old_key}' to '{new_key}'"
}
class MetadataIdsUpdate(BaseModel):
"""Request model for manually updating TMDB and TVDB IDs."""
tmdb_id: Optional[int] = Field(
default=None,
description="TMDB ID (positive integer, or null to clear)"
)
tvdb_id: Optional[int] = Field(
default=None,
description="TVDB ID (positive integer, or null to clear)"
)
@field_validator("tmdb_id", "tvdb_id")
@classmethod
def validate_positive_or_null(cls, v):
if v is not None and v <= 0:
raise ValueError("ID must be a positive integer or null")
return v
@router.patch("/{anime_key}/metadata-ids", response_model=dict)
async def update_series_metadata_ids(
anime_key: str,
update_data: MetadataIdsUpdate,
db: AsyncSession = Depends(get_optional_database_session),
series_app: Any = Depends(get_series_app),
) -> dict:
"""Manually update TMDB and TVDB IDs for a series.
This endpoint allows users to supply missing metadata IDs for series
that failed automatic TMDB lookup. After updating IDs, it triggers
a background NFO re-generation.
Args:
anime_key: Series key
update_data: TMDB and TVDB IDs to set
db: Database session
series_app: SeriesApp instance for in-memory updates
Returns:
Updated series info with new IDs
Raises:
HTTPException: If validation fails or series not found
"""
if update_data.tmdb_id is None and update_data.tvdb_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="At least one of tmdb_id or tvdb_id must be provided"
)
series_db = None
if db:
series_db = await AnimeSeriesService.get_by_key(db, anime_key)
if not series_db:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Series with key '{anime_key}' not found"
)
update_fields = {}
if update_data.tmdb_id is not None:
update_fields["tmdb_id"] = update_data.tmdb_id
if update_data.tvdb_id is not None:
update_fields["tvdb_id"] = update_data.tvdb_id
if db:
from datetime import datetime, timezone
update_fields["nfo_updated_at"] = datetime.now(timezone.utc)
update_fields["has_nfo"] = True
await AnimeSeriesService.update(
db,
series_db.id,
**update_fields
)
await db.commit()
# Update in-memory cache if available
if series_app and hasattr(series_app, "list"):
for serie in series_app.list.GetList():
if getattr(serie, "key", None) == anime_key:
if update_data.tmdb_id is not None:
serie.tmdb_id = update_data.tmdb_id
if update_data.tvdb_id is not None:
serie.tvdb_id = update_data.tvdb_id
break
# Trigger background NFO re-generation
background_loader = None
try:
background_loader = await get_background_loader_service()
except Exception:
pass
nfo_queued = False
if background_loader and db:
try:
from src.server.database.connection import get_db_session
async with get_db_session() as bg_db:
series_for_bg = await AnimeSeriesService.get_by_key(bg_db, anime_key)
if series_for_bg:
await background_loader.load_series_nfo(
series_for_bg.key,
series_for_bg.folder,
series_for_bg.name,
force_refresh=True
)
nfo_queued = True
except Exception as e:
logger.warning("Failed to queue NFO refresh for '%s': %s", anime_key, str(e))
logger.info(
"Metadata IDs updated for '%s': tmdb_id=%s, tvdb_id=%s, NFO_queued=%s",
anime_key,
update_data.tmdb_id,
update_data.tvdb_id,
nfo_queued
)
return {
"success": True,
"key": anime_key,
"tmdb_id": update_data.tmdb_id,
"tvdb_id": update_data.tvdb_id,
"nfo_refresh_queued": nfo_queued,
"message": "Metadata IDs updated. NFO refresh queued." if nfo_queued
else "Metadata IDs updated."
}
@router.post("/{anime_key}/refresh-nfo", response_model=dict)
async def refresh_series_nfo(
anime_key: str,
db: AsyncSession = Depends(get_optional_database_session),
series_app: Any = Depends(get_series_app),
) -> dict:
"""Force NFO re-generation for a series using current IDs.
This endpoint triggers a background NFO re-generation using the
existing TMDB/TVDB IDs (or creating minimal NFO if no IDs exist).
Args:
anime_key: Series key
db: Database session
series_app: SeriesApp instance
Returns:
Status of NFO refresh operation
Raises:
HTTPException: If series not found
"""
if db:
series_db = await AnimeSeriesService.get_by_key(db, anime_key)
if not db or not series_db:
# Check in-memory
found = None
if series_app and hasattr(series_app, "list"):
for serie in series_app.list.GetList():
if getattr(serie, "key", None) == anime_key:
found = serie
break
if not found:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Series with key '{anime_key}' not found"
)
background_loader = None
try:
background_loader = await get_background_loader_service()
except Exception:
pass
if not background_loader:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Background loader service not available"
)
series_for_bg = None
if db:
async with get_db_session() as bg_db:
series_for_bg = await AnimeSeriesService.get_by_key(bg_db, anime_key)
if not series_for_bg:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Series with key '{anime_key}' not found"
)
try:
await background_loader.load_series_nfo(
series_for_bg.key,
series_for_bg.folder,
series_for_bg.name,
force_refresh=True
)
nfo_queued = True
except Exception as e:
logger.error("Failed to queue NFO refresh for '%s': %s", anime_key, str(e))
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to queue NFO refresh: {str(e)}"
)
logger.info("NFO refresh queued for '%s'", anime_key)
return {
"success": True,
"key": anime_key,
"message": "NFO refresh queued"
}
# Maximum allowed input size for security
MAX_INPUT_LENGTH = 100000 # 100KB

View File

@@ -617,6 +617,10 @@ class SystemSettings(Base, TimestampMixin):
Boolean, nullable=False, default=False, server_default="0",
doc="Whether legacy key/data file migration has been completed"
)
legacy_key_cleanup_completed: Mapped[bool] = mapped_column(
Boolean, nullable=False, default=False, server_default="0",
doc="Whether legacy key file cleanup has been completed"
)
last_scan_timestamp: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True,
doc="Timestamp of the last completed scan"

View File

@@ -155,6 +155,36 @@ class SystemSettingsService:
await db.commit()
logger.info("Marked legacy files migration as completed")
@staticmethod
async def is_legacy_key_cleanup_completed(db: AsyncSession) -> bool:
"""Check if legacy key file cleanup has been completed.
Args:
db: Database session
Returns:
True if cleanup is completed, False otherwise
"""
settings = await SystemSettingsService.get_or_create(db)
return settings.legacy_key_cleanup_completed
@staticmethod
async def mark_legacy_key_cleanup_completed(
db: AsyncSession,
timestamp: Optional[datetime] = None
) -> None:
"""Mark the legacy key file cleanup as completed.
Args:
db: Database session
timestamp: Optional timestamp to set, defaults to current time
"""
settings = await SystemSettingsService.get_or_create(db)
settings.legacy_key_cleanup_completed = True
settings.last_scan_timestamp = timestamp or datetime.now(timezone.utc)
await db.commit()
logger.info("Marked legacy key file cleanup as completed")
@staticmethod
async def mark_initial_media_scan_completed(
db: AsyncSession,
@@ -184,6 +214,8 @@ class SystemSettingsService:
settings.initial_scan_completed = False
settings.initial_nfo_scan_completed = False
settings.initial_media_scan_completed = False
settings.migration_legacy_files_completed = False
settings.legacy_key_cleanup_completed = False
settings.last_scan_timestamp = None
await db.commit()
logger.info("Reset all scan completion flags")

View File

@@ -44,6 +44,18 @@ class SchedulerConfig(BaseModel):
description="Run folder maintenance (NFO repair, folder renaming, "
"poster checks) during the scheduled run.",
)
# Legacy alias fields — read via Pydantic alias
auto_download: Optional[bool] = Field(default=None, alias="auto_download")
folder_scan: Optional[bool] = Field(default=None, alias="folder_scan")
def __init__(self, **data):
super().__init__(**data)
# Map legacy keys to primary fields only when primary key absent from data.
# "key in data" checks for explicit presence (even False/None), not just truthiness.
if self.auto_download is not None and "auto_download_after_rescan" not in data:
object.__setattr__(self, "auto_download_after_rescan", self.auto_download)
if self.folder_scan is not None and "folder_scan_enabled" not in data:
object.__setattr__(self, "folder_scan_enabled", self.folder_scan)
@field_validator("schedule_time")
@classmethod
@@ -69,6 +81,22 @@ class SchedulerConfig(BaseModel):
)
return v
def model_dump(self, **kwargs) -> Dict[str, object]:
"""Serialize, excluding legacy alias fields when they are None.
The alias fields (auto_download, folder_scan) must not be written to
config.json as null entries, otherwise a roundtrip load sees the key
present (哪怕 value is None) and skips the alias-to-primary mapping.
"""
data = super().model_dump(**kwargs)
# Drop None alias fields so they don't pollute config.json.
# They are still settable via the constructor for backward compatibility.
if data.get("auto_download") is None:
data.pop("auto_download", None)
if data.get("folder_scan") is None:
data.pop("folder_scan", None)
return data
class BackupConfig(BaseModel):
"""Configuration for automatic backups of application data."""
@@ -171,6 +199,12 @@ class AppConfig(BaseModel):
logging: LoggingConfig = Field(default_factory=LoggingConfig)
backup: BackupConfig = Field(default_factory=BackupConfig)
nfo: NFOConfig = Field(default_factory=NFOConfig)
scan_key_overrides: Dict[str, str] = Field(
default_factory=dict,
description="Map of folder names to provider keys for scan overrides. "
"Used when auto-generated keys from folder names are incorrect. "
"Format: {\"Folder Name\": \"actual-provider-key\"}"
)
other: Dict[str, object] = Field(
default_factory=dict, description="Arbitrary other settings"
)
@@ -209,6 +243,7 @@ class ConfigUpdate(BaseModel):
logging: Optional[LoggingConfig] = None
backup: Optional[BackupConfig] = None
nfo: Optional[NFOConfig] = None
scan_key_overrides: Optional[Dict[str, str]] = None
other: Optional[Dict[str, object]] = None
def apply_to(self, current: AppConfig) -> AppConfig:
@@ -225,6 +260,8 @@ class ConfigUpdate(BaseModel):
data["backup"] = self.backup.model_dump()
if self.nfo is not None:
data["nfo"] = self.nfo.model_dump()
if self.scan_key_overrides is not None:
data["scan_key_overrides"] = self.scan_key_overrides
if self.other is not None:
merged = dict(current.other or {})
merged.update(self.other)

View File

@@ -528,6 +528,8 @@ class AnimeService:
"tmdb_id": db_series.tmdb_id,
"tvdb_id": db_series.tvdb_id,
"series_id": db_series.id,
"loading_status": db_series.loading_status,
"loading_error": db_series.loading_error,
}
# Build episodeDict from DB, skipping is_downloaded=True
@@ -596,6 +598,8 @@ class AnimeService:
"tmdb_id": nfo_data.get("tmdb_id"),
"tvdb_id": nfo_data.get("tvdb_id"),
"series_id": nfo_data.get("series_id"),
"loading_status": nfo_data.get("loading_status"),
"loading_error": nfo_data.get("loading_error"),
}
result_list.append(series_dict)

View File

@@ -144,7 +144,13 @@ class ConfigService:
# Save configuration with version
data = config.model_dump()
data["version"] = self.CONFIG_VERSION
# Re-serialize SchedulerConfig through its overridden model_dump so
# that None legacy alias fields are stripped before writing to disk.
# Pydantic converts nested models to plain dicts in model_dump() output,
# so we call the override explicitly on the scheduler field.
data["scheduler"] = config.scheduler.model_dump()
# Write to temporary file first for atomic operation
temp_path = self.config_path.with_suffix(".tmp")
try:

View File

@@ -13,8 +13,9 @@ reflect the new paths.
from __future__ import annotations
import logging
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Set, Tuple
from lxml import etree
@@ -34,6 +35,141 @@ logger = logging.getLogger(__name__)
INVALID_PATH_CHARS = '<>:"/\\|?*\x00'
class DuplicateGroup:
"""Represents a group of duplicate folders for the same series.
Attributes:
key: The series key (folder name before rename).
folders: List of folder paths that map to this series.
nfo_paths: List of corresponding NFO file paths.
"""
def __init__(self, key: str, folders: List[str], nfo_paths: List[Path]):
self.key = key
self.folders = folders
self.nfo_paths = nfo_paths
@property
def count(self) -> int:
return len(self.folders)
def __repr__(self) -> str:
return f"DuplicateGroup(key={self.key!r}, folders={self.folders})"
def _scan_for_pre_existing_duplicates(anime_dir: Path) -> List[DuplicateGroup]:
"""Scan anime directory for pre-existing duplicate folders.
Groups folders by the series key extracted from their NFO files.
Folders with the same title+year (same expected name) are flagged as duplicates.
Args:
anime_dir: Path to the anime directory to scan.
Returns:
List of DuplicateGroup objects, one per series with duplicate folders.
"""
# Group folders by their expected name (title+year from NFO)
groups: Dict[str, List[Tuple[str, Path]]] = defaultdict(list)
for series_dir in anime_dir.iterdir():
if not series_dir.is_dir():
continue
nfo_path = series_dir / "tvshow.nfo"
if not nfo_path.exists():
continue
title, year = _parse_nfo_title_and_year(nfo_path)
if not title or not year:
continue
expected_name = _compute_expected_folder_name(title, year)
groups[expected_name].append((series_dir.name, nfo_path))
# Filter to only groups with more than one folder
duplicates = []
for key, items in groups.items():
if len(items) > 1:
folders = [item[0] for item in items]
nfo_paths = [item[1] for item in items]
duplicates.append(DuplicateGroup(key=key, folders=folders, nfo_paths=nfo_paths))
return duplicates
def _try_merge_duplicate_group(group: DuplicateGroup, dry_run: bool = False) -> bool:
"""Attempt to merge a duplicate group automatically.
Uses the first folder as the canonical one and removes others if they are
empty or contain only symlinks.
Args:
group: The DuplicateGroup to merge.
dry_run: If True, only log actions without executing them.
Returns:
True if merge was successful, False otherwise.
"""
if len(group.folders) < 2:
return True
# Keep first folder as canonical, mark others for removal
canonical = group.folders[0]
to_remove = group.folders[1:]
for folder in to_remove:
folder_path = group.nfo_paths[0].parent.parent / folder # same parent dir
if not folder_path.exists():
continue
# Check if folder is empty or only has symlinks
try:
contents = list(folder_path.iterdir())
except PermissionError:
logger.warning("Permission denied accessing %s, skip merge", folder_path)
return False
except OSError:
return False
if not contents:
# Empty folder - safe to remove
if dry_run:
logger.info("[DRY-RUN] Would delete empty duplicate folder: %s", folder_path)
else:
try:
folder_path.rmdir()
logger.info("Deleted empty duplicate folder: %s", folder_path)
except OSError:
return False
continue
# Check if all contents are symlinks pointing to canonical
all_symlinks = all(
item.is_symlink() and item.resolve() == (folder_path.parent / canonical).resolve()
for item in contents
)
if all_symlinks:
if dry_run:
logger.info("[DRY-RUN] Would remove symlinks in duplicate folder: %s", folder_path)
else:
for item in contents:
item.unlink()
try:
folder_path.rmdir()
logger.info("Removed symlink-only duplicate folder: %s", folder_path)
except OSError:
return False
continue
# Cannot auto-merge - requires manual intervention
logger.warning(
"Cannot auto-merge duplicate folders for '%s': %s (manual merge required)",
group.key,
[canonical] + to_remove,
)
return False
return True
def _parse_nfo_title_and_year(nfo_path: Path) -> Tuple[Optional[str], Optional[str]]:
"""Parse a tvshow.nfo and return (title, year) text values.
@@ -115,6 +251,136 @@ def _is_series_being_downloaded(series_folder: str) -> bool:
return True
def _cleanup_stale_files_after_rename(new_path: Path, new_name: str) -> None:
"""Remove legacy 'key' file after successful folder rename.
Also checks for orphaned folders with the same key that may have been
left behind from previous rename operations.
Args:
new_path: The new folder path after rename.
new_name: The new folder name.
"""
key_file = new_path / "key"
if key_file.exists():
try:
key_file.unlink()
logger.info(
"Removed legacy 'key' file after rename: %s", key_file
)
except OSError as exc:
logger.warning(
"Could not remove legacy 'key' file %s: %s", key_file, exc
)
def _cleanup_orphaned_folder(old_path: Path, new_path: Path, dry_run: bool = False) -> bool:
"""Clean up orphaned folder after successful rename.
After a folder is successfully renamed to new_path, this function checks
if the old_path still exists (orphaned folder) and removes it. If the
old folder contains files, they are moved to new_path before deletion.
Args:
old_path: The original folder path before rename.
new_path: The new folder path after rename.
dry_run: If True, only log actions without executing them.
Returns:
True if old folder was cleaned up (or would be in dry-run mode),
False if old folder does not exist or cleanup failed.
"""
if not old_path.exists():
logger.debug(
"Old folder does not exist, no cleanup needed: %s", old_path
)
return False
# Check if folder is empty
try:
contents = list(old_path.iterdir())
except PermissionError as exc:
logger.warning(
"Permission denied accessing old folder %s: %s", old_path, exc
)
return False
except OSError as exc:
logger.warning(
"OS error accessing old folder %s: %s", old_path, exc
)
return False
if not contents:
# Empty folder — delete it
if dry_run:
logger.info(
"[DRY-RUN] Would delete empty orphaned folder: %s", old_path
)
return True
try:
old_path.rmdir()
logger.info("Deleted empty orphaned folder: %s", old_path)
return True
except PermissionError as exc:
logger.warning(
"Permission denied deleting folder %s: %s", old_path, exc
)
return False
except OSError as exc:
logger.warning(
"OS error deleting folder %s: %s", old_path, exc
)
return False
# Folder has contents — move files to new_path then delete
if dry_run:
logger.info(
"[DRY-RUN] Would move %d files from orphaned folder %s to %s",
len(contents), old_path, new_path
)
for item in contents:
logger.info("[DRY-RUN] Would move: %s%s", item, new_path / item.name)
logger.info("[DRY-RUN] Would then delete orphaned folder: %s", old_path)
return True
files_moved = 0
errors = 0
for item in contents:
try:
dest = new_path / item.name
item.rename(dest)
logger.debug("Moved %s%s", item, dest)
files_moved += 1
except PermissionError as exc:
logger.warning(
"Permission denied moving %s: %s", item, exc
)
errors += 1
except OSError as exc:
logger.warning(
"OS error moving %s: %s", item, exc
)
errors += 1
if files_moved > 0:
logger.info(
"Moved %d files from orphaned folder to %s",
files_moved, new_path
)
# Delete the now-empty old folder
try:
old_path.rmdir()
logger.info("Deleted orphaned folder after moving contents: %s", old_path)
return errors == 0
except OSError as exc:
logger.warning(
"Could not delete orphaned folder %s (may not be empty): %s",
old_path, exc
)
return False
async def _update_database_paths(
old_folder: str,
new_folder: str,
@@ -211,7 +477,7 @@ async def _update_database_paths(
)
async def validate_and_rename_series_folders() -> Dict[str, int]:
async def validate_and_rename_series_folders(dry_run: bool = False) -> Dict[str, int]:
"""Validate and rename series folders to match NFO metadata.
Iterates over every subfolder in ``settings.anime_directory`` that
@@ -226,6 +492,10 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
Skips folders where title or year is missing/empty. Logs every
rename action.
Args:
dry_run: If True, simulate rename operations without actually
moving folders or updating the database.
Returns:
Dictionary with counts:
- ``"scanned"``: total folders scanned
@@ -244,8 +514,33 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
)
return {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
if dry_run:
logger.info("Running in DRY-RUN mode — no changes will be made")
stats = {"scanned": 0, "renamed": 0, "skipped": 0, "errors": 0}
# Detect pre-existing duplicates before rename loop
pre_existing_duplicates: Set[str] = set()
duplicates = _scan_for_pre_existing_duplicates(anime_dir)
for dup_group in duplicates:
# Try automatic merge first
if _try_merge_duplicate_group(dup_group, dry_run=dry_run):
logger.info(
"Auto-merged duplicate group for '%s' (%d folders)",
dup_group.key,
dup_group.count,
)
else:
# Flag all folders in this group as pre-existing duplicates
for folder in dup_group.folders:
pre_existing_duplicates.add(folder)
logger.warning(
"Duplicate folders detected for series '%s': %s"
"manual cleanup required (different releases or non-empty duplicates)",
dup_group.key,
dup_group.folders,
)
for series_dir in sorted(anime_dir.iterdir()):
if not series_dir.is_dir():
continue
@@ -285,6 +580,15 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
expected_path = anime_dir / expected_name
# Check for pre-existing duplicate
if current_name in pre_existing_duplicates:
logger.warning(
"Skipping rename for '%s' — pre-existing duplicate folder detected",
current_name,
)
stats["errors"] += 1
continue
# Check for duplicate target
if expected_path.exists():
logger.warning(
@@ -292,7 +596,55 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
current_name,
expected_name,
)
stats["errors"] += 1
# Target folder exists — remove source folder and delete its DB record
# (target folder's DB record survives, source folder's record must be removed
# to avoid orphaning episodes/downloads)
try:
import shutil
logger.warning(
"Removing source duplicate folder '%s' — target '%s' already exists",
current_name,
expected_name,
)
shutil.rmtree(series_dir)
logger.info(
"Removed source folder '%s' — series already exists at target",
current_name,
)
# Delete source DB record (cascades to episodes and download items)
async with get_db_session() as db:
source_series = await AnimeSeriesService.get_by_key(db, current_name)
if source_series is None:
# Fallback: find by folder name
all_series = await AnimeSeriesService.get_all(db)
for s in all_series:
if s.folder == current_name:
source_series = s
break
if source_series is not None:
await AnimeSeriesService.delete(db, source_series.id)
logger.info(
"Deleted source DB record for '%s' (id=%s) — target folder '%s' retains DB record",
current_name,
source_series.id,
expected_name,
)
else:
logger.info(
"No DB record found for source folder '%s' — folder removed only",
current_name,
)
stats["renamed"] += 1
except OSError as exc:
logger.error(
"Failed to remove source folder '%s': %s",
current_name,
exc,
)
stats["errors"] += 1
continue
# Check path length limits
@@ -305,7 +657,17 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
stats["errors"] += 1
continue
if dry_run:
logger.info(
"[DRY-RUN] Would rename folder: '%s''%s'",
current_name,
expected_name,
)
stats["renamed"] += 1
continue
try:
old_path = series_dir
series_dir.rename(expected_path)
logger.info(
"Renamed folder: '%s''%s'", current_name, expected_name
@@ -315,6 +677,12 @@ async def validate_and_rename_series_folders() -> Dict[str, int]:
# Update database records
await _update_database_paths(current_name, expected_name, anime_dir)
# Clean up stale/legacy files after successful rename
_cleanup_stale_files_after_rename(expected_path, expected_name)
# Clean up orphaned folder if old path still exists
_cleanup_orphaned_folder(old_path, expected_path, dry_run=False)
except PermissionError as exc:
logger.error(
"Permission denied renaming '%s''%s': %s",

View File

@@ -1,5 +1,6 @@
"""Centralized initialization service for application startup and setup."""
import asyncio
import os
from pathlib import Path
from typing import Callable, Optional
@@ -122,6 +123,28 @@ async def _mark_legacy_migration_completed() -> None:
)
async def _check_legacy_key_cleanup_status() -> bool:
"""Check if legacy key file cleanup has been completed.
Returns:
bool: True if cleanup was completed, False otherwise
"""
return await _check_scan_status(
check_method=lambda svc, db: svc.is_legacy_key_cleanup_completed(db),
scan_type="legacy_key_cleanup",
log_completed_msg="Legacy key file cleanup already completed, skipping",
log_not_completed_msg="Legacy key file cleanup not yet run, will clean up key files"
)
async def _mark_legacy_key_cleanup_completed() -> None:
"""Mark the legacy key file cleanup as completed in system settings."""
await _mark_scan_completed(
mark_method=lambda svc, db: svc.mark_legacy_key_cleanup_completed(db),
scan_type="legacy_key_cleanup"
)
async def _migrate_legacy_files() -> int:
"""Migrate series from legacy key/data files to database.
@@ -151,6 +174,78 @@ async def _migrate_legacy_files() -> int:
return 0
async def _cleanup_legacy_key_files() -> int:
"""Remove legacy key files from folders that already have DB entries.
This is a one-time cleanup task that runs at startup after legacy migration.
It removes deprecated 'key' files that cause duplicate key errors when
folders are renamed, since the DB is now the source of truth.
Returns:
int: Number of key files deleted
"""
from src.server.database.connection import get_db_session
from src.server.database.service import AnimeSeriesService
logger.info("Checking for legacy key files to clean up...")
if not settings.anime_directory or not os.path.isdir(settings.anime_directory):
logger.warning(
"Anime directory not configured or does not exist, skipping legacy key cleanup"
)
return 0
deleted_count = 0
scanned_count = 0
try:
async with get_db_session() as db:
# Get all series from DB to know which folders should have key files removed
all_series = await AnimeSeriesService.get_all(db)
# Build a set of known folder names from DB
db_folders: set[str] = {series.folder for series in all_series if series.folder}
for folder_name in db_folders:
folder_path = settings.anime_directory / folder_name
key_file = folder_path / "key"
if not key_file.exists():
continue
scanned_count += 1
try:
key_file.unlink()
deleted_count += 1
logger.info(
"Removed legacy key file",
folder=folder_name,
key_file=str(key_file)
)
except OSError as exc:
logger.warning(
"Could not remove legacy key file",
folder=folder_name,
key_file=str(key_file),
error=str(exc)
)
except Exception as e:
logger.error(
"Legacy key file cleanup failed",
error=str(e),
exc_info=True
)
return deleted_count
logger.info(
"Legacy key file cleanup complete",
scanned=scanned_count,
deleted=deleted_count
)
return deleted_count
async def _sync_anime_folders(progress_service=None) -> int:
"""Scan anime folders and sync series to database.
@@ -287,6 +382,13 @@ async def perform_initial_setup(progress_service=None):
# Sync series from anime folders to database
await _sync_anime_folders(progress_service)
# Clean up legacy key files from folders that now have DB entries
# This runs after migration/sync to ensure DB entries exist before deletion
is_key_cleanup_done = await _check_legacy_key_cleanup_status()
if not is_key_cleanup_done:
await _cleanup_legacy_key_files()
await _mark_legacy_key_cleanup_completed()
# Mark the initial scan as completed
await _mark_initial_scan_completed()

View File

@@ -4,17 +4,16 @@ Uses APScheduler's AsyncIOScheduler with CronTrigger for precise
cron-based scheduling. The legacy interval-based loop has been removed
in favour of the cron approach.
Jobs are persisted to a SQLite database so they survive process restarts.
On startup, if the last scheduled run was missed (server was down at the
cron time), the job is triggered immediately within a grace period.
Jobs are held in memory (no separate scheduler database). On startup,
if the last scan timestamp indicates a missed run (server was down at the
scheduled cron time), a rescan is triggered immediately.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from typing import List, Optional
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
@@ -83,10 +82,9 @@ class SchedulerService:
logger.error("Failed to load scheduler configuration: %s", exc)
raise SchedulerServiceError(f"Failed to load config: {exc}") from exc
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///./data/scheduler.db"),
}
self._scheduler = AsyncIOScheduler(jobstores=jobstores)
# Use in-memory job store — no separate scheduler.db needed.
# Jobs are reconstructed from config on every startup.
self._scheduler = AsyncIOScheduler()
if not self._config.enabled:
logger.info("Scheduler is disabled in configuration — not adding jobs")
@@ -125,10 +123,7 @@ class SchedulerService:
self._scheduler.start()
self._is_running = True
# Startup recovery: if the server was down at the scheduled time and
# the job is within the misfire window, APScheduler will run it
# automatically. Log the scheduled time for visibility.
# Note: next_run_time is only available AFTER scheduler.start()
# Log next scheduled run for visibility.
job = self._scheduler.get_job(_JOB_ID)
if job:
next_run = job.next_run_time
@@ -137,6 +132,11 @@ class SchedulerService:
next_run.isoformat() if next_run else None,
)
# Startup misfire recovery: check if the last scan was missed while
# the server was down. If overdue by more than one interval but within
# the grace period, trigger an immediate rescan.
await self._check_missed_run()
async def stop(self) -> None:
"""Stop the APScheduler gracefully."""
logger.info("SchedulerService.stop() called")
@@ -303,6 +303,67 @@ class SchedulerService:
)
return trigger
async def _check_missed_run(self) -> None:
"""Check if a scheduled rescan was missed while the server was down.
Compares system_settings.last_scan_timestamp against the expected
schedule. If the last scan is overdue (more than 24h ago for a daily
schedule) but within the grace period, triggers an immediate rescan.
"""
if not self._config or not self._config.enabled:
return
if not self._config.schedule_days:
return
try:
from src.server.database.connection import ( # noqa: PLC0415
get_db_session,
)
from src.server.database.system_settings_service import ( # noqa: PLC0415
SystemSettingsService,
)
async with get_db_session() as db:
settings = await SystemSettingsService.get_or_create(db)
last_scan = settings.last_scan_timestamp
if last_scan is None:
# Never scanned before — trigger immediately
logger.info("No previous scan recorded — triggering immediate rescan")
await self._perform_rescan()
return
# Ensure timezone-aware comparison
if last_scan.tzinfo is None:
last_scan = last_scan.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
elapsed = now - last_scan
# If last scan was more than 24h + grace period ago, don't trigger
# (avoids surprise rescans after long downtime).
max_overdue = timedelta(hours=24, seconds=_MISFIRE_GRACE_SECONDS)
# If last scan was more than ~25h ago, skip (too stale)
if elapsed > max_overdue:
logger.info(
"Last scan was %s ago (> %s) — skipping missed-run recovery",
elapsed,
max_overdue,
)
return
# Check if a run should have happened between last_scan and now.
# Simple heuristic: if elapsed > 24h, we missed at least one daily run.
if elapsed > timedelta(hours=23):
logger.info(
"Missed scheduled rescan detected (last scan %s ago) — triggering now",
elapsed,
)
await self._perform_rescan()
except Exception as exc: # pylint: disable=broad-exception-caught
logger.warning("Missed-run check failed (non-fatal): %s", exc)
async def _broadcast(self, event_type: str, data: dict) -> None:
"""Broadcast a WebSocket event to all connected clients."""
try:

View File

@@ -442,6 +442,23 @@ class AniWorldApp {
this.hideConfigModal();
});
// Edit key modal
document.getElementById('close-edit-key').addEventListener('click', () => {
this.hideEditKeyModal();
});
document.getElementById('cancel-edit-key').addEventListener('click', () => {
this.hideEditKeyModal();
});
document.querySelector('#edit-key-modal .modal-overlay').addEventListener('click', () => {
this.hideEditKeyModal();
});
document.getElementById('save-edit-key').addEventListener('click', () => {
this.saveManualKey();
});
// Scheduler configuration
document.getElementById('scheduled-rescan-enabled').addEventListener('change', () => {
this.toggleSchedulerTimeInput();
@@ -1547,6 +1564,72 @@ class AniWorldApp {
document.getElementById('config-modal').classList.add('hidden');
}
showEditKeyModal(key, folder) {
this._currentEditKey = key;
document.getElementById('edit-key-folder').textContent = folder;
document.getElementById('edit-key-input').value = '';
document.getElementById('edit-key-error').classList.add('hidden');
document.getElementById('edit-key-modal').classList.remove('hidden');
}
hideEditKeyModal() {
document.getElementById('edit-key-modal').classList.add('hidden');
this._currentEditKey = null;
}
async saveManualKey() {
const oldKey = this._currentEditKey;
const newKey = document.getElementById('edit-key-input').value.trim();
const errorEl = document.getElementById('edit-key-error');
if (!newKey || newKey.length < 2) {
errorEl.textContent = 'Key must be at least 2 characters';
errorEl.classList.remove('hidden');
return;
}
// Validate key format (URL-safe)
if (!/^[a-zA-Z0-9][a-zA-Z0-9_-]*$/.test(newKey)) {
errorEl.textContent = 'Key must be URL-safe (alphanumeric, hyphens, underscores only)';
errorEl.classList.remove('hidden');
return;
}
try {
const response = await this.makeAuthenticatedRequest(
`/api/anime/${encodeURIComponent(oldKey)}/manual-key`,
{
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ key: newKey })
}
);
if (!response) {
errorEl.textContent = 'Failed to update key';
errorEl.classList.remove('hidden');
return;
}
if (response.ok) {
this.hideEditKeyModal();
this.showToast(`Key updated: ${oldKey}${newKey}`, 'success');
// Reload series list
if (typeof AniWorld.SeriesManager !== 'undefined') {
AniWorld.SeriesManager.loadSeries();
}
} else {
const data = await response.json().catch(() => ({ detail: 'Update failed' }));
errorEl.textContent = data.detail || 'Failed to update key';
errorEl.classList.remove('hidden');
}
} catch (err) {
console.error('Error saving manual key:', err);
errorEl.textContent = 'Error updating key';
errorEl.classList.remove('hidden');
}
}
async loadSchedulerConfig() {
try {
const response = await this.makeAuthenticatedRequest('/api/scheduler/config');
@@ -2344,4 +2427,336 @@ document.addEventListener('DOMContentLoaded', () => {
});
// Global functions for inline event handlers
window.app = null;
window.app = null;
/**
* Show the edit key modal
* @param {string} currentKey - The current series key
* @param {string} folderName - The folder name
*/
function showEditKeyModal(currentKey, folderName) {
const modal = document.getElementById('edit-key-modal');
const overlay = document.getElementById('edit-key-overlay');
const folderSpan = document.getElementById('edit-key-folder');
const keyInput = document.getElementById('edit-key-input');
const errorSpan = document.getElementById('edit-key-error');
const saveBtn = document.getElementById('edit-key-save');
if (!modal || !overlay || !folderSpan || !keyInput) {
console.error('Edit key modal elements not found');
return;
}
// Reset state
folderSpan.textContent = folderName;
keyInput.value = currentKey;
keyInput.dataset.originalKey = currentKey;
errorSpan.textContent = '';
errorSpan.style.display = 'none';
saveBtn.disabled = false;
// Show modal
modal.classList.remove('hidden');
overlay.classList.remove('hidden');
keyInput.focus();
keyInput.select();
}
/**
* Hide the edit key modal
*/
function hideEditKeyModal() {
const modal = document.getElementById('edit-key-modal');
const overlay = document.getElementById('edit-key-overlay');
if (modal) {
modal.classList.add('hidden');
}
if (overlay) {
overlay.classList.add('hidden');
}
}
/**
* Save the manual key for a series
* @param {string} oldKey - The original key
* @param {string} newKey - The new key to set
*/
async function saveManualKey(oldKey, newKey) {
const errorSpan = document.getElementById('edit-key-error');
const saveBtn = document.getElementById('edit-key-save');
if (!errorSpan || !saveBtn) {
console.error('Edit key modal elements not found');
return;
}
errorSpan.textContent = '';
errorSpan.style.display = 'none';
saveBtn.disabled = true;
try {
const response = await fetch(`/api/anime/${encodeURIComponent(oldKey)}/manual-key`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ key: newKey }),
});
const data = await response.json();
if (!response.ok) {
errorSpan.textContent = data.detail || 'Failed to update key';
errorSpan.style.display = 'block';
saveBtn.disabled = false;
return;
}
// Success - hide modal and reload
hideEditKeyModal();
showToast('Key updated successfully', 'success');
// Reload series list
if (window.app && window.app.loadSeries) {
window.app.loadSeries();
} else if (typeof loadSeries === 'function') {
loadSeries();
} else {
location.reload();
}
} catch (error) {
console.error('Error saving manual key:', error);
errorSpan.textContent = 'Network error: ' + error.message;
errorSpan.style.display = 'block';
saveBtn.disabled = false;
}
}
// Current metadata edit state
let _currentEditMetadataKey = null;
/**
* Show the edit metadata IDs modal
* @param {string} key - The series key
* @param {string} name - The series name
* @param {number|null} currentTmdbId - Current TMDB ID
* @param {number|null} currentTvdbId - Current TVDB ID
*/
function showEditMetadataModal(key, name, currentTmdbId, currentTvdbId) {
const modal = document.getElementById('edit-metadata-modal');
const overlay = modal ? modal.querySelector('.modal-overlay') : null;
const nameSpan = document.getElementById('edit-metadata-series-name');
const tmdbInput = document.getElementById('edit-metadata-tmdb');
const tvdbInput = document.getElementById('edit-metadata-tvdb');
const errorSpan = document.getElementById('edit-metadata-error');
const saveBtn = document.getElementById('save-edit-metadata');
const cancelBtn = document.getElementById('cancel-edit-metadata');
if (!modal || !nameSpan || !tmdbInput || !tvdbInput) {
console.error('Edit metadata modal elements not found');
return;
}
// Store current key
_currentEditMetadataKey = key;
// Reset state
nameSpan.textContent = name;
tmdbInput.value = currentTmdbId || '';
tvdbInput.value = currentTvdbId || '';
if (errorSpan) {
errorSpan.textContent = '';
errorSpan.classList.add('hidden');
}
if (saveBtn) saveBtn.disabled = false;
// Show modal
modal.classList.remove('hidden');
if (overlay) overlay.classList.remove('hidden');
tmdbInput.focus();
}
/**
* Hide the edit metadata modal
*/
function hideEditMetadataModal() {
const modal = document.getElementById('edit-metadata-modal');
if (modal) {
modal.classList.add('hidden');
}
_currentEditMetadataKey = null;
}
/**
* Save metadata IDs for a series
* @param {string} key - The series key
* @param {number|null} tmdbId - TMDB ID (null to clear)
* @param {number|null} tvdbId - TVDB ID (null to clear)
*/
async function saveMetadataIds(key, tmdbId, tvdbId) {
const errorSpan = document.getElementById('edit-metadata-error');
const saveBtn = document.getElementById('save-edit-metadata');
if (!errorSpan || !saveBtn) {
console.error('Edit metadata modal elements not found');
return;
}
errorSpan.textContent = '';
errorSpan.classList.add('hidden');
saveBtn.disabled = true;
try {
const body = {};
if (tmdbId !== '') body.tmdb_id = parseInt(tmdbId, 10) || null;
if (tvdbId !== '') body.tvdb_id = parseInt(tvdbId, 10) || null;
const response = await fetch(`/api/anime/${encodeURIComponent(key)}/metadata-ids`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
const data = await response.json();
if (!response.ok) {
errorSpan.textContent = data.detail || 'Failed to update metadata IDs';
errorSpan.classList.remove('hidden');
saveBtn.disabled = false;
return;
}
// Success - hide modal and show toast
hideEditMetadataModal();
showToast('Metadata IDs updated. NFO refresh queued.', 'success');
// Reload series list to reflect changes
if (window.app && window.app.loadSeries) {
window.app.loadSeries();
} else if (typeof loadSeries === 'function') {
loadSeries();
}
} catch (error) {
console.error('Error saving metadata IDs:', error);
errorSpan.textContent = 'Network error: ' + error.message;
errorSpan.classList.remove('hidden');
saveBtn.disabled = false;
}
}
/**
* Refresh NFO for a series
* @param {string} key - The series key
*/
async function refreshSeriesNfo(key) {
try {
const response = await fetch(`/api/anime/${encodeURIComponent(key)}/refresh-nfo`, {
method: 'POST',
});
const data = await response.json();
if (!response.ok) {
showToast('Failed to refresh NFO: ' + (data.detail || 'Unknown error'), 'error');
return;
}
showToast('NFO refresh queued', 'success');
} catch (error) {
console.error('Error refreshing NFO:', error);
showToast('Network error: ' + error.message, 'error');
}
}
// Bind edit metadata modal events if modal exists
document.addEventListener('DOMContentLoaded', () => {
const modal = document.getElementById('edit-metadata-modal');
const overlay = modal ? modal.querySelector('.modal-overlay') : null;
const cancelBtn = document.getElementById('cancel-edit-metadata');
const saveBtn = document.getElementById('save-edit-metadata');
const tmdbInput = document.getElementById('edit-metadata-tmdb');
const tvdbInput = document.getElementById('edit-metadata-tvdb');
if (cancelBtn) {
cancelBtn.addEventListener('click', hideEditMetadataModal);
}
if (overlay) {
overlay.addEventListener('click', hideEditMetadataModal);
}
if (saveBtn && tmdbInput && tvdbInput) {
saveBtn.addEventListener('click', () => {
if (_currentEditMetadataKey) {
saveMetadataIds(
_currentEditMetadataKey,
tmdbInput.value,
tvdbInput.value
);
}
});
}
if (tmdbInput) {
tmdbInput.addEventListener('keydown', (e) => {
if (e.key === 'Enter') {
e.preventDefault();
saveBtn.click();
} else if (e.key === 'Escape') {
hideEditMetadataModal();
}
});
}
if (tvdbInput) {
tvdbInput.addEventListener('keydown', (e) => {
if (e.key === 'Enter') {
e.preventDefault();
saveBtn.click();
} else if (e.key === 'Escape') {
hideEditMetadataModal();
}
});
}
});
// Bind edit key modal events if modal exists
document.addEventListener('DOMContentLoaded', () => {
const modal = document.getElementById('edit-key-modal');
const overlay = document.getElementById('edit-key-overlay');
const cancelBtn = document.getElementById('edit-key-cancel');
const saveBtn = document.getElementById('edit-key-save');
const keyInput = document.getElementById('edit-key-input');
if (cancelBtn) {
cancelBtn.addEventListener('click', hideEditKeyModal);
}
if (overlay) {
overlay.addEventListener('click', hideEditKeyModal);
}
if (saveBtn && keyInput) {
saveBtn.addEventListener('click', () => {
const originalKey = keyInput.dataset.originalKey;
const newKey = keyInput.value.trim();
if (newKey && newKey !== originalKey) {
saveManualKey(originalKey, newKey);
}
});
}
if (keyInput) {
keyInput.addEventListener('keydown', (e) => {
if (e.key === 'Enter') {
e.preventDefault();
saveBtn.click();
} else if (e.key === 'Escape') {
hideEditKeyModal();
}
});
}
});

View File

@@ -40,6 +40,31 @@ AniWorld.SeriesManager = (function() {
if (sortBtn) {
sortBtn.addEventListener('click', toggleAlphabeticalSort);
}
// Event delegation for dynamically created edit-key buttons
document.addEventListener('click', function(e) {
const editKeyBtn = e.target.closest('.edit-key-btn');
if (editKeyBtn) {
e.preventDefault();
const key = editKeyBtn.dataset.key;
const folder = editKeyBtn.dataset.folder;
if (window.showEditKeyModal) {
window.showEditKeyModal(key, folder);
}
}
const editMetadataBtn = e.target.closest('.edit-metadata-btn');
if (editMetadataBtn) {
e.preventDefault();
const key = editMetadataBtn.dataset.key;
const name = editMetadataBtn.dataset.name;
const tmdbId = editMetadataBtn.dataset.tmdbId || null;
const tvdbId = editMetadataBtn.dataset.tvdbId || null;
if (window.showEditMetadataModal) {
window.showEditMetadataModal(key, name, tmdbId, tvdbId);
}
}
});
}
/**
@@ -343,7 +368,8 @@ AniWorld.SeriesManager = (function() {
const canBeSelected = hasMissingEpisodes;
const hasNfo = serie.has_nfo || false;
const isLoading = serie.loading_status && serie.loading_status !== 'completed' && serie.loading_status !== 'failed';
const hasKeyError = serie.loading_error && serie.loading_error.includes('key cannot be None or empty');
// Debug logging for troubleshooting
if (serie.key === 'so-im-a-spider-so-what') {
console.log('[createSerieCard] Spider series:', {
@@ -356,6 +382,12 @@ AniWorld.SeriesManager = (function() {
});
}
const editKeyBtn = hasKeyError
? '<button class="btn btn-icon edit-key-btn" title="Fix key error" data-key="' + serie.key + '" data-folder="' + serie.folder + '"><i class="fas fa-key"></i></button>'
: '';
const editMetadataBtn = '<button class="btn btn-icon edit-metadata-btn" title="Edit Metadata IDs" data-key="' + serie.key + '" data-name="' + AniWorld.UI.escapeHtml(serie.name) + '" data-tmdb-id="' + (serie.tmdb_id || '') + '" data-tvdb-id="' + (serie.tvdb_id || '') + '"><i class="fas fa-database"></i></button>';
return '<div class="series-card ' + (isSelected ? 'selected' : '') + ' ' +
(hasMissingEpisodes ? 'has-missing' : 'complete') + ' ' +
(isLoading ? 'loading' : '') + '" ' +
@@ -368,9 +400,12 @@ AniWorld.SeriesManager = (function() {
'<div class="series-folder">' + AniWorld.UI.escapeHtml(serie.folder) + '</div>' +
'</div>' +
'<div class="series-status">' +
(hasKeyError ? '<i class="fas fa-exclamation-triangle key-error-badge" title="Key error: ' + serie.loading_error + '"></i>' : '') +
(hasMissingEpisodes ? '' : '<i class="fas fa-check-circle status-complete" title="Complete"></i>') +
(hasNfo ? '<i class="fas fa-file-alt nfo-badge nfo-exists" title="NFO metadata available"></i>' :
'<i class="fas fa-file-alt nfo-badge nfo-missing" title="No NFO metadata"></i>') +
editMetadataBtn +
editKeyBtn +
'</div>' +
'</div>' +
'<div class="series-stats">' +

View File

@@ -640,6 +640,88 @@
</div>
</div>
<!-- Edit Key Modal -->
<div id="edit-key-modal" class="modal hidden">
<div class="modal-overlay"></div>
<div class="modal-content">
<div class="modal-header">
<h3 data-text="edit-key-title">Edit Series Key</h3>
<button id="close-edit-key" class="btn btn-icon">
<i class="fas fa-times"></i>
</button>
</div>
<div class="modal-body">
<div class="config-item">
<label data-text="current-folder">Folder Name:</label>
<span id="edit-key-folder" class="config-value"></span>
</div>
<div class="config-item">
<label for="edit-key-input" data-text="new-key">New Key:</label>
<input type="text" id="edit-key-input" class="input-field"
placeholder="e.g., attack-on-titan" minlength="2">
<small class="config-hint" data-text="key-format-hint">
URL-safe key (alphanumeric, hyphens, underscores)
</small>
</div>
<div id="edit-key-error" class="config-error hidden"></div>
</div>
<div class="modal-footer">
<button id="save-edit-key" class="btn btn-primary">
<i class="fas fa-save"></i>
<span data-text="save">Save</span>
</button>
<button id="cancel-edit-key" class="btn btn-secondary">
<span data-text="cancel">Cancel</span>
</button>
</div>
</div>
</div>
<!-- Edit Metadata IDs Modal -->
<div id="edit-metadata-modal" class="modal hidden">
<div class="modal-overlay"></div>
<div class="modal-content">
<div class="modal-header">
<h3 data-text="edit-metadata-title">Edit Metadata IDs</h3>
<button id="close-edit-metadata" class="btn btn-icon">
<i class="fas fa-times"></i>
</button>
</div>
<div class="modal-body">
<div class="config-item">
<label data-text="series-name">Series:</label>
<span id="edit-metadata-series-name" class="config-value"></span>
</div>
<div class="config-item">
<label for="edit-metadata-tmdb" data-text="tmdb-id">TMDB ID:</label>
<input type="number" id="edit-metadata-tmdb" class="input-field"
placeholder="e.g., 12345" min="1" step="1">
<small class="config-hint" data-text="tmdb-id-hint">
Leave blank to clear. Find IDs at themoviedb.org
</small>
</div>
<div class="config-item">
<label for="edit-metadata-tvdb" data-text="tvdb-id">TVDB ID:</label>
<input type="number" id="edit-metadata-tvdb" class="input-field"
placeholder="e.g., 67890" min="1" step="1">
<small class="config-hint" data-text="tvdb-id-hint">
Leave blank to clear. Find IDs at thetvdb.com
</small>
</div>
<div id="edit-metadata-error" class="config-error hidden"></div>
</div>
<div class="modal-footer">
<button id="save-edit-metadata" class="btn btn-primary">
<i class="fas fa-save"></i>
<span data-text="save-refresh">Save & Refresh NFO</span>
</button>
<button id="cancel-edit-metadata" class="btn btn-secondary">
<span data-text="cancel">Cancel</span>
</button>
</div>
</div>
</div>
<!-- Toast notifications -->
<div id="toast-container" class="toast-container"></div>
</div>

View File

@@ -95,6 +95,37 @@ class TestConfigServiceLoadSave:
assert loaded_config.logging.level == sample_config.logging.level
assert loaded_config.other == sample_config.other
def test_save_and_load_scheduler_flags_roundtrip(self, config_service):
"""Scheduler auto_download_after_rescan and folder_scan_enabled must
survive a full save/load roundtrip through ConfigService.
Regression test for a bug where null legacy alias fields
(auto_download=None, folder_scan=None) were written to config.json
on save. On reload the alias mapping was skipped (because the keys
were present), causing the primary boolean fields to reset to False.
"""
original = AppConfig(
scheduler=SchedulerConfig(
enabled=True,
auto_download_after_rescan=True,
folder_scan_enabled=True,
)
)
config_service.save_config(original, create_backup=False)
# Verify raw JSON does not contain legacy alias keys
with open(config_service.config_path, "r", encoding="utf-8") as f:
raw = json.load(f)
assert "auto_download" not in raw["scheduler"]
assert "folder_scan" not in raw["scheduler"]
assert raw["scheduler"]["auto_download_after_rescan"] is True
assert raw["scheduler"]["folder_scan_enabled"] is True
# Verify loaded config preserves values
loaded = config_service.load_config()
assert loaded.scheduler.auto_download_after_rescan is True
assert loaded.scheduler.folder_scan_enabled is True
def test_save_includes_version(self, config_service, sample_config):
"""Test that saved config includes version field."""
config_service.save_config(sample_config, create_backup=False)

View File

@@ -0,0 +1,222 @@
"""Tests for folder ignore patterns feature."""
import os
import tempfile
import warnings
from unittest.mock import patch
import pytest
from src.config.settings import Settings
class TestShouldIgnoreFolder:
"""Test should_ignore_folder method."""
def test_ignore_pattern_matches_exact(self):
"""Test exact folder name match."""
settings = Settings()
assert settings.should_ignore_folder("The Last of Us") is True
def test_ignore_pattern_matches_case_insensitive(self):
"""Test case-insensitive matching."""
settings = Settings()
assert settings.should_ignore_folder("the last of us") is True
assert settings.should_ignore_folder("THE LAST OF US") is True
def test_ignore_pattern_partial_match(self):
"""Test partial folder name match."""
settings = Settings()
assert settings.should_ignore_folder("Loki Season 2") is True
assert settings.should_ignore_folder("Chernobyl Complete") is True
def test_non_matching_folder_returns_false(self):
"""Test non-matching folder passes through."""
settings = Settings()
assert settings.should_ignore_folder("Attack on Titan") is False
assert settings.should_ignore_folder("Naruto") is False
def test_empty_folder_returns_false(self):
"""Test empty folder name."""
settings = Settings()
assert settings.should_ignore_folder("") is False
def test_custom_patterns_via_env_var(self, monkeypatch):
"""Test custom ignore patterns via environment variable."""
monkeypatch.setenv("NFO_FOLDER_IGNORE_PATTERNS", "MyShow|AnotherShow")
settings = Settings()
assert settings.should_ignore_folder("MyShow") is True
assert settings.should_ignore_folder("AnotherShow") is True
assert settings.should_ignore_folder("OtherShow") is False
def test_custom_patterns_case_insensitive_via_env_var(self, monkeypatch):
"""Test custom patterns respect case-insensitivity via env var."""
monkeypatch.setenv("NFO_FOLDER_IGNORE_PATTERNS", "myshow")
settings = Settings()
assert settings.should_ignore_folder("MyShow") is True
assert settings.should_ignore_folder("MYSHOW") is True
class TestFolderIgnorePatternsProperty:
"""Test folder_ignore_patterns property."""
def test_default_patterns_parsed(self):
"""Test default patterns are parsed correctly."""
settings = Settings()
patterns = settings.folder_ignore_patterns
assert len(patterns) > 0
assert "The Last of Us" in patterns
assert "Loki" in patterns
def test_empty_string_via_env_var_returns_empty_list(self, monkeypatch):
"""Test empty patterns string via env var."""
monkeypatch.setenv("NFO_FOLDER_IGNORE_PATTERNS", "")
settings = Settings()
patterns = settings.folder_ignore_patterns
assert patterns == []
def test_single_pattern_via_env_var(self, monkeypatch):
"""Test single pattern via env var."""
monkeypatch.setenv("NFO_FOLDER_IGNORE_PATTERNS", "TestShow")
settings = Settings()
patterns = settings.folder_ignore_patterns
# Single pattern in pipe-separated string
assert "TestShow" in patterns
def test_pipe_separated_patterns_via_env_var(self, monkeypatch):
"""Test pipe-separated patterns via env var."""
monkeypatch.setenv("NFO_FOLDER_IGNORE_PATTERNS", "Show1|Show2|Show3")
settings = Settings()
patterns = settings.folder_ignore_patterns
assert len(patterns) == 3
assert "Show1" in patterns
assert "Show2" in patterns
assert "Show3" in patterns
def test_pattern_with_spaces_trimmed_via_env_var(self, monkeypatch):
"""Test patterns with spaces are trimmed."""
monkeypatch.setenv("NFO_FOLDER_IGNORE_PATTERNS", "Show1 | Show2 | Show3 ")
settings = Settings()
patterns = settings.folder_ignore_patterns
# All patterns should be trimmed of whitespace
for p in patterns:
assert p == p.strip()
class TestSerieScannerIgnorePatterns:
"""Test SerieScanner respects ignore patterns."""
def test_scanner_skips_ignored_folders(self, tmp_path):
"""Test scanner skips folders matching ignore patterns."""
from src.core.SerieScanner import SerieScanner
from src.core.providers.aniworld_provider import AniworldLoader
# Create test folders
ignored_folder = tmp_path / "The Last of Us"
ignored_folder.mkdir()
(ignored_folder / "S01E01.mp4").touch()
normal_folder = tmp_path / "Attack on Titan"
normal_folder.mkdir()
(normal_folder / "S01E01.mp4").touch()
loader = AniworldLoader()
scanner = SerieScanner(str(tmp_path), loader)
# Get MP4 files - should only find Attack on Titan
mp4_files = list(scanner._SerieScanner__find_mp4_files())
folder_names = [name for name, _ in mp4_files]
assert "Attack on Titan" in folder_names
assert "The Last of Us" not in folder_names
def test_scanner_normal_folders_not_ignored(self, tmp_path):
"""Test normal folders are not skipped."""
from src.core.SerieScanner import SerieScanner
from src.core.providers.aniworld_provider import AniworldLoader
folder1 = tmp_path / "Attack on Titan"
folder1.mkdir()
(folder1 / "S01E01.mp4").touch()
folder2 = tmp_path / "Naruto"
folder2.mkdir()
(folder2 / "S01E01.mp4").touch()
loader = AniworldLoader()
scanner = SerieScanner(str(tmp_path), loader)
mp4_files = list(scanner._SerieScanner__find_mp4_files())
folder_names = [name for name, _ in mp4_files]
assert "Attack on Titan" in folder_names
assert "Naruto" in folder_names
def test_scanner_respects_default_ignore_patterns(self, tmp_path):
"""Test scanner respects default ignore patterns."""
from src.core.SerieScanner import SerieScanner
from src.core.providers.aniworld_provider import AniworldLoader
# Create folder matching default ignore pattern (Chernobyl)
ignored_folder = tmp_path / "Chernobyl Complete Series"
ignored_folder.mkdir()
(ignored_folder / "S01E01.mp4").touch()
normal_folder = tmp_path / "Normal Anime"
normal_folder.mkdir()
(normal_folder / "S01E01.mp4").touch()
loader = AniworldLoader()
scanner = SerieScanner(str(tmp_path), loader)
mp4_files = list(scanner._SerieScanner__find_mp4_files())
folder_names = [name for name, _ in mp4_files]
assert "Normal Anime" in folder_names
assert "Chernobyl Complete Series" not in folder_names
class TestSerieListIgnorePatterns:
"""Test SerieList respects ignore patterns."""
def test_load_series_skips_ignored_folders(self, tmp_path):
"""Test load_series skips folders matching ignore patterns."""
from src.core.entities.SerieList import SerieList
from src.core.entities.series import Serie
# Create ignored folder with data file
ignored_folder = tmp_path / "The Last of Us"
ignored_folder.mkdir()
ignored_data = ignored_folder / "data"
ignored_serie = Serie(
key="the-last-of-us",
name="The Last of Us",
site="https://aniworld.to/anime/stream/the-last-of-us",
folder="The Last of Us",
episodeDict={1: [1, 2, 3]}
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
ignored_serie.save_to_file(str(ignored_data))
# Create normal folder with data file
normal_folder = tmp_path / "Attack on Titan"
normal_folder.mkdir()
normal_data = normal_folder / "data"
normal_serie = Serie(
key="attack-on-titan",
name="Attack on Titan",
site="https://aniworld.to/anime/stream/attack-on-titan",
folder="Attack on Titan",
episodeDict={1: [1, 2]}
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
normal_serie.save_to_file(str(normal_data))
# Load series
serie_list = SerieList(str(tmp_path))
# Verify ignored folder was skipped
assert serie_list.contains("attack-on-titan") is True
assert serie_list.contains("the-last-of-us") is False

View File

@@ -9,6 +9,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.server.services.folder_rename_service import (
_cleanup_orphaned_folder,
_compute_expected_folder_name,
_is_series_being_downloaded,
_parse_nfo_title_and_year,
@@ -278,6 +279,71 @@ class TestUpdateDatabasePaths:
assert mock_episode.file_path == str(new_path)
class TestCleanupOrphanedFolder:
"""Tests for _cleanup_orphaned_folder."""
def test_returns_false_when_old_folder_does_not_exist(self, tmp_path: Path) -> None:
old_path = tmp_path / "nonexistent"
new_path = tmp_path / "new"
result = _cleanup_orphaned_folder(old_path, new_path)
assert result is False
def test_deletes_empty_folder(self, tmp_path: Path) -> None:
old_path = tmp_path / "empty_orphan"
old_path.mkdir()
new_path = tmp_path / "new"
new_path.mkdir()
result = _cleanup_orphaned_folder(old_path, new_path)
assert result is True
assert not old_path.exists()
def test_moves_files_and_deletes_folder(self, tmp_path: Path) -> None:
old_path = tmp_path / "old_orphan"
old_path.mkdir()
new_path = tmp_path / "new"
new_path.mkdir()
file1 = old_path / "S01E01.mkv"
file1.write_text("episode 1")
file2 = old_path / "S01E02.mkv"
file2.write_text("episode 2")
result = _cleanup_orphaned_folder(old_path, new_path)
assert result is True
assert not old_path.exists()
assert (new_path / "S01E01.mkv").exists()
assert (new_path / "S01E02.mkv").exists()
def test_dry_run_does_not_delete_empty_folder(self, tmp_path: Path) -> None:
old_path = tmp_path / "empty_orphan"
old_path.mkdir()
new_path = tmp_path / "new"
new_path.mkdir()
result = _cleanup_orphaned_folder(old_path, new_path, dry_run=True)
assert result is True
assert old_path.exists()
def test_dry_run_does_not_move_files(self, tmp_path: Path) -> None:
old_path = tmp_path / "old_orphan"
old_path.mkdir()
new_path = tmp_path / "new"
new_path.mkdir()
file1 = old_path / "S01E01.mkv"
file1.write_text("episode 1")
result = _cleanup_orphaned_folder(old_path, new_path, dry_run=True)
assert result is True
assert old_path.exists()
assert not (new_path / "S01E01.mkv").exists()
def test_handles_permission_error_gracefully(self, tmp_path: Path) -> None:
old_path = tmp_path / "permission_denied"
old_path.mkdir()
new_path = tmp_path / "new"
new_path.mkdir()
# Simulate permission error by patching rmdir
with patch.object(Path, "rmdir", side_effect=PermissionError("Access denied")):
result = _cleanup_orphaned_folder(old_path, new_path)
assert result is False
class TestValidateAndRenameSeriesFolders:
"""Integration-style tests for validate_and_rename_series_folders."""
@@ -389,7 +455,8 @@ class TestValidateAndRenameSeriesFolders:
assert series_dir.is_dir()
@pytest.mark.asyncio
async def test_errors_when_target_exists(self, tmp_path: Path) -> None:
async def test_duplicate_target_folder_source_removed_and_db_deleted(self, tmp_path: Path) -> None:
"""When target folder exists, source folder should be removed and its DB record deleted."""
anime_dir = tmp_path / "anime"
anime_dir.mkdir()
series_dir = anime_dir / "Attack on Titan"
@@ -398,7 +465,13 @@ class TestValidateAndRenameSeriesFolders:
"<tvshow><title>Attack on Titan</title><year>2013</year></tvshow>"
)
# Pre-create the target folder to simulate a duplicate
(anime_dir / "Attack on Titan (2013)").mkdir()
target_dir = anime_dir / "Attack on Titan (2013)"
target_dir.mkdir()
mock_db = AsyncMock()
mock_session = AsyncMock()
mock_db.__aenter__.return_value = mock_session
mock_db.__aexit__.return_value = None
with patch(
"src.server.services.folder_rename_service.settings.anime_directory",
@@ -406,14 +479,28 @@ class TestValidateAndRenameSeriesFolders:
), patch(
"src.server.services.folder_rename_service._is_series_being_downloaded",
return_value=False,
), patch(
"src.server.services.folder_rename_service.get_db_session",
return_value=mock_db,
), patch(
"src.server.services.folder_rename_service.AnimeSeriesService.get_by_key",
new_callable=AsyncMock,
return_value=None,
), patch(
"src.server.services.folder_rename_service.AnimeSeriesService.get_all",
new_callable=AsyncMock,
return_value=[],
):
stats = await validate_and_rename_series_folders()
# Source folder removed, target survives
assert not series_dir.exists()
assert target_dir.is_dir()
# Duplicate resolved: counts as renamed (source removed, target kept)
assert stats["scanned"] == 1
assert stats["renamed"] == 0
assert stats["renamed"] == 1
assert stats["skipped"] == 0
assert stats["errors"] == 1
assert series_dir.is_dir()
assert stats["errors"] == 0
@pytest.mark.asyncio
async def test_counts_multiple_folders(self, tmp_path: Path) -> None:
@@ -459,3 +546,30 @@ class TestValidateAndRenameSeriesFolders:
assert (anime_dir / "Show A (2020)").is_dir()
assert d2.is_dir()
assert d3.is_dir()
@pytest.mark.asyncio
async def test_dry_run_does_not_rename_folders(self, tmp_path: Path) -> None:
anime_dir = tmp_path / "anime"
anime_dir.mkdir()
series_dir = anime_dir / "Attack on Titan"
series_dir.mkdir()
(series_dir / "tvshow.nfo").write_text(
"<tvshow><title>Attack on Titan</title><year>2013</year></tvshow>"
)
with patch(
"src.server.services.folder_rename_service.settings.anime_directory",
str(anime_dir),
), patch(
"src.server.services.folder_rename_service._is_series_being_downloaded",
return_value=False,
):
stats = await validate_and_rename_series_folders(dry_run=True)
assert stats["scanned"] == 1
assert stats["renamed"] == 1
assert stats["skipped"] == 0
assert stats["errors"] == 0
# Original folder should still exist (not renamed in dry-run)
assert series_dir.is_dir()
assert not (anime_dir / "Attack on Titan (2013)").exists()

View File

@@ -0,0 +1,293 @@
"""
Unit tests for key generation utilities.
"""
import pytest
from src.core.utils.key_utils import (
generate_key_from_folder,
normalize_key,
is_valid_key,
sanitize_key_for_url,
validate_key_uniqueness,
)
class TestGenerateKeyFromFolder:
"""Test generate_key_from_folder function with edge cases."""
def test_standard_folder_name(self):
"""Test standard folder name with year."""
key = generate_key_from_folder("Attack on Titan (2013)")
assert key == "attack-on-titan-2013"
assert is_valid_key(key)
def test_a_time_called_you(self):
"""Test 'A Time Called You (2023)' - the specific failing case."""
key = generate_key_from_folder("A Time Called You (2023)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_andor_2022(self):
"""Test 'Andor (2022)' - the specific failing case."""
key = generate_key_from_folder("Andor (2022)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_japanese_mixed_folder(self):
"""Test '25-sai no Joshikousei (2018)' - Japanese + Latin mixed."""
key = generate_key_from_folder("25-sai no Joshikousei (2018)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_folder_with_only_special_characters(self):
"""Test folder that would slugify to empty string."""
key = generate_key_from_folder("!!!@@@###")
assert key is not None
assert key != ""
# Should use UUID fallback
def test_folder_with_only_numbers(self):
"""Test folder that is just numbers."""
key = generate_key_from_folder("12345")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_folder_with_parentheses_and_year(self):
"""Test folder with parentheses containing year."""
key = generate_key_from_folder("My Series (2020)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_folder_with_brackets(self):
"""Test folder with square brackets."""
key = generate_key_from_folder("My Series [Special] (2021)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_unicode_characters(self):
"""Test folder with various Unicode characters."""
key = generate_key_from_folder("Héros Légende (2022)")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_korean_characters(self):
"""Test folder with Korean characters."""
key = generate_key_from_folder("나의 애니메이션 (2023)")
assert key is not None
assert key != ""
def test_chinese_characters(self):
"""Test folder with Chinese characters."""
key = generate_key_from_folder("我的动漫 (2024)")
assert key is not None
assert key != ""
def test_empty_string_input(self):
"""Test empty string input raises ValueError."""
with pytest.raises(ValueError, match="Folder name cannot be empty"):
generate_key_from_folder("")
def test_only_whitespace_input(self):
"""Test whitespace-only input raises ValueError."""
with pytest.raises(ValueError, match="Folder name cannot be empty"):
generate_key_from_folder(" ")
def test_single_character_folder(self):
"""Test single character folder name."""
key = generate_key_from_folder("X")
assert key is not None
assert key != ""
assert is_valid_key(key)
def test_very_long_folder_name(self):
"""Test very long folder name."""
long_name = "A" * 200
key = generate_key_from_folder(long_name)
assert key is not None
assert key != ""
def test_multiple_spaces(self):
"""Test folder with multiple consecutive spaces."""
key = generate_key_from_folder("My Series Name")
assert key is not None
assert key != ""
def test_leading_trailing_spaces(self):
"""Test folder with leading and trailing spaces."""
key = generate_key_from_folder(" My Series ")
assert key is not None
assert key != ""
def test_diacritics_normalization(self):
"""Test that diacritics are properly normalized."""
key = generate_key_from_folder("Animé (2023)")
assert key is not None
assert is_valid_key(key)
class TestNormalizeKey:
"""Test normalize_key function."""
def test_normalize_standard_key(self):
"""Test normalizing a standard key."""
result = normalize_key("Attack-on-Titan")
assert result == "attack-on-titan"
def test_normalize_with_underscores(self):
"""Test normalizing key with underscores."""
result = normalize_key("attack_on_titan")
assert result == "attack-on-titan"
def test_normalize_mixed_case(self):
"""Test normalizing mixed case key."""
result = normalize_key("Attack_On_Titan")
assert result == "attack-on-titan"
def test_normalize_with_spaces(self):
"""Test normalizing key with spaces."""
result = normalize_key("attack on titan")
assert result == "attack-on-titan"
def test_normalize_empty_string(self):
"""Test normalizing empty string returns empty."""
result = normalize_key("")
assert result == ""
def test_normalize_only_special_chars(self):
"""Test normalizing string with only special characters."""
result = normalize_key("!!!@@@")
assert result == ""
class TestIsValidKey:
"""Test is_valid_key function."""
def test_valid_simple_key(self):
"""Test valid simple key."""
assert is_valid_key("attack-on-titan")
def test_valid_key_with_numbers(self):
"""Test valid key with numbers."""
assert is_valid_key("a-time-called-you-2023")
def test_valid_key_with_underscores(self):
"""Test valid key with underscores."""
assert is_valid_key("a_time_called_you_2023")
def test_valid_key_starting_with_number(self):
"""Test valid key starting with number."""
assert is_valid_key("25-sai-no-joshikousei-2018")
def test_invalid_empty_key(self):
"""Test invalid empty key."""
assert not is_valid_key("")
def test_invalid_key_with_spaces(self):
"""Test invalid key with spaces."""
assert not is_valid_key("attack on titan")
def test_invalid_key_with_special_chars(self):
"""Test invalid key with special characters."""
assert not is_valid_key("attack@titan")
def test_invalid_key_with_unicode(self):
"""Test invalid key with unstripped unicode."""
assert not is_valid_key("attack\u00a0titan") # Non-breaking space
def test_invalid_single_char(self):
"""Test invalid single character key."""
assert not is_valid_key("a")
def test_valid_two_char_key(self):
"""Test valid two character key."""
assert is_valid_key("ab")
def test_invalid_key_starting_with_hyphen(self):
"""Test invalid key starting with hyphen."""
assert not is_valid_key("-attack")
class TestSanitizeKeyForUrl:
"""Test sanitize_key_for_url function."""
def test_standard_key_unchanged(self):
"""Test standard key remains unchanged."""
result = sanitize_key_for_url("attack-on-titan-2013")
assert result == "attack-on-titan-2013"
def test_spaces_replaced(self):
"""Test spaces are replaced with hyphens."""
result = sanitize_key_for_url("attack on titan")
assert result == "attack-on-titan"
def test_uppercase_preserved(self):
"""Test uppercase is preserved (use normalize_key for lowercase)."""
result = sanitize_key_for_url("AttackOnTitan")
# sanitize_key_for_url preserves case, only removes special chars
assert result == "AttackOnTitan"
def test_special_chars_removed(self):
"""Test special characters are removed."""
result = sanitize_key_for_url("Attack@#@On!Titan")
assert result == "AttackOnTitan"
def test_accents_preserved(self):
"""Test accented characters are preserved (use normalize_key for full normalization)."""
result = sanitize_key_for_url("AttäckÖnTïtan")
# Only removes truly problematic chars, preserves accented letters
assert "AttäckÖnTïtan" in result
def test_multiple_hyphens_collapses(self):
"""Test multiple hyphens are collapsed."""
result = sanitize_key_for_url("attack---on---titan")
assert result == "attack-on-titan"
def test_leading_trailing_hyphens_removed(self):
"""Test leading and trailing hyphens are removed."""
result = sanitize_key_for_url("-attack-on-titan-")
assert result == "attack-on-titan"
class TestValidateKeyUniqueness:
"""Test validate_key_uniqueness function."""
def test_unique_key(self):
"""Test key that is unique."""
existing_keys = {"attack-on-titan", "one-piece", "naruto"}
is_valid, error = validate_key_uniqueness("new-series", existing_keys)
assert is_valid is True
assert error == ""
def test_duplicate_key(self):
"""Test key that already exists."""
existing_keys = {"attack-on-titan", "one-piece", "naruto"}
is_valid, error = validate_key_uniqueness("one-piece", existing_keys)
assert is_valid is False
assert "already in use" in error
def test_empty_existing_set(self):
"""Test with empty existing keys set."""
is_valid, error = validate_key_uniqueness("new-series", set())
assert is_valid is True
assert error == ""
def test_key_differs_only_by_case(self):
"""Test key that differs only by case is NOT flagged by utility (API layer handles case-insensitivity)."""
existing_keys = {"attack-on-titan"} # lowercase in set
is_valid, error = validate_key_uniqueness("Attack-on-Titan", existing_keys)
# Utility function does case-sensitive check; API layer handles case-insensitivity
assert is_valid is True
assert error == ""
def test_same_key_same_case(self):
"""Test same key in existing set is flagged."""
existing_keys = {"my-series"}
is_valid, error = validate_key_uniqueness("my-series", existing_keys)
assert is_valid is False

View File

@@ -1809,3 +1809,107 @@ class TestNegativeCache:
assert "expired_key" not in tmdb_client._negative_cache
assert "valid_key" in tmdb_client._negative_cache
class TestNFOIDOverride:
"""Tests for manual TMDB ID override via NFO."""
@pytest.mark.asyncio
async def test_create_tvshow_nfo_uses_existing_tmdb_id(self, nfo_service, tmp_path, mock_tmdb_data, mock_content_ratings_de):
"""Test that existing TMDB ID in NFO skips search."""
# Create series folder with existing NFO containing TMDB ID
series_folder = tmp_path / "Attack on Titan"
series_folder.mkdir()
nfo_path = series_folder / "tvshow.nfo"
nfo_path.write_text("""<?xml version="1.0" encoding="UTF-8"?>
<tvshow>
<title>Attack on Titan</title>
<tmdbid>1429</tmdbid>
</tvshow>
""", encoding="utf-8")
with patch.object(nfo_service.tmdb_client, 'get_tv_show_details', new_callable=AsyncMock) as mock_details, \
patch.object(nfo_service.tmdb_client, 'get_tv_show_content_ratings', new_callable=AsyncMock) as mock_ratings, \
patch.object(nfo_service, '_download_media_files', new_callable=AsyncMock), \
patch.object(nfo_service.tmdb_client, '_ensure_session', new_callable=AsyncMock):
mock_details.return_value = mock_tmdb_data
mock_ratings.return_value = mock_content_ratings_de
nfo_path_result = await nfo_service.create_tvshow_nfo(
"Attack on Titan",
"Attack on Titan",
download_poster=False, download_logo=False, download_fanart=False
)
# Verify NFO was created
assert nfo_path_result.exists()
# Verify get_tv_show_details was called directly with the ID (no search)
mock_details.assert_called_once_with(1429, append_to_response="credits,external_ids,images")
# Verify search was NOT called
# (we can check by verifying no search_tv_show mock was set up)
@pytest.mark.asyncio
async def test_create_tvshow_nfo_searches_when_no_tmdb_id(self, nfo_service, tmp_path, mock_tmdb_data, mock_content_ratings_de):
"""Test that search is used when NFO has no TMDB ID."""
# Create series folder without existing NFO
series_folder = tmp_path / "Test Anime"
series_folder.mkdir()
with patch.object(nfo_service.tmdb_client, 'search_tv_show', new_callable=AsyncMock) as mock_search, \
patch.object(nfo_service.tmdb_client, 'get_tv_show_details', new_callable=AsyncMock) as mock_details, \
patch.object(nfo_service.tmdb_client, 'get_tv_show_content_ratings', new_callable=AsyncMock) as mock_ratings, \
patch.object(nfo_service, '_download_media_files', new_callable=AsyncMock), \
patch.object(nfo_service.tmdb_client, '_ensure_session', new_callable=AsyncMock):
mock_search.return_value = {
"results": [{
"id": 1429,
"name": "Test Anime",
"first_air_date": "2024-01-01",
"overview": "Test overview"
}]
}
mock_details.return_value = mock_tmdb_data
mock_ratings.return_value = mock_content_ratings_de
nfo_path = await nfo_service.create_tvshow_nfo(
"Test Anime",
"Test Anime",
download_poster=False, download_logo=False, download_fanart=False
)
# Verify search was called
mock_search.assert_called()
class TestSearchMultiStrategy:
"""Tests for search/multi fallback strategy."""
@pytest.mark.asyncio
async def test_search_multi_strategy_used_as_fallback(self, nfo_service, mock_tmdb_data):
"""Test that search/multi is tried after regular search fails."""
mock_search = AsyncMock()
mock_multi = AsyncMock()
# First: regular search fails
# Second: multi search returns TV result
mock_search.return_value = {"results": []}
mock_multi.return_value = {
"results": [
{"media_type": "movie", "id": 123},
{"media_type": "tv", "id": 456, "name": "Found Show", "first_air_date": "2024-01-01"}
]
}
with patch.object(nfo_service.tmdb_client, 'search_tv_show', mock_search), \
patch.object(nfo_service.tmdb_client, 'search_multi', mock_multi):
result, source = await nfo_service._search_with_fallback(
"Unknown Show", 2024, None
)
assert result["id"] == 456
assert source == "multi_search"

View File

@@ -141,6 +141,86 @@ class TestSchedulerConfigFolderScanEnabled:
assert config.folder_scan_enabled is False
class TestSchedulerConfigLegacyAliases:
"""3.10 Legacy config key aliases (auto_download, folder_scan)."""
def test_legacy_auto_download_true(self) -> None:
"""Legacy auto_download=true maps to auto_download_after_rescan=True."""
config = SchedulerConfig(auto_download=True)
assert config.auto_download_after_rescan is True
assert config.folder_scan_enabled is False
def test_legacy_auto_download_false(self) -> None:
config = SchedulerConfig(auto_download=False)
assert config.auto_download_after_rescan is False
def test_legacy_folder_scan_true(self) -> None:
"""Legacy folder_scan=true maps to folder_scan_enabled=True."""
config = SchedulerConfig(folder_scan=True)
assert config.folder_scan_enabled is True
assert config.auto_download_after_rescan is False
def test_legacy_folder_scan_false(self) -> None:
config = SchedulerConfig(folder_scan=False)
assert config.folder_scan_enabled is False
def test_legacy_both_set(self) -> None:
"""Both legacy keys can be set simultaneously."""
config = SchedulerConfig(auto_download=True, folder_scan=True)
assert config.auto_download_after_rescan is True
assert config.folder_scan_enabled is True
def test_explicit_primary_overrides_legacy(self) -> None:
"""Primary field explicitly set to False still wins over legacy True.
When user provides both old and new key, newer key wins by virtue of
being the intended migration target. Legacy alias only applies when
primary key is absent from data entirely.
"""
config = SchedulerConfig(
auto_download=True,
auto_download_after_rescan=True,
folder_scan=True,
folder_scan_enabled=True,
)
# Both set to True — no conflict possible when both agree
assert config.auto_download_after_rescan is True
assert config.folder_scan_enabled is True
def test_explicit_primary_false_wins_over_legacy_true(self) -> None:
"""Primary=False explicitly set wins over legacy=True.
User has migrated config to new keys but old key still present.
Explicit primary value must be respected.
"""
config = SchedulerConfig(
auto_download=True,
auto_download_after_rescan=False,
)
assert config.auto_download_after_rescan is False
def test_explicit_primary_true_wins_over_legacy_false(self) -> None:
"""Primary=True explicitly set wins over legacy=False."""
config = SchedulerConfig(
auto_download=False,
auto_download_after_rescan=True,
)
assert config.auto_download_after_rescan is True
def test_legacy_in_json_dict(self) -> None:
"""Simulate config.json with legacy auto_download key."""
data = {
"enabled": True,
"schedule_time": "03:00",
"schedule_days": ALL_DAYS,
"auto_download": True,
"folder_scan": True,
}
config = SchedulerConfig(**data)
assert config.auto_download_after_rescan is True
assert config.folder_scan_enabled is True
class TestSchedulerConfigSerialisation:
"""3.9 Serialisation roundtrip."""
@@ -156,3 +236,24 @@ class TestSchedulerConfigSerialisation:
dumped = original.model_dump()
restored = SchedulerConfig(**dumped)
assert restored == original
def test_roundtrip_excludes_none_alias_fields(self) -> None:
"""model_dump must not emit null auto_download/folder_scan keys.
Previously these null keys were written to config.json on save.
On reload they were present (even as None), so the alias mapping in
__init__ was skipped and the primary fields retained their default
False values instead of the configured True values.
"""
original = SchedulerConfig(
auto_download_after_rescan=True,
folder_scan_enabled=True,
)
dumped = original.model_dump()
# Alias fields must not appear when None
assert "auto_download" not in dumped
assert "folder_scan" not in dumped
# Primary fields roundtrip correctly
restored = SchedulerConfig(**dumped)
assert restored.auto_download_after_rescan is True
assert restored.folder_scan_enabled is True

View File

@@ -489,12 +489,12 @@ class TestSingletonHelpers:
# ---------------------------------------------------------------------------
# 12.12 Persistent job store — SQLAlchemyJobStore passed to AsyncIOScheduler
# 12.12 In-memory job store — no separate scheduler.db needed
# ---------------------------------------------------------------------------
class TestPersistentJobStore:
class TestInMemoryJobStore:
@pytest.mark.asyncio
async def test_start_creates_scheduler_with_sqlalchemy_jobstore(
async def test_start_creates_scheduler_without_jobstore_arg(
self, scheduler_service, mock_config_service
):
with patch(
@@ -508,10 +508,9 @@ class TestPersistentJobStore:
MockScheduler.assert_called_once()
call_kwargs = MockScheduler.call_args
jobstores = call_kwargs[1]["jobstores"]
assert "default" in jobstores
# Verify it's a SQLAlchemyJobStore (class check via module name)
assert "sqlalchemy" in type(jobstores["default"]).__module__
# No jobstores argument — uses default MemoryJobStore
if call_kwargs[1]:
assert "jobstores" not in call_kwargs[1]
@pytest.mark.asyncio
async def test_job_options_include_misfire_grace_and_coalesce(

View File

@@ -519,23 +519,8 @@ class TestFindMp4Files:
class TestReadDataFromFile:
"""Test __read_data_from_file method."""
def test_reads_key_file(self, mock_loader):
"""Should read key from 'key' file."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
anime_folder = os.path.join(tmpdir, "SomeAnime")
os.makedirs(anime_folder)
with open(os.path.join(anime_folder, "key"), "w") as f:
f.write("some-key")
scanner = SerieScanner(tmpdir, mock_loader)
result = scanner._SerieScanner__read_data_from_file("SomeAnime")
assert result is not None
assert result.key == "some-key"
def test_reads_data_file(self, mock_loader):
"""Should read Serie from 'data' file when no 'key' file."""
"""Should read Serie from 'data' file when no DB entry exists."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
@@ -552,8 +537,8 @@ class TestReadDataFromFile:
assert result is not None
assert result.key == "test-key"
def test_no_files_returns_none(self, mock_loader):
"""Should return None when no key or data file exists."""
def test_no_files_returns_serie_with_generated_key(self, mock_loader):
"""Should return Serie with generated key when no key or data file exists."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
@@ -562,7 +547,30 @@ class TestReadDataFromFile:
scanner = SerieScanner(tmpdir, mock_loader)
result = scanner._SerieScanner__read_data_from_file("Empty")
assert result is None
# Step 5 (was Step 4) generates key from folder name when no files exist
assert result is not None
assert isinstance(result, Serie)
assert result.key == "empty"
def test_scan_key_override_used_instead_of_generated(self, mock_loader):
"""Should use override key when folder name matches override dict."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
anime_folder = os.path.join(tmpdir, "Anyway, I'm Falling in Love with You (2025)")
os.makedirs(anime_folder)
overrides = {
"Anyway, I'm Falling in Love with You (2025)": "anyway-im-falling-in-love-with-you-2025"
}
scanner = SerieScanner(tmpdir, mock_loader, scan_key_overrides=overrides)
result = scanner._SerieScanner__read_data_from_file(
"Anyway, I'm Falling in Love with You (2025)"
)
# Override key should be used instead of generated key
assert result is not None
assert isinstance(result, Serie)
assert result.key == "anyway-im-falling-in-love-with-you-2025"
class TestReinit:
@@ -763,7 +771,7 @@ class TestDbLookupFallback:
assert scanner.keyDict["rooster-fighter"].episodeDict == {1: [1, 2, 3]}
def test_db_lookup_returns_none_folder_skipped(self, mock_loader):
"""When db_lookup returns None, the folder is skipped with a warning."""
"""When db_lookup returns None, Step 4 fallback generates key from folder name."""
import tempfile
with tempfile.TemporaryDirectory() as tmp_dir:
@@ -773,10 +781,11 @@ class TestDbLookupFallback:
with patch.object(scanner, 'get_total_to_scan', return_value=1):
scanner.scan()
assert len(scanner.keyDict) == 0
# Step 4 generates key from folder name, so keyDict is not empty
assert len(scanner.keyDict) == 1
def test_db_lookup_exception_skips_folder(self, mock_loader):
"""When db_lookup raises, the folder is skipped gracefully."""
"""When db_lookup raises, Step 4 fallback generates key from folder name."""
import tempfile
with tempfile.TemporaryDirectory() as tmp_dir:
@@ -786,7 +795,8 @@ class TestDbLookupFallback:
with patch.object(scanner, 'get_total_to_scan', return_value=1):
scanner.scan() # should not raise
assert len(scanner.keyDict) == 0
# Step 4 generates key from folder name, so keyDict is not empty
assert len(scanner.keyDict) == 1
def test_db_lookup_warning_logged_when_no_files(
self, mock_loader, caplog

View File

@@ -77,23 +77,19 @@ class TestGetSerieFromFolderDbLookup:
assert result.key == "rooster-fighter"
lookup.assert_called_once_with("Rooster Fighter (2026)")
def test_legacy_key_file_as_last_resort(self, temp_directory, mock_loader):
"""No DB, no callback -> legacy 'key' file used with deprecation warning."""
def test_no_db_no_callback_generates_key_from_folder_name(self, temp_directory, mock_loader):
"""No DB entry, no callback -> key generated from folder name."""
folder = os.path.join(temp_directory, "Legacy Series")
os.makedirs(folder, exist_ok=True)
with open(os.path.join(folder, "key"), "w") as f:
f.write("legacy-key")
# No key file, no data file - should fall through to Step 4 (key generation)
scanner = SerieScanner(temp_directory, mock_loader)
with patch.object(logging.getLogger("src.core.SerieScanner"), "warning") as mock_warning:
result = scanner._SerieScanner__read_data_from_file("Legacy Series")
result = scanner._SerieScanner__read_data_from_file("Legacy Series")
assert result is not None
assert result.key == "legacy-key"
mock_warning.assert_called()
warning_calls = [str(c) for c in mock_warning.call_args_list]
assert any("deprecated" in c or "v3.0.0" in c for c in warning_calls)
assert result is not None
assert result.key == "legacy-series"
assert result.folder == "Legacy Series"
def test_db_lookup_exception_caught_and_logged(self, temp_directory, mock_loader):
"""DB exception -> fallback to provider callback."""

View File

@@ -209,7 +209,7 @@ class TestPersistSerieToDbErrorHandling:
with patch(
"src.server.database.connection.get_async_session_factory",
return_value=mock_factory
return_value=mock_session
):
with patch(
"src.server.database.service.AnimeSeriesService.get_by_key",

View File

@@ -23,6 +23,8 @@ async def test_system_settings_integration():
assert settings.initial_scan_completed is False
assert settings.initial_nfo_scan_completed is False
assert settings.initial_media_scan_completed is False
assert settings.migration_legacy_files_completed is False
assert settings.legacy_key_cleanup_completed is False
# Test checking individual flags
async with get_db_session() as db:
@@ -34,6 +36,12 @@ async def test_system_settings_integration():
is_media_done = await SystemSettingsService.is_initial_media_scan_completed(db)
assert is_media_done is False
is_migration_done = await SystemSettingsService.is_migration_legacy_files_completed(db)
assert is_migration_done is False
is_key_cleanup_done = await SystemSettingsService.is_legacy_key_cleanup_completed(db)
assert is_key_cleanup_done is False
# Test marking scans as completed
async with get_db_session() as db:
@@ -56,6 +64,8 @@ async def test_system_settings_integration():
assert settings.initial_scan_completed is False
assert settings.initial_nfo_scan_completed is False
assert settings.initial_media_scan_completed is False
assert settings.migration_legacy_files_completed is False
assert settings.legacy_key_cleanup_completed is False
if __name__ == "__main__":

View File

@@ -444,6 +444,77 @@ class TestTMDBClientSessionLeak:
"Unexpected warning about unclosed session"
class TestTMDBClientLifecycleIntegration:
"""Integration tests for TMDBClient lifecycle management."""
@pytest.mark.asyncio
async def test_context_manager_no_resource_warning(self, caplog):
"""Test async with TMDBClient produces no ResourceWarning."""
import logging
import warnings
caplog.set_level(logging.WARNING)
# Use context manager properly - should not leak
async with TMDBClient(api_key="test_key") as client:
await client._ensure_session()
assert client.session is not None
# Session should be closed after context exit
assert client.session is None or client.session.closed
@pytest.mark.asyncio
async def test_exception_safety_during_api_call(self, caplog):
"""Test session is closed even when exception raised during API call."""
import logging
caplog.set_level(logging.WARNING)
close_called = False
class TrackingSession:
def __init__(self):
self.closed = False
async def close(self):
nonlocal close_called
close_called = True
self.closed = True
async def get(self, url, **kwargs):
raise TMDBAPIError("Simulated API failure")
client = TMDBClient(api_key="test_key")
client.session = TrackingSession()
# Exception during context should still close session
with pytest.raises(TMDBAPIError):
async with client:
raise TMDBAPIError("Simulated API failure")
assert close_called, "Session was not closed after API exception"
@pytest.mark.asyncio
async def test_reuse_session_across_multiple_requests(self, caplog):
"""Test session is reused across multiple requests without leaks."""
import logging
caplog.set_level(logging.WARNING)
client = TMDBClient(api_key="test_key")
async with client as c:
# First request
await c._ensure_session()
session1 = c.session
# Second request should reuse same session
await c._ensure_session()
session2 = c.session
assert session1 is session2, "Session should be reused"
# After context exit, session should be closed
assert client.session is None or client.session.closed
class TestTMDBClientConnectorClosed:
"""Test handling of 'Connector is closed' errors."""