"""SQLAlchemy ORM models for the Aniworld web application. This module defines database models for anime series, episodes, download queue, and user sessions. Models use SQLAlchemy 2.0 style with type annotations. Models: - AnimeSeries: Represents an anime series with metadata - Episode: Individual episodes linked to series - DownloadQueueItem: Download queue with status and progress tracking - UserSession: User authentication sessions with JWT tokens """ from __future__ import annotations from datetime import datetime, timezone from enum import Enum from typing import List, Optional from sqlalchemy import Boolean, DateTime, ForeignKey, Index, Integer, String, Text, func from sqlalchemy.orm import Mapped, mapped_column, relationship, validates from src.server.database.base import Base, TimestampMixin class AnimeSeries(Base, TimestampMixin): """SQLAlchemy model for anime series. Represents an anime series with metadata, provider information, and links to episodes. Corresponds to the core Serie class. Series Identifier Convention: - `key`: PRIMARY IDENTIFIER - Unique, provider-assigned, URL-safe (e.g., "attack-on-titan"). Used for all lookups and operations. - `folder`: METADATA ONLY - Filesystem folder name for display (e.g., "Attack on Titan (2013)"). Never used for identification. - `id`: Internal database primary key for relationships. Attributes: id: Database primary key (internal use for relationships) key: Unique provider key - PRIMARY IDENTIFIER for all lookups name: Display name of the series site: Provider site URL folder: Filesystem folder name (metadata only, not for lookups) episodes: Relationship to Episode models (via id foreign key) download_items: Relationship to DownloadQueueItem models (via id foreign key) created_at: Creation timestamp (from TimestampMixin) updated_at: Last update timestamp (from TimestampMixin) Note: All database relationships use `id` (primary key), not `key` or `folder`. Use `get_by_key()` in AnimeSeriesService for lookups. """ __tablename__ = "anime_series" # Primary key id: Mapped[int] = mapped_column( Integer, primary_key=True, autoincrement=True ) # Core identification key: Mapped[str] = mapped_column( String(255), unique=True, nullable=False, index=True, doc="Unique provider key - PRIMARY IDENTIFIER for all lookups" ) name: Mapped[str] = mapped_column( String(500), nullable=False, index=True, doc="Series name" ) site: Mapped[str] = mapped_column( String(500), nullable=False, doc="Provider site URL" ) folder: Mapped[str] = mapped_column( String(1000), nullable=False, doc="Filesystem folder name - METADATA ONLY, not for lookups" ) year: Mapped[Optional[int]] = mapped_column( Integer, nullable=True, doc="Release year of the series" ) # NFO metadata tracking has_nfo: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether tvshow.nfo file exists for this series" ) nfo_path: Mapped[Optional[str]] = mapped_column( String(1000), nullable=True, doc="Path to the tvshow.nfo metadata file" ) nfo_created_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="Timestamp when NFO was first created" ) nfo_updated_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="Timestamp when NFO was last updated" ) # TMDB (The Movie Database) ID for series metadata tmdb_id: Mapped[Optional[int]] = mapped_column( Integer, nullable=True, index=True, doc="TMDB (The Movie Database) ID for series metadata" ) tvdb_id: Mapped[Optional[int]] = mapped_column( Integer, nullable=True, index=True, doc="TVDB (TheTVDB) ID for series metadata" ) # Loading status fields for asynchronous data loading loading_status: Mapped[str] = mapped_column( String(50), nullable=False, default="completed", server_default="completed", doc="Loading status: pending, loading_episodes, loading_nfo, loading_logo, " "loading_images, completed, failed" ) episodes_loaded: Mapped[bool] = mapped_column( Boolean, nullable=False, default=True, server_default="1", doc="Whether episodes have been scanned and loaded" ) logo_loaded: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether logo.png has been downloaded" ) images_loaded: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether poster/fanart images have been downloaded" ) loading_started_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="Timestamp when background loading started" ) loading_completed_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="Timestamp when background loading completed" ) loading_error: Mapped[Optional[str]] = mapped_column( String(1000), nullable=True, doc="Error message if loading failed" ) # Relationships episodes: Mapped[List["Episode"]] = relationship( "Episode", back_populates="series", cascade="all, delete-orphan" ) download_items: Mapped[List["DownloadQueueItem"]] = relationship( "DownloadQueueItem", back_populates="series", cascade="all, delete-orphan" ) @validates('key') def validate_key(self, key: str, value: str) -> str: """Validate key field length and format.""" if not value or not value.strip(): raise ValueError("Series key cannot be empty") if len(value) > 255: raise ValueError("Series key must be 255 characters or less") return value.strip() @validates('name') def validate_name(self, key: str, value: str) -> str: """Validate name field length.""" if not value or not value.strip(): raise ValueError("Series name cannot be empty") if len(value) > 500: raise ValueError("Series name must be 500 characters or less") return value.strip() @validates('site') def validate_site(self, key: str, value: str) -> str: """Validate site URL length.""" if not value or not value.strip(): raise ValueError("Series site URL cannot be empty") if len(value) > 500: raise ValueError("Site URL must be 500 characters or less") return value.strip() @validates('folder') def validate_folder(self, key: str, value: str) -> str: """Validate folder path length.""" if not value or not value.strip(): raise ValueError("Series folder path cannot be empty") if len(value) > 1000: raise ValueError("Folder path must be 1000 characters or less") return value.strip() def __repr__(self) -> str: return ( f"" ) @property def episodeDict(self) -> dict[int, list[int]]: """Build episode dictionary from episodes relationship or private cache. Returns: Dictionary mapping season numbers to lists of episode numbers """ # Check for private cache first (set when loading from JSON without DB) if hasattr(self, '_episode_dict_cache') and self._episode_dict_cache is not None: return self._episode_dict_cache episode_dict: dict[int, list[int]] = {} if self.episodes: for ep in self.episodes: season = ep.season or 1 if season not in episode_dict: episode_dict[season] = [] episode_dict[season].append(ep.episode_number or 0) return episode_dict @property def name_with_year(self) -> str: """Get series name with year appended if available. Returns: Name in format "Name (Year)" if year is available, else just name """ if self.year: import re year_suffix = f" ({self.year})" clean_name = re.sub(r'(\s*\(\d{4}\))+\s*$', '', self.name or '').strip() return f"{clean_name}{year_suffix}" return self.name or '' @property def sanitized_folder(self) -> str: """Get filesystem-safe folder name from display name with year. Returns: Sanitized folder name based on display name with year """ from src.server.utils.filesystem import sanitize_folder_name name_to_sanitize = self.name_with_year or self.folder or self.key try: return sanitize_folder_name(name_to_sanitize) except ValueError: return sanitize_folder_name(self.key) class Episode(Base, TimestampMixin): """SQLAlchemy model for anime episodes. Represents individual episodes linked to an anime series. Tracks download status and file location. Attributes: id: Primary key series_id: Foreign key to AnimeSeries season: Season number episode_number: Episode number within season title: Episode title file_path: Local file path if downloaded is_downloaded: Whether episode is downloaded series: Relationship to AnimeSeries created_at: Creation timestamp (from TimestampMixin) updated_at: Last update timestamp (from TimestampMixin) """ __tablename__ = "episodes" # Primary key id: Mapped[int] = mapped_column( Integer, primary_key=True, autoincrement=True ) # Foreign key to series series_id: Mapped[int] = mapped_column( ForeignKey("anime_series.id", ondelete="CASCADE"), nullable=False, index=True ) # Episode identification season: Mapped[int] = mapped_column( Integer, nullable=False, doc="Season number" ) episode_number: Mapped[int] = mapped_column( Integer, nullable=False, doc="Episode number within season" ) title: Mapped[Optional[str]] = mapped_column( String(500), nullable=True, doc="Episode title" ) # Download information file_path: Mapped[Optional[str]] = mapped_column( String(1000), nullable=True, doc="Local file path" ) is_downloaded: Mapped[bool] = mapped_column( Boolean, default=False, nullable=False, doc="Whether episode is downloaded" ) # Relationship series: Mapped["AnimeSeries"] = relationship( "AnimeSeries", back_populates="episodes" ) @validates('season') def validate_season(self, key: str, value: int) -> int: """Validate season number is positive.""" if value < 0: raise ValueError("Season number must be non-negative") if value > 1000: raise ValueError("Season number must be 1000 or less") return value @validates('episode_number') def validate_episode_number(self, key: str, value: int) -> int: """Validate episode number is positive.""" if value < 0: raise ValueError("Episode number must be non-negative") if value > 10000: raise ValueError("Episode number must be 10000 or less") return value @validates('title') def validate_title(self, key: str, value: Optional[str]) -> Optional[str]: """Validate title length.""" if value is not None and len(value) > 500: raise ValueError("Episode title must be 500 characters or less") return value @validates('file_path') def validate_file_path( self, key: str, value: Optional[str] ) -> Optional[str]: """Validate file path length.""" if value is not None and len(value) > 1000: raise ValueError("File path must be 1000 characters or less") return value def __repr__(self) -> str: return ( f"" ) class DownloadStatus(str, Enum): """Status enum for download queue items.""" PENDING = "pending" DOWNLOADING = "downloading" PAUSED = "paused" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class DownloadPriority(str, Enum): """Priority enum for download queue items.""" LOW = "low" NORMAL = "normal" HIGH = "high" class DownloadQueueItem(Base, TimestampMixin): """SQLAlchemy model for download queue items. Tracks download queue with error information. Provides persistence for the DownloadService queue state. Attributes: id: Primary key series_id: Foreign key to AnimeSeries episode_id: Foreign key to Episode status: Queue status (pending/downloading/completed/failed/permanently_failed) error_message: Error description if failed download_url: Provider download URL file_destination: Target file path started_at: When download started completed_at: When download completed series: Relationship to AnimeSeries episode: Relationship to Episode created_at: Creation timestamp (from TimestampMixin) updated_at: Last update timestamp (from TimestampMixin) """ __tablename__ = "download_queue" # Primary key id: Mapped[int] = mapped_column( Integer, primary_key=True, autoincrement=True ) # Foreign key to series series_id: Mapped[int] = mapped_column( ForeignKey("anime_series.id", ondelete="CASCADE"), nullable=False, index=True ) # Foreign key to episode episode_id: Mapped[int] = mapped_column( ForeignKey("episodes.id", ondelete="CASCADE"), nullable=False, index=True ) # Status column to track queue item state # Allows distinguishing pending items from permanently failed ones status: Mapped[str] = mapped_column( String(50), nullable=False, default="pending", doc="Queue item status: pending, downloading, completed, failed, permanently_failed" ) # Retry count to track failed download attempts # Used to determine when to move item to permanently_failed retry_count: Mapped[int] = mapped_column( Integer, nullable=False, default=0, doc="Number of retry attempts for this download" ) # Unique constraint to prevent duplicate pending queue items per episode # An episode can only have one PENDING entry at a time # The status column allows failed items to remain in DB while new # pending items can be added (application-level dedup still required) __table_args__ = ( Index( "ix_download_queue_episode_status", "episode_id", "status", unique=True, ), ) # Error handling error_message: Mapped[Optional[str]] = mapped_column( Text, nullable=True, doc="Error description" ) # Download details download_url: Mapped[Optional[str]] = mapped_column( String(1000), nullable=True, doc="Provider download URL" ) file_destination: Mapped[Optional[str]] = mapped_column( String(1000), nullable=True, doc="Target file path" ) # Timestamps started_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="When download started" ) completed_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="When download completed" ) # Relationship series: Mapped["AnimeSeries"] = relationship( "AnimeSeries", back_populates="download_items" ) episode: Mapped["Episode"] = relationship( "Episode" ) @validates('download_url') def validate_download_url( self, key: str, value: Optional[str] ) -> Optional[str]: """Validate download URL length.""" if value is not None and len(value) > 1000: raise ValueError("Download URL must be 1000 characters or less") return value @validates('file_destination') def validate_file_destination( self, key: str, value: Optional[str] ) -> Optional[str]: """Validate file destination path length.""" if value is not None and len(value) > 1000: raise ValueError( "File destination path must be 1000 characters or less" ) return value def __repr__(self) -> str: return ( f"" ) class UserSession(Base, TimestampMixin): """SQLAlchemy model for user sessions. Tracks authenticated user sessions with JWT tokens. Supports session management, revocation, and expiry. Attributes: id: Primary key session_id: Unique session identifier token_hash: Hashed JWT token for validation user_id: User identifier (for multi-user support) ip_address: Client IP address user_agent: Client user agent string expires_at: Session expiration timestamp is_active: Whether session is active last_activity: Last activity timestamp created_at: Creation timestamp (from TimestampMixin) updated_at: Last update timestamp (from TimestampMixin) """ __tablename__ = "user_sessions" # Primary key id: Mapped[int] = mapped_column( Integer, primary_key=True, autoincrement=True ) # Session identification session_id: Mapped[str] = mapped_column( String(255), unique=True, nullable=False, index=True, doc="Unique session identifier" ) token_hash: Mapped[str] = mapped_column( String(255), nullable=False, doc="Hashed JWT token" ) # User information user_id: Mapped[Optional[str]] = mapped_column( String(255), nullable=True, index=True, doc="User identifier (for multi-user)" ) # Client information ip_address: Mapped[Optional[str]] = mapped_column( String(45), nullable=True, doc="Client IP address" ) user_agent: Mapped[Optional[str]] = mapped_column( String(500), nullable=True, doc="Client user agent" ) # Session management expires_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, doc="Session expiration" ) is_active: Mapped[bool] = mapped_column( Boolean, default=True, nullable=False, index=True, doc="Whether session is active" ) last_activity: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False, doc="Last activity timestamp" ) @validates('session_id') def validate_session_id(self, key: str, value: str) -> str: """Validate session ID length and format.""" if not value or not value.strip(): raise ValueError("Session ID cannot be empty") if len(value) > 255: raise ValueError("Session ID must be 255 characters or less") return value.strip() @validates('token_hash') def validate_token_hash(self, key: str, value: str) -> str: """Validate token hash length.""" if not value or not value.strip(): raise ValueError("Token hash cannot be empty") if len(value) > 255: raise ValueError("Token hash must be 255 characters or less") return value.strip() @validates('user_id') def validate_user_id( self, key: str, value: Optional[str] ) -> Optional[str]: """Validate user ID length.""" if value is not None and len(value) > 255: raise ValueError("User ID must be 255 characters or less") return value @validates('ip_address') def validate_ip_address( self, key: str, value: Optional[str] ) -> Optional[str]: """Validate IP address length (IPv4 or IPv6).""" if value is not None and len(value) > 45: raise ValueError("IP address must be 45 characters or less") return value @validates('user_agent') def validate_user_agent( self, key: str, value: Optional[str] ) -> Optional[str]: """Validate user agent length.""" if value is not None and len(value) > 500: raise ValueError("User agent must be 500 characters or less") return value def __repr__(self) -> str: return ( f"" ) @property def is_expired(self) -> bool: """Check if session has expired.""" # Ensure expires_at is timezone-aware for comparison expires_at = self.expires_at if expires_at.tzinfo is None: expires_at = expires_at.replace(tzinfo=timezone.utc) return datetime.now(timezone.utc) > expires_at def revoke(self) -> None: """Revoke this session.""" self.is_active = False class UnresolvedFolder(Base, TimestampMixin): """SQLAlchemy model for folders that couldn't be resolved during setup. Tracks anime folders whose provider key couldn't be auto-resolved during the initial setup scan. Users can provide the correct key via the API to complete the series registration. Attributes: id: Primary key folder_name: Original filesystem folder name title: Extracted title from folder name year: Extracted release year (optional) provider_key: User-provided provider key to resolve this folder search_attempts: Number of auto-search attempts made last_search_result: Cached search results (JSON string) for UI suggestions resolved_at: Timestamp when provider_key was provided created_at: Creation timestamp (from TimestampMixin) updated_at: Last update timestamp (from TimestampMixin) """ __tablename__ = "unresolved_folders" # Primary key id: Mapped[int] = mapped_column( Integer, primary_key=True, autoincrement=True ) # Folder metadata folder_name: Mapped[str] = mapped_column( String(1000), unique=True, nullable=False, index=True, doc="Original filesystem folder name" ) title: Mapped[str] = mapped_column( String(500), nullable=False, doc="Extracted title from folder name" ) year: Mapped[Optional[int]] = mapped_column( Integer, nullable=True, doc="Extracted release year" ) # Resolution data provider_key: Mapped[Optional[str]] = mapped_column( String(255), nullable=True, doc="User-provided provider key to resolve this folder" ) search_attempts: Mapped[int] = mapped_column( Integer, nullable=False, default=0, server_default="0", doc="Number of auto-search attempts made" ) last_search_result: Mapped[Optional[str]] = mapped_column( Text, nullable=True, doc="Cached search results (JSON) for UI display" ) resolved_at: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="Timestamp when this folder was resolved" ) @validates('folder_name') def validate_folder_name(self, key: str, value: str) -> str: """Validate folder name is not empty.""" if not value or not value.strip(): raise ValueError("Folder name cannot be empty") if len(value) > 1000: raise ValueError("Folder name must be 1000 characters or less") return value.strip() @validates('title') def validate_title(self, key: str, value: str) -> str: """Validate title is not empty.""" if not value or not value.strip(): raise ValueError("Title cannot be empty") if len(value) > 500: raise ValueError("Title must be 500 characters or less") return value.strip() @property def is_resolved(self) -> bool: """Check if this folder has been resolved with a provider key.""" return self.provider_key is not None and self.resolved_at is not None def __repr__(self) -> str: return ( f"" ) class SystemSettings(Base, TimestampMixin): """SQLAlchemy model for system-wide settings and state. Stores application-level configuration and state flags that persist across restarts. Used to track initialization status and setup completion. Attributes: id: Primary key (single row expected) initial_scan_completed: Whether the initial anime folder scan has been completed initial_nfo_scan_completed: Whether the initial NFO scan has been completed initial_media_scan_completed: Whether the initial media scan has been completed last_scan_timestamp: Timestamp of the last completed scan created_at: Creation timestamp (from TimestampMixin) updated_at: Last update timestamp (from TimestampMixin) """ __tablename__ = "system_settings" # Primary key (only one row should exist) id: Mapped[int] = mapped_column( Integer, primary_key=True, autoincrement=True ) # Setup/initialization tracking initial_scan_completed: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether the initial anime folder scan has been completed" ) initial_nfo_scan_completed: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether the initial NFO scan has been completed" ) initial_media_scan_completed: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether the initial media scan has been completed" ) migration_legacy_files_completed: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether legacy key/data file migration has been completed" ) legacy_key_cleanup_completed: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, server_default="0", doc="Whether legacy key file cleanup has been completed" ) last_scan_timestamp: Mapped[Optional[datetime]] = mapped_column( DateTime(timezone=True), nullable=True, doc="Timestamp of the last completed scan" ) def __repr__(self) -> str: return ( f"" )