instruction2

This commit is contained in:
2025-10-12 22:39:51 +02:00
parent e48cb29131
commit 7481a33c15
40 changed files with 639 additions and 14993 deletions

View File

@@ -1,6 +0,0 @@
"""
Infrastructure package for the Aniworld server.
This package contains repository implementations, database connections,
caching, and other infrastructure concerns.
"""

View File

@@ -1,916 +0,0 @@
"""
Database & Storage Management for AniWorld App
This module provides database schema management, data migration,
backup/restore functionality, and storage optimization.
"""
import os
import sqlite3
import json
import shutil
import time
import hashlib
import logging
import threading
import zipfile
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from contextlib import contextmanager
import glob
from pathlib import Path
@dataclass
class AnimeMetadata:
"""Represents anime metadata stored in database."""
anime_id: str
name: str
folder: str
key: Optional[str] = None
description: Optional[str] = None
genres: List[str] = field(default_factory=list)
release_year: Optional[int] = None
status: str = 'ongoing' # ongoing, completed, cancelled
total_episodes: Optional[int] = None
poster_url: Optional[str] = None
last_updated: datetime = field(default_factory=datetime.now)
created_at: datetime = field(default_factory=datetime.now)
custom_metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class EpisodeMetadata:
"""Represents episode metadata stored in database."""
episode_id: str
anime_id: str
season: int
episode: int
title: Optional[str] = None
description: Optional[str] = None
duration_seconds: Optional[int] = None
file_path: Optional[str] = None
file_size_bytes: Optional[int] = None
download_date: Optional[datetime] = None
last_watched: Optional[datetime] = None
watch_count: int = 0
is_downloaded: bool = False
quality: Optional[str] = None
language: str = 'German Dub'
@dataclass
class BackupInfo:
"""Represents backup metadata."""
backup_id: str
backup_path: str
backup_type: str # full, incremental, metadata_only
created_at: datetime
size_bytes: int
description: Optional[str] = None
tables_included: List[str] = field(default_factory=list)
checksum: Optional[str] = None
class DatabaseManager:
"""Manage SQLite database with migrations and maintenance."""
def __init__(self, db_path: str = "./data/aniworld.db"):
self.db_path = db_path
self.db_dir = os.path.dirname(db_path)
self.logger = logging.getLogger(__name__)
self.lock = threading.Lock()
# Create database directory
os.makedirs(self.db_dir, exist_ok=True)
# Initialize database
self.initialize_database()
# Run migrations
self.run_migrations()
@contextmanager
def get_connection(self):
"""Get database connection with proper error handling."""
conn = None
try:
conn = sqlite3.connect(self.db_path, timeout=30)
conn.row_factory = sqlite3.Row # Enable dict-like access
yield conn
except Exception as e:
if conn:
conn.rollback()
self.logger.error(f"Database connection error: {e}")
raise
finally:
if conn:
conn.close()
def initialize_database(self):
"""Initialize database with base schema."""
with self.get_connection() as conn:
# Create schema version table
conn.execute("""
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT
)
""")
# Insert initial version if not exists
conn.execute("""
INSERT OR IGNORE INTO schema_version (version, description)
VALUES (0, 'Initial schema')
""")
conn.commit()
def get_current_version(self) -> int:
"""Get current database schema version."""
with self.get_connection() as conn:
cursor = conn.execute("SELECT MAX(version) FROM schema_version")
result = cursor.fetchone()
return result[0] if result and result[0] is not None else 0
def run_migrations(self):
"""Run database migrations."""
current_version = self.get_current_version()
migrations = self.get_migrations()
for version, migration in migrations.items():
if version > current_version:
self.logger.info(f"Running migration to version {version}")
try:
with self.get_connection() as conn:
migration['up'](conn)
# Record migration
conn.execute("""
INSERT INTO schema_version (version, description)
VALUES (?, ?)
""", (version, migration['description']))
conn.commit()
self.logger.info(f"Migration to version {version} completed")
except Exception as e:
self.logger.error(f"Migration to version {version} failed: {e}")
raise
def get_migrations(self) -> Dict[int, Dict[str, Any]]:
"""Define database migrations."""
return {
1: {
'description': 'Create anime metadata table',
'up': self._migration_001_anime_table
},
2: {
'description': 'Create episode metadata table',
'up': self._migration_002_episode_table
},
3: {
'description': 'Create download history table',
'up': self._migration_003_download_history
},
4: {
'description': 'Create user preferences table',
'up': self._migration_004_user_preferences
},
5: {
'description': 'Create storage locations table',
'up': self._migration_005_storage_locations
},
6: {
'description': 'Add indexes for performance',
'up': self._migration_006_indexes
}
}
def _migration_001_anime_table(self, conn: sqlite3.Connection):
"""Create anime metadata table."""
conn.execute("""
CREATE TABLE anime_metadata (
anime_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
folder TEXT NOT NULL UNIQUE,
key TEXT,
description TEXT,
genres TEXT, -- JSON array
release_year INTEGER,
status TEXT DEFAULT 'ongoing',
total_episodes INTEGER,
poster_url TEXT,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
custom_metadata TEXT -- JSON object
)
""")
def _migration_002_episode_table(self, conn: sqlite3.Connection):
"""Create episode metadata table."""
conn.execute("""
CREATE TABLE episode_metadata (
episode_id TEXT PRIMARY KEY,
anime_id TEXT NOT NULL,
season INTEGER NOT NULL,
episode INTEGER NOT NULL,
title TEXT,
description TEXT,
duration_seconds INTEGER,
file_path TEXT,
file_size_bytes INTEGER,
download_date TIMESTAMP,
last_watched TIMESTAMP,
watch_count INTEGER DEFAULT 0,
is_downloaded BOOLEAN DEFAULT FALSE,
quality TEXT,
language TEXT DEFAULT 'German Dub',
FOREIGN KEY (anime_id) REFERENCES anime_metadata(anime_id),
UNIQUE(anime_id, season, episode, language)
)
""")
def _migration_003_download_history(self, conn: sqlite3.Connection):
"""Create download history table."""
conn.execute("""
CREATE TABLE download_history (
download_id TEXT PRIMARY KEY,
anime_id TEXT NOT NULL,
season INTEGER NOT NULL,
episode INTEGER NOT NULL,
language TEXT NOT NULL,
download_started TIMESTAMP NOT NULL,
download_completed TIMESTAMP,
download_status TEXT NOT NULL, -- started, completed, failed, cancelled
file_size_bytes INTEGER,
download_speed_mbps REAL,
error_message TEXT,
retry_count INTEGER DEFAULT 0,
FOREIGN KEY (anime_id) REFERENCES anime_metadata(anime_id)
)
""")
def _migration_004_user_preferences(self, conn: sqlite3.Connection):
"""Create user preferences table."""
conn.execute("""
CREATE TABLE user_preferences (
key TEXT PRIMARY KEY,
value TEXT NOT NULL, -- JSON value
category TEXT NOT NULL,
description TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
def _migration_005_storage_locations(self, conn: sqlite3.Connection):
"""Create storage locations table."""
conn.execute("""
CREATE TABLE storage_locations (
location_id TEXT PRIMARY KEY,
anime_id TEXT,
path TEXT NOT NULL,
location_type TEXT NOT NULL, -- primary, backup, cache
is_active BOOLEAN DEFAULT TRUE,
free_space_bytes INTEGER,
total_space_bytes INTEGER,
last_checked TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (anime_id) REFERENCES anime_metadata(anime_id)
)
""")
def _migration_006_indexes(self, conn: sqlite3.Connection):
"""Add indexes for performance."""
indexes = [
"CREATE INDEX idx_anime_name ON anime_metadata(name)",
"CREATE INDEX idx_anime_folder ON anime_metadata(folder)",
"CREATE INDEX idx_anime_status ON anime_metadata(status)",
"CREATE INDEX idx_episode_anime_id ON episode_metadata(anime_id)",
"CREATE INDEX idx_episode_season_episode ON episode_metadata(season, episode)",
"CREATE INDEX idx_episode_downloaded ON episode_metadata(is_downloaded)",
"CREATE INDEX idx_download_status ON download_history(download_status)",
"CREATE INDEX idx_download_date ON download_history(download_started)",
"CREATE INDEX idx_storage_active ON storage_locations(is_active)",
"CREATE INDEX idx_storage_type ON storage_locations(location_type)"
]
for index_sql in indexes:
try:
conn.execute(index_sql)
except sqlite3.OperationalError as e:
if "already exists" not in str(e):
raise
def execute_query(self, query: str, params: tuple = ()) -> List[sqlite3.Row]:
"""Execute a SELECT query and return results."""
with self.get_connection() as conn:
cursor = conn.execute(query, params)
return cursor.fetchall()
def execute_update(self, query: str, params: tuple = ()) -> int:
"""Execute an UPDATE/INSERT/DELETE query and return affected rows."""
with self.get_connection() as conn:
cursor = conn.execute(query, params)
conn.commit()
return cursor.rowcount
class AnimeRepository:
"""Repository for anime data operations."""
def __init__(self, db_manager: DatabaseManager):
self.db = db_manager
self.logger = logging.getLogger(__name__)
def create_anime(self, metadata: AnimeMetadata) -> bool:
"""Create new anime record."""
try:
query = """
INSERT INTO anime_metadata (
anime_id, name, folder, key, description, genres,
release_year, status, total_episodes, poster_url,
custom_metadata
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
metadata.anime_id,
metadata.name,
metadata.folder,
metadata.key,
metadata.description,
json.dumps(metadata.genres),
metadata.release_year,
metadata.status,
metadata.total_episodes,
metadata.poster_url,
json.dumps(metadata.custom_metadata)
)
rows_affected = self.db.execute_update(query, params)
return rows_affected > 0
except Exception as e:
self.logger.error(f"Failed to create anime {metadata.name}: {e}")
return False
def get_anime_by_folder(self, folder: str) -> Optional[AnimeMetadata]:
"""Get anime by folder name."""
try:
query = """
SELECT * FROM anime_metadata WHERE folder = ?
"""
results = self.db.execute_query(query, (folder,))
if results:
row = results[0]
return self._row_to_anime_metadata(row)
return None
except Exception as e:
self.logger.error(f"Failed to get anime by folder {folder}: {e}")
return None
def get_all_anime(self, status_filter: Optional[str] = None) -> List[AnimeMetadata]:
"""Get all anime, optionally filtered by status."""
try:
if status_filter:
query = "SELECT * FROM anime_metadata WHERE status = ? ORDER BY name"
params = (status_filter,)
else:
query = "SELECT * FROM anime_metadata ORDER BY name"
params = ()
results = self.db.execute_query(query, params)
return [self._row_to_anime_metadata(row) for row in results]
except Exception as e:
self.logger.error(f"Failed to get all anime: {e}")
return []
def update_anime(self, metadata: AnimeMetadata) -> bool:
"""Update anime metadata."""
try:
query = """
UPDATE anime_metadata SET
name = ?, key = ?, description = ?, genres = ?,
release_year = ?, status = ?, total_episodes = ?,
poster_url = ?, last_updated = CURRENT_TIMESTAMP,
custom_metadata = ?
WHERE anime_id = ?
"""
params = (
metadata.name,
metadata.key,
metadata.description,
json.dumps(metadata.genres),
metadata.release_year,
metadata.status,
metadata.total_episodes,
metadata.poster_url,
json.dumps(metadata.custom_metadata),
metadata.anime_id
)
rows_affected = self.db.execute_update(query, params)
return rows_affected > 0
except Exception as e:
self.logger.error(f"Failed to update anime {metadata.anime_id}: {e}")
return False
def delete_anime(self, anime_id: str) -> bool:
"""Delete anime and related data."""
try:
# Delete episodes first (foreign key constraint)
self.db.execute_update("DELETE FROM episode_metadata WHERE anime_id = ?", (anime_id,))
self.db.execute_update("DELETE FROM download_history WHERE anime_id = ?", (anime_id,))
self.db.execute_update("DELETE FROM storage_locations WHERE anime_id = ?", (anime_id,))
# Delete anime
rows_affected = self.db.execute_update("DELETE FROM anime_metadata WHERE anime_id = ?", (anime_id,))
return rows_affected > 0
except Exception as e:
self.logger.error(f"Failed to delete anime {anime_id}: {e}")
return False
def search_anime(self, search_term: str) -> List[AnimeMetadata]:
"""Search anime by name or description."""
try:
query = """
SELECT * FROM anime_metadata
WHERE name LIKE ? OR description LIKE ?
ORDER BY name
"""
search_pattern = f"%{search_term}%"
results = self.db.execute_query(query, (search_pattern, search_pattern))
return [self._row_to_anime_metadata(row) for row in results]
except Exception as e:
self.logger.error(f"Failed to search anime: {e}")
return []
def _row_to_anime_metadata(self, row: sqlite3.Row) -> AnimeMetadata:
"""Convert database row to AnimeMetadata object."""
return AnimeMetadata(
anime_id=row['anime_id'],
name=row['name'],
folder=row['folder'],
key=row['key'],
description=row['description'],
genres=json.loads(row['genres'] or '[]'),
release_year=row['release_year'],
status=row['status'],
total_episodes=row['total_episodes'],
poster_url=row['poster_url'],
last_updated=datetime.fromisoformat(row['last_updated']) if row['last_updated'] else datetime.now(),
created_at=datetime.fromisoformat(row['created_at']) if row['created_at'] else datetime.now(),
custom_metadata=json.loads(row['custom_metadata'] or '{}')
)
class BackupManager:
"""Manage database backups and restore operations."""
def __init__(self, db_manager: DatabaseManager, backup_dir: str = "./backups"):
self.db = db_manager
self.backup_dir = backup_dir
self.logger = logging.getLogger(__name__)
# Create backup directory
os.makedirs(backup_dir, exist_ok=True)
def create_full_backup(self, description: str = None) -> Optional[BackupInfo]:
"""Create a full database backup."""
try:
backup_id = f"full_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_filename = f"{backup_id}.db"
backup_path = os.path.join(self.backup_dir, backup_filename)
# Copy database file
shutil.copy2(self.db.db_path, backup_path)
# Calculate checksum
checksum = self._calculate_file_checksum(backup_path)
# Get file size
size_bytes = os.path.getsize(backup_path)
# Get table list
with self.db.get_connection() as conn:
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
backup_info = BackupInfo(
backup_id=backup_id,
backup_path=backup_path,
backup_type='full',
created_at=datetime.now(),
size_bytes=size_bytes,
description=description or f"Full backup created on {datetime.now().strftime('%Y-%m-%d %H:%M')}",
tables_included=tables,
checksum=checksum
)
# Save backup metadata
self._save_backup_metadata(backup_info)
self.logger.info(f"Full backup created: {backup_id}")
return backup_info
except Exception as e:
self.logger.error(f"Failed to create full backup: {e}")
return None
def create_metadata_backup(self, description: str = None) -> Optional[BackupInfo]:
"""Create a metadata-only backup (excluding large binary data)."""
try:
backup_id = f"metadata_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_filename = f"{backup_id}.json"
backup_path = os.path.join(self.backup_dir, backup_filename)
# Export metadata as JSON
metadata = self._export_metadata()
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, default=str)
# Calculate checksum
checksum = self._calculate_file_checksum(backup_path)
# Get file size
size_bytes = os.path.getsize(backup_path)
backup_info = BackupInfo(
backup_id=backup_id,
backup_path=backup_path,
backup_type='metadata_only',
created_at=datetime.now(),
size_bytes=size_bytes,
description=description or f"Metadata backup created on {datetime.now().strftime('%Y-%m-%d %H:%M')}",
tables_included=['anime_metadata', 'episode_metadata', 'user_preferences'],
checksum=checksum
)
# Save backup metadata
self._save_backup_metadata(backup_info)
self.logger.info(f"Metadata backup created: {backup_id}")
return backup_info
except Exception as e:
self.logger.error(f"Failed to create metadata backup: {e}")
return None
def restore_backup(self, backup_id: str) -> bool:
"""Restore from a backup."""
try:
backup_info = self._load_backup_metadata(backup_id)
if not backup_info:
self.logger.error(f"Backup not found: {backup_id}")
return False
if not os.path.exists(backup_info.backup_path):
self.logger.error(f"Backup file not found: {backup_info.backup_path}")
return False
# Verify backup integrity
if not self._verify_backup_integrity(backup_info):
self.logger.error(f"Backup integrity check failed: {backup_id}")
return False
# Create a backup of current database before restore
current_backup = self.create_full_backup(f"Pre-restore backup before restoring {backup_id}")
if backup_info.backup_type == 'full':
# Replace database file
shutil.copy2(backup_info.backup_path, self.db.db_path)
elif backup_info.backup_type == 'metadata_only':
# Restore metadata from JSON
with open(backup_info.backup_path, 'r', encoding='utf-8') as f:
metadata = json.load(f)
self._import_metadata(metadata)
self.logger.info(f"Backup restored successfully: {backup_id}")
return True
except Exception as e:
self.logger.error(f"Failed to restore backup {backup_id}: {e}")
return False
def list_backups(self) -> List[BackupInfo]:
"""List all available backups."""
backups = []
try:
# Look for backup metadata files
metadata_pattern = os.path.join(self.backup_dir, "*.backup_info.json")
for metadata_file in glob.glob(metadata_pattern):
try:
with open(metadata_file, 'r') as f:
backup_data = json.load(f)
backup_info = BackupInfo(
backup_id=backup_data['backup_id'],
backup_path=backup_data['backup_path'],
backup_type=backup_data['backup_type'],
created_at=datetime.fromisoformat(backup_data['created_at']),
size_bytes=backup_data['size_bytes'],
description=backup_data.get('description'),
tables_included=backup_data.get('tables_included', []),
checksum=backup_data.get('checksum')
)
backups.append(backup_info)
except Exception as e:
self.logger.warning(f"Failed to load backup metadata from {metadata_file}: {e}")
# Sort by creation date (newest first)
backups.sort(key=lambda b: b.created_at, reverse=True)
except Exception as e:
self.logger.error(f"Failed to list backups: {e}")
return backups
def cleanup_old_backups(self, keep_days: int = 30, keep_count: int = 10):
"""Clean up old backup files."""
try:
backups = self.list_backups()
cutoff_date = datetime.now() - timedelta(days=keep_days)
# Keep at least keep_count backups regardless of age
backups_to_delete = []
for i, backup in enumerate(backups):
if i >= keep_count and backup.created_at < cutoff_date:
backups_to_delete.append(backup)
for backup in backups_to_delete:
try:
# Remove backup file
if os.path.exists(backup.backup_path):
os.remove(backup.backup_path)
# Remove metadata file
metadata_file = f"{backup.backup_path}.backup_info.json"
if os.path.exists(metadata_file):
os.remove(metadata_file)
self.logger.info(f"Removed old backup: {backup.backup_id}")
except Exception as e:
self.logger.warning(f"Failed to remove backup {backup.backup_id}: {e}")
if backups_to_delete:
self.logger.info(f"Cleaned up {len(backups_to_delete)} old backups")
except Exception as e:
self.logger.error(f"Failed to cleanup old backups: {e}")
def _export_metadata(self) -> Dict[str, Any]:
"""Export database metadata to dictionary."""
metadata = {
'export_date': datetime.now().isoformat(),
'schema_version': self.db.get_current_version(),
'tables': {}
}
# Export specific tables
tables_to_export = ['anime_metadata', 'episode_metadata', 'user_preferences', 'storage_locations']
with self.db.get_connection() as conn:
for table in tables_to_export:
try:
cursor = conn.execute(f"SELECT * FROM {table}")
rows = cursor.fetchall()
# Convert rows to dictionaries
metadata['tables'][table] = [dict(row) for row in rows]
except Exception as e:
self.logger.warning(f"Failed to export table {table}: {e}")
return metadata
def _import_metadata(self, metadata: Dict[str, Any]):
"""Import metadata from dictionary to database."""
with self.db.get_connection() as conn:
for table_name, rows in metadata.get('tables', {}).items():
if not rows:
continue
try:
# Clear existing data (be careful!)
conn.execute(f"DELETE FROM {table_name}")
# Insert new data
if rows:
columns = list(rows[0].keys())
placeholders = ','.join(['?' for _ in columns])
insert_sql = f"INSERT INTO {table_name} ({','.join(columns)}) VALUES ({placeholders})"
for row in rows:
values = [row[col] for col in columns]
conn.execute(insert_sql, values)
conn.commit()
self.logger.info(f"Imported {len(rows)} rows to {table_name}")
except Exception as e:
self.logger.error(f"Failed to import table {table_name}: {e}")
conn.rollback()
raise
def _calculate_file_checksum(self, file_path: str) -> str:
"""Calculate SHA256 checksum of file."""
hash_sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
def _verify_backup_integrity(self, backup_info: BackupInfo) -> bool:
"""Verify backup file integrity using checksum."""
if not backup_info.checksum:
return True # No checksum to verify
current_checksum = self._calculate_file_checksum(backup_info.backup_path)
return current_checksum == backup_info.checksum
def _save_backup_metadata(self, backup_info: BackupInfo):
"""Save backup metadata to file."""
metadata_file = f"{backup_info.backup_path}.backup_info.json"
metadata = {
'backup_id': backup_info.backup_id,
'backup_path': backup_info.backup_path,
'backup_type': backup_info.backup_type,
'created_at': backup_info.created_at.isoformat(),
'size_bytes': backup_info.size_bytes,
'description': backup_info.description,
'tables_included': backup_info.tables_included,
'checksum': backup_info.checksum
}
with open(metadata_file, 'w') as f:
json.dump(metadata, f, indent=2)
def _load_backup_metadata(self, backup_id: str) -> Optional[BackupInfo]:
"""Load backup metadata from file."""
# Look for metadata file
metadata_pattern = os.path.join(self.backup_dir, f"{backup_id}.*.backup_info.json")
metadata_files = glob.glob(metadata_pattern)
if not metadata_files:
return None
try:
with open(metadata_files[0], 'r') as f:
backup_data = json.load(f)
return BackupInfo(
backup_id=backup_data['backup_id'],
backup_path=backup_data['backup_path'],
backup_type=backup_data['backup_type'],
created_at=datetime.fromisoformat(backup_data['created_at']),
size_bytes=backup_data['size_bytes'],
description=backup_data.get('description'),
tables_included=backup_data.get('tables_included', []),
checksum=backup_data.get('checksum')
)
except Exception as e:
self.logger.error(f"Failed to load backup metadata for {backup_id}: {e}")
return None
class StorageManager:
"""Manage storage locations and usage monitoring."""
def __init__(self, db_manager: DatabaseManager):
self.db = db_manager
self.logger = logging.getLogger(__name__)
def add_storage_location(self, path: str, location_type: str = 'primary', anime_id: str = None) -> str:
"""Add a new storage location."""
location_id = str(uuid.uuid4())
query = """
INSERT INTO storage_locations
(location_id, anime_id, path, location_type, is_active)
VALUES (?, ?, ?, ?, ?)
"""
self.db.execute_update(query, (location_id, anime_id, path, location_type, True))
# Update storage stats
self.update_storage_stats(location_id)
return location_id
def update_storage_stats(self, location_id: str):
"""Update storage statistics for a location."""
try:
# Get location path
query = "SELECT path FROM storage_locations WHERE location_id = ?"
results = self.db.execute_query(query, (location_id,))
if not results:
return
path = results[0]['path']
if os.path.exists(path):
# Get disk usage
stat = shutil.disk_usage(path)
# Update database
update_query = """
UPDATE storage_locations
SET free_space_bytes = ?, total_space_bytes = ?, last_checked = CURRENT_TIMESTAMP
WHERE location_id = ?
"""
self.db.execute_update(update_query, (stat.free, stat.total, location_id))
except Exception as e:
self.logger.error(f"Failed to update storage stats for {location_id}: {e}")
def get_storage_summary(self) -> Dict[str, Any]:
"""Get storage usage summary."""
query = """
SELECT
location_type,
COUNT(*) as location_count,
SUM(free_space_bytes) as total_free,
SUM(total_space_bytes) as total_space
FROM storage_locations
WHERE is_active = 1
GROUP BY location_type
"""
results = self.db.execute_query(query)
summary = {}
for row in results:
summary[row['location_type']] = {
'location_count': row['location_count'],
'total_free_gb': (row['total_free'] or 0) / (1024**3),
'total_space_gb': (row['total_space'] or 0) / (1024**3),
'usage_percent': ((row['total_space'] - row['total_free']) / row['total_space'] * 100) if row['total_space'] else 0
}
return summary
# Global instances
database_manager = DatabaseManager()
anime_repository = AnimeRepository(database_manager)
backup_manager = BackupManager(database_manager)
storage_manager = StorageManager(database_manager)
def init_database_system():
"""Initialize database system."""
# Database is initialized on creation
pass
def cleanup_database_system():
"""Clean up database resources."""
# No specific cleanup needed for SQLite
pass
# Export main components
__all__ = [
'DatabaseManager',
'AnimeRepository',
'BackupManager',
'StorageManager',
'AnimeMetadata',
'EpisodeMetadata',
'BackupInfo',
'database_manager',
'anime_repository',
'backup_manager',
'storage_manager',
'init_database_system',
'cleanup_database_system'
]

View File

@@ -1,6 +0,0 @@
"""
Repository package for data access layer.
This package contains repository implementations following the Repository pattern
for clean separation of data access logic from business logic.
"""