Aniworld/src/server/services/download_service.py
Lukas b1726968e5 Refactor: Replace CallbackManager with Events pattern
- Replace callback system with events library in SerieScanner
- Update SeriesApp to subscribe to loader and scanner events
- Refactor ScanService to use Events instead of CallbackManager
- Remove CallbackManager imports and callback classes
- Add safe event calling with error handling in SerieScanner
- Update AniworldLoader to use Events for download progress
- Remove progress_callback parameter from download methods
- Update all affected tests for Events pattern
- Fix test_series_app.py for new event subscription model
- Comment out obsolete callback tests in test_scan_service.py

All core tests passing. Events provide cleaner event-driven architecture.
2025-12-30 21:04:45 +01:00

1032 lines
37 KiB
Python

"""Download queue service for managing anime episode downloads.
This module provides a simplified queue management system for handling
anime episode downloads with manual start/stop controls, progress tracking,
database persistence, and retry functionality.
The service uses SQLite database for persistent storage via QueueRepository
while maintaining an in-memory cache for performance.
"""
from __future__ import annotations
import asyncio
import uuid
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Dict, List, Optional
import structlog
from src.server.models.download import (
DownloadItem,
DownloadPriority,
DownloadStatus,
EpisodeIdentifier,
QueueStats,
QueueStatus,
)
from src.server.services.anime_service import AnimeService, AnimeServiceError
from src.server.services.progress_service import ProgressService, get_progress_service
if TYPE_CHECKING:
from src.server.services.queue_repository import QueueRepository
logger = structlog.get_logger(__name__)
class DownloadServiceError(Exception):
"""Service-level exception for download queue operations."""
class DownloadService:
"""Manages the download queue with manual start/stop controls.
Features:
- Manual download start/stop
- FIFO queue processing
- Real-time progress tracking
- Database persistence via QueueRepository
- Automatic retry logic
- WebSocket broadcast support
"""
def __init__(
self,
anime_service: AnimeService,
queue_repository: Optional["QueueRepository"] = None,
max_retries: int = 3,
progress_service: Optional[ProgressService] = None,
):
"""Initialize the download service.
Args:
anime_service: Service for anime operations
queue_repository: Optional repository for database persistence.
If not provided, will use default singleton.
max_retries: Maximum retry attempts for failed downloads
progress_service: Optional progress service for tracking
"""
self._anime_service = anime_service
self._max_retries = max_retries
self._progress_service = progress_service or get_progress_service()
# Database repository for persistence
self._queue_repository = queue_repository
self._db_initialized = False
# In-memory cache for performance (synced with database)
self._pending_queue: deque[DownloadItem] = deque()
# Helper dict for O(1) lookup of pending items by ID
self._pending_items_by_id: Dict[str, DownloadItem] = {}
self._active_download: Optional[DownloadItem] = None
self._completed_items: deque[DownloadItem] = deque(maxlen=100)
self._failed_items: deque[DownloadItem] = deque(maxlen=50)
# Control flags
self._is_stopped = True # Queue processing is stopped by default
self._is_shutting_down = False # Flag to indicate shutdown
# Executor for blocking operations
self._executor = ThreadPoolExecutor(max_workers=1)
# Track active download task for cancellation
self._active_download_task: Optional[asyncio.Task] = None
# Statistics tracking
self._total_downloaded_mb: float = 0.0
self._download_speeds: deque[float] = deque(maxlen=10)
# Track if queue progress has been initialized
self._queue_progress_initialized: bool = False
logger.info(
"DownloadService initialized",
max_retries=max_retries,
)
def _get_repository(self) -> "QueueRepository":
"""Get the queue repository, initializing if needed.
Returns:
QueueRepository instance
"""
if self._queue_repository is None:
from src.server.services.queue_repository import get_queue_repository
self._queue_repository = get_queue_repository()
return self._queue_repository
async def initialize(self) -> None:
"""Initialize the service by loading queue state from database.
Should be called after database is initialized during app startup.
Note: With the simplified model, status/priority/progress are now
managed in-memory only. The database stores the queue items
for persistence across restarts.
"""
if self._db_initialized:
return
try:
repository = self._get_repository()
# Load all items from database - they all start as PENDING
# since status is now managed in-memory only
all_items = await repository.get_all_items()
for item in all_items:
# All items from database are treated as pending
item.status = DownloadStatus.PENDING
self._add_to_pending_queue(item)
self._db_initialized = True
logger.info(
"Queue restored from database: pending_count=%d",
len(self._pending_queue),
)
except Exception as e:
logger.error("Failed to load queue from database: %s", e, exc_info=True)
# Continue without persistence - queue will work in memory only
self._db_initialized = True
async def _save_to_database(self, item: DownloadItem) -> DownloadItem:
"""Save or update an item in the database.
Args:
item: Download item to save
Returns:
Saved item with database ID
"""
try:
repository = self._get_repository()
return await repository.save_item(item)
except Exception as e:
logger.error("Failed to save item to database: %s", e)
return item
async def _set_error_in_database(
self,
item_id: str,
error: str,
) -> bool:
"""Set error message on an item in the database.
Args:
item_id: Download item ID
error: Error message
Returns:
True if update succeeded
"""
try:
repository = self._get_repository()
return await repository.set_error(item_id, error)
except Exception as e:
logger.error("Failed to set error in database: %s", e)
return False
async def _delete_from_database(self, item_id: str) -> bool:
"""Delete an item from the database.
Args:
item_id: Download item ID
Returns:
True if delete succeeded
"""
try:
repository = self._get_repository()
return await repository.delete_item(item_id)
except Exception as e:
logger.error("Failed to delete from database: %s", e)
return False
async def _remove_episode_from_missing_list(
self,
series_key: str,
season: int,
episode: int,
) -> bool:
"""Remove a downloaded episode from the missing episodes list.
Called when a download completes successfully to update the
database so the episode no longer appears as missing.
Args:
series_key: Unique provider key for the series
season: Season number
episode: Episode number within season
Returns:
True if episode was removed, False otherwise
"""
try:
from src.server.database.connection import get_db_session
from src.server.database.service import EpisodeService
async with get_db_session() as db:
deleted = await EpisodeService.delete_by_series_and_episode(
db=db,
series_key=series_key,
season=season,
episode_number=episode,
)
if deleted:
logger.info(
"Removed episode from missing list: "
"%s S%02dE%02d",
series_key,
season,
episode,
)
# Clear the anime service cache so list_missing
# returns updated data
try:
self._anime_service._cached_list_missing.cache_clear()
except Exception:
pass
return deleted
except Exception as e:
logger.error(
"Failed to remove episode from missing list: %s", e
)
return False
async def _init_queue_progress(self) -> None:
"""Initialize the download queue progress tracking.
This is called lazily when queue processing starts to ensure
the event loop is running and the coroutine can be properly awaited.
"""
if self._queue_progress_initialized:
return
try:
from src.server.services.progress_service import ProgressType
await self._progress_service.start_progress(
progress_id="download_queue",
progress_type=ProgressType.QUEUE,
title="Download Queue",
message="Queue ready",
)
self._queue_progress_initialized = True
except Exception as e:
logger.error("Failed to initialize queue progress: %s", e)
def _add_to_pending_queue(
self, item: DownloadItem, front: bool = False
) -> None:
"""Add item to pending queue and update helper dict.
Args:
item: Download item to add
front: If True, add to front of queue (higher priority)
"""
if front:
self._pending_queue.appendleft(item)
else:
self._pending_queue.append(item)
self._pending_items_by_id[item.id] = item
def _remove_from_pending_queue(self, item_or_id: str) -> Optional[DownloadItem]: # noqa: E501
"""Remove item from pending queue and update helper dict.
Args:
item_or_id: Item ID to remove
Returns:
Removed item or None if not found
"""
if isinstance(item_or_id, str):
item = self._pending_items_by_id.get(item_or_id)
if not item:
return None
item_id = item_or_id
else:
item = item_or_id
item_id = item.id
try:
self._pending_queue.remove(item)
del self._pending_items_by_id[item_id]
return item
except (ValueError, KeyError):
return None
def _generate_item_id(self) -> str:
"""Generate unique identifier for download items."""
return str(uuid.uuid4())
async def add_to_queue(
self,
serie_id: str,
serie_folder: str,
serie_name: str,
episodes: List[EpisodeIdentifier],
priority: DownloadPriority = DownloadPriority.NORMAL,
) -> List[str]:
"""Add episodes to the download queue (FIFO order).
Args:
serie_id: Series identifier - provider key (e.g.,
'attack-on-titan'). This is the unique identifier used
for lookups and identification.
serie_folder: Series folder name on disk (e.g.,
'Attack on Titan (2013)'). Used for filesystem operations
only.
serie_name: Series display name for user interface
episodes: List of episodes to download
priority: Queue priority level (ignored, kept for
compatibility)
Returns:
List of created download item IDs
Raises:
DownloadServiceError: If adding items fails
"""
# Initialize queue progress tracking if not already done
await self._init_queue_progress()
created_ids = []
try:
for episode in episodes:
item = DownloadItem(
id=self._generate_item_id(),
serie_id=serie_id,
serie_folder=serie_folder,
serie_name=serie_name,
episode=episode,
status=DownloadStatus.PENDING,
priority=priority,
added_at=datetime.now(timezone.utc),
)
# Save to database first to get persistent ID
saved_item = await self._save_to_database(item)
# Add to in-memory cache
self._add_to_pending_queue(saved_item, front=False)
created_ids.append(saved_item.id)
logger.info(
"Item added to queue",
item_id=saved_item.id,
serie_key=serie_id,
serie_name=serie_name,
season=episode.season,
episode=episode.episode,
)
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message=f"Added {len(created_ids)} items to queue",
metadata={
"action": "items_added",
"added_ids": created_ids,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
return created_ids
except Exception as e:
logger.error("Failed to add items to queue: %s", e)
raise DownloadServiceError(f"Failed to add items: {str(e)}") from e
async def remove_from_queue(self, item_ids: List[str]) -> List[str]:
"""Remove items from the queue.
Args:
item_ids: List of download item IDs to remove
Returns:
List of successfully removed item IDs
Raises:
DownloadServiceError: If removal fails
"""
removed_ids = []
try:
for item_id in item_ids:
# Check if item is currently downloading
active = self._active_download
if active and active.id == item_id:
item = active
item.status = DownloadStatus.CANCELLED
item.completed_at = datetime.now(timezone.utc)
self._failed_items.append(item)
self._active_download = None
# Delete cancelled item from database
await self._delete_from_database(item_id)
removed_ids.append(item_id)
logger.info("Cancelled active download: item_id=%s", item_id)
continue
# Check pending queue - O(1) lookup using helper dict
if item_id in self._pending_items_by_id:
item = self._pending_items_by_id[item_id]
self._pending_queue.remove(item)
del self._pending_items_by_id[item_id]
# Delete from database
await self._delete_from_database(item_id)
removed_ids.append(item_id)
logger.info(
"Removed from pending queue", item_id=item_id
)
if removed_ids:
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message=f"Removed {len(removed_ids)} items from queue",
metadata={
"action": "items_removed",
"removed_ids": removed_ids,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
return removed_ids
except Exception as e:
logger.error("Failed to remove items: %s", e)
raise DownloadServiceError(
f"Failed to remove items: {str(e)}"
) from e
async def reorder_queue(self, item_ids: List[str]) -> None:
"""Reorder pending queue items.
Args:
item_ids: List of item IDs in desired order.
Items not in this list remain at end of queue.
Raises:
DownloadServiceError: If reordering fails
Note:
Reordering is done in-memory only. Database priority is not
updated since the in-memory queue defines the actual order.
"""
try:
# Build new queue based on specified order
new_queue = deque()
remaining_items = list(self._pending_queue)
# Add items in specified order
for item_id in item_ids:
if item_id in self._pending_items_by_id:
item = self._pending_items_by_id[item_id]
new_queue.append(item)
remaining_items.remove(item)
# Add remaining items that weren't in the reorder list
for item in remaining_items:
new_queue.append(item)
# Replace queue
self._pending_queue = new_queue
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message=f"Queue reordered with {len(item_ids)} items",
metadata={
"action": "queue_reordered",
"reordered_count": len(item_ids),
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
logger.info("Queue reordered", reordered_count=len(item_ids))
except Exception as e:
logger.error("Failed to reorder queue: %s", e)
raise DownloadServiceError(
f"Failed to reorder queue: {str(e)}"
) from e
async def start_queue_processing(self) -> Optional[str]:
"""Start automatic queue processing of all pending downloads.
This will process all pending downloads one by one until the queue
is empty or stopped. The processing continues even if the browser
is closed.
Returns:
Item ID of first started download, or None if queue is empty
Raises:
DownloadServiceError: If queue processing is already active
"""
try:
# Initialize queue progress tracking if not already done
await self._init_queue_progress()
# Check if download already active
if self._active_download:
raise DownloadServiceError(
"Queue processing is already active"
)
# Check if queue is empty
if not self._pending_queue:
logger.info("No pending downloads to start")
return None
# Mark queue as running
self._is_stopped = False
# Start queue processing in background
asyncio.create_task(self._process_queue())
logger.info("Queue processing started")
return "queue_started"
except Exception as e:
logger.error("Failed to start queue processing: %s", e)
raise DownloadServiceError(
f"Failed to start queue processing: {str(e)}"
) from e
async def _process_queue(self) -> None:
"""Process all items in the queue sequentially.
This runs continuously until the queue is empty or stopped.
Each download is processed one at a time, and the next one starts
automatically after the previous one completes.
"""
logger.info("Queue processor started")
while not self._is_stopped and len(self._pending_queue) > 0:
try:
# Get next item from queue
item = self._pending_queue.popleft()
del self._pending_items_by_id[item.id]
logger.info(
"Processing next item from queue",
item_id=item.id,
serie=item.serie_name,
remaining=len(self._pending_queue)
)
# Notify via progress service
queue_status = await self.get_queue_status()
msg = (
f"Started: {item.serie_name} "
f"S{item.episode.season:02d}E{item.episode.episode:02d}"
)
await self._progress_service.update_progress(
progress_id="download_queue",
message=msg,
metadata={
"action": "download_started",
"item_id": item.id,
"serie_name": item.serie_name,
"season": item.episode.season,
"episode": item.episode.episode,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
# Process the download (this will wait until complete)
self._active_download_task = asyncio.create_task(
self._process_download(item)
)
await self._active_download_task
self._active_download_task = None
# Small delay between downloads
await asyncio.sleep(1)
except Exception as e:
logger.error(
"Error in queue processing loop",
error=str(e),
exc_info=True
)
# Continue with next item even if one fails
await asyncio.sleep(2)
# Queue processing completed
self._is_stopped = True
if len(self._pending_queue) == 0:
logger.info("Queue processing completed - all items processed")
queue_status = await self.get_queue_status()
await self._progress_service.complete_progress(
progress_id="download_queue",
message="All downloads completed",
metadata={
"queue_status": queue_status.model_dump(mode="json")
},
)
else:
logger.info("Queue processing stopped by user")
async def start_next_download(self) -> Optional[str]:
"""Legacy method - redirects to start_queue_processing.
Returns:
Item ID of started download, or None if queue is empty
Raises:
DownloadServiceError: If a download is already active
"""
return await self.start_queue_processing()
async def stop_downloads(self) -> None:
"""Stop processing new downloads from queue.
Current download will continue, but no new downloads will start.
"""
self._is_stopped = True
logger.info("Download processing stopped")
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message="Queue processing stopped",
metadata={
"action": "queue_stopped",
"is_stopped": True,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
async def get_queue_status(self) -> QueueStatus:
"""Get current status of all queues.
Returns:
Complete queue status with all items
"""
active_downloads = (
[self._active_download] if self._active_download else []
)
return QueueStatus(
is_running=not self._is_stopped,
is_paused=False, # Kept for compatibility
active_downloads=active_downloads,
pending_queue=list(self._pending_queue),
completed_downloads=list(self._completed_items),
failed_downloads=list(self._failed_items),
)
async def get_queue_stats(self) -> QueueStats:
"""Calculate queue statistics.
Returns:
Statistics about the download queue
"""
active_count = 1 if self._active_download else 0
pending_count = len(self._pending_queue)
completed_count = len(self._completed_items)
failed_count = len(self._failed_items)
# Calculate average speed
avg_speed = None
if self._download_speeds:
avg_speed = (
sum(self._download_speeds) / len(self._download_speeds)
)
# Estimate remaining time
eta_seconds = None
if avg_speed and avg_speed > 0 and pending_count > 0:
# Rough estimation based on average file size
estimated_size_per_episode = 500 # MB
remaining_mb = pending_count * estimated_size_per_episode
eta_seconds = int(remaining_mb / avg_speed)
return QueueStats(
total_items=(
active_count + pending_count + completed_count + failed_count
),
pending_count=pending_count,
active_count=active_count,
completed_count=completed_count,
failed_count=failed_count,
total_downloaded_mb=self._total_downloaded_mb,
average_speed_mbps=avg_speed,
estimated_time_remaining=eta_seconds,
)
async def clear_completed(self) -> int:
"""Clear completed downloads from history.
Returns:
Number of items cleared
"""
count = len(self._completed_items)
self._completed_items.clear()
logger.info("Cleared completed items", count=count)
# Notify via progress service
if count > 0:
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message=f"Cleared {count} completed items",
metadata={
"action": "completed_cleared",
"cleared_count": count,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
return count
async def clear_failed(self) -> int:
"""Clear failed downloads from history.
Returns:
Number of items cleared
"""
count = len(self._failed_items)
self._failed_items.clear()
logger.info("Cleared failed items", count=count)
# Notify via progress service
if count > 0:
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message=f"Cleared {count} failed items",
metadata={
"action": "failed_cleared",
"cleared_count": count,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
return count
async def clear_pending(self) -> int:
"""Clear all pending downloads from the queue.
Returns:
Number of items cleared
"""
count = len(self._pending_queue)
# Delete all pending items from database
for item_id in list(self._pending_items_by_id.keys()):
await self._delete_from_database(item_id)
self._pending_queue.clear()
self._pending_items_by_id.clear()
logger.info("Cleared pending items", count=count)
# Notify via progress service
if count > 0:
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message=f"Cleared {count} pending items",
metadata={
"action": "pending_cleared",
"cleared_count": count,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
return count
async def retry_failed(
self, item_ids: Optional[List[str]] = None
) -> List[str]:
"""Retry failed downloads.
Args:
item_ids: Specific item IDs to retry, or None for all failed items
Returns:
List of item IDs moved back to pending queue
"""
retried_ids = []
try:
failed_list = list(self._failed_items)
for item in failed_list:
# Skip if specific IDs requested and this isn't one
if item_ids and item.id not in item_ids:
continue
# Skip if max retries reached
if item.retry_count >= self._max_retries:
continue
# Move back to pending
self._failed_items.remove(item)
item.status = DownloadStatus.PENDING
item.retry_count += 1
item.error = None
item.progress = None
self._add_to_pending_queue(item)
retried_ids.append(item.id)
# Status is now managed in-memory only
logger.info(
"Retrying failed item: item_id=%s, retry_count=%d",
item.id,
item.retry_count,
)
if retried_ids:
# Notify via progress service
queue_status = await self.get_queue_status()
await self._progress_service.update_progress(
progress_id="download_queue",
message=f"Retried {len(retried_ids)} failed items",
metadata={
"action": "items_retried",
"retried_ids": retried_ids,
"queue_status": queue_status.model_dump(mode="json"),
},
force_broadcast=True,
)
return retried_ids
except Exception as e:
logger.error("Failed to retry items: %s", e)
raise DownloadServiceError(
f"Failed to retry: {str(e)}"
) from e
async def _process_download(self, item: DownloadItem) -> None:
"""Process a single download item.
Args:
item: Download item to process
"""
try:
# Check if shutting down
if self._is_shutting_down:
logger.info("Skipping download due to shutdown")
return
# Update status in memory (status is now in-memory only)
item.status = DownloadStatus.DOWNLOADING
item.started_at = datetime.now(timezone.utc)
self._active_download = item
logger.info(
"Starting download: item_id=%s, serie_key=%s, S%02dE%02d",
item.id,
item.serie_id,
item.episode.season,
item.episode.episode,
)
# Execute download via anime service
# AnimeService handles ALL progress via SeriesApp events:
# - download started/progress/completed/failed events
# - All updates forwarded to ProgressService
# - ProgressService broadcasts to WebSocket clients
# Use serie_folder for filesystem operations
# and serie_id (key) for identification
if not item.serie_folder:
raise DownloadServiceError(
f"Missing serie_folder for download item {item.id}. "
"serie_folder is required for filesystem operations."
)
success = await self._anime_service.download(
serie_folder=item.serie_folder,
season=item.episode.season,
episode=item.episode.episode,
key=item.serie_id,
item_id=item.id,
)
# Handle result
if success:
item.status = DownloadStatus.COMPLETED
item.completed_at = datetime.now(timezone.utc)
# Track downloaded size
if item.progress and item.progress.downloaded_mb:
self._total_downloaded_mb += item.progress.downloaded_mb
self._completed_items.append(item)
# Delete completed item from database (status is in-memory)
await self._delete_from_database(item.id)
# Remove episode from missing episodes list in database
await self._remove_episode_from_missing_list(
series_key=item.serie_id,
season=item.episode.season,
episode=item.episode.episode,
)
logger.info(
"Download completed successfully: item_id=%s", item.id
)
else:
raise AnimeServiceError("Download returned False")
except asyncio.CancelledError:
# Handle task cancellation during shutdown
logger.info(
"Download task cancelled: item_id=%s",
item.id,
)
item.status = DownloadStatus.CANCELLED
item.completed_at = datetime.now(timezone.utc)
# Delete cancelled item from database
await self._delete_from_database(item.id)
# Return item to pending queue if not shutting down
if not self._is_shutting_down:
self._add_to_pending_queue(item, front=True)
# Re-save to database as pending
await self._save_to_database(item)
raise # Re-raise to properly cancel the task
except InterruptedError:
# Handle download cancellation from provider
logger.info(
"Download interrupted/cancelled: item_id=%s",
item.id,
)
item.status = DownloadStatus.CANCELLED
item.completed_at = datetime.now(timezone.utc)
# Delete cancelled item from database
await self._delete_from_database(item.id)
# Return item to pending queue if not shutting down
if not self._is_shutting_down:
self._add_to_pending_queue(item, front=True)
# Re-save to database as pending
await self._save_to_database(item)
# Don't re-raise - this is handled gracefully
except Exception as e:
# Handle failure
item.status = DownloadStatus.FAILED
item.completed_at = datetime.now(timezone.utc)
item.error = str(e)
self._failed_items.append(item)
# Set error in database
await self._set_error_in_database(item.id, str(e))
logger.error(
"Download failed: item_id=%s, error=%s, retry_count=%d",
item.id,
str(e),
item.retry_count,
)
# Note: Failure is already broadcast by AnimeService
# via ProgressService when SeriesApp fires failed event
finally:
# Remove from active downloads
if self._active_download and self._active_download.id == item.id:
self._active_download = None
# Singleton instance
_download_service_instance: Optional[DownloadService] = None
def get_download_service(anime_service: AnimeService) -> DownloadService:
"""Factory function for FastAPI dependency injection.
Args:
anime_service: AnimeService instance
Returns:
Singleton DownloadService instance
"""
global _download_service_instance
if _download_service_instance is None:
_download_service_instance = DownloadService(anime_service)
return _download_service_instance