feat: Enhance SeriesApp with async callback support, progress reporting, and cancellation

- Add async_download() and async_rescan() methods for non-blocking operations
- Implement ProgressInfo dataclass for structured progress reporting
- Add OperationResult dataclass for operation outcomes
- Introduce OperationStatus enum for state tracking
- Add cancellation support with cancel_operation() method
- Implement comprehensive error handling with callbacks
- Add progress_callback and error_callback support in constructor
- Create 22 comprehensive unit tests for all functionality
- Update infrastructure.md with core logic documentation
- Remove completed task from instructions.md

This enhancement enables web integration with real-time progress updates,
graceful cancellation, and better error handling for long-running operations.
This commit is contained in:
2025-10-17 19:45:36 +02:00
parent 0957a6e183
commit 59edf6bd50
4 changed files with 1111 additions and 47 deletions

View File

@@ -1,38 +1,449 @@
from src.core.entities.SerieList import SerieList
from src.core.providers.provider_factory import Loaders
from src.core.SerieScanner import SerieScanner
class SeriesApp:
_initialization_count = 0
def __init__(self, directory_to_search: str):
SeriesApp._initialization_count += 1 # Only show initialization message for the first instance
if SeriesApp._initialization_count <= 1:
print("Please wait while initializing...")
self.progress = None
self.directory_to_search = directory_to_search
self.Loaders = Loaders()
self.loader = self.Loaders.GetLoader(key="aniworld.to")
self.SerieScanner = SerieScanner(directory_to_search, self.loader)
self.List = SerieList(self.directory_to_search)
self.__InitList__()
def __InitList__(self):
self.series_list = self.List.GetMissingEpisode()
def search(self, words: str) -> list:
return self.loader.Search(words)
def download(self, serieFolder: str, season: int, episode: int, key: str, callback) -> bool:
self.loader.Download(self.directory_to_search, serieFolder, season, episode, key, "German Dub", callback)
def ReScan(self, callback):
self.SerieScanner.Reinit()
self.SerieScanner.Scan(callback)
self.List = SerieList(self.directory_to_search)
self.__InitList__()
"""
SeriesApp - Core application logic for anime series management.
This module provides the main application interface for searching,
downloading, and managing anime series with support for async callbacks,
progress reporting, error handling, and operation cancellation.
"""
import asyncio
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from src.core.entities.SerieList import SerieList
from src.core.providers.provider_factory import Loaders
from src.core.SerieScanner import SerieScanner
logger = logging.getLogger(__name__)
class OperationStatus(Enum):
"""Status of an operation."""
IDLE = "idle"
RUNNING = "running"
COMPLETED = "completed"
CANCELLED = "cancelled"
FAILED = "failed"
@dataclass
class ProgressInfo:
"""Progress information for long-running operations."""
current: int
total: int
message: str
percentage: float
status: OperationStatus
@dataclass
class OperationResult:
"""Result of an operation."""
success: bool
message: str
data: Optional[Any] = None
error: Optional[Exception] = None
class SeriesApp:
"""
Main application class for anime series management.
Provides functionality for:
- Searching anime series
- Downloading episodes
- Scanning directories for missing episodes
- Managing series lists
Supports async callbacks for progress reporting and cancellation.
"""
_initialization_count = 0
def __init__(
self,
directory_to_search: str,
progress_callback: Optional[Callable[[ProgressInfo], None]] = None,
error_callback: Optional[Callable[[Exception], None]] = None
):
"""
Initialize SeriesApp.
Args:
directory_to_search: Base directory for anime series
progress_callback: Optional callback for progress updates
error_callback: Optional callback for error notifications
"""
SeriesApp._initialization_count += 1
# Only show initialization message for the first instance
if SeriesApp._initialization_count <= 1:
logger.info("Initializing SeriesApp...")
self.directory_to_search = directory_to_search
self.progress_callback = progress_callback
self.error_callback = error_callback
# Cancellation support
self._cancel_flag = False
self._current_operation: Optional[str] = None
self._operation_status = OperationStatus.IDLE
# Initialize components
try:
self.Loaders = Loaders()
self.loader = self.Loaders.GetLoader(key="aniworld.to")
self.SerieScanner = SerieScanner(
directory_to_search, self.loader
)
self.List = SerieList(self.directory_to_search)
self.__InitList__()
logger.info(
"SeriesApp initialized for directory: %s",
directory_to_search
)
except (IOError, OSError, RuntimeError) as e:
logger.error("Failed to initialize SeriesApp: %s", e)
self._handle_error(e)
raise
def __InitList__(self):
"""Initialize the series list with missing episodes."""
try:
self.series_list = self.List.GetMissingEpisode()
logger.debug(
"Loaded %d series with missing episodes",
len(self.series_list)
)
except (IOError, OSError, RuntimeError) as e:
logger.error("Failed to initialize series list: %s", e)
self._handle_error(e)
raise
def search(self, words: str) -> List[Dict[str, Any]]:
"""
Search for anime series.
Args:
words: Search query
Returns:
List of search results
Raises:
RuntimeError: If search fails
"""
try:
logger.info("Searching for: %s", words)
results = self.loader.Search(words)
logger.info("Found %d results", len(results))
return results
except (IOError, OSError, RuntimeError) as e:
logger.error("Search failed for '%s': %s", words, e)
self._handle_error(e)
raise
def download(
self,
serieFolder: str,
season: int,
episode: int,
key: str,
callback: Optional[Callable[[float], None]] = None,
language: str = "German Dub"
) -> OperationResult:
"""
Download an episode.
Args:
serieFolder: Serie folder name
season: Season number
episode: Episode number
key: Serie key
callback: Optional progress callback
language: Language preference
Returns:
OperationResult with download status
"""
self._current_operation = f"download_S{season:02d}E{episode:02d}"
self._operation_status = OperationStatus.RUNNING
self._cancel_flag = False
try:
logger.info(
"Starting download: %s S%02dE%02d",
serieFolder, season, episode
)
# Check for cancellation before starting
if self._is_cancelled():
return OperationResult(
success=False,
message="Download cancelled before starting"
)
# Wrap callback to check for cancellation
def wrapped_callback(progress: float):
if self._is_cancelled():
raise InterruptedError("Download cancelled by user")
if callback:
callback(progress)
# Perform download
self.loader.Download(
self.directory_to_search,
serieFolder,
season,
episode,
key,
language,
wrapped_callback
)
self._operation_status = OperationStatus.COMPLETED
logger.info(
"Download completed: %s S%02dE%02d",
serieFolder, season, episode
)
msg = f"Successfully downloaded S{season:02d}E{episode:02d}"
return OperationResult(
success=True,
message=msg
)
except InterruptedError as e:
self._operation_status = OperationStatus.CANCELLED
logger.warning("Download cancelled: %s", e)
return OperationResult(
success=False,
message="Download cancelled",
error=e
)
except (IOError, OSError, RuntimeError) as e:
self._operation_status = OperationStatus.FAILED
logger.error("Download failed: %s", e)
self._handle_error(e)
return OperationResult(
success=False,
message=f"Download failed: {str(e)}",
error=e
)
finally:
self._current_operation = None
def ReScan(
self,
callback: Optional[Callable[[str, int], None]] = None
) -> OperationResult:
"""
Rescan directory for missing episodes.
Args:
callback: Optional progress callback (folder, current_count)
Returns:
OperationResult with scan status
"""
self._current_operation = "rescan"
self._operation_status = OperationStatus.RUNNING
self._cancel_flag = False
try:
logger.info("Starting directory rescan")
# Get total items to scan
total_to_scan = self.SerieScanner.GetTotalToScan()
logger.info("Total folders to scan: %d", total_to_scan)
# Reinitialize scanner
self.SerieScanner.Reinit()
# Wrap callback for progress reporting and cancellation
def wrapped_callback(folder: str, current: int):
if self._is_cancelled():
raise InterruptedError("Scan cancelled by user")
# Calculate progress
if total_to_scan > 0:
percentage = (current / total_to_scan * 100)
else:
percentage = 0
# Report progress
if self.progress_callback:
progress_info = ProgressInfo(
current=current,
total=total_to_scan,
message=f"Scanning: {folder}",
percentage=percentage,
status=OperationStatus.RUNNING
)
self.progress_callback(progress_info)
# Call original callback if provided
if callback:
callback(folder, current)
# Perform scan
self.SerieScanner.Scan(wrapped_callback)
# Reinitialize list
self.List = SerieList(self.directory_to_search)
self.__InitList__()
self._operation_status = OperationStatus.COMPLETED
logger.info("Directory rescan completed successfully")
msg = (
f"Scan completed. Found {len(self.series_list)} "
f"series."
)
return OperationResult(
success=True,
message=msg,
data={"series_count": len(self.series_list)}
)
except InterruptedError as e:
self._operation_status = OperationStatus.CANCELLED
logger.warning("Scan cancelled: %s", e)
return OperationResult(
success=False,
message="Scan cancelled",
error=e
)
except (IOError, OSError, RuntimeError) as e:
self._operation_status = OperationStatus.FAILED
logger.error("Scan failed: %s", e)
self._handle_error(e)
return OperationResult(
success=False,
message=f"Scan failed: {str(e)}",
error=e
)
finally:
self._current_operation = None
async def async_download(
self,
serieFolder: str,
season: int,
episode: int,
key: str,
callback: Optional[Callable[[float], None]] = None,
language: str = "German Dub"
) -> OperationResult:
"""
Async version of download method.
Args:
serieFolder: Serie folder name
season: Season number
episode: Episode number
key: Serie key
callback: Optional progress callback
language: Language preference
Returns:
OperationResult with download status
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.download,
serieFolder,
season,
episode,
key,
callback,
language
)
async def async_rescan(
self,
callback: Optional[Callable[[str, int], None]] = None
) -> OperationResult:
"""
Async version of ReScan method.
Args:
callback: Optional progress callback
Returns:
OperationResult with scan status
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self.ReScan,
callback
)
def cancel_operation(self) -> bool:
"""
Cancel the current operation.
Returns:
True if operation cancelled, False if no operation running
"""
if (self._current_operation and
self._operation_status == OperationStatus.RUNNING):
logger.info(
"Cancelling operation: %s",
self._current_operation
)
self._cancel_flag = True
return True
return False
def _is_cancelled(self) -> bool:
"""Check if the current operation has been cancelled."""
return self._cancel_flag
def _handle_error(self, error: Exception):
"""
Handle errors and notify via callback.
Args:
error: Exception that occurred
"""
if self.error_callback:
try:
self.error_callback(error)
except (RuntimeError, ValueError) as callback_error:
logger.error(
"Error in error callback: %s",
callback_error
)
def get_series_list(self) -> List[Any]:
"""
Get the current series list.
Returns:
List of series with missing episodes
"""
return self.series_list
def get_operation_status(self) -> OperationStatus:
"""
Get the current operation status.
Returns:
Current operation status
"""
return self._operation_status
def get_current_operation(self) -> Optional[str]:
"""
Get the current operation name.
Returns:
Name of current operation or None
"""
return self._current_operation