From e414a1a358aea374d5715f05663b7c169e09fcc1 Mon Sep 17 00:00:00 2001 From: Lukas Date: Sun, 2 Nov 2025 10:34:49 +0100 Subject: [PATCH] refactored callback --- data/config.json | 2 +- instructions.md | 3 + src/core/SeriesApp.py | 2 +- src/server/services/anime_service.py | 258 ++++++++++++++---------- src/server/services/download_service.py | 230 ++++++++------------- 5 files changed, 241 insertions(+), 254 deletions(-) diff --git a/data/config.json b/data/config.json index 077de6f..49efcb6 100644 --- a/data/config.json +++ b/data/config.json @@ -17,7 +17,7 @@ "keep_days": 30 }, "other": { - "master_password_hash": "$pbkdf2-sha256$29000$MkbonbMWolTKOUfIOcc4Jw$8Aza9RknTXDSwQ1/mc.EwerqRrZ4Yo6tQlust.Nm/kQ", + "master_password_hash": "$pbkdf2-sha256$29000$GiPkvJeS8j4HwBjDmNOaMw$8k4ShYlk51ZsxoiQBZGjXCsvl0xbbiXIFYI/EWlqVrI", "anime_directory": "/home/lukas/Volume/serien/" }, "version": "1.0.0" diff --git a/instructions.md b/instructions.md index de67f07..15b928e 100644 --- a/instructions.md +++ b/instructions.md @@ -129,3 +129,6 @@ For each task completed: - WebSocket infrastructure remains unchanged # Tasks + +[] check method from SeriesApp are used in a correct way. SeriesApp method changed. make sure that classes that use SeriesApp take the latest interface. +[] SeriesApp no have events make sure services and api use them diff --git a/src/core/SeriesApp.py b/src/core/SeriesApp.py index baff698..42596b1 100644 --- a/src/core/SeriesApp.py +++ b/src/core/SeriesApp.py @@ -197,7 +197,7 @@ class SeriesApp: results = await asyncio.to_thread(self.loader.search, words) logger.info("Found %d results", len(results)) return results - + async def download( self, serie_folder: str, diff --git a/src/server/services/anime_service.py b/src/server/services/anime_service.py index 412a441..09643da 100644 --- a/src/server/services/anime_service.py +++ b/src/server/services/anime_service.py @@ -1,9 +1,8 @@ from __future__ import annotations import asyncio -from concurrent.futures import ThreadPoolExecutor from functools import lru_cache -from typing import Callable, List, Optional +from typing import List, Optional import structlog @@ -22,9 +21,10 @@ class AnimeServiceError(Exception): class AnimeService: - """Wraps the blocking SeriesApp for use in the FastAPI web layer. + """Wraps SeriesApp for use in the FastAPI web layer. - - Runs blocking operations in a threadpool + - SeriesApp methods are now async, no need for threadpool + - Subscribes to SeriesApp events for progress tracking - Exposes async methods - Adds simple in-memory caching for read operations """ @@ -32,152 +32,208 @@ class AnimeService: def __init__( self, directory: str, - max_workers: int = 4, progress_service: Optional[ProgressService] = None, ): self._directory = directory - self._executor = ThreadPoolExecutor(max_workers=max_workers) self._progress_service = progress_service or get_progress_service() - # SeriesApp is blocking; instantiate per-service + # Initialize SeriesApp with async methods try: self._app = SeriesApp(directory) + # Subscribe to SeriesApp events + self._app.download_status += self._on_download_status + self._app.scan_status += self._on_scan_status except Exception as e: logger.exception("Failed to initialize SeriesApp") raise AnimeServiceError("Initialization failed") from e - async def _run_in_executor(self, func, *args, **kwargs): - loop = asyncio.get_event_loop() + def _on_download_status(self, args) -> None: + """Handle download status events from SeriesApp. + + Args: + args: DownloadStatusEventArgs from SeriesApp + """ try: - return await loop.run_in_executor(self._executor, lambda: func(*args, **kwargs)) - except Exception as e: - logger.exception("Executor task failed") - raise AnimeServiceError(str(e)) from e + # Map SeriesApp download events to progress service + if args.status == "started": + asyncio.create_task( + self._progress_service.start_progress( + progress_id=f"download_{args.serie_folder}_{args.season}_{args.episode}", # noqa: E501 + progress_type=ProgressType.DOWNLOAD, + title=f"Downloading {args.serie_folder}", + message=f"S{args.season:02d}E{args.episode:02d}", + ) + ) + elif args.status == "progress": + asyncio.create_task( + self._progress_service.update_progress( + progress_id=f"download_{args.serie_folder}_{args.season}_{args.episode}", # noqa: E501 + current=int(args.progress), + total=100, + message=args.message or "Downloading...", + ) + ) + elif args.status == "completed": + asyncio.create_task( + self._progress_service.complete_progress( + progress_id=f"download_{args.serie_folder}_{args.season}_{args.episode}", # noqa: E501 + message="Download completed", + ) + ) + elif args.status == "failed": + asyncio.create_task( + self._progress_service.fail_progress( + progress_id=f"download_{args.serie_folder}_{args.season}_{args.episode}", # noqa: E501 + error_message=args.message or str(args.error), + ) + ) + except Exception as exc: + logger.error( + "Error handling download status event", + error=str(exc) + ) + + def _on_scan_status(self, args) -> None: + """Handle scan status events from SeriesApp. + + Args: + args: ScanStatusEventArgs from SeriesApp + """ + try: + scan_id = "library_scan" + + # Map SeriesApp scan events to progress service + if args.status == "started": + asyncio.create_task( + self._progress_service.start_progress( + progress_id=scan_id, + progress_type=ProgressType.SCAN, + title="Scanning anime library", + message=args.message or "Initializing scan...", + ) + ) + elif args.status == "progress": + asyncio.create_task( + self._progress_service.update_progress( + progress_id=scan_id, + current=args.current, + total=args.total, + message=args.message or f"Scanning: {args.folder}", + ) + ) + elif args.status == "completed": + asyncio.create_task( + self._progress_service.complete_progress( + progress_id=scan_id, + message=args.message or "Scan completed", + ) + ) + elif args.status == "failed": + asyncio.create_task( + self._progress_service.fail_progress( + progress_id=scan_id, + error_message=args.message or str(args.error), + ) + ) + elif args.status == "cancelled": + asyncio.create_task( + self._progress_service.fail_progress( + progress_id=scan_id, + error_message=args.message or "Scan cancelled", + ) + ) + except Exception as exc: + logger.error("Error handling scan status event", error=str(exc)) @lru_cache(maxsize=128) def _cached_list_missing(self) -> List[dict]: - # Synchronous cached call used by async wrapper + # Synchronous cached call - SeriesApp.series_list is populated + # during initialization try: series = self._app.series_list # normalize to simple dicts - return [s.to_dict() if hasattr(s, "to_dict") else s for s in series] - except Exception as e: + return [ + s.to_dict() if hasattr(s, "to_dict") else s + for s in series + ] + except Exception: logger.exception("Failed to get missing episodes list") raise async def list_missing(self) -> List[dict]: """Return list of series with missing episodes.""" try: - return await self._run_in_executor(self._cached_list_missing) + # series_list is already populated, just access it + return self._cached_list_missing() except AnimeServiceError: raise - except Exception as e: + except Exception as exc: logger.exception("list_missing failed") - raise AnimeServiceError("Failed to list missing series") from e + raise AnimeServiceError("Failed to list missing series") from exc async def search(self, query: str) -> List[dict]: - """Search for series using underlying loader.Search.""" + """Search for series using underlying loader. + + Args: + query: Search query string + + Returns: + List of search results as dictionaries + """ if not query: return [] try: - result = await self._run_in_executor(self._app.search, query) - # result may already be list of dicts or objects + # SeriesApp.search is now async + result = await self._app.search(query) return result - except Exception as e: + except Exception as exc: logger.exception("search failed") - raise AnimeServiceError("Search failed") from e + raise AnimeServiceError("Search failed") from exc - async def rescan(self, callback: Optional[Callable] = None) -> None: - """Trigger a re-scan. Accepts an optional callback function. + async def rescan(self) -> None: + """Trigger a re-scan. - The callback is executed in the threadpool by SeriesApp. - Progress updates are tracked and broadcasted via ProgressService. + The SeriesApp now handles progress tracking via events which are + forwarded to the ProgressService through event handlers. """ - scan_id = "library_scan" - try: - # Start progress tracking - await self._progress_service.start_progress( - progress_id=scan_id, - progress_type=ProgressType.SCAN, - title="Scanning anime library", - message="Initializing scan...", - ) - - # Create wrapped callback for progress updates - def progress_callback(progress_data: dict) -> None: - """Update progress during scan.""" - try: - if callback: - callback(progress_data) - - # Update progress service - current = progress_data.get("current", 0) - total = progress_data.get("total", 0) - message = progress_data.get("message", "Scanning...") - - # Schedule the coroutine without waiting for it - # This is safe because we don't need the result - loop = asyncio.get_event_loop() - if loop.is_running(): - asyncio.ensure_future( - self._progress_service.update_progress( - progress_id=scan_id, - current=current, - total=total, - message=message, - ) - ) - except Exception as e: - logger.error("Scan progress callback error", error=str(e)) - - # Run scan - await self._run_in_executor(self._app.ReScan, progress_callback) - + # SeriesApp.re_scan is now async and handles events internally + await self._app.re_scan() + # invalidate cache try: self._cached_list_missing.cache_clear() except Exception: pass - - # Complete progress tracking - await self._progress_service.complete_progress( - progress_id=scan_id, - message="Scan completed successfully", - ) - - except Exception as e: - logger.exception("rescan failed") - - # Fail progress tracking - await self._progress_service.fail_progress( - progress_id=scan_id, - error_message=str(e), - ) - - raise AnimeServiceError("Rescan failed") from e - async def download(self, serie_folder: str, season: int, episode: int, key: str, callback=None) -> bool: - """Start a download via the underlying loader. + except Exception as exc: + logger.exception("rescan failed") + raise AnimeServiceError("Rescan failed") from exc + + async def download( + self, + serie_folder: str, + season: int, + episode: int, + key: str, + ) -> bool: + """Start a download. + + The SeriesApp now handles progress tracking via events which are + forwarded to the ProgressService through event handlers. Returns True on success or raises AnimeServiceError on failure. """ try: - result = await self._run_in_executor( - self._app.download, serie_folder, season, episode, - key, callback + # SeriesApp.download is now async and handles events internally + return await self._app.download( + serie_folder=serie_folder, + season=season, + episode=episode, + key=key, ) - # OperationResult has a success attribute - if hasattr(result, 'success'): - logger.debug( - "Download result", - success=result.success, - message=result.message - ) - return result.success - return bool(result) - except Exception as e: + except Exception as exc: logger.exception("download failed") - raise AnimeServiceError("Download failed") from e + raise AnimeServiceError("Download failed") from exc def get_anime_service(directory: str = "./") -> AnimeService: diff --git a/src/server/services/download_service.py b/src/server/services/download_service.py index ca68c8a..32347f1 100644 --- a/src/server/services/download_service.py +++ b/src/server/services/download_service.py @@ -27,11 +27,7 @@ from src.server.models.download import ( QueueStatus, ) from src.server.services.anime_service import AnimeService, AnimeServiceError -from src.server.services.progress_service import ( - ProgressService, - ProgressType, - get_progress_service, -) +from src.server.services.progress_service import ProgressService, get_progress_service logger = structlog.get_logger(__name__) @@ -92,10 +88,18 @@ class DownloadService: # Statistics tracking self._total_downloaded_mb: float = 0.0 self._download_speeds: deque[float] = deque(maxlen=10) - + + # Subscribe to SeriesApp download events for progress tracking + if hasattr(anime_service, '_app') and hasattr( + anime_service._app, 'download_status' + ): + anime_service._app.download_status += ( + self._on_seriesapp_download_status + ) + # Load persisted queue self._load_queue() - + logger.info( "DownloadService initialized", max_retries=max_retries, @@ -146,6 +150,69 @@ class DownloadService: self._broadcast_callback = callback logger.debug("Broadcast callback registered") + def _on_seriesapp_download_status(self, args) -> None: + """Handle download status events from SeriesApp. + + Updates the active download item with progress information. + + Args: + args: DownloadStatusEventArgs from SeriesApp + """ + try: + # Only process if we have an active download + if not self._active_download: + return + + # Match the event to the active download item + # SeriesApp events include serie_folder, season, episode + if ( + self._active_download.serie_folder == args.serie_folder + and self._active_download.episode.season == args.season + and self._active_download.episode.episode == args.episode + ): + if args.status == "progress": + # Update item progress + self._active_download.progress = DownloadProgress( + percent=args.progress, + downloaded_mb=( + args.progress * args.mbper_sec / 100 + if args.mbper_sec + else 0.0 + ), + total_mb=None, # Not provided by SeriesApp + speed_mbps=args.mbper_sec, + eta_seconds=args.eta, + ) + + # Track speed + if args.mbper_sec: + self._download_speeds.append(args.mbper_sec) + + # Broadcast update + asyncio.create_task( + self._broadcast_update( + "download_progress", + { + "download_id": self._active_download.id, + "item_id": self._active_download.id, + "serie_name": self._active_download.serie_name, + "season": args.season, + "episode": args.episode, + "progress": ( + self._active_download.progress.model_dump( + mode="json" + ) + ), + }, + ) + ) + + except Exception as exc: + logger.error( + "Error handling SeriesApp download status", + error=str(exc) + ) + async def _broadcast_update(self, update_type: str, data: dict) -> None: """Broadcast update to connected WebSocket clients. @@ -689,107 +756,6 @@ class DownloadService: f"Failed to retry: {str(e)}" ) from e - def _create_progress_callback(self, item: DownloadItem) -> Callable: - """Create a progress callback for a download item. - - Args: - item: Download item to track progress for - - Returns: - Callback function for progress updates - """ - logger.info( - f"Creating progress callback for item {item.id}" - ) - - def progress_callback(progress_data: dict) -> None: - """Update progress and broadcast to clients.""" - try: - logger.debug( - f"Progress callback received: {progress_data}" - ) - - # Update item progress - item.progress = DownloadProgress( - percent=progress_data.get("percent", 0.0), - downloaded_mb=progress_data.get("downloaded_mb", 0.0), - total_mb=progress_data.get("total_mb"), - speed_mbps=progress_data.get("speed_mbps"), - eta_seconds=progress_data.get("eta_seconds"), - ) - - logger.debug( - f"Updated item progress: percent={item.progress.percent:.1f}%, " - f"downloaded={item.progress.downloaded_mb:.1f}MB, " - f"total={item.progress.total_mb}MB, " - f"speed={item.progress.speed_mbps}MB/s" - ) - - # Track speed for statistics - if item.progress.speed_mbps: - self._download_speeds.append(item.progress.speed_mbps) - - # Update progress service - # Schedule coroutines in a thread-safe manner - # (callback may be called from executor thread) - if item.progress.total_mb and item.progress.total_mb > 0: - current_mb = int(item.progress.downloaded_mb) - total_mb = int(item.progress.total_mb) - - logger.debug( - f"Updating progress service: current={current_mb}MB, " - f"total={total_mb}MB" - ) - - try: - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe( - self._progress_service.update_progress( - progress_id=f"download_{item.id}", - current=current_mb, - total=total_mb, - metadata={ - "speed_mbps": item.progress.speed_mbps, - "eta_seconds": item.progress.eta_seconds, - }, - ), - loop - ) - except RuntimeError as e: - logger.warning( - f"Could not schedule progress update: {e}" - ) - - # Broadcast update (fire and forget) - logger.debug( - f"Broadcasting download_progress event for item {item.id}" - ) - - try: - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe( - self._broadcast_update( - "download_progress", - { - "download_id": item.id, - "item_id": item.id, - "serie_name": item.serie_name, - "season": item.episode.season, - "episode": item.episode.episode, - "progress": item.progress.model_dump(mode="json"), - }, - ), - loop - ) - except RuntimeError as e: - logger.warning( - f"Could not schedule broadcast: {e}" - ) - except Exception as e: - logger.error("Progress callback error", error=str(e)) - - return progress_callback - async def _process_download(self, item: DownloadItem) -> None: """Process a single download item. @@ -809,31 +775,10 @@ class DownloadService: season=item.episode.season, episode=item.episode.episode, ) - - # Start progress tracking - await self._progress_service.start_progress( - progress_id=f"download_{item.id}", - progress_type=ProgressType.DOWNLOAD, - title=f"Downloading {item.serie_name}", - message=( - f"S{item.episode.season:02d}E{item.episode.episode:02d}" - ), - metadata={ - "item_id": item.id, - "serie_name": item.serie_name, - "season": item.episode.season, - "episode": item.episode.episode, - }, - ) - - # Create progress callback - progress_callback = self._create_progress_callback(item) - logger.info( - f"Passing callback {progress_callback} to anime_service for " - f"item {item.id}" - ) - + # Execute download via anime service + # Note: AnimeService handles progress via SeriesApp events + # Progress updates received via _on_seriesapp_download_status # Use serie_folder if available, otherwise fall back to serie_id # for backwards compatibility with old queue items folder = item.serie_folder if item.serie_folder else item.serie_id @@ -842,7 +787,6 @@ class DownloadService: season=item.episode.season, episode=item.episode.episode, key=item.serie_id, - callback=progress_callback, ) # Handle result @@ -860,17 +804,7 @@ class DownloadService: "Download completed successfully", item_id=item.id ) - # Complete progress tracking - await self._progress_service.complete_progress( - progress_id=f"download_{item.id}", - message="Download completed successfully", - metadata={ - "downloaded_mb": item.progress.downloaded_mb - if item.progress - else 0, - }, - ) - + # Broadcast completion (progress already handled by events) await self._broadcast_update( "download_complete", { @@ -901,13 +835,7 @@ class DownloadService: retry_count=item.retry_count, ) - # Fail progress tracking - await self._progress_service.fail_progress( - progress_id=f"download_{item.id}", - error_message=str(e), - metadata={"retry_count": item.retry_count}, - ) - + # Broadcast failure (progress already handled by events) await self._broadcast_update( "download_failed", {