migration removed

This commit is contained in:
2025-12-10 21:12:34 +01:00
parent 99f79e4c29
commit 842f9c88eb
25 changed files with 2 additions and 3862 deletions

View File

@@ -30,7 +30,6 @@ from src.server.database.init import (
create_database_backup,
create_database_schema,
get_database_info,
get_migration_guide,
get_schema_version,
initialize_database,
seed_initial_data,
@@ -64,7 +63,6 @@ __all__ = [
"check_database_health",
"create_database_backup",
"get_database_info",
"get_migration_guide",
"CURRENT_SCHEMA_VERSION",
"EXPECTED_TABLES",
# Models

View File

@@ -2,12 +2,9 @@
This module provides comprehensive database initialization functionality:
- Schema creation and validation
- Initial data migration
- Database health checks
- Schema versioning support
- Migration utilities
For production deployments, consider using Alembic for managed migrations.
"""
from __future__ import annotations
@@ -354,8 +351,6 @@ async def create_schema_version_table(
) -> None:
"""Create schema version tracking table.
Future enhancement for tracking schema migrations with Alembic.
Args:
engine: Optional database engine (uses default if not provided)
"""
@@ -587,60 +582,6 @@ def get_database_info() -> Dict[str, Any]:
}
def get_migration_guide() -> str:
"""Get migration guide for production deployments.
Returns:
Migration guide text
"""
return """
Database Migration Guide
========================
Current Setup: SQLAlchemy create_all()
- Automatically creates tables on startup
- Suitable for development and single-instance deployments
- Schema changes require manual handling
For Production with Alembic:
============================
1. Initialize Alembic (already installed):
alembic init alembic
2. Configure alembic/env.py:
from src.server.database.base import Base
target_metadata = Base.metadata
3. Configure alembic.ini:
sqlalchemy.url = <your-database-url>
4. Generate initial migration:
alembic revision --autogenerate -m "Initial schema v1.0.0"
5. Review migration in alembic/versions/
6. Apply migration:
alembic upgrade head
7. For future schema changes:
- Modify models in src/server/database/models.py
- Generate migration: alembic revision --autogenerate -m "Description"
- Review generated migration
- Test in staging environment
- Apply: alembic upgrade head
- For rollback: alembic downgrade -1
Best Practices:
==============
- Always backup database before migrations
- Test migrations in staging first
- Review auto-generated migrations carefully
- Keep migrations in version control
- Document breaking changes
"""
# =============================================================================
# Public API
# =============================================================================
@@ -656,7 +597,6 @@ __all__ = [
"check_database_health",
"create_database_backup",
"get_database_info",
"get_migration_guide",
"CURRENT_SCHEMA_VERSION",
"EXPECTED_TABLES",
]

View File

@@ -1,167 +0,0 @@
"""Database migration utilities.
This module provides utilities for database migrations and schema versioning.
Alembic integration can be added when needed for production environments.
For now, we use SQLAlchemy's create_all for automatic schema creation.
"""
from __future__ import annotations
import logging
from typing import Optional
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
from src.server.database.base import Base
from src.server.database.connection import get_engine, get_sync_engine
logger = logging.getLogger(__name__)
async def initialize_schema(engine: Optional[AsyncEngine] = None) -> None:
"""Initialize database schema.
Creates all tables defined in Base metadata if they don't exist.
This is a simple migration strategy suitable for single-instance deployments.
For production with multiple instances, consider using Alembic:
- alembic init alembic
- alembic revision --autogenerate -m "Initial schema"
- alembic upgrade head
Args:
engine: Optional database engine (uses default if not provided)
Raises:
RuntimeError: If database is not initialized
"""
if engine is None:
engine = get_engine()
logger.info("Initializing database schema...")
# Create all tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Database schema initialized successfully")
async def check_schema_version(engine: Optional[AsyncEngine] = None) -> str:
"""Check current database schema version.
Returns a simple version identifier based on existing tables.
For production, consider using Alembic for proper versioning.
Args:
engine: Optional database engine (uses default if not provided)
Returns:
Schema version string
Raises:
RuntimeError: If database is not initialized
"""
if engine is None:
engine = get_engine()
async with engine.connect() as conn:
# Check which tables exist
result = await conn.execute(
text(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name NOT LIKE 'sqlite_%'"
)
)
tables = [row[0] for row in result]
if not tables:
return "empty"
elif len(tables) == 4 and all(
t in tables for t in [
"anime_series",
"episodes",
"download_queue",
"user_sessions",
]
):
return "v1.0"
else:
return "custom"
def get_migration_info() -> str:
"""Get information about database migration setup.
Returns:
Migration setup information
"""
return """
Database Migration Information
==============================
Current Strategy: SQLAlchemy create_all()
- Automatically creates tables on startup
- Suitable for development and single-instance deployments
- Schema changes require manual handling
For Production Migrations (Alembic):
====================================
1. Initialize Alembic:
alembic init alembic
2. Configure alembic/env.py:
- Import Base from src.server.database.base
- Set target_metadata = Base.metadata
3. Configure alembic.ini:
- Set sqlalchemy.url to your database URL
4. Generate initial migration:
alembic revision --autogenerate -m "Initial schema"
5. Apply migrations:
alembic upgrade head
6. For future changes:
- Modify models in src/server/database/models.py
- Generate migration: alembic revision --autogenerate -m "Description"
- Review generated migration in alembic/versions/
- Apply: alembic upgrade head
Benefits of Alembic:
- Version control for database schema
- Automatic migration generation from model changes
- Rollback support with downgrade scripts
- Multi-instance deployment support
- Safe schema changes in production
"""
# =============================================================================
# Future Alembic Integration
# =============================================================================
#
# When ready to use Alembic, follow these steps:
#
# 1. Install Alembic (already in requirements.txt):
# pip install alembic
#
# 2. Initialize Alembic from project root:
# alembic init alembic
#
# 3. Update alembic/env.py to use our Base:
# from src.server.database.base import Base
# target_metadata = Base.metadata
#
# 4. Configure alembic.ini with DATABASE_URL from settings
#
# 5. Generate initial migration:
# alembic revision --autogenerate -m "Initial schema"
#
# 6. Review generated migration and apply:
# alembic upgrade head
#
# =============================================================================

View File

@@ -1,236 +0,0 @@
"""
Initial database schema migration.
This migration creates the base tables for the Aniworld application,
including users, anime, downloads, and configuration tables.
Version: 20250124_001
Created: 2025-01-24
"""
import logging
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ..migrations.base import Migration, MigrationError
logger = logging.getLogger(__name__)
class InitialSchemaMigration(Migration):
"""
Creates initial database schema.
This migration sets up all core tables needed for the application:
- users: User accounts and authentication
- anime: Anime series metadata
- episodes: Episode information
- downloads: Download queue and history
- config: Application configuration
"""
def __init__(self):
"""Initialize the initial schema migration."""
super().__init__(
version="20250124_001",
description="Create initial database schema",
)
async def upgrade(self, session: AsyncSession) -> None:
"""
Create all initial tables.
Args:
session: Database session
Raises:
MigrationError: If table creation fails
"""
try:
# Create users table
await session.execute(
text(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
email TEXT,
password_hash TEXT NOT NULL,
is_active BOOLEAN DEFAULT 1,
is_admin BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
)
# Create anime table
await session.execute(
text(
"""
CREATE TABLE IF NOT EXISTS anime (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
original_title TEXT,
description TEXT,
genres TEXT,
release_year INTEGER,
status TEXT,
total_episodes INTEGER,
cover_image_url TEXT,
aniworld_url TEXT,
mal_id INTEGER,
anilist_id INTEGER,
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
)
# Create episodes table
await session.execute(
text(
"""
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
anime_id INTEGER NOT NULL,
episode_number INTEGER NOT NULL,
season_number INTEGER DEFAULT 1,
title TEXT,
description TEXT,
duration_minutes INTEGER,
air_date DATE,
stream_url TEXT,
download_url TEXT,
file_path TEXT,
file_size_bytes INTEGER,
is_downloaded BOOLEAN DEFAULT 0,
download_progress REAL DEFAULT 0.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (anime_id) REFERENCES anime(id)
ON DELETE CASCADE,
UNIQUE (anime_id, season_number, episode_number)
)
"""
)
)
# Create downloads table
await session.execute(
text(
"""
CREATE TABLE IF NOT EXISTS downloads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
episode_id INTEGER NOT NULL,
user_id INTEGER,
status TEXT NOT NULL DEFAULT 'pending',
priority INTEGER DEFAULT 5,
progress REAL DEFAULT 0.0,
download_speed_mbps REAL,
eta_seconds INTEGER,
started_at TIMESTAMP,
completed_at TIMESTAMP,
failed_at TIMESTAMP,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (episode_id) REFERENCES episodes(id)
ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users(id)
ON DELETE SET NULL
)
"""
)
)
# Create config table
await session.execute(
text(
"""
CREATE TABLE IF NOT EXISTS config (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL UNIQUE,
value TEXT NOT NULL,
category TEXT DEFAULT 'general',
description TEXT,
is_secret BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
)
# Create indexes for better performance
await session.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_anime_title "
"ON anime(title)"
)
)
await session.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_episodes_anime_id "
"ON episodes(anime_id)"
)
)
await session.execute(
text(
"CREATE INDEX IF NOT EXISTS idx_downloads_status "
"ON downloads(status)"
)
)
await session.execute(
text(
"CREATE INDEX IF NOT EXISTS "
"idx_downloads_episode_id ON downloads(episode_id)"
)
)
logger.info("Initial schema created successfully")
except Exception as e:
logger.error(f"Failed to create initial schema: {e}")
raise MigrationError(
f"Initial schema creation failed: {e}"
) from e
async def downgrade(self, session: AsyncSession) -> None:
"""
Drop all initial tables.
Args:
session: Database session
Raises:
MigrationError: If table dropping fails
"""
try:
# Drop tables in reverse order to respect foreign keys
tables = [
"downloads",
"episodes",
"anime",
"users",
"config",
]
for table in tables:
await session.execute(text(f"DROP TABLE IF EXISTS {table}"))
logger.debug(f"Dropped table: {table}")
logger.info("Initial schema rolled back successfully")
except Exception as e:
logger.error(f"Failed to rollback initial schema: {e}")
raise MigrationError(
f"Initial schema rollback failed: {e}"
) from e

View File

@@ -1,17 +0,0 @@
"""
Database migration system for Aniworld application.
This package provides tools for managing database schema changes,
including migration creation, execution, and rollback capabilities.
"""
from .base import Migration, MigrationError
from .runner import MigrationRunner
from .validator import MigrationValidator
__all__ = [
"Migration",
"MigrationError",
"MigrationRunner",
"MigrationValidator",
]

View File

@@ -1,128 +0,0 @@
"""
Base migration classes and utilities.
This module provides the foundation for database migrations,
including the abstract Migration class and error handling.
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
class MigrationError(Exception):
"""Base exception for migration-related errors."""
pass
class Migration(ABC):
"""
Abstract base class for database migrations.
Each migration should inherit from this class and implement
the upgrade and downgrade methods.
Attributes:
version: Unique version identifier (e.g., "20250124_001")
description: Human-readable description of the migration
created_at: Timestamp when migration was created
"""
def __init__(
self,
version: str,
description: str,
created_at: Optional[datetime] = None,
):
"""
Initialize migration.
Args:
version: Unique version identifier
description: Human-readable description
created_at: Creation timestamp (defaults to now)
"""
self.version = version
self.description = description
self.created_at = created_at or datetime.now()
@abstractmethod
async def upgrade(self, session: AsyncSession) -> None:
"""
Apply the migration.
Args:
session: Database session for executing changes
Raises:
MigrationError: If migration fails
"""
pass
@abstractmethod
async def downgrade(self, session: AsyncSession) -> None:
"""
Revert the migration.
Args:
session: Database session for reverting changes
Raises:
MigrationError: If rollback fails
"""
pass
def __repr__(self) -> str:
"""Return string representation of migration."""
return f"Migration({self.version}: {self.description})"
def __eq__(self, other: object) -> bool:
"""Check equality based on version."""
if not isinstance(other, Migration):
return False
return self.version == other.version
def __hash__(self) -> int:
"""Return hash based on version."""
return hash(self.version)
class MigrationHistory:
"""
Tracks applied migrations in the database.
This model stores information about which migrations have been
applied, when they were applied, and their execution status.
"""
__tablename__ = "migration_history"
def __init__(
self,
version: str,
description: str,
applied_at: datetime,
execution_time_ms: int,
success: bool = True,
error_message: Optional[str] = None,
):
"""
Initialize migration history record.
Args:
version: Migration version identifier
description: Migration description
applied_at: Timestamp when migration was applied
execution_time_ms: Time taken to execute in milliseconds
success: Whether migration succeeded
error_message: Error message if migration failed
"""
self.version = version
self.description = description
self.applied_at = applied_at
self.execution_time_ms = execution_time_ms
self.success = success
self.error_message = error_message

View File

@@ -1,323 +0,0 @@
"""
Migration runner for executing database migrations.
This module handles the execution of migrations in the correct order,
tracks migration history, and provides rollback capabilities.
"""
import importlib.util
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from .base import Migration, MigrationError, MigrationHistory
logger = logging.getLogger(__name__)
class MigrationRunner:
"""
Manages database migration execution and tracking.
This class handles loading migrations, executing them in order,
tracking their status, and rolling back when needed.
"""
def __init__(self, migrations_dir: Path, session: AsyncSession):
"""
Initialize migration runner.
Args:
migrations_dir: Directory containing migration files
session: Database session for executing migrations
"""
self.migrations_dir = migrations_dir
self.session = session
self._migrations: List[Migration] = []
async def initialize(self) -> None:
"""
Initialize migration system by creating tracking table if needed.
Raises:
MigrationError: If initialization fails
"""
try:
# Create migration_history table if it doesn't exist
create_table_sql = """
CREATE TABLE IF NOT EXISTS migration_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
version TEXT NOT NULL UNIQUE,
description TEXT NOT NULL,
applied_at TIMESTAMP NOT NULL,
execution_time_ms INTEGER NOT NULL,
success BOOLEAN NOT NULL DEFAULT 1,
error_message TEXT
)
"""
await self.session.execute(text(create_table_sql))
await self.session.commit()
logger.info("Migration system initialized")
except Exception as e:
logger.error(f"Failed to initialize migration system: {e}")
raise MigrationError(f"Initialization failed: {e}") from e
def load_migrations(self) -> None:
"""
Load all migration files from the migrations directory.
Migration files should be named in format: {version}_{description}.py
and contain a Migration class that inherits from base.Migration.
Raises:
MigrationError: If loading migrations fails
"""
try:
self._migrations.clear()
if not self.migrations_dir.exists():
logger.warning(f"Migrations directory does not exist: {self.migrations_dir}")
return
# Find all Python files in migrations directory
migration_files = sorted(self.migrations_dir.glob("*.py"))
migration_files = [f for f in migration_files if f.name != "__init__.py"]
for file_path in migration_files:
try:
# Import the migration module dynamically
spec = importlib.util.spec_from_file_location(
f"migration.{file_path.stem}", file_path
)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find Migration subclass in module
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, Migration)
and attr != Migration
):
migration_instance = attr()
self._migrations.append(migration_instance)
logger.debug(f"Loaded migration: {migration_instance.version}")
break
except Exception as e:
logger.error(f"Failed to load migration {file_path.name}: {e}")
raise MigrationError(f"Failed to load {file_path.name}: {e}") from e
# Sort migrations by version
self._migrations.sort(key=lambda m: m.version)
logger.info(f"Loaded {len(self._migrations)} migrations")
except Exception as e:
logger.error(f"Failed to load migrations: {e}")
raise MigrationError(f"Loading migrations failed: {e}") from e
async def get_applied_migrations(self) -> List[str]:
"""
Get list of already applied migration versions.
Returns:
List of migration versions that have been applied
Raises:
MigrationError: If query fails
"""
try:
result = await self.session.execute(
text("SELECT version FROM migration_history WHERE success = 1 ORDER BY version")
)
versions = [row[0] for row in result.fetchall()]
return versions
except Exception as e:
logger.error(f"Failed to get applied migrations: {e}")
raise MigrationError(f"Query failed: {e}") from e
async def get_pending_migrations(self) -> List[Migration]:
"""
Get list of migrations that haven't been applied yet.
Returns:
List of pending Migration objects
Raises:
MigrationError: If check fails
"""
applied = await self.get_applied_migrations()
pending = [m for m in self._migrations if m.version not in applied]
return pending
async def apply_migration(self, migration: Migration) -> None:
"""
Apply a single migration.
Args:
migration: Migration to apply
Raises:
MigrationError: If migration fails
"""
start_time = time.time()
success = False
error_message = None
try:
logger.info(f"Applying migration: {migration.version} - {migration.description}")
# Execute the migration
await migration.upgrade(self.session)
await self.session.commit()
success = True
execution_time_ms = int((time.time() - start_time) * 1000)
logger.info(
f"Migration {migration.version} applied successfully in {execution_time_ms}ms"
)
except Exception as e:
error_message = str(e)
execution_time_ms = int((time.time() - start_time) * 1000)
logger.error(f"Migration {migration.version} failed: {e}")
await self.session.rollback()
raise MigrationError(f"Migration {migration.version} failed: {e}") from e
finally:
# Record migration in history
try:
history_record = MigrationHistory(
version=migration.version,
description=migration.description,
applied_at=datetime.now(),
execution_time_ms=execution_time_ms,
success=success,
error_message=error_message,
)
insert_sql = """
INSERT INTO migration_history
(version, description, applied_at, execution_time_ms, success, error_message)
VALUES (:version, :description, :applied_at, :execution_time_ms, :success, :error_message)
"""
await self.session.execute(
text(insert_sql),
{
"version": history_record.version,
"description": history_record.description,
"applied_at": history_record.applied_at,
"execution_time_ms": history_record.execution_time_ms,
"success": history_record.success,
"error_message": history_record.error_message,
},
)
await self.session.commit()
except Exception as e:
logger.error(f"Failed to record migration history: {e}")
async def run_migrations(self, target_version: Optional[str] = None) -> int:
"""
Run all pending migrations up to target version.
Args:
target_version: Stop at this version (None = run all)
Returns:
Number of migrations applied
Raises:
MigrationError: If migrations fail
"""
pending = await self.get_pending_migrations()
if target_version:
pending = [m for m in pending if m.version <= target_version]
if not pending:
logger.info("No pending migrations to apply")
return 0
logger.info(f"Applying {len(pending)} pending migrations")
for migration in pending:
await self.apply_migration(migration)
return len(pending)
async def rollback_migration(self, migration: Migration) -> None:
"""
Rollback a single migration.
Args:
migration: Migration to rollback
Raises:
MigrationError: If rollback fails
"""
start_time = time.time()
try:
logger.info(f"Rolling back migration: {migration.version}")
# Execute the downgrade
await migration.downgrade(self.session)
await self.session.commit()
execution_time_ms = int((time.time() - start_time) * 1000)
# Remove from history
delete_sql = "DELETE FROM migration_history WHERE version = :version"
await self.session.execute(text(delete_sql), {"version": migration.version})
await self.session.commit()
logger.info(
f"Migration {migration.version} rolled back successfully in {execution_time_ms}ms"
)
except Exception as e:
logger.error(f"Rollback of {migration.version} failed: {e}")
await self.session.rollback()
raise MigrationError(f"Rollback of {migration.version} failed: {e}") from e
async def rollback(self, steps: int = 1) -> int:
"""
Rollback the last N migrations.
Args:
steps: Number of migrations to rollback
Returns:
Number of migrations rolled back
Raises:
MigrationError: If rollback fails
"""
applied = await self.get_applied_migrations()
if not applied:
logger.info("No migrations to rollback")
return 0
# Get migrations to rollback (in reverse order)
to_rollback = applied[-steps:]
to_rollback.reverse()
migrations_to_rollback = [m for m in self._migrations if m.version in to_rollback]
logger.info(f"Rolling back {len(migrations_to_rollback)} migrations")
for migration in migrations_to_rollback:
await self.rollback_migration(migration)
return len(migrations_to_rollback)

View File

@@ -1,222 +0,0 @@
"""
Migration validator for ensuring migration safety and integrity.
This module provides validation utilities to check migrations
before they are executed, ensuring they meet quality standards.
"""
import logging
from typing import List, Optional, Set
from .base import Migration, MigrationError
logger = logging.getLogger(__name__)
class MigrationValidator:
"""
Validates migrations before execution.
Performs various checks to ensure migrations are safe to run,
including version uniqueness, naming conventions, and
dependency resolution.
"""
def __init__(self):
"""Initialize migration validator."""
self.errors: List[str] = []
self.warnings: List[str] = []
def reset(self) -> None:
"""Clear validation results."""
self.errors.clear()
self.warnings.clear()
def validate_migration(self, migration: Migration) -> bool:
"""
Validate a single migration.
Args:
migration: Migration to validate
Returns:
True if migration is valid, False otherwise
"""
self.reset()
# Check version format
if not self._validate_version_format(migration.version):
self.errors.append(
f"Invalid version format: {migration.version}. "
"Expected format: YYYYMMDD_NNN"
)
# Check description
if not migration.description or len(migration.description) < 5:
self.errors.append(
f"Migration {migration.version} has invalid "
f"description: '{migration.description}'"
)
# Check for implementation
if not hasattr(migration, "upgrade") or not callable(
getattr(migration, "upgrade")
):
self.errors.append(
f"Migration {migration.version} missing upgrade method"
)
if not hasattr(migration, "downgrade") or not callable(
getattr(migration, "downgrade")
):
self.errors.append(
f"Migration {migration.version} missing downgrade method"
)
return len(self.errors) == 0
def validate_migrations(self, migrations: List[Migration]) -> bool:
"""
Validate a list of migrations.
Args:
migrations: List of migrations to validate
Returns:
True if all migrations are valid, False otherwise
"""
self.reset()
if not migrations:
self.warnings.append("No migrations to validate")
return True
# Check for duplicate versions
versions: Set[str] = set()
for migration in migrations:
if migration.version in versions:
self.errors.append(
f"Duplicate migration version: {migration.version}"
)
versions.add(migration.version)
# Return early if duplicates found
if self.errors:
return False
# Validate each migration
for migration in migrations:
if not self.validate_migration(migration):
logger.error(
f"Migration {migration.version} "
f"validation failed: {self.errors}"
)
return False
# Check version ordering
sorted_versions = sorted([m.version for m in migrations])
actual_versions = [m.version for m in migrations]
if sorted_versions != actual_versions:
self.warnings.append(
"Migrations are not in chronological order"
)
return len(self.errors) == 0
def _validate_version_format(self, version: str) -> bool:
"""
Validate version string format.
Args:
version: Version string to validate
Returns:
True if format is valid
"""
# Expected format: YYYYMMDD_NNN or YYYYMMDD_NNN_description
if not version:
return False
parts = version.split("_")
if len(parts) < 2:
return False
# Check date part (YYYYMMDD)
date_part = parts[0]
if len(date_part) != 8 or not date_part.isdigit():
return False
# Check sequence part (NNN)
seq_part = parts[1]
if not seq_part.isdigit():
return False
return True
def check_migration_conflicts(
self,
pending: List[Migration],
applied: List[str],
) -> Optional[str]:
"""
Check for conflicts between pending and applied migrations.
Args:
pending: List of pending migrations
applied: List of applied migration versions
Returns:
Error message if conflicts found, None otherwise
"""
# Check if any pending migration has version lower than applied
if not applied:
return None
latest_applied = max(applied)
for migration in pending:
if migration.version < latest_applied:
return (
f"Migration {migration.version} is older than "
f"latest applied migration {latest_applied}. "
"This may indicate a merge conflict."
)
return None
def get_validation_report(self) -> str:
"""
Get formatted validation report.
Returns:
Formatted report string
"""
report = []
if self.errors:
report.append("Validation Errors:")
for error in self.errors:
report.append(f" - {error}")
if self.warnings:
report.append("Validation Warnings:")
for warning in self.warnings:
report.append(f" - {warning}")
if not self.errors and not self.warnings:
report.append("All validations passed")
return "\n".join(report)
def raise_if_invalid(self) -> None:
"""
Raise exception if validation failed.
Raises:
MigrationError: If validation errors exist
"""
if self.errors:
error_msg = "\n".join(self.errors)
raise MigrationError(
f"Migration validation failed:\n{error_msg}"
)