Compare commits

...

2 Commits

Author SHA1 Message Date
5f66a28bed Mark migration task as complete, remove old task list 2025-12-01 17:46:42 +01:00
17754a86f0 Add database migration from legacy data files
- Create DataMigrationService for migrating data files to SQLite
- Add sync database methods to AnimeSeriesService
- Update SerieScanner to save to database with file fallback
- Update anime API endpoints to use database with fallback
- Add delete endpoint for anime series
- Add automatic migration on startup in fastapi_app.py lifespan
- Add 28 unit tests for migration service
- Add 14 integration tests for migration flow
- Update infrastructure.md and database README docs

Migration runs automatically on startup, legacy data files preserved.
2025-12-01 17:42:09 +01:00
10 changed files with 2235 additions and 45 deletions

View File

@ -162,7 +162,83 @@ All series-related WebSocket events include `key` as the primary identifier in t
- `AnimeSeriesService.get_by_key(key)` - **Primary lookup method**
- `AnimeSeriesService.get_by_id(id)` - Internal lookup by database ID
- No `get_by_folder()` method exists - folder is never used for lookups
- `AnimeSeriesService.get_all(db)` - Get all series from database
- `AnimeSeriesService.create(db, key, name, site, folder, episode_dict)` - Create new series
- `AnimeSeriesService.update(db, id, **kwargs)` - Update existing series
- `AnimeSeriesService.delete(db, id)` - Delete series by ID
- `AnimeSeriesService.upsert_sync(db, key, name, site, folder, episode_dict)` - Sync upsert for scanner
No `get_by_folder()` method exists - folder is never used for lookups.
## Data Storage Migration
### Background
The application has migrated from file-based storage to SQLite database storage for anime series metadata.
**Previous Storage (Deprecated)**:
- Individual `data` files (no extension) in each anime folder
- Example: `/anime-directory/Attack on Titan (2013)/data`
**Current Storage (Database)**:
- SQLite database at `data/aniworld.db`
- Managed by `AnimeSeriesService` using SQLAlchemy
### Migration Service
The `DataMigrationService` handles automatic migration of legacy data files to the database:
```python
from src.server.services.data_migration_service import DataMigrationService
# Check for legacy files
service = DataMigrationService()
files = await service.check_for_legacy_data_files(anime_directory)
# Migrate all to database
result = await service.migrate_all_legacy_data(anime_directory, db_session)
print(result) # Migration Result: 10 migrated, 2 skipped, 0 failed
# Optional: cleanup old files with backup
await service.cleanup_migrated_files(files, backup=True)
```
### Automatic Migration on Startup
Migration runs automatically during application startup:
1. Database is initialized (`init_db()`)
2. Legacy data files are detected
3. Files are migrated to database
4. Results are logged (no files are deleted automatically)
### Migration Result
```python
@dataclass
class MigrationResult:
total_found: int # Total legacy files found
migrated: int # Successfully migrated
failed: int # Failed to migrate
skipped: int # Already in database
errors: List[str] # Error messages
```
### Deprecation Notes
- **Legacy file-based storage is deprecated** - Do not create new `data` files
- **SerieScanner**: Updated to save to database (with file fallback for CLI)
- **API endpoints**: Now use database as primary source
- **CLI**: Still uses file-based storage for backward compatibility
### Related Files
| File | Purpose |
| ---- | ------- |
| `src/server/services/data_migration_service.py` | Migration service |
| `src/server/database/service.py` | Database CRUD operations |
| `src/server/database/models.py` | SQLAlchemy models |
| `src/core/SerieScanner.py` | Scanner with DB support |
## Core Services

View File

@ -120,3 +120,29 @@ For each task completed:
- Good foundation for future enhancements if needed
---
## ✅ COMPLETED: Migration Task - File-Based Storage to Database Storage
**Status**: All tasks completed on December 1, 2025
**Summary**:
- Created `DataMigrationService` for migrating legacy `data` files to SQLite
- Automatic migration on application startup
- Updated API endpoints to use database with fallback
- Added sync database methods to `AnimeSeriesService`
- Updated `SerieScanner` to save to database
- 28 unit tests + 14 integration tests (all passing)
- Documentation updated in `docs/infrastructure.md` and database README
**Files Created/Modified**:
- `src/server/services/data_migration_service.py` (new)
- `src/server/database/service.py` (sync methods added)
- `src/server/fastapi_app.py` (lifespan migration)
- `src/server/api/anime.py` (delete endpoint, DB integration)
- `src/core/SerieScanner.py` (DB save support)
- `tests/unit/test_data_migration_service.py` (new)
- `tests/integration/test_data_migration.py` (new)
**All 1048 tests pass.**

View File

@ -3,6 +3,8 @@ SerieScanner - Scans directories for anime series and missing episodes.
This module provides functionality to scan anime directories, identify
missing episodes, and report progress through callback interfaces.
Supports both database storage (preferred) and file-based storage (fallback).
"""
import logging
@ -34,13 +36,15 @@ class SerieScanner:
Scans directories for anime series and identifies missing episodes.
Supports progress callbacks for real-time scanning updates.
Prefers database storage when available, falls back to file storage.
"""
def __init__(
self,
basePath: str,
loader: Loader,
callback_manager: Optional[CallbackManager] = None
callback_manager: Optional[CallbackManager] = None,
use_database: bool = True,
) -> None:
"""
Initialize the SerieScanner.
@ -49,6 +53,7 @@ class SerieScanner:
basePath: Base directory containing anime series
loader: Loader instance for fetching series information
callback_manager: Optional callback manager for progress updates
use_database: Whether to save to database (fallback to files)
Raises:
ValueError: If basePath is invalid or doesn't exist
@ -71,6 +76,22 @@ class SerieScanner:
callback_manager or CallbackManager()
)
self._current_operation_id: Optional[str] = None
self._use_database: bool = use_database
self._db_available: bool = False
# Check if database is available
if use_database:
try:
from src.server.database.connection import get_sync_session
session = get_sync_session()
session.close()
self._db_available = True
logger.info("Database available for SerieScanner")
except (ImportError, RuntimeError) as e:
logger.warning(
"Database not available, using file storage: %s", e
)
self._db_available = False
logger.info("Initialized SerieScanner with base path: %s", abs_path)
@ -79,6 +100,63 @@ class SerieScanner:
"""Get the callback manager instance."""
return self._callback_manager
def _save_serie(self, serie: Serie, data_path: str) -> None:
"""Save a serie to database (if available) and file.
This method handles saving serie data with database-first approach
and file fallback for backward compatibility.
Args:
serie: The Serie object to save.
data_path: The path to save the data file for fallback.
"""
# Always save to file for backward compatibility
serie.save_to_file(data_path)
# Try to save to database if available
if self._use_database and self._db_available:
try:
from src.server.database.connection import get_sync_session
from src.server.database.service import AnimeSeriesService
session = get_sync_session()
try:
# Convert episodeDict to JSON-serializable format
episode_dict_json = None
if serie.episodeDict:
episode_dict_json = {}
for season, episodes in serie.episodeDict.items():
season_key = str(season)
episode_dict_json[season_key] = [
ep.to_dict() if hasattr(ep, 'to_dict')
else str(ep)
for ep in episodes
]
# Get site from serie if available
site = getattr(serie, 'site', '') or ''
# Upsert to database (static method call)
AnimeSeriesService.upsert_sync(
db=session,
key=serie.key,
name=serie.name,
site=site,
folder=serie.folder,
episode_dict=episode_dict_json
)
logger.debug(
"Saved serie to database: %s", serie.key
)
finally:
session.close()
except (ImportError, RuntimeError) as e:
logger.warning(
"Failed to save serie to database, "
"file backup exists: %s",
e
)
def reinit(self) -> None:
"""Reinitialize the series dictionary (keyed by serie.key)."""
self.keyDict: dict[str, Serie] = {}
@ -185,7 +263,8 @@ class SerieScanner:
data_path = os.path.join(
self.directory, folder, 'data'
)
serie.save_to_file(data_path)
# Save to database (if available) and file
self._save_serie(serie, data_path)
# Store by key (primary identifier), not folder
if serie.key in self.keyDict:

View File

@ -263,32 +263,65 @@ async def list_anime(
)
try:
# Get missing episodes from series app
if not hasattr(series_app, "list"):
return []
series = series_app.list.GetMissingEpisode()
# Try to get series from database first
summaries: List[AnimeSummary] = []
for serie in series:
# Get all properties from the serie object
key = getattr(serie, "key", "")
name = getattr(serie, "name", "")
site = getattr(serie, "site", "")
folder = getattr(serie, "folder", "")
episode_dict = getattr(serie, "episodeDict", {}) or {}
try:
from src.server.database.connection import get_db_session
from src.server.database.service import AnimeSeriesService
# Convert episode dict keys to strings for JSON serialization
missing_episodes = {str(k): v for k, v in episode_dict.items()}
summaries.append(
AnimeSummary(
key=key,
name=name,
site=site,
folder=folder,
missing_episodes=missing_episodes,
async with get_db_session() as db:
db_series = await AnimeSeriesService.get_all(db)
for series in db_series:
episode_dict = series.episode_dict or {}
# Only include series with missing episodes
if episode_dict:
# Ensure episode dict keys are strings
missing_episodes = {
str(k): v for k, v in episode_dict.items()
}
summaries.append(
AnimeSummary(
key=series.key,
name=series.name,
site=series.site,
folder=series.folder,
missing_episodes=missing_episodes,
)
)
logger.debug(
"Loaded %d series from database",
len(summaries)
)
except Exception as db_error:
# Fall back to in-memory series_app if database fails
logger.warning(
"Database query failed, using in-memory fallback: %s",
db_error
)
if series_app and hasattr(series_app, "list"):
series = series_app.list.GetMissingEpisode()
for serie in series:
key = getattr(serie, "key", "")
name = getattr(serie, "name", "")
site = getattr(serie, "site", "")
folder = getattr(serie, "folder", "")
episode_dict = getattr(serie, "episodeDict", {}) or {}
# Convert episode dict keys to strings
missing_episodes = {
str(k): v for k, v in episode_dict.items()
}
summaries.append(
AnimeSummary(
key=key,
name=name,
site=site,
folder=folder,
missing_episodes=missing_episodes,
)
)
# Apply sorting if requested
if sort_by:
@ -585,6 +618,9 @@ async def add_series(
) -> dict:
"""Add a new series to the library.
Creates a database entry for the series and also updates
the in-memory cache. The filesystem folder is created for downloads.
Extracts the series `key` from the provided link URL.
The `key` is the URL-safe identifier used for all lookups.
The `name` is stored as display metadata along with a
@ -603,6 +639,12 @@ async def add_series(
Raises:
HTTPException: If adding the series fails or link is invalid
"""
import os
from src.config.settings import settings
from src.server.database.connection import get_db_session
from src.server.database.service import AnimeSeriesService
try:
# Validate inputs
if not request.link or not request.link.strip():
@ -617,13 +659,6 @@ async def add_series(
detail="Series name cannot be empty",
)
# Check if series_app has the list attribute
if not hasattr(series_app, "list"):
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Series list functionality not available",
)
# Extract key from link URL
# Expected format: https://aniworld.to/anime/stream/{key}
link = request.link.strip()
@ -647,35 +682,78 @@ async def add_series(
# Create folder from name (filesystem-friendly)
folder = request.name.strip()
# Create a new Serie object
# key: unique identifier extracted from link
# name: display name from request
# folder: filesystem folder name (derived from name)
# episodeDict: empty for new series
# Try database storage first, fall back to in-memory/file storage
db_stored = False
try:
async with get_db_session() as db:
# Check if series already exists in database
existing = await AnimeSeriesService.get_by_key(db, key)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Series with key '{key}' already exists",
)
# Create database entry
await AnimeSeriesService.create(
db,
key=key,
name=request.name.strip(),
site="aniworld.to",
folder=folder,
episode_dict={},
)
await db.commit()
db_stored = True
logger.info(
"Created database entry for series: %s (key=%s)",
request.name,
key
)
except HTTPException:
raise
except RuntimeError as db_error:
# Database not initialized - fall back to file storage
logger.warning(
"Database unavailable, using file storage: %s",
db_error
)
# Create filesystem folder (for downloads)
if settings.anime_directory:
anime_path = os.path.join(settings.anime_directory, folder)
os.makedirs(anime_path, exist_ok=True)
logger.debug("Created folder: %s", anime_path)
# Create Serie object for in-memory cache
serie = Serie(
key=key,
name=request.name.strip(),
site="aniworld.to",
folder=folder,
episodeDict={}
episodeDict={},
)
# Add the series to the list
series_app.list.add(serie)
# Refresh the series list to update the cache
if hasattr(series_app, "refresh_series_list"):
series_app.refresh_series_list()
# Update in-memory cache and/or file storage
if series_app and hasattr(series_app, "list"):
# If database wasn't available, use file-based storage
if not db_stored:
series_app.list.add(serie)
else:
# Just update in-memory cache
series_app.list.keyDict[key] = serie
logger.debug("Updated in-memory cache for series: %s", key)
return {
"status": "success",
"message": f"Successfully added series: {request.name}",
"key": key,
"folder": folder
"folder": folder,
}
except HTTPException:
raise
except Exception as exc:
logger.error("Failed to add series: %s", exc, exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to add series: {str(exc)}",
@ -773,6 +851,112 @@ async def get_anime(
) from exc
@router.delete("/{anime_key}")
async def delete_series(
anime_key: str,
_auth: dict = Depends(require_auth),
series_app: Any = Depends(get_series_app),
) -> dict:
"""Delete a series from the library.
Removes the series from the database. The `anime_key` should be
the unique series key (provider identifier).
Note: This does NOT delete the filesystem folder or downloaded files.
To remove files, use the filesystem operations separately.
Args:
anime_key: Series key (primary identifier)
_auth: Ensures the caller is authenticated (value unused)
series_app: Core `SeriesApp` instance for cache updates
Returns:
Dict[str, Any]: Status payload with success message
Raises:
HTTPException: If series not found or deletion fails
"""
from src.server.database.connection import get_db_session
from src.server.database.service import AnimeSeriesService
try:
if not anime_key or not anime_key.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Series key cannot be empty",
)
anime_key = anime_key.strip()
series_name = anime_key # Default if not found
db_deleted = False
# Try database deletion first
try:
async with get_db_session() as db:
# Find series by key
series = await AnimeSeriesService.get_by_key(db, anime_key)
if series:
series_name = series.name
series_id = series.id
# Delete from database
deleted = await AnimeSeriesService.delete(db, series_id)
await db.commit()
if deleted:
db_deleted = True
logger.info(
"Deleted series from database: %s (key=%s)",
series_name,
anime_key
)
except RuntimeError as db_error:
# Database not available
logger.warning(
"Database unavailable for deletion: %s",
db_error
)
# Remove from in-memory cache if available
in_memory_deleted = False
if series_app and hasattr(series_app, "list"):
if anime_key in series_app.list.keyDict:
serie = series_app.list.keyDict[anime_key]
series_name = getattr(serie, "name", anime_key)
del series_app.list.keyDict[anime_key]
in_memory_deleted = True
logger.debug(
"Removed series from in-memory cache: %s",
anime_key
)
# Check if anything was deleted
if not db_deleted and not in_memory_deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Series with key '{anime_key}' not found",
)
return {
"status": "success",
"message": f"Successfully deleted series: {series_name}",
"key": anime_key,
}
except HTTPException:
raise
except Exception as exc:
logger.error(
"Failed to delete series %s: %s",
anime_key,
exc,
exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete series: {str(exc)}",
) from exc
# Maximum allowed input size for security
MAX_INPUT_LENGTH = 100000 # 100KB

View File

@ -403,6 +403,87 @@ series = result.scalar_one()
# episodes already loaded, no additional queries
```
## Data Migration from Legacy Files
### Background
The application previously stored series metadata in individual `data` files (no extension) in each anime folder. The database layer now provides centralized storage with the `DataMigrationService` handling the transition.
### Migration Service
The migration service (`src/server/services/data_migration_service.py`) provides:
```python
from src.server.services.data_migration_service import DataMigrationService
service = DataMigrationService()
# Scan for legacy data files
files = await service.check_for_legacy_data_files("/path/to/anime")
# Migrate a single file
success = await service.migrate_data_file_to_db(file_path, db_session)
# Migrate all files at once
result = await service.migrate_all_legacy_data("/path/to/anime", db_session)
# result.migrated, result.skipped, result.failed, result.errors
# Cleanup old files (with backup)
await service.cleanup_migrated_files(files, backup=True)
# Check migration status
status = await service.get_migration_status("/path/to/anime", db_session)
```
### Automatic Migration
Migration runs automatically during application startup in `fastapi_app.py`:
1. Database is initialized
2. Legacy files are detected
3. Files are migrated (duplicates skipped)
4. Results are logged
### AnimeSeriesService Operations
```python
from src.server.database.service import AnimeSeriesService
# Create new series
series = await AnimeSeriesService.create(
db, key="my-anime", name="My Anime",
site="aniworld.to", folder="My Anime (2024)",
episode_dict={"1": [1, 2, 3]}
)
# Lookup by key (primary method)
series = await AnimeSeriesService.get_by_key(db, "my-anime")
# Get all series
all_series = await AnimeSeriesService.get_all(db)
# Update
updated = await AnimeSeriesService.update(
db, series.id, episode_dict={"1": [1, 2, 3, 4]}
)
# Delete
await AnimeSeriesService.delete(db, series.id)
# Sync upsert (for SerieScanner)
AnimeSeriesService.upsert_sync(
sync_db, key="my-anime", name="My Anime",
site="aniworld.to", folder="My Anime (2024)",
episode_dict={"1": [1, 2, 3]}
)
```
### Deprecation Notes
- **File-based storage is deprecated** - use database for new code
- **CLI compatibility** - CLI still uses files (migration on startup handles sync)
- **SerieScanner** - updated to save to database when session provided
## Troubleshooting
### Database not initialized

View File

@ -241,6 +241,155 @@ class AnimeSeriesService:
.limit(limit)
)
return list(result.scalars().all())
# ==========================================================================
# Sync Methods (for use in non-async contexts like CLI/scanner)
# ==========================================================================
@staticmethod
def create_sync(
db: Session,
key: str,
name: str,
site: str,
folder: str,
description: Optional[str] = None,
status: Optional[str] = None,
total_episodes: Optional[int] = None,
cover_url: Optional[str] = None,
episode_dict: Optional[Dict] = None,
) -> AnimeSeries:
"""Create a new anime series (synchronous version).
Args:
db: Sync database session
key: Unique provider key
name: Series name
site: Provider site URL
folder: Local filesystem path
description: Optional series description
status: Optional series status
total_episodes: Optional total episode count
cover_url: Optional cover image URL
episode_dict: Optional episode dictionary
Returns:
Created AnimeSeries instance
"""
series = AnimeSeries(
key=key,
name=name,
site=site,
folder=folder,
description=description,
status=status,
total_episodes=total_episodes,
cover_url=cover_url,
episode_dict=episode_dict,
)
db.add(series)
db.flush()
db.refresh(series)
logger.info("Created anime series (sync): %s (key=%s)", name, key)
return series
@staticmethod
def get_by_key_sync(db: Session, key: str) -> Optional[AnimeSeries]:
"""Get anime series by provider key (synchronous version).
Args:
db: Sync database session
key: Unique provider key
Returns:
AnimeSeries instance or None if not found
"""
result = db.execute(
select(AnimeSeries).where(AnimeSeries.key == key)
)
return result.scalar_one_or_none()
@staticmethod
def update_sync(
db: Session,
series_id: int,
**kwargs,
) -> Optional[AnimeSeries]:
"""Update anime series (synchronous version).
Args:
db: Sync database session
series_id: Series primary key
**kwargs: Fields to update
Returns:
Updated AnimeSeries instance or None if not found
"""
result = db.execute(
select(AnimeSeries).where(AnimeSeries.id == series_id)
)
series = result.scalar_one_or_none()
if not series:
return None
for key, value in kwargs.items():
if hasattr(series, key):
setattr(series, key, value)
db.flush()
db.refresh(series)
logger.info(
"Updated anime series (sync): %s (id=%s)",
series.name,
series_id
)
return series
@staticmethod
def upsert_sync(
db: Session,
key: str,
name: str,
site: str,
folder: str,
episode_dict: Optional[Dict] = None,
) -> AnimeSeries:
"""Create or update anime series (synchronous version).
Args:
db: Sync database session
key: Unique provider key
name: Series name
site: Provider site URL
folder: Local filesystem path
episode_dict: Optional episode dictionary
Returns:
Created or updated AnimeSeries instance
"""
existing = AnimeSeriesService.get_by_key_sync(db, key)
if existing:
for attr, value in [
("name", name),
("site", site),
("folder", folder),
("episode_dict", episode_dict),
]:
if value is not None:
setattr(existing, attr, value)
db.flush()
db.refresh(existing)
logger.info("Updated anime series (upsert): %s", key)
return existing
else:
return AnimeSeriesService.create_sync(
db,
key=key,
name=name,
site=site,
folder=folder,
episode_dict=episode_dict,
)
# ============================================================================

View File

@ -51,6 +51,15 @@ async def lifespan(app: FastAPI):
try:
logger.info("Starting FastAPI application...")
# Initialize database
try:
from src.server.database.connection import init_db
await init_db()
logger.info("Database initialized successfully")
except Exception as e:
logger.error("Failed to initialize database: %s", e, exc_info=True)
raise
# Load configuration from config.json and sync with settings
try:
from src.server.services.config_service import get_config_service
@ -67,6 +76,43 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.warning("Failed to load config from config.json: %s", e)
# Run legacy data file migration
if settings.anime_directory:
try:
from src.server.database.connection import get_db_session
from src.server.services.data_migration_service import (
DataMigrationService,
)
migration_service = DataMigrationService()
legacy_files = await migration_service \
.check_for_legacy_data_files(settings.anime_directory)
if legacy_files:
logger.info(
"Found %d legacy data file(s) to migrate",
len(legacy_files)
)
async with get_db_session() as db:
result = await migration_service \
.migrate_all_legacy_data(
settings.anime_directory, db
)
logger.info(
"Migration complete: %d migrated, %d skipped, "
"%d failed",
result.migrated,
result.skipped,
result.failed
)
else:
logger.debug("No legacy data files found")
except Exception as e:
# Migration failure should not prevent app startup
logger.warning(
"Legacy data migration check failed: %s", e, exc_info=True
)
# Initialize progress service with event subscription
progress_service = get_progress_service()
ws_service = get_websocket_service()

View File

@ -0,0 +1,359 @@
"""Data migration service for legacy file-based storage to database.
This module provides functionality to detect and migrate anime series metadata
from legacy 'data' files (no extension) to SQLite database storage.
Classes:
- MigrationResult: Dataclass containing migration statistics
- DataMigrationService: Service for detecting and migrating data files
"""
from __future__ import annotations
import logging
import os
import shutil
from dataclasses import dataclass, field
from datetime import datetime
from typing import List
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.entities.series import Serie
from src.server.database.service import AnimeSeriesService
logger = logging.getLogger(__name__)
@dataclass
class MigrationResult:
"""Result statistics from a migration operation.
Attributes:
total_found: Total number of legacy data files found
migrated: Number of files successfully migrated
failed: Number of files that failed migration
skipped: Number of files skipped (already in database)
errors: List of error messages for failed migrations
"""
total_found: int = 0
migrated: int = 0
failed: int = 0
skipped: int = 0
errors: List[str] = field(default_factory=list)
def __str__(self) -> str:
"""Human-readable summary of migration results."""
return (
f"Migration Result: {self.migrated} migrated, "
f"{self.skipped} skipped, {self.failed} failed "
f"(total: {self.total_found})"
)
class DataMigrationService:
"""Service to migrate legacy data files to database storage.
Legacy data files are JSON files named 'data' (no extension) located
in each anime folder. This service detects these files, reads them
using the Serie.load_from_file() method, and creates corresponding
database entries using AnimeSeriesService.
Example:
>>> migration_service = DataMigrationService()
>>> async with get_db_session() as db:
... result = await migration_service.migrate_all_legacy_data(
... "/path/to/anime", db
... )
... print(result)
Migration Result: 10 migrated, 2 skipped, 0 failed (total: 12)
"""
async def check_for_legacy_data_files(
self, anime_directory: str
) -> List[str]:
"""Scan anime directory for folders containing 'data' files.
Searches all immediate subdirectories for files named 'data'
(no extension), which are the legacy metadata files.
Args:
anime_directory: Base path to anime folders
Returns:
List of absolute paths to found 'data' files
Example:
>>> service = DataMigrationService()
>>> files = await service.check_for_legacy_data_files("/anime")
>>> print(files)
['/anime/Attack on Titan/data', '/anime/Naruto/data']
"""
if not anime_directory or not os.path.isdir(anime_directory):
logger.warning(
"Anime directory does not exist or is invalid: %s",
anime_directory
)
return []
data_files: List[str] = []
try:
entries = os.listdir(anime_directory)
except OSError as error:
logger.error(
"Unable to scan directory %s: %s",
anime_directory,
error
)
return []
for folder_name in entries:
folder_path = os.path.join(anime_directory, folder_name)
# Skip if not a directory
if not os.path.isdir(folder_path):
continue
# Check for 'data' file (no extension)
data_path = os.path.join(folder_path, "data")
if os.path.isfile(data_path):
data_files.append(data_path)
logger.debug("Found legacy data file: %s", data_path)
logger.info(
"Found %d legacy data file(s) in %s",
len(data_files),
anime_directory
)
return data_files
async def migrate_data_file_to_db(
self,
data_file_path: str,
db: AsyncSession,
) -> bool:
"""Migrate a single data file to database.
Reads the legacy data file using Serie.load_from_file() and creates
a corresponding database entry. If the series already exists in the
database (by key), the migration is skipped.
Args:
data_file_path: Path to the 'data' file (no extension)
db: Database session
Returns:
True if migration successful, False otherwise
Raises:
FileNotFoundError: If data file does not exist
ValueError: If data file is corrupted or invalid
"""
if not os.path.isfile(data_file_path):
raise FileNotFoundError(f"Data file not found: {data_file_path}")
try:
# Load serie from legacy file
serie = Serie.load_from_file(data_file_path)
logger.debug(
"Loaded serie from file: %s (key=%s)",
serie.name,
serie.key
)
except Exception as error:
logger.error(
"Failed to load data file %s: %s",
data_file_path,
error
)
raise ValueError(f"Invalid data file: {error}") from error
# Check if series already exists in database
existing = await AnimeSeriesService.get_by_key(db, serie.key)
if existing:
logger.debug(
"Series '%s' already exists in database, skipping",
serie.key
)
return False # Signal that it was skipped, not failed
# Create database entry
try:
await AnimeSeriesService.create(
db,
key=serie.key,
name=serie.name,
site=serie.site,
folder=serie.folder,
episode_dict=serie.episodeDict,
)
await db.commit()
logger.info(
"Successfully migrated series: %s (key=%s)",
serie.name,
serie.key
)
return True
except Exception as error:
await db.rollback()
logger.error(
"Failed to create database entry for %s: %s",
serie.key,
error
)
raise
async def migrate_all_legacy_data(
self,
anime_directory: str,
db: AsyncSession,
) -> MigrationResult:
"""Migrate all legacy data files to database.
Scans the anime directory for all legacy data files and migrates
each one to the database. Errors in individual files do not stop
the entire migration.
Args:
anime_directory: Base path to anime folders
db: Database session
Returns:
MigrationResult with success/failure counts
Example:
>>> service = DataMigrationService()
>>> async with get_db_session() as db:
... result = await service.migrate_all_legacy_data(
... "/anime", db
... )
... if result.failed > 0:
... for error in result.errors:
... print(f"Error: {error}")
"""
result = MigrationResult()
# Find all legacy data files
data_files = await self.check_for_legacy_data_files(anime_directory)
result.total_found = len(data_files)
if not data_files:
logger.info("No legacy data files found to migrate")
return result
logger.info("Starting migration of %d data file(s)", len(data_files))
for data_file_path in data_files:
try:
migrated = await self.migrate_data_file_to_db(
data_file_path, db
)
if migrated:
result.migrated += 1
else:
result.skipped += 1 # Already exists in DB
except FileNotFoundError:
result.failed += 1
error_msg = "File not found: %s" % data_file_path
result.errors.append(error_msg)
logger.error(error_msg)
except ValueError as error:
result.failed += 1
error_msg = "Invalid data in %s: %s" % (data_file_path, error)
result.errors.append(error_msg)
logger.error(error_msg)
except Exception as error:
result.failed += 1
error_msg = "Migration failed for %s: %s" % (
data_file_path, error
)
result.errors.append(error_msg)
logger.error(error_msg)
logger.info(str(result))
return result
async def cleanup_migrated_files(
self,
migrated_paths: List[str],
backup: bool = True,
) -> None:
"""Optionally backup and remove migrated data files.
Creates backups of data files before removal (if backup=True).
Backups are stored with a '.backup' extension and timestamp.
Args:
migrated_paths: List of successfully migrated file paths
backup: Whether to create backups before deletion (default: True)
Example:
>>> service = DataMigrationService()
>>> await service.cleanup_migrated_files(
... ["/anime/Show/data"],
... backup=True
... )
# Creates /anime/Show/data.backup.20231115_120000
# Removes /anime/Show/data
"""
if not migrated_paths:
logger.info("No files to clean up")
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
for file_path in migrated_paths:
if not os.path.isfile(file_path):
logger.warning("File no longer exists: %s", file_path)
continue
try:
if backup:
backup_path = "%s.backup.%s" % (file_path, timestamp)
shutil.copy2(file_path, backup_path)
logger.debug("Created backup: %s", backup_path)
os.remove(file_path)
logger.info("Removed migrated file: %s", file_path)
except OSError as error:
logger.error("Failed to clean up %s: %s", file_path, error)
async def get_migration_status(
self,
anime_directory: str,
db: AsyncSession,
) -> dict:
"""Get current migration status.
Provides statistics about legacy files vs database entries,
useful for monitoring migration progress.
Args:
anime_directory: Base path to anime folders
db: Database session
Returns:
Dictionary with migration status information
"""
legacy_files = await self.check_for_legacy_data_files(anime_directory)
db_series = await AnimeSeriesService.get_all(db)
# Build sets of keys for comparison
legacy_keys: set = set()
for file_path in legacy_files:
try:
serie = Serie.load_from_file(file_path)
legacy_keys.add(serie.key)
except Exception:
pass # Skip invalid files
db_keys = {s.key for s in db_series}
return {
"legacy_files_count": len(legacy_files),
"database_entries_count": len(db_series),
"only_in_files": list(legacy_keys - db_keys),
"only_in_database": list(db_keys - legacy_keys),
"in_both": list(legacy_keys & db_keys),
"migration_complete": len(legacy_keys - db_keys) == 0,
}

View File

@ -0,0 +1,471 @@
"""Integration tests for data migration from file-based to database storage.
This module tests the complete migration flow including:
- Migration of legacy data files to database
- API endpoints working with database backend
- Data integrity during migration
"""
import json
import os
import pytest
from httpx import ASGITransport, AsyncClient
from src.server.database.service import AnimeSeriesService
from src.server.fastapi_app import app
from src.server.services.auth_service import auth_service
from src.server.services.data_migration_service import (
DataMigrationService,
MigrationResult,
)
@pytest.fixture
def temp_anime_dir(tmp_path):
"""Create a temporary anime directory with test data files."""
anime_dir = tmp_path / "anime"
anime_dir.mkdir()
# Create multiple anime series directories with data files
series_data = [
{
"key": "test-anime-1",
"name": "Test Anime 1",
"site": "aniworld.to",
"folder": "Test Anime 1 (2020)",
"episodeDict": {
"1": [1, 2, 3]
}
},
{
"key": "test-anime-2",
"name": "Test Anime 2",
"site": "aniworld.to",
"folder": "Test Anime 2 (2021)",
"episodeDict": {
"1": [1],
"2": [1, 2]
}
},
{
"key": "test-anime-3",
"name": "Test Anime 3",
"site": "aniworld.to",
"folder": "Test Anime 3 (2022)",
"episodeDict": {}
}
]
for data in series_data:
series_dir = anime_dir / data["folder"]
series_dir.mkdir()
data_file = series_dir / "data"
data_file.write_text(json.dumps(data))
return anime_dir
@pytest.fixture
def temp_db_path(tmp_path):
"""Create a temporary database path."""
return tmp_path / "test_aniworld.db"
@pytest.fixture
async def test_db_session(temp_db_path):
"""Create an async database session with a temporary database."""
from sqlalchemy.ext.asyncio import (
async_sessionmaker,
create_async_engine,
)
from src.server.database.base import Base
# Create test database
test_db_url = f"sqlite+aiosqlite:///{temp_db_path}"
engine = create_async_engine(test_db_url, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
yield session
await engine.dispose()
class TestDataMigrationIntegration:
"""Integration tests for the complete data migration flow."""
@pytest.mark.asyncio
async def test_full_migration_flow(
self, temp_anime_dir, test_db_session
):
"""Test complete migration from data files to database."""
# Setup: Verify data files exist
data_files = list(temp_anime_dir.glob("*/data"))
assert len(data_files) == 3, "Should have 3 data files"
# Create migration service
migration_service = DataMigrationService()
# Check for legacy data files
files = await migration_service.check_for_legacy_data_files(
str(temp_anime_dir)
)
assert len(files) == 3, "Should find 3 legacy data files"
# Run full migration
result = await migration_service.migrate_all_legacy_data(
str(temp_anime_dir), test_db_session
)
# Verify results
assert result.total_found == 3
assert result.migrated == 3
assert result.failed == 0
assert result.skipped == 0
assert len(result.errors) == 0
# Verify all entries in database
all_series = await AnimeSeriesService.get_all(test_db_session)
assert len(all_series) == 3, "Should have 3 series in database"
# Verify series keys
keys_in_db = {s.key for s in all_series}
expected_keys = {"test-anime-1", "test-anime-2", "test-anime-3"}
assert keys_in_db == expected_keys, \
"All series keys should be migrated"
@pytest.mark.asyncio
async def test_migration_preserves_data(
self, temp_anime_dir, test_db_session
):
"""Test that migration preserves all series data."""
migration_service = DataMigrationService()
# Run migration
await migration_service.migrate_all_legacy_data(
str(temp_anime_dir), test_db_session
)
# Verify specific series data
series = await AnimeSeriesService.get_by_key(
test_db_session, "test-anime-1"
)
assert series is not None
assert series.name == "Test Anime 1"
assert series.site == "aniworld.to"
assert series.folder == "Test Anime 1 (2020)"
assert series.episode_dict == {"1": [1, 2, 3]}
# Verify series with multiple seasons
series2 = await AnimeSeriesService.get_by_key(
test_db_session, "test-anime-2"
)
assert series2 is not None
assert series2.episode_dict == {"1": [1], "2": [1, 2]}
@pytest.mark.asyncio
async def test_migration_idempotent(
self, temp_anime_dir, test_db_session
):
"""Test that re-running migration doesn't create duplicates."""
migration_service = DataMigrationService()
# Run migration twice
result1 = await migration_service.migrate_all_legacy_data(
str(temp_anime_dir), test_db_session
)
result2 = await migration_service.migrate_all_legacy_data(
str(temp_anime_dir), test_db_session
)
# First run should migrate all
assert result1.migrated == 3
assert result1.skipped == 0
# Second run should skip all (already in DB)
assert result2.migrated == 0
assert result2.skipped == 3
# Database should only have 3 entries (not 6)
all_series = await AnimeSeriesService.get_all(test_db_session)
assert len(all_series) == 3
@pytest.mark.asyncio
async def test_single_file_migration(
self, temp_anime_dir, test_db_session
):
"""Test migration of a single data file."""
migration_service = DataMigrationService()
# Get one data file path
data_file = str(temp_anime_dir / "Test Anime 1 (2020)" / "data")
# Migrate single file
result = await migration_service.migrate_data_file_to_db(
data_file, test_db_session
)
assert result is True
# Verify in database
series = await AnimeSeriesService.get_by_key(
test_db_session, "test-anime-1"
)
assert series is not None
assert series.name == "Test Anime 1"
@pytest.mark.asyncio
async def test_migration_with_corrupted_file(
self, temp_anime_dir, test_db_session
):
"""Test migration handles corrupted files gracefully."""
# Create a corrupted data file
corrupted_dir = temp_anime_dir / "Corrupted Anime"
corrupted_dir.mkdir()
corrupted_file = corrupted_dir / "data"
corrupted_file.write_text("not valid json {{{")
migration_service = DataMigrationService()
# Run migration
result = await migration_service.migrate_all_legacy_data(
str(temp_anime_dir), test_db_session
)
# Should have 3 migrated, 1 failed
assert result.total_found == 4
assert result.migrated == 3
assert result.failed == 1
assert len(result.errors) == 1
@pytest.mark.asyncio
async def test_migration_with_empty_directory(
self, tmp_path, test_db_session
):
"""Test migration with directory containing no data files."""
empty_dir = tmp_path / "empty_anime"
empty_dir.mkdir()
migration_service = DataMigrationService()
# Check for files
files = await migration_service.check_for_legacy_data_files(
str(empty_dir)
)
assert len(files) == 0
# Run migration on empty directory
result = await migration_service.migrate_all_legacy_data(
str(empty_dir), test_db_session
)
assert result.total_found == 0
assert result.migrated == 0
assert result.failed == 0
@pytest.mark.asyncio
async def test_migration_with_invalid_directory(
self, tmp_path, test_db_session
):
"""Test migration with non-existent directory."""
migration_service = DataMigrationService()
# Try non-existent directory
files = await migration_service.check_for_legacy_data_files(
"/non/existent/path"
)
assert len(files) == 0
result = await migration_service.migrate_all_legacy_data(
"/non/existent/path", test_db_session
)
assert result.total_found == 0
@pytest.mark.asyncio
async def test_cleanup_migrated_files(
self, temp_anime_dir, test_db_session
):
"""Test cleanup of migrated data files with backup."""
migration_service = DataMigrationService()
# Get data file paths before migration
files = await migration_service.check_for_legacy_data_files(
str(temp_anime_dir)
)
assert len(files) == 3
# Run cleanup (with backup=True)
await migration_service.cleanup_migrated_files(files, backup=True)
# Original data files should be removed
for original_path in files:
assert not os.path.exists(original_path), \
f"Original file should not exist: {original_path}"
# Backup files have timestamp suffix: data.backup.YYYYMMDD_HHMMSS
parent_dir = os.path.dirname(original_path)
backup_files = [
f for f in os.listdir(parent_dir)
if f.startswith("data.backup.")
]
assert len(backup_files) == 1, \
f"Backup file should exist in {parent_dir}"
@pytest.mark.asyncio
async def test_cleanup_without_backup(
self, temp_anime_dir, test_db_session
):
"""Test cleanup of migrated data files without backup."""
migration_service = DataMigrationService()
# Get data file paths
files = await migration_service.check_for_legacy_data_files(
str(temp_anime_dir)
)
# Run cleanup without backup
await migration_service.cleanup_migrated_files(files, backup=False)
# Files should be deleted, no backups
for original_path in files:
assert not os.path.exists(original_path)
assert not os.path.exists(original_path + ".migrated")
class TestAPIWithDatabaseIntegration:
"""Test API endpoints with database backend.
Note: These tests focus on the database integration layer.
Full API tests are in tests/api/test_anime_endpoints.py.
"""
@pytest.fixture
def mock_auth(self):
"""Mock authentication for API tests."""
return {"user_id": "test_user", "role": "admin"}
@pytest.fixture
async def authenticated_client(self, mock_auth):
"""Create an authenticated test client."""
# Create token
token = auth_service.create_access_token(mock_auth)
transport = ASGITransport(app=app)
async with AsyncClient(
transport=transport,
base_url="http://test"
) as client:
client.headers["Authorization"] = f"Bearer {token}"
yield client
@pytest.mark.asyncio
async def test_anime_service_uses_database(
self, test_db_session
):
"""Test that AnimeSeriesService correctly stores data."""
# Create a series through the service
_series = await AnimeSeriesService.create(
test_db_session,
key="api-test-anime",
name="API Test Anime",
site="aniworld.to",
folder="API Test Anime (2024)",
episode_dict={"1": [1, 2, 3]}
)
await test_db_session.commit()
# Verify it's stored
retrieved = await AnimeSeriesService.get_by_key(
test_db_session, "api-test-anime"
)
assert retrieved is not None
assert retrieved.name == "API Test Anime"
assert retrieved.folder == "API Test Anime (2024)"
@pytest.mark.asyncio
async def test_database_update_series(
self, test_db_session
):
"""Test that series can be updated in database."""
# Create a series
series = await AnimeSeriesService.create(
test_db_session,
key="update-test-anime",
name="Original Name",
site="aniworld.to",
folder="Original Folder",
episode_dict={}
)
await test_db_session.commit()
# Update it
updated = await AnimeSeriesService.update(
test_db_session,
series.id,
name="Updated Name",
episode_dict={"1": [1, 2]}
)
await test_db_session.commit()
# Verify update
assert updated.name == "Updated Name"
assert updated.episode_dict == {"1": [1, 2]}
@pytest.mark.asyncio
async def test_database_delete_series(
self, test_db_session
):
"""Test that series can be deleted from database."""
# Create a series
series = await AnimeSeriesService.create(
test_db_session,
key="delete-test-anime",
name="To Delete",
site="aniworld.to",
folder="Delete Folder",
episode_dict={}
)
await test_db_session.commit()
series_id = series.id
# Delete it
result = await AnimeSeriesService.delete(test_db_session, series_id)
await test_db_session.commit()
assert result is True
# Verify deletion
retrieved = await AnimeSeriesService.get_by_key(
test_db_session, "delete-test-anime"
)
assert retrieved is None
class TestMigrationResult:
"""Tests for MigrationResult dataclass."""
def test_migration_result_defaults(self):
"""Test default values for MigrationResult."""
result = MigrationResult()
assert result.total_found == 0
assert result.migrated == 0
assert result.failed == 0
assert result.skipped == 0
assert result.errors == []
def test_migration_result_str(self):
"""Test string representation of MigrationResult."""
result = MigrationResult(
total_found=10,
migrated=7,
failed=1,
skipped=2,
errors=["Error 1"]
)
expected = (
"Migration Result: 7 migrated, 2 skipped, "
"1 failed (total: 10)"
)
assert str(result) == expected

View File

@ -0,0 +1,719 @@
"""Unit tests for data migration service.
Tests cover:
- Detection of legacy data files
- Migration of data files to database
- Error handling for corrupted files
- Backup functionality
- Migration status reporting
"""
from __future__ import annotations
import json
import os
import tempfile
import pytest
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import StaticPool
from src.server.database.base import Base
from src.server.database.service import AnimeSeriesService
from src.server.services.data_migration_service import (
DataMigrationService,
MigrationResult,
)
@pytest.fixture
async def test_engine():
"""Create in-memory SQLite engine for testing."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
poolclass=StaticPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture
async def test_session(test_engine):
"""Create async session for testing."""
from sqlalchemy.ext.asyncio import async_sessionmaker
async_session = async_sessionmaker(
test_engine,
expire_on_commit=False,
)
async with async_session() as session:
yield session
@pytest.fixture
def temp_anime_dir():
"""Create temporary directory for testing anime folders."""
with tempfile.TemporaryDirectory() as tmp_dir:
yield tmp_dir
@pytest.fixture
def migration_service():
"""Create DataMigrationService instance."""
return DataMigrationService()
def create_test_data_file(
base_dir: str,
folder_name: str,
key: str,
name: str,
) -> str:
"""Create a test data file in the specified folder.
Args:
base_dir: Base anime directory
folder_name: Folder name to create
key: Series key
name: Series name
Returns:
Path to the created data file
"""
folder_path = os.path.join(base_dir, folder_name)
os.makedirs(folder_path, exist_ok=True)
data_path = os.path.join(folder_path, "data")
data = {
"key": key,
"name": name,
"site": "aniworld.to",
"folder": folder_name,
"episodeDict": {"1": [1, 2, 3], "2": [1, 2]},
}
with open(data_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=4)
return data_path
# =============================================================================
# MigrationResult Tests
# =============================================================================
class TestMigrationResult:
"""Test cases for MigrationResult dataclass."""
def test_migration_result_defaults(self):
"""Test MigrationResult default values."""
result = MigrationResult()
assert result.total_found == 0
assert result.migrated == 0
assert result.failed == 0
assert result.skipped == 0
assert result.errors == []
def test_migration_result_with_values(self):
"""Test MigrationResult with custom values."""
result = MigrationResult(
total_found=10,
migrated=7,
failed=1,
skipped=2,
errors=["Error 1"],
)
assert result.total_found == 10
assert result.migrated == 7
assert result.failed == 1
assert result.skipped == 2
assert result.errors == ["Error 1"]
def test_migration_result_str(self):
"""Test MigrationResult string representation."""
result = MigrationResult(
total_found=10,
migrated=7,
failed=1,
skipped=2,
)
result_str = str(result)
assert "7 migrated" in result_str
assert "2 skipped" in result_str
assert "1 failed" in result_str
assert "10" in result_str
# =============================================================================
# Check for Legacy Data Files Tests
# =============================================================================
class TestCheckForLegacyDataFiles:
"""Test cases for check_for_legacy_data_files method."""
@pytest.mark.asyncio
async def test_check_empty_directory(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
):
"""Test scanning empty directory returns empty list."""
files = await migration_service.check_for_legacy_data_files(
temp_anime_dir
)
assert files == []
@pytest.mark.asyncio
async def test_check_nonexistent_directory(
self,
migration_service: DataMigrationService,
):
"""Test scanning nonexistent directory returns empty list."""
files = await migration_service.check_for_legacy_data_files(
"/nonexistent/path"
)
assert files == []
@pytest.mark.asyncio
async def test_check_none_directory(
self,
migration_service: DataMigrationService,
):
"""Test scanning None directory returns empty list."""
files = await migration_service.check_for_legacy_data_files(None)
assert files == []
@pytest.mark.asyncio
async def test_check_empty_string_directory(
self,
migration_service: DataMigrationService,
):
"""Test scanning empty string directory returns empty list."""
files = await migration_service.check_for_legacy_data_files("")
assert files == []
@pytest.mark.asyncio
async def test_find_single_data_file(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
):
"""Test finding a single data file."""
create_test_data_file(
temp_anime_dir,
"Test Anime",
"test-anime",
"Test Anime",
)
files = await migration_service.check_for_legacy_data_files(
temp_anime_dir
)
assert len(files) == 1
assert files[0].endswith("data")
assert "Test Anime" in files[0]
@pytest.mark.asyncio
async def test_find_multiple_data_files(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
):
"""Test finding multiple data files."""
create_test_data_file(
temp_anime_dir, "Anime 1", "anime-1", "Anime 1"
)
create_test_data_file(
temp_anime_dir, "Anime 2", "anime-2", "Anime 2"
)
create_test_data_file(
temp_anime_dir, "Anime 3", "anime-3", "Anime 3"
)
files = await migration_service.check_for_legacy_data_files(
temp_anime_dir
)
assert len(files) == 3
@pytest.mark.asyncio
async def test_skip_folders_without_data_file(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
):
"""Test that folders without data files are skipped."""
# Create folder with data file
create_test_data_file(
temp_anime_dir, "With Data", "with-data", "With Data"
)
# Create folder without data file
empty_folder = os.path.join(temp_anime_dir, "Without Data")
os.makedirs(empty_folder, exist_ok=True)
files = await migration_service.check_for_legacy_data_files(
temp_anime_dir
)
assert len(files) == 1
assert "With Data" in files[0]
@pytest.mark.asyncio
async def test_skip_non_directories(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
):
"""Test that non-directory entries are skipped."""
create_test_data_file(
temp_anime_dir, "Anime", "anime", "Anime"
)
# Create a file (not directory) in anime dir
file_path = os.path.join(temp_anime_dir, "some_file.txt")
with open(file_path, "w") as f:
f.write("test")
files = await migration_service.check_for_legacy_data_files(
temp_anime_dir
)
assert len(files) == 1
# =============================================================================
# Migrate Data File to DB Tests
# =============================================================================
class TestMigrateDataFileToDb:
"""Test cases for migrate_data_file_to_db method."""
@pytest.mark.asyncio
async def test_migrate_valid_data_file(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migrating a valid data file creates database entry."""
data_path = create_test_data_file(
temp_anime_dir,
"Test Anime",
"test-anime",
"Test Anime",
)
result = await migration_service.migrate_data_file_to_db(
data_path, test_session
)
assert result is True
# Verify database entry
series = await AnimeSeriesService.get_by_key(
test_session, "test-anime"
)
assert series is not None
assert series.name == "Test Anime"
assert series.site == "aniworld.to"
assert series.folder == "Test Anime"
@pytest.mark.asyncio
async def test_migrate_nonexistent_file_raises_error(
self,
migration_service: DataMigrationService,
test_session,
):
"""Test migrating nonexistent file raises FileNotFoundError."""
with pytest.raises(FileNotFoundError):
await migration_service.migrate_data_file_to_db(
"/nonexistent/path/data",
test_session,
)
@pytest.mark.asyncio
async def test_migrate_invalid_json_raises_error(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migrating invalid JSON file raises ValueError."""
# Create folder with invalid data file
folder_path = os.path.join(temp_anime_dir, "Invalid")
os.makedirs(folder_path, exist_ok=True)
data_path = os.path.join(folder_path, "data")
with open(data_path, "w") as f:
f.write("not valid json")
with pytest.raises(ValueError):
await migration_service.migrate_data_file_to_db(
data_path, test_session
)
@pytest.mark.asyncio
async def test_migrate_skips_existing_series(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migration returns False for existing series in DB."""
# Create series in database first
await AnimeSeriesService.create(
test_session,
key="existing-anime",
name="Existing Anime",
site="aniworld.to",
folder="Existing Anime",
)
await test_session.commit()
# Create data file with same key
data_path = create_test_data_file(
temp_anime_dir,
"Existing Anime",
"existing-anime",
"Existing Anime",
)
result = await migration_service.migrate_data_file_to_db(
data_path, test_session
)
assert result is False
@pytest.mark.asyncio
async def test_migrate_preserves_episode_dict(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migration preserves episode dictionary correctly."""
data_path = create_test_data_file(
temp_anime_dir,
"With Episodes",
"with-episodes",
"With Episodes",
)
await migration_service.migrate_data_file_to_db(
data_path, test_session
)
series = await AnimeSeriesService.get_by_key(
test_session, "with-episodes"
)
assert series is not None
assert series.episode_dict is not None
# Note: JSON keys become strings, so check string keys
assert "1" in series.episode_dict or 1 in series.episode_dict
# =============================================================================
# Migrate All Legacy Data Tests
# =============================================================================
class TestMigrateAllLegacyData:
"""Test cases for migrate_all_legacy_data method."""
@pytest.mark.asyncio
async def test_migrate_empty_directory(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migrating empty directory returns zero counts."""
result = await migration_service.migrate_all_legacy_data(
temp_anime_dir, test_session
)
assert result.total_found == 0
assert result.migrated == 0
assert result.failed == 0
assert result.skipped == 0
@pytest.mark.asyncio
async def test_migrate_multiple_files(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migrating multiple files successfully."""
create_test_data_file(
temp_anime_dir, "Anime 1", "anime-1", "Anime 1"
)
create_test_data_file(
temp_anime_dir, "Anime 2", "anime-2", "Anime 2"
)
result = await migration_service.migrate_all_legacy_data(
temp_anime_dir, test_session
)
assert result.total_found == 2
assert result.migrated == 2
assert result.failed == 0
assert result.skipped == 0
@pytest.mark.asyncio
async def test_migrate_with_existing_entries(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migration skips existing database entries."""
# Create one in DB already
await AnimeSeriesService.create(
test_session,
key="anime-1",
name="Anime 1",
site="aniworld.to",
folder="Anime 1",
)
await test_session.commit()
# Create data files
create_test_data_file(
temp_anime_dir, "Anime 1", "anime-1", "Anime 1"
)
create_test_data_file(
temp_anime_dir, "Anime 2", "anime-2", "Anime 2"
)
result = await migration_service.migrate_all_legacy_data(
temp_anime_dir, test_session
)
assert result.total_found == 2
assert result.migrated == 1
assert result.skipped == 1
assert result.failed == 0
@pytest.mark.asyncio
async def test_migrate_with_invalid_file(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test migration continues after encountering invalid file."""
# Create valid data file
create_test_data_file(
temp_anime_dir, "Valid Anime", "valid-anime", "Valid Anime"
)
# Create invalid data file
invalid_folder = os.path.join(temp_anime_dir, "Invalid")
os.makedirs(invalid_folder, exist_ok=True)
invalid_path = os.path.join(invalid_folder, "data")
with open(invalid_path, "w") as f:
f.write("invalid json")
result = await migration_service.migrate_all_legacy_data(
temp_anime_dir, test_session
)
assert result.total_found == 2
assert result.migrated == 1
assert result.failed == 1
assert len(result.errors) == 1
# =============================================================================
# Cleanup Migrated Files Tests
# =============================================================================
class TestCleanupMigratedFiles:
"""Test cases for cleanup_migrated_files method."""
@pytest.mark.asyncio
async def test_cleanup_empty_list(
self,
migration_service: DataMigrationService,
):
"""Test cleanup with empty list does nothing."""
# Should not raise
await migration_service.cleanup_migrated_files([])
@pytest.mark.asyncio
async def test_cleanup_creates_backup(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
):
"""Test cleanup creates backup before removal."""
data_path = create_test_data_file(
temp_anime_dir, "Test", "test", "Test"
)
await migration_service.cleanup_migrated_files(
[data_path], backup=True
)
# Original file should be gone
assert not os.path.exists(data_path)
# Backup should exist
folder_path = os.path.dirname(data_path)
backup_files = [
f for f in os.listdir(folder_path) if f.startswith("data.backup")
]
assert len(backup_files) == 1
@pytest.mark.asyncio
async def test_cleanup_without_backup(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
):
"""Test cleanup without backup just removes file."""
data_path = create_test_data_file(
temp_anime_dir, "Test", "test", "Test"
)
await migration_service.cleanup_migrated_files(
[data_path], backup=False
)
# Original file should be gone
assert not os.path.exists(data_path)
# No backup should exist
folder_path = os.path.dirname(data_path)
backup_files = [
f for f in os.listdir(folder_path) if f.startswith("data.backup")
]
assert len(backup_files) == 0
@pytest.mark.asyncio
async def test_cleanup_handles_missing_file(
self,
migration_service: DataMigrationService,
):
"""Test cleanup handles already-deleted files gracefully."""
# Should not raise
await migration_service.cleanup_migrated_files(
["/nonexistent/path/data"]
)
# =============================================================================
# Migration Status Tests
# =============================================================================
class TestGetMigrationStatus:
"""Test cases for get_migration_status method."""
@pytest.mark.asyncio
async def test_status_empty_directory_empty_db(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test status with empty directory and database."""
status = await migration_service.get_migration_status(
temp_anime_dir, test_session
)
assert status["legacy_files_count"] == 0
assert status["database_entries_count"] == 0
assert status["only_in_files"] == []
assert status["only_in_database"] == []
assert status["in_both"] == []
assert status["migration_complete"] is True
@pytest.mark.asyncio
async def test_status_with_pending_migrations(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test status when files need migration."""
create_test_data_file(
temp_anime_dir, "Anime 1", "anime-1", "Anime 1"
)
create_test_data_file(
temp_anime_dir, "Anime 2", "anime-2", "Anime 2"
)
status = await migration_service.get_migration_status(
temp_anime_dir, test_session
)
assert status["legacy_files_count"] == 2
assert status["database_entries_count"] == 0
assert len(status["only_in_files"]) == 2
assert status["migration_complete"] is False
@pytest.mark.asyncio
async def test_status_after_complete_migration(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test status after all files migrated."""
create_test_data_file(
temp_anime_dir, "Anime 1", "anime-1", "Anime 1"
)
# Migrate
await migration_service.migrate_all_legacy_data(
temp_anime_dir, test_session
)
status = await migration_service.get_migration_status(
temp_anime_dir, test_session
)
assert status["legacy_files_count"] == 1
assert status["database_entries_count"] == 1
assert status["in_both"] == ["anime-1"]
assert status["migration_complete"] is True
@pytest.mark.asyncio
async def test_status_with_only_db_entries(
self,
migration_service: DataMigrationService,
temp_anime_dir: str,
test_session,
):
"""Test status when database has entries but no files."""
await AnimeSeriesService.create(
test_session,
key="db-only-anime",
name="DB Only Anime",
site="aniworld.to",
folder="DB Only",
)
await test_session.commit()
status = await migration_service.get_migration_status(
temp_anime_dir, test_session
)
assert status["legacy_files_count"] == 0
assert status["database_entries_count"] == 1
assert status["only_in_database"] == ["db-only-anime"]
assert status["migration_complete"] is True