Remove unused scripts and config files

This commit is contained in:
2026-02-22 19:56:49 +01:00
parent d951963d87
commit 6c7dc66c5d
12 changed files with 0 additions and 1611 deletions

View File

@@ -1,97 +0,0 @@
"""Database migration utility for adding loading status fields.
This script adds the loading status fields to existing anime_series tables
without Alembic. For new databases, these fields are created automatically
via create_all().
Run this after updating the models.py file.
"""
import asyncio
import logging
import sys
from pathlib import Path
# Add project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text
from sqlalchemy.exc import OperationalError
from src.config.settings import settings
from src.server.database.connection import get_engine, init_db
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def migrate_add_loading_status_fields():
"""Add loading status fields to anime_series table if they don't exist."""
# Initialize database connection
await init_db()
engine = get_engine()
if not engine:
logger.error("Failed to get database engine")
return
# Define the migrations
migrations = [
("loading_status", "ALTER TABLE anime_series ADD COLUMN loading_status VARCHAR(50) NOT NULL DEFAULT 'completed'"),
("episodes_loaded", "ALTER TABLE anime_series ADD COLUMN episodes_loaded BOOLEAN NOT NULL DEFAULT 1"),
("logo_loaded", "ALTER TABLE anime_series ADD COLUMN logo_loaded BOOLEAN NOT NULL DEFAULT 0"),
("images_loaded", "ALTER TABLE anime_series ADD COLUMN images_loaded BOOLEAN NOT NULL DEFAULT 0"),
("loading_started_at", "ALTER TABLE anime_series ADD COLUMN loading_started_at TIMESTAMP"),
("loading_completed_at", "ALTER TABLE anime_series ADD COLUMN loading_completed_at TIMESTAMP"),
("loading_error", "ALTER TABLE anime_series ADD COLUMN loading_error VARCHAR(1000)"),
]
async with engine.begin() as conn:
for column_name, sql in migrations:
try:
logger.info(f"Adding column: {column_name}")
await conn.execute(text(sql))
logger.info(f"✅ Successfully added column: {column_name}")
except OperationalError as e:
if "duplicate column name" in str(e).lower() or "already exists" in str(e).lower():
logger.info(f"⏭️ Column {column_name} already exists, skipping")
else:
logger.error(f"❌ Error adding column {column_name}: {e}")
raise
logger.info("Migration completed successfully!")
logger.info("All loading status fields are now available in anime_series table")
async def rollback_loading_status_fields():
"""Remove loading status fields from anime_series table."""
await init_db()
engine = get_engine()
if not engine:
logger.error("Failed to get database engine")
return
# SQLite doesn't support DROP COLUMN easily, so we'd need to recreate the table
# For now, just log a warning
logger.warning("Rollback not implemented for SQLite")
logger.warning("To rollback, you would need to:")
logger.warning("1. Create a new table without the loading fields")
logger.warning("2. Copy data from old table")
logger.warning("3. Drop old table and rename new table")
def main():
"""Run the migration."""
import sys
if len(sys.argv) > 1 and sys.argv[1] == "rollback":
asyncio.run(rollback_loading_status_fields())
else:
asyncio.run(migrate_add_loading_status_fields())
if __name__ == "__main__":
main()

View File

@@ -1,421 +0,0 @@
"""
Aniworld Application Setup Script
This script handles initial setup, dependency installation, database
initialization, and configuration for the Aniworld application.
Usage:
python setup.py [--environment {development|production}] [--no-deps]
python setup.py --help
"""
import argparse
import asyncio
import os
import subprocess
import sys
from pathlib import Path
class SetupManager:
"""Manages application setup and initialization."""
def __init__(
self,
environment: str = "development",
skip_deps: bool = False
):
"""
Initialize setup manager.
Args:
environment: Environment mode (development or production)
skip_deps: Skip dependency installation
"""
self.environment = environment
self.skip_deps = skip_deps
self.project_root = Path(__file__).parent.parent
self.conda_env = "AniWorld"
# ============================================================================
# Logging
# ============================================================================
@staticmethod
def log_info(message: str) -> None:
"""Log info message."""
print(f"\033[34m[INFO]\033[0m {message}")
@staticmethod
def log_success(message: str) -> None:
"""Log success message."""
print(f"\033[32m[SUCCESS]\033[0m {message}")
@staticmethod
def log_warning(message: str) -> None:
"""Log warning message."""
print(f"\033[33m[WARNING]\033[0m {message}")
@staticmethod
def log_error(message: str) -> None:
"""Log error message."""
print(f"\033[31m[ERROR]\033[0m {message}")
# ============================================================================
# Validation
# ============================================================================
def validate_environment(self) -> bool:
"""
Validate environment parameter.
Returns:
True if valid, False otherwise
"""
valid_envs = {"development", "production", "testing"}
if self.environment not in valid_envs:
self.log_error(
f"Invalid environment: {self.environment}. "
f"Must be one of: {valid_envs}"
)
return False
self.log_success(f"Environment: {self.environment}")
return True
def check_conda_env(self) -> bool:
"""
Check if conda environment exists.
Returns:
True if exists, False otherwise
"""
result = subprocess.run(
["conda", "env", "list"],
capture_output=True,
text=True
)
if self.conda_env in result.stdout:
self.log_success(f"Conda environment '{self.conda_env}' found")
return True
self.log_error(
f"Conda environment '{self.conda_env}' not found. "
f"Create with: conda create -n {self.conda_env} python=3.11"
)
return False
def check_python_version(self) -> bool:
"""
Check Python version.
Returns:
True if version >= 3.9, False otherwise
"""
if sys.version_info < (3, 9):
self.log_error(
f"Python 3.9+ required. Current: {sys.version_info.major}."
f"{sys.version_info.minor}"
)
return False
self.log_success(
f"Python version: {sys.version_info.major}."
f"{sys.version_info.minor}"
)
return True
# ============================================================================
# Directory Setup
# ============================================================================
def create_directories(self) -> bool:
"""
Create necessary directories.
Returns:
True if successful, False otherwise
"""
try:
directories = [
"logs",
"data",
"data/config_backups",
"Temp",
"tests",
"scripts",
]
self.log_info("Creating directories...")
for directory in directories:
dir_path = self.project_root / directory
dir_path.mkdir(parents=True, exist_ok=True)
self.log_success("Directories created")
return True
except Exception as e:
self.log_error(f"Failed to create directories: {e}")
return False
# ============================================================================
# Dependency Installation
# ============================================================================
def install_dependencies(self) -> bool:
"""
Install Python dependencies.
Returns:
True if successful, False otherwise
"""
if self.skip_deps:
self.log_warning("Skipping dependency installation")
return True
try:
requirements_file = self.project_root / "requirements.txt"
if not requirements_file.exists():
self.log_error(
f"requirements.txt not found at {requirements_file}"
)
return False
self.log_info("Installing dependencies...")
subprocess.run(
["conda", "run", "-n", self.conda_env,
"pip", "install", "-q", "-r", str(requirements_file)],
check=True
)
self.log_success("Dependencies installed")
return True
except subprocess.CalledProcessError as e:
self.log_error(f"Failed to install dependencies: {e}")
return False
# ============================================================================
# Environment Configuration
# ============================================================================
def create_env_files(self) -> bool:
"""
Create environment configuration files.
Returns:
True if successful, False otherwise
"""
try:
self.log_info("Creating environment configuration files...")
env_file = self.project_root / f".env.{self.environment}"
if env_file.exists():
self.log_warning(f"{env_file.name} already exists")
return True
# Create environment file with defaults
env_content = self._get_env_template()
env_file.write_text(env_content)
self.log_success(f"Created {env_file.name}")
return True
except Exception as e:
self.log_error(f"Failed to create env files: {e}")
return False
def _get_env_template(self) -> str:
"""
Get environment file template.
Returns:
Environment file content
"""
if self.environment == "production":
return """# Aniworld Production Configuration
# IMPORTANT: Set these values before running in production
# Security (REQUIRED - generate new values)
JWT_SECRET_KEY=change-this-to-a-secure-random-key
PASSWORD_SALT=change-this-to-a-secure-random-salt
MASTER_PASSWORD_HASH=change-this-to-hashed-password
# Database (REQUIRED - use PostgreSQL or MySQL in production)
DATABASE_URL=postgresql://user:password@localhost/aniworld
DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=10
# Application
ENVIRONMENT=production
ANIME_DIRECTORY=/var/lib/aniworld
TEMP_DIRECTORY=/tmp/aniworld
# Server
HOST=0.0.0.0
PORT=8000
WORKERS=4
# Security
CORS_ORIGINS=https://yourdomain.com
ALLOWED_HOSTS=yourdomain.com
# Logging
LOG_LEVEL=WARNING
LOG_FILE=logs/production.log
LOG_ROTATION_SIZE=10485760
LOG_RETENTION_DAYS=30
# Performance
API_RATE_LIMIT=60
SESSION_TIMEOUT_HOURS=24
MAX_CONCURRENT_DOWNLOADS=3
"""
else: # development
return """# Aniworld Development Configuration
# Security (Development defaults - NOT for production)
JWT_SECRET_KEY=dev-secret-key-change-in-production
PASSWORD_SALT=dev-salt-change-in-production
MASTER_PASSWORD_HASH=$2b$12$wP0KBVbJKVAb8CdSSXw0NeGTKCkbw4fSAFXIqR2/wDqPSEBn9w7lS
MASTER_PASSWORD=password
# Database
DATABASE_URL=sqlite:///./data/aniworld_dev.db
# Application
ENVIRONMENT=development
ANIME_DIRECTORY=/tmp/aniworld_dev
TEMP_DIRECTORY=/tmp/aniworld_dev/temp
# Server
HOST=127.0.0.1
PORT=8000
WORKERS=1
# Security
CORS_ORIGINS=*
# Logging
LOG_LEVEL=DEBUG
LOG_FILE=logs/development.log
# Performance
API_RATE_LIMIT=1000
SESSION_TIMEOUT_HOURS=168
MAX_CONCURRENT_DOWNLOADS=1
"""
# ============================================================================
# Database Initialization
# ============================================================================
async def init_database(self) -> bool:
"""
Initialize database.
Returns:
True if successful, False otherwise
"""
try:
self.log_info("Initializing database...")
# Import and run database initialization
os.chdir(self.project_root)
from src.server.database import init_db
await init_db()
self.log_success("Database initialized")
return True
except Exception as e:
self.log_error(f"Failed to initialize database: {e}")
return False
# ============================================================================
# Summary
# ============================================================================
def print_summary(self) -> None:
"""Print setup summary."""
self.log_info("=" * 50)
self.log_info("Setup Summary")
self.log_info("=" * 50)
self.log_info(f"Environment: {self.environment}")
self.log_info(f"Conda Environment: {self.conda_env}")
self.log_info(f"Project Root: {self.project_root}")
self.log_info("")
self.log_success("Setup complete!")
self.log_info("")
self.log_info("Next steps:")
self.log_info("1. Configure .env files with your settings")
if self.environment == "production":
self.log_info("2. Set up database (PostgreSQL/MySQL)")
self.log_info("3. Configure security settings")
self.log_info("4. Run: ./scripts/start.sh production")
else:
self.log_info("2. Run: ./scripts/start.sh development")
self.log_info("")
# ============================================================================
# Main Setup
# ============================================================================
async def run(self) -> int:
"""
Run setup process.
Returns:
0 if successful, 1 otherwise
"""
print("\033[34m" + "=" * 50 + "\033[0m")
print("\033[34mAniworld Application Setup\033[0m")
print("\033[34m" + "=" * 50 + "\033[0m")
print()
# Validation
if not self.validate_environment():
return 1
if not self.check_python_version():
return 1
if not self.check_conda_env():
return 1
# Setup
if not self.create_directories():
return 1
if not self.create_env_files():
return 1
if not self.install_dependencies():
return 1
# Initialize database
if not await self.init_database():
return 1
# Summary
self.print_summary()
return 0
async def main() -> int:
"""
Main entry point.
Returns:
Exit code
"""
parser = argparse.ArgumentParser(
description="Aniworld Application Setup"
)
parser.add_argument(
"--environment",
choices=["development", "production", "testing"],
default="development",
help="Environment to set up (default: development)"
)
parser.add_argument(
"--no-deps",
action="store_true",
help="Skip dependency installation"
)
args = parser.parse_args()
setup = SetupManager(
environment=args.environment,
skip_deps=args.no_deps
)
return await setup.run()
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

View File

@@ -1,225 +0,0 @@
#!/bin/bash
################################################################################
# Aniworld Application Startup Script
#
# This script initializes the development or production environment,
# installs dependencies, sets up the database, and starts the application.
#
# Usage:
# ./start.sh [development|production] [--no-install]
#
# Environment Variables:
# ENVIRONMENT: 'development' or 'production' (default: development)
# CONDA_ENV: Conda environment name (default: AniWorld)
# PORT: Server port (default: 8000)
# HOST: Server host (default: 127.0.0.1)
#
################################################################################
set -euo pipefail
# ============================================================================
# Configuration
# ============================================================================
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
CONDA_ENV="${CONDA_ENV:-AniWorld}"
ENVIRONMENT="${1:-development}"
INSTALL_DEPS="${INSTALL_DEPS:-true}"
PORT="${PORT:-8000}"
HOST="${HOST:-127.0.0.1}"
# ============================================================================
# Color Output
# ============================================================================
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# ============================================================================
# Functions
# ============================================================================
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# Check if conda environment exists
check_conda_env() {
if ! conda env list | grep -q "^$CONDA_ENV "; then
log_error "Conda environment '$CONDA_ENV' not found."
log_info "Create it with: conda create -n $CONDA_ENV python=3.11"
exit 1
fi
log_success "Conda environment '$CONDA_ENV' found."
}
# Validate environment parameter
validate_environment() {
if [[ ! "$ENVIRONMENT" =~ ^(development|production|testing)$ ]]; then
log_error "Invalid environment: $ENVIRONMENT"
log_info "Valid options: development, production, testing"
exit 1
fi
log_success "Environment set to: $ENVIRONMENT"
}
# Create necessary directories
create_directories() {
log_info "Creating necessary directories..."
mkdir -p "$PROJECT_ROOT/logs"
mkdir -p "$PROJECT_ROOT/data"
mkdir -p "$PROJECT_ROOT/data/config_backups"
mkdir -p "$PROJECT_ROOT/Temp"
log_success "Directories created."
}
# Install dependencies
install_dependencies() {
if [[ "$INSTALL_DEPS" != "true" ]]; then
log_warning "Skipping dependency installation."
return
fi
log_info "Installing dependencies..."
conda run -n "$CONDA_ENV" pip install -q -r "$PROJECT_ROOT/requirements.txt"
log_success "Dependencies installed."
}
# Initialize database
init_database() {
log_info "Initializing database..."
cd "$PROJECT_ROOT"
conda run -n "$CONDA_ENV" \
python -c "from src.server.database import init_db; import asyncio; asyncio.run(init_db())"
log_success "Database initialized."
}
# Create environment file if it doesn't exist
create_env_file() {
ENV_FILE="$PROJECT_ROOT/.env.$ENVIRONMENT"
if [[ ! -f "$ENV_FILE" ]]; then
log_warning "Creating $ENV_FILE with defaults..."
cat > "$ENV_FILE" << EOF
# Aniworld Configuration for $ENVIRONMENT
# Security Settings
JWT_SECRET_KEY=your-secret-key-here
PASSWORD_SALT=your-salt-here
MASTER_PASSWORD_HASH=\$2b\$12\$wP0KBVbJKVAb8CdSSXw0NeGTKCkbw4fSAFXIqR2/wDqPSEBn9w7lS
# Database
DATABASE_URL=sqlite:///./data/aniworld_${ENVIRONMENT}.db
# Application
ENVIRONMENT=${ENVIRONMENT}
ANIME_DIRECTORY=/path/to/anime
# Server
PORT=${PORT}
HOST=${HOST}
# Logging
LOG_LEVEL=$([ "$ENVIRONMENT" = "production" ] && echo "WARNING" || echo "DEBUG")
# Features (development only)
$([ "$ENVIRONMENT" = "development" ] && echo "DEBUG=true" || echo "DEBUG=false")
EOF
log_success "Created $ENV_FILE - please configure with your settings"
fi
}
# Start the application
start_application() {
log_info "Starting Aniworld application..."
log_info "Environment: $ENVIRONMENT"
log_info "Conda Environment: $CONDA_ENV"
log_info "Server: http://$HOST:$PORT"
cd "$PROJECT_ROOT"
case "$ENVIRONMENT" in
development)
log_info "Starting in development mode with auto-reload..."
conda run -n "$CONDA_ENV" \
python -m uvicorn \
src.server.fastapi_app:app \
--host "$HOST" \
--port "$PORT" \
--reload
;;
production)
WORKERS="${WORKERS:-4}"
log_info "Starting in production mode with $WORKERS workers..."
conda run -n "$CONDA_ENV" \
python -m uvicorn \
src.server.fastapi_app:app \
--host "$HOST" \
--port "$PORT" \
--workers "$WORKERS" \
--worker-class "uvicorn.workers.UvicornWorker"
;;
testing)
log_warning "Starting in testing mode..."
# Testing mode typically runs tests instead of starting server
conda run -n "$CONDA_ENV" \
python -m pytest tests/ -v --tb=short
;;
*)
log_error "Unknown environment: $ENVIRONMENT"
exit 1
;;
esac
}
# ============================================================================
# Main Script
# ============================================================================
main() {
log_info "=========================================="
log_info "Aniworld Application Startup"
log_info "=========================================="
# Parse command-line options
while [[ $# -gt 0 ]]; do
case "$1" in
--no-install)
INSTALL_DEPS="false"
shift
;;
*)
ENVIRONMENT="$1"
shift
;;
esac
done
validate_environment
check_conda_env
create_directories
create_env_file
install_dependencies
init_database
start_application
}
# Run main function
main "$@"

View File

@@ -1,294 +0,0 @@
"""Manual integration test for NFO functionality.
This script tests the complete NFO generation workflow with real TMDB API calls.
It's intended for manual verification, not automated testing.
Usage:
1. Set TMDB_API_KEY environment variable
2. Run: python scripts/test_nfo_integration.py
3. Check output in test_output/ directory
Requirements:
- Valid TMDB API key (get from https://www.themoviedb.org/settings/api)
- Internet connection
- Write permissions for test_output/ directory
"""
import asyncio
import os
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.core.entities.nfo_models import TVShowNFO
from src.core.services.nfo_service import NFOService
from src.core.services.tmdb_client import TMDBAPIError, TMDBClient
from src.core.utils.nfo_generator import generate_tvshow_nfo, validate_nfo_xml
async def test_tmdb_client():
"""Test TMDB client basic functionality."""
print("\n=== Testing TMDB Client ===")
api_key = os.getenv("TMDB_API_KEY")
if not api_key:
print("❌ TMDB_API_KEY environment variable not set")
print(" Get your API key from: https://www.themoviedb.org/settings/api")
return False
try:
async with TMDBClient(api_key=api_key) as client:
# Test 1: Search for a show
print("\n1. Searching for 'Attack on Titan'...")
results = await client.search_tv_show("Attack on Titan")
if results and results.get("results"):
show = results["results"][0]
print(f" ✅ Found: {show['name']} (ID: {show['id']})")
show_id = show["id"]
else:
print(" ❌ No results found")
return False
# Test 2: Get show details
print(f"\n2. Getting details for show ID {show_id}...")
details = await client.get_tv_show_details(
show_id,
append_to_response="credits,external_ids,images"
)
print(f" ✅ Title: {details['name']}")
print(f" ✅ First Air Date: {details.get('first_air_date', 'N/A')}")
print(f" ✅ Rating: {details.get('vote_average', 'N/A')}/10")
# Test 3: Get external IDs
if "external_ids" in details:
ext_ids = details["external_ids"]
print(f" ✅ IMDB ID: {ext_ids.get('imdb_id', 'N/A')}")
print(f" ✅ TVDB ID: {ext_ids.get('tvdb_id', 'N/A')}")
# Test 4: Get images
if "images" in details:
images = details["images"]
print(f" ✅ Posters: {len(images.get('posters', []))}")
print(f" ✅ Backdrops: {len(images.get('backdrops', []))}")
print(f" ✅ Logos: {len(images.get('logos', []))}")
# Test 5: Get image URL
if details.get("poster_path"):
url = client.get_image_url(details["poster_path"], "w500")
print(f" ✅ Poster URL: {url[:60]}...")
return True
except TMDBAPIError as e:
print(f" ❌ TMDB API Error: {e}")
return False
except Exception as e:
print(f" ❌ Unexpected Error: {e}")
import traceback
traceback.print_exc()
return False
async def test_nfo_generation():
"""Test NFO XML generation."""
print("\n=== Testing NFO Generation ===")
try:
# Create a sample NFO model
print("\n1. Creating sample TVShowNFO model...")
from src.core.entities.nfo_models import (
ActorInfo,
ImageInfo,
RatingInfo,
UniqueID,
)
nfo = TVShowNFO(
title="Test Show",
originaltitle="Test Show Original",
year=2020,
plot="This is a test show for NFO generation validation.",
runtime=45,
premiered="2020-01-15",
status="Continuing",
genre=["Action", "Drama", "Animation"],
studio=["Test Studio"],
country=["Japan"],
ratings=[RatingInfo(
name="themoviedb",
value=8.5,
votes=1000,
max_rating=10,
default=True
)],
actors=[
ActorInfo(name="Test Actor 1", role="Main Character"),
ActorInfo(name="Test Actor 2", role="Villain")
],
thumb=[ImageInfo(url="https://image.tmdb.org/t/p/w500/poster.jpg")],
fanart=[ImageInfo(url="https://image.tmdb.org/t/p/original/fanart.jpg")],
uniqueid=[
UniqueID(type="tmdb", value="12345", default=False),
UniqueID(type="tvdb", value="67890", default=True)
],
tmdbid=12345,
tvdbid=67890,
imdbid="tt1234567"
)
print(" ✅ TVShowNFO model created")
# Test 2: Generate XML
print("\n2. Generating XML...")
xml_string = generate_tvshow_nfo(nfo)
print(f" ✅ Generated {len(xml_string)} characters")
# Test 3: Validate XML
print("\n3. Validating XML...")
validate_nfo_xml(xml_string)
print(" ✅ XML is valid")
# Test 4: Save to file
output_dir = Path("test_output")
output_dir.mkdir(exist_ok=True)
nfo_path = output_dir / "test_tvshow.nfo"
nfo_path.write_text(xml_string, encoding="utf-8")
print(f" ✅ Saved to: {nfo_path}")
# Test 5: Show sample
print("\n4. Sample XML (first 500 chars):")
print(" " + xml_string[:500].replace("\n", "\n "))
return True
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def test_nfo_service():
"""Test complete NFO service workflow."""
print("\n=== Testing NFO Service ===")
api_key = os.getenv("TMDB_API_KEY")
if not api_key:
print("❌ TMDB_API_KEY environment variable not set")
return False
try:
# Create test output directory
output_dir = Path("test_output")
output_dir.mkdir(exist_ok=True)
# Create a test series folder
test_series = output_dir / "Attack_on_Titan"
test_series.mkdir(exist_ok=True)
print(f"\n1. Creating NFO for 'Attack on Titan'...")
print(f" Output directory: {test_series}")
# Initialize NFO service
nfo_service = NFOService(
tmdb_api_key=api_key,
anime_directory=str(output_dir),
image_size="w500"
)
# Create NFO
nfo_path = await nfo_service.create_tvshow_nfo(
serie_name="Attack on Titan",
serie_folder="Attack_on_Titan",
year=2013,
download_poster=True,
download_logo=True,
download_fanart=True
)
print(f" ✅ NFO created: {nfo_path}")
# Check if files were created
print("\n2. Checking created files...")
files_created = {
"tvshow.nfo": (test_series / "tvshow.nfo").exists(),
"poster.jpg": (test_series / "poster.jpg").exists(),
"logo.png": (test_series / "logo.png").exists(),
"fanart.jpg": (test_series / "fanart.jpg").exists(),
}
for filename, exists in files_created.items():
status = "" if exists else ""
size = ""
if exists:
file_path = test_series / filename
size = f" ({file_path.stat().st_size:,} bytes)"
print(f" {status} {filename}{size}")
# Read and validate NFO
if files_created["tvshow.nfo"]:
print("\n3. Validating generated NFO...")
nfo_content = nfo_path.read_text(encoding="utf-8")
validate_nfo_xml(nfo_content)
print(" ✅ NFO is valid XML")
# Show sample
print("\n4. NFO Content (first 800 chars):")
print(" " + nfo_content[:800].replace("\n", "\n "))
return all(files_created.values())
except Exception as e:
print(f" ❌ Error: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all integration tests."""
print("=" * 70)
print("NFO Functionality Integration Tests")
print("=" * 70)
print("\nNOTE: This requires a valid TMDB API key set as environment variable.")
print("Get your API key from: https://www.themoviedb.org/settings/api")
print("Set it with: export TMDB_API_KEY='your_api_key_here'")
results = []
# Test 1: TMDB Client
results.append(("TMDB Client", await test_tmdb_client()))
# Test 2: NFO Generation
results.append(("NFO Generation", await test_nfo_generation()))
# Test 3: NFO Service (full workflow)
results.append(("NFO Service", await test_nfo_service()))
# Summary
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
for test_name, passed in results:
status = "✅ PASSED" if passed else "❌ FAILED"
print(f"{test_name:.<50} {status}")
all_passed = all(result for _, result in results)
if all_passed:
print("\n🎉 All tests passed!")
print("\nGenerated files are in the 'test_output/' directory.")
print("You can import tvshow.nfo into Kodi/Plex/Jellyfin to verify compatibility.")
else:
print("\n⚠️ Some tests failed. Check the output above for details.")
return 1
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

View File

@@ -1,183 +0,0 @@
"""Test script for NFOService.update_tvshow_nfo() functionality.
This script tests the update functionality by:
1. Creating a test NFO file with TMDB ID
2. Updating it with fresh data from TMDB
3. Verifying the update worked correctly
"""
import asyncio
import logging
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config.settings import settings
from src.core.services.nfo_service import NFOService
from src.core.services.tmdb_client import TMDBAPIError
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
async def test_update_nfo():
"""Test NFO update functionality."""
# Check if TMDB API key is configured
if not settings.tmdb_api_key:
logger.error("TMDB_API_KEY not configured in environment")
logger.error("Set TMDB_API_KEY in .env file or environment variables")
return False
# Test series: Attack on Titan (TMDB ID: 1429)
test_serie_name = "Attack on Titan"
test_serie_folder = "test_update_nfo"
test_tmdb_id = 1429
# Create test folder
test_folder = Path(settings.anime_directory) / test_serie_folder
test_folder.mkdir(parents=True, exist_ok=True)
logger.info(f"Created test folder: {test_folder}")
# Initialize NFO service
nfo_service = NFOService(
tmdb_api_key=settings.tmdb_api_key,
anime_directory=settings.anime_directory,
image_size=settings.nfo_image_size
)
try:
# Step 1: Create initial NFO
logger.info("=" * 60)
logger.info("STEP 1: Creating initial NFO")
logger.info("=" * 60)
nfo_path = await nfo_service.create_tvshow_nfo(
serie_name=test_serie_name,
serie_folder=test_serie_folder,
year=2013,
download_poster=False, # Skip downloads for faster testing
download_logo=False,
download_fanart=False
)
logger.info(f"✓ Initial NFO created: {nfo_path}")
# Read initial NFO content
initial_content = nfo_path.read_text(encoding="utf-8")
logger.info(f"Initial NFO size: {len(initial_content)} bytes")
# Verify TMDB ID is in the file
if str(test_tmdb_id) not in initial_content:
logger.error(f"TMDB ID {test_tmdb_id} not found in NFO!")
return False
logger.info(f"✓ TMDB ID {test_tmdb_id} found in NFO")
# Step 2: Update the NFO
logger.info("")
logger.info("=" * 60)
logger.info("STEP 2: Updating NFO with fresh data")
logger.info("=" * 60)
updated_path = await nfo_service.update_tvshow_nfo(
serie_folder=test_serie_folder,
download_media=False # Skip downloads
)
logger.info(f"✓ NFO updated: {updated_path}")
# Read updated content
updated_content = updated_path.read_text(encoding="utf-8")
logger.info(f"Updated NFO size: {len(updated_content)} bytes")
# Verify TMDB ID is still in the file
if str(test_tmdb_id) not in updated_content:
logger.error(f"TMDB ID {test_tmdb_id} not found after update!")
return False
logger.info(f"✓ TMDB ID {test_tmdb_id} still present after update")
# Step 3: Test update on non-existent NFO (should fail)
logger.info("")
logger.info("=" * 60)
logger.info("STEP 3: Testing error handling")
logger.info("=" * 60)
try:
await nfo_service.update_tvshow_nfo(
serie_folder="non_existent_folder",
download_media=False
)
logger.error("✗ Should have raised FileNotFoundError!")
return False
except FileNotFoundError as e:
logger.info(f"✓ Correctly raised FileNotFoundError: {e}")
# Step 4: Test update on NFO without TMDB ID
logger.info("")
logger.info("STEP 4: Testing NFO without TMDB ID")
# Create a minimal NFO without TMDB ID
no_id_folder = test_folder.parent / "test_no_tmdb_id"
no_id_folder.mkdir(parents=True, exist_ok=True)
no_id_nfo = no_id_folder / "tvshow.nfo"
no_id_nfo.write_text(
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<tvshow>\n'
' <title>Test Show</title>\n'
'</tvshow>',
encoding="utf-8"
)
try:
await nfo_service.update_tvshow_nfo(
serie_folder="test_no_tmdb_id",
download_media=False
)
logger.error("✗ Should have raised TMDBAPIError!")
return False
except TMDBAPIError as e:
logger.info(f"✓ Correctly raised TMDBAPIError: {e}")
logger.info("")
logger.info("=" * 60)
logger.info("✓ ALL TESTS PASSED")
logger.info("=" * 60)
return True
except Exception as e:
logger.error(f"✗ Test failed with error: {e}", exc_info=True)
return False
finally:
await nfo_service.close()
# Cleanup test folders
logger.info("\nCleaning up test folders...")
import shutil
try:
if test_folder.exists():
shutil.rmtree(test_folder)
logger.info(f"Removed: {test_folder}")
no_id_folder = test_folder.parent / "test_no_tmdb_id"
if no_id_folder.exists():
shutil.rmtree(no_id_folder)
logger.info(f"Removed: {no_id_folder}")
except Exception as e:
logger.warning(f"Cleanup error: {e}")
async def main():
"""Main entry point."""
success = await test_update_nfo()
sys.exit(0 if success else 1)
if __name__ == "__main__":
asyncio.run(main())