Aniworld/src/server/services/data_migration_service.py

414 lines
13 KiB
Python

"""Data migration service for migrating file-based storage to database.
This module provides functionality to migrate anime series data from
legacy file-based storage (data files without .json extension) to the
SQLite database using the AnimeSeries model.
The migration service:
- Scans anime directories for existing data files
- Reads Serie objects from data files
- Migrates them to the database using AnimeSeriesService
- Handles errors gracefully without stopping the migration
- Provides detailed migration results
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from src.core.entities.series import Serie
from src.server.database.service import AnimeSeriesService
logger = logging.getLogger(__name__)
@dataclass
class MigrationResult:
"""Result of a data file migration operation.
Attributes:
total_found: Total number of data files found
migrated: Number of files successfully migrated
skipped: Number of files skipped (already in database)
failed: Number of files that failed to migrate
errors: List of error messages encountered
"""
total_found: int = 0
migrated: int = 0
skipped: int = 0
failed: int = 0
errors: List[str] = field(default_factory=list)
def __post_init__(self):
"""Ensure errors is always a list."""
if self.errors is None:
self.errors = []
class DataMigrationError(Exception):
"""Base exception for data migration errors."""
class DataFileReadError(DataMigrationError):
"""Raised when a data file cannot be read."""
class DataMigrationService:
"""Service for migrating data files to database.
This service handles the migration of anime series data from
file-based storage to the database. It scans directories for
data files, reads Serie objects, and creates AnimeSeries records.
Example:
```python
service = DataMigrationService()
# Check if migration is needed
if await service.is_migration_needed("/path/to/anime"):
async with get_db_session() as db:
result = await service.migrate_all("/path/to/anime", db)
print(f"Migrated {result.migrated} series")
```
"""
def __init__(self) -> None:
"""Initialize the data migration service."""
pass
def scan_for_data_files(self, anime_directory: str) -> List[Path]:
"""Scan for data files in the anime directory.
Finds all 'data' files (JSON format without extension) in
the anime directory structure. Each series folder may contain
a 'data' file with series metadata.
Args:
anime_directory: Path to the anime directory containing
series folders
Returns:
List of Path objects pointing to data files
Raises:
ValueError: If anime_directory is invalid
"""
if not anime_directory or not anime_directory.strip():
logger.warning("Empty anime directory provided")
return []
base_path = Path(anime_directory)
if not base_path.exists():
logger.warning(
"Anime directory does not exist: %s",
anime_directory
)
return []
if not base_path.is_dir():
logger.warning(
"Anime directory is not a directory: %s",
anime_directory
)
return []
data_files: List[Path] = []
try:
# Iterate through all subdirectories (series folders)
for folder in base_path.iterdir():
if not folder.is_dir():
continue
# Check for 'data' file in each series folder
data_file = folder / "data"
if data_file.exists() and data_file.is_file():
data_files.append(data_file)
logger.debug("Found data file: %s", data_file)
except PermissionError as e:
logger.error(
"Permission denied scanning directory %s: %s",
anime_directory,
e
)
except OSError as e:
logger.error(
"OS error scanning directory %s: %s",
anime_directory,
e
)
logger.info(
"Found %d data files in %s",
len(data_files),
anime_directory
)
return data_files
def _read_data_file(self, data_path: Path) -> Optional[Serie]:
"""Read a Serie object from a data file.
Args:
data_path: Path to the data file
Returns:
Serie object if successfully read, None otherwise
Raises:
DataFileReadError: If the file cannot be read or parsed
"""
try:
serie = Serie.load_from_file(str(data_path))
# Validate the serie has required fields
if not serie.key or not serie.key.strip():
raise DataFileReadError(
f"Data file {data_path} has empty or missing key"
)
logger.debug(
"Successfully read serie '%s' from %s",
serie.key,
data_path
)
return serie
except FileNotFoundError as e:
raise DataFileReadError(
f"Data file not found: {data_path}"
) from e
except PermissionError as e:
raise DataFileReadError(
f"Permission denied reading data file: {data_path}"
) from e
except (ValueError, KeyError, TypeError) as e:
raise DataFileReadError(
f"Invalid data in file {data_path}: {e}"
) from e
except Exception as e:
raise DataFileReadError(
f"Error reading data file {data_path}: {e}"
) from e
async def migrate_data_file(
self,
data_path: Path,
db: AsyncSession
) -> bool:
"""Migrate a single data file to the database.
Reads the data file, checks if the series already exists in the
database, and creates a new record if it doesn't exist. If the
series exists, optionally updates the episode_dict if changed.
Args:
data_path: Path to the data file
db: Async database session
Returns:
True if the series was migrated (created or updated),
False if skipped (already exists with same data)
Raises:
DataFileReadError: If the file cannot be read
DataMigrationError: If database operation fails
"""
# Read the data file
serie = self._read_data_file(data_path)
if serie is None:
raise DataFileReadError(f"Could not read data file: {data_path}")
# Check if series already exists in database
existing = await AnimeSeriesService.get_by_key(db, serie.key)
if existing is not None:
# Check if episode_dict has changed
existing_dict = existing.episode_dict or {}
new_dict = serie.episodeDict or {}
# Convert keys to strings for comparison (JSON stores keys as strings)
new_dict_str_keys = {
str(k): v for k, v in new_dict.items()
}
if existing_dict == new_dict_str_keys:
logger.debug(
"Series '%s' already exists with same data, skipping",
serie.key
)
return False
# Update episode_dict if different
await AnimeSeriesService.update(
db,
existing.id,
episode_dict=new_dict_str_keys
)
logger.info(
"Updated episode_dict for existing series '%s'",
serie.key
)
return True
# Create new series in database
try:
# Convert episode_dict keys to strings for JSON storage
episode_dict_for_db = {
str(k): v for k, v in (serie.episodeDict or {}).items()
}
await AnimeSeriesService.create(
db,
key=serie.key,
name=serie.name,
site=serie.site,
folder=serie.folder,
episode_dict=episode_dict_for_db,
)
logger.info(
"Migrated series '%s' to database",
serie.key
)
return True
except IntegrityError as e:
# Race condition - series was created by another process
logger.warning(
"Series '%s' was already created (race condition): %s",
serie.key,
e
)
return False
except Exception as e:
raise DataMigrationError(
f"Failed to create series '{serie.key}' in database: {e}"
) from e
async def migrate_all(
self,
anime_directory: str,
db: AsyncSession
) -> MigrationResult:
"""Migrate all data files from anime directory to database.
Scans the anime directory for data files and migrates each one
to the database. Errors are logged but do not stop the migration.
Args:
anime_directory: Path to the anime directory
db: Async database session
Returns:
MigrationResult with counts and error messages
"""
result = MigrationResult()
# Scan for data files
data_files = self.scan_for_data_files(anime_directory)
result.total_found = len(data_files)
if result.total_found == 0:
logger.info("No data files found to migrate")
return result
logger.info(
"Starting migration of %d data files",
result.total_found
)
# Migrate each file
for data_path in data_files:
try:
migrated = await self.migrate_data_file(data_path, db)
if migrated:
result.migrated += 1
else:
result.skipped += 1
except DataFileReadError as e:
result.failed += 1
error_msg = f"Failed to read {data_path}: {e}"
result.errors.append(error_msg)
logger.error(error_msg)
except DataMigrationError as e:
result.failed += 1
error_msg = f"Failed to migrate {data_path}: {e}"
result.errors.append(error_msg)
logger.error(error_msg)
except Exception as e:
result.failed += 1
error_msg = f"Unexpected error migrating {data_path}: {e}"
result.errors.append(error_msg)
logger.exception(error_msg)
# Commit all changes
try:
await db.commit()
except Exception as e:
logger.error("Failed to commit migration: %s", e)
result.errors.append(f"Failed to commit migration: {e}")
logger.info(
"Migration complete: %d migrated, %d skipped, %d failed",
result.migrated,
result.skipped,
result.failed
)
return result
def is_migration_needed(self, anime_directory: str) -> bool:
"""Check if there are data files to migrate.
Args:
anime_directory: Path to the anime directory
Returns:
True if data files exist, False otherwise
"""
data_files = self.scan_for_data_files(anime_directory)
needs_migration = len(data_files) > 0
if needs_migration:
logger.info(
"Migration needed: found %d data files",
len(data_files)
)
else:
logger.debug("No migration needed: no data files found")
return needs_migration
# Singleton instance for the service
_data_migration_service: Optional[DataMigrationService] = None
def get_data_migration_service() -> DataMigrationService:
"""Get the singleton data migration service instance.
Returns:
DataMigrationService instance
"""
global _data_migration_service
if _data_migration_service is None:
_data_migration_service = DataMigrationService()
return _data_migration_service
def reset_data_migration_service() -> None:
"""Reset the singleton service instance (for testing)."""
global _data_migration_service
_data_migration_service = None