feat(Task 3.4): Implement ScanService with key-based identification

- Create ScanService class (src/server/services/scan_service.py)
  - Use 'key' as primary series identifier throughout
  - Include 'folder' as metadata only for display purposes
  - Implement scan progress tracking via ProgressService
  - Add callback classes for progress, error, and completion
  - Support scan event subscription and broadcasting
  - Maintain scan history with configurable limit
  - Provide cancellation support for in-progress scans

- Create comprehensive unit tests (tests/unit/test_scan_service.py)
  - 38 tests covering all functionality
  - Test ScanProgress dataclass serialization
  - Test callback classes (progress, error, completion)
  - Test service lifecycle (start, cancel, status)
  - Test event subscription and broadcasting
  - Test key-based identification throughout
  - Test singleton pattern

- Update infrastructure.md with ScanService documentation
  - Document service overview and key features
  - Document components and event types
  - Document integration points
  - Include usage example

- Update instructions.md
  - Mark Task 3.4 as complete
  - Mark Phase 3 as fully complete
  - Remove finished task definition

Task: Phase 3, Task 3.4 - Update ScanService to Use Key
Completion Date: November 27, 2025
This commit is contained in:
2025-11-27 18:50:02 +01:00
parent 84ca53a1bc
commit 6726c176b2
4 changed files with 1506 additions and 32 deletions

View File

@@ -0,0 +1,660 @@
"""Scan service for managing anime library scan operations.
This module provides a service layer for scanning the anime library directory,
identifying missing episodes, and broadcasting scan progress updates.
All scan operations use 'key' as the primary series identifier.
"""
from __future__ import annotations
import asyncio
import uuid
from datetime import datetime, timezone
from typing import Any, Callable, Dict, List, Optional
import structlog
from src.core.interfaces.callbacks import (
CallbackManager,
CompletionCallback,
CompletionContext,
ErrorCallback,
ErrorContext,
OperationType,
ProgressCallback,
ProgressContext,
ProgressPhase,
)
from src.server.services.progress_service import (
ProgressService,
ProgressStatus,
ProgressType,
get_progress_service,
)
logger = structlog.get_logger(__name__)
class ScanServiceError(Exception):
"""Service-level exception for scan operations."""
class ScanProgress:
"""Represents the current state of a scan operation.
Attributes:
scan_id: Unique identifier for this scan operation
status: Current status (started, in_progress, completed, failed)
current: Number of folders processed
total: Total number of folders to process
percentage: Completion percentage
message: Human-readable progress message
key: Current series key being scanned (if applicable)
folder: Current folder being scanned (metadata only)
started_at: When the scan started
updated_at: When the progress was last updated
series_found: Number of series found with missing episodes
errors: List of error messages encountered
"""
def __init__(self, scan_id: str):
"""Initialize scan progress.
Args:
scan_id: Unique identifier for this scan
"""
self.scan_id = scan_id
self.status = "started"
self.current = 0
self.total = 0
self.percentage = 0.0
self.message = "Initializing scan..."
self.key: Optional[str] = None
self.folder: Optional[str] = None
self.started_at = datetime.now(timezone.utc)
self.updated_at = datetime.now(timezone.utc)
self.series_found = 0
self.errors: List[str] = []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization.
Returns:
Dictionary representation with 'key' as primary identifier
and 'folder' as metadata only.
"""
result = {
"scan_id": self.scan_id,
"status": self.status,
"current": self.current,
"total": self.total,
"percentage": round(self.percentage, 2),
"message": self.message,
"started_at": self.started_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"series_found": self.series_found,
"errors": self.errors,
}
# Include optional series identifiers
if self.key is not None:
result["key"] = self.key
if self.folder is not None:
result["folder"] = self.folder
return result
class ScanServiceProgressCallback(ProgressCallback):
"""Callback implementation for forwarding scan progress to ScanService.
This callback receives progress events from SerieScanner and forwards
them to the ScanService for processing and broadcasting.
"""
def __init__(
self,
service: "ScanService",
scan_progress: ScanProgress,
):
"""Initialize the callback.
Args:
service: Parent ScanService instance
scan_progress: ScanProgress to update
"""
self._service = service
self._scan_progress = scan_progress
def on_progress(self, context: ProgressContext) -> None:
"""Handle progress update from SerieScanner.
Args:
context: Progress context with key and folder information
"""
self._scan_progress.current = context.current
self._scan_progress.total = context.total
self._scan_progress.percentage = context.percentage
self._scan_progress.message = context.message
self._scan_progress.key = context.key
self._scan_progress.folder = context.folder
self._scan_progress.updated_at = datetime.now(timezone.utc)
if context.phase == ProgressPhase.STARTING:
self._scan_progress.status = "started"
elif context.phase == ProgressPhase.IN_PROGRESS:
self._scan_progress.status = "in_progress"
elif context.phase == ProgressPhase.COMPLETED:
self._scan_progress.status = "completed"
elif context.phase == ProgressPhase.FAILED:
self._scan_progress.status = "failed"
# Forward to service for broadcasting
# Use run_coroutine_threadsafe if event loop is available
try:
loop = asyncio.get_running_loop()
asyncio.run_coroutine_threadsafe(
self._service._handle_progress_update(self._scan_progress),
loop
)
except RuntimeError:
# No running event loop - likely in test or sync context
pass
class ScanServiceErrorCallback(ErrorCallback):
"""Callback implementation for handling scan errors.
This callback receives error events from SerieScanner and forwards
them to the ScanService for processing and broadcasting.
"""
def __init__(
self,
service: "ScanService",
scan_progress: ScanProgress,
):
"""Initialize the callback.
Args:
service: Parent ScanService instance
scan_progress: ScanProgress to update
"""
self._service = service
self._scan_progress = scan_progress
def on_error(self, context: ErrorContext) -> None:
"""Handle error from SerieScanner.
Args:
context: Error context with key and folder information
"""
error_msg = context.message
if context.folder:
error_msg = f"[{context.folder}] {error_msg}"
self._scan_progress.errors.append(error_msg)
self._scan_progress.updated_at = datetime.now(timezone.utc)
logger.warning(
"Scan error",
key=context.key,
folder=context.folder,
error=str(context.error),
recoverable=context.recoverable,
)
# Forward to service for broadcasting
# Use run_coroutine_threadsafe if event loop is available
try:
loop = asyncio.get_running_loop()
asyncio.run_coroutine_threadsafe(
self._service._handle_scan_error(
self._scan_progress,
context,
),
loop
)
except RuntimeError:
# No running event loop - likely in test or sync context
pass
class ScanServiceCompletionCallback(CompletionCallback):
"""Callback implementation for handling scan completion.
This callback receives completion events from SerieScanner and forwards
them to the ScanService for processing and broadcasting.
"""
def __init__(
self,
service: "ScanService",
scan_progress: ScanProgress,
):
"""Initialize the callback.
Args:
service: Parent ScanService instance
scan_progress: ScanProgress to update
"""
self._service = service
self._scan_progress = scan_progress
def on_completion(self, context: CompletionContext) -> None:
"""Handle completion from SerieScanner.
Args:
context: Completion context with statistics
"""
self._scan_progress.status = "completed" if context.success else "failed"
self._scan_progress.message = context.message
self._scan_progress.updated_at = datetime.now(timezone.utc)
if context.statistics:
self._scan_progress.series_found = context.statistics.get(
"series_found", 0
)
# Forward to service for broadcasting
# Use run_coroutine_threadsafe if event loop is available
try:
loop = asyncio.get_running_loop()
asyncio.run_coroutine_threadsafe(
self._service._handle_scan_completion(
self._scan_progress,
context,
),
loop
)
except RuntimeError:
# No running event loop - likely in test or sync context
pass
class ScanService:
"""Manages anime library scan operations.
Features:
- Trigger library scans
- Track scan progress in real-time
- Use 'key' as primary series identifier
- Broadcast scan progress via WebSocket
- Handle scan errors gracefully
- Provide scan history and statistics
All operations use 'key' (provider-assigned, URL-safe identifier)
as the primary series identifier. 'folder' is used only as metadata
for display and filesystem operations.
"""
def __init__(
self,
progress_service: Optional[ProgressService] = None,
):
"""Initialize the scan service.
Args:
progress_service: Optional progress service for tracking
"""
self._progress_service = progress_service or get_progress_service()
# Current scan state
self._current_scan: Optional[ScanProgress] = None
self._is_scanning = False
# Scan history (limited size)
self._scan_history: List[ScanProgress] = []
self._max_history_size = 10
# Event handlers for scan events
self._scan_event_handlers: List[
Callable[[Dict[str, Any]], None]
] = []
# Lock for thread-safe operations
self._lock = asyncio.Lock()
logger.info("ScanService initialized")
def subscribe_to_scan_events(
self,
handler: Callable[[Dict[str, Any]], None],
) -> None:
"""Subscribe to scan events.
Args:
handler: Function to call when scan events occur.
Receives a dictionary with event data including
'key' as the primary identifier.
"""
self._scan_event_handlers.append(handler)
logger.debug("Scan event handler subscribed")
def unsubscribe_from_scan_events(
self,
handler: Callable[[Dict[str, Any]], None],
) -> None:
"""Unsubscribe from scan events.
Args:
handler: Handler function to remove
"""
try:
self._scan_event_handlers.remove(handler)
logger.debug("Scan event handler unsubscribed")
except ValueError:
logger.warning("Handler not found for unsubscribe")
async def _emit_scan_event(self, event_data: Dict[str, Any]) -> None:
"""Emit scan event to all subscribers.
Args:
event_data: Event data to broadcast, includes 'key' as
primary identifier and 'folder' as metadata
"""
for handler in self._scan_event_handlers:
try:
if asyncio.iscoroutinefunction(handler):
await handler(event_data)
else:
handler(event_data)
except Exception as e:
logger.error(
"Scan event handler error",
error=str(e),
)
@property
def is_scanning(self) -> bool:
"""Check if a scan is currently in progress."""
return self._is_scanning
@property
def current_scan(self) -> Optional[ScanProgress]:
"""Get the current scan progress."""
return self._current_scan
async def start_scan(
self,
scanner_factory: Callable[..., Any],
) -> str:
"""Start a new library scan.
Args:
scanner_factory: Factory function that creates a SerieScanner.
The factory should accept a callback_manager parameter.
Returns:
Scan ID for tracking
Raises:
ScanServiceError: If a scan is already in progress
Note:
The scan uses 'key' as the primary identifier for all series.
The 'folder' field is included only as metadata for display.
"""
async with self._lock:
if self._is_scanning:
raise ScanServiceError("A scan is already in progress")
self._is_scanning = True
scan_id = str(uuid.uuid4())
scan_progress = ScanProgress(scan_id)
self._current_scan = scan_progress
logger.info("Starting library scan", scan_id=scan_id)
# Start progress tracking
try:
await self._progress_service.start_progress(
progress_id=f"scan_{scan_id}",
progress_type=ProgressType.SCAN,
title="Library Scan",
message="Initializing scan...",
)
except Exception as e:
logger.error("Failed to start progress tracking", error=str(e))
# Emit scan started event
await self._emit_scan_event({
"type": "scan_started",
"scan_id": scan_id,
"message": "Library scan started",
})
return scan_id
def create_callback_manager(
self,
scan_progress: Optional[ScanProgress] = None,
) -> CallbackManager:
"""Create a callback manager for scan operations.
Args:
scan_progress: Optional scan progress to use. If None,
uses current scan progress.
Returns:
CallbackManager configured with scan callbacks
"""
progress = scan_progress or self._current_scan
if not progress:
progress = ScanProgress(str(uuid.uuid4()))
self._current_scan = progress
callback_manager = CallbackManager()
# Register callbacks
callback_manager.register_progress_callback(
ScanServiceProgressCallback(self, progress)
)
callback_manager.register_error_callback(
ScanServiceErrorCallback(self, progress)
)
callback_manager.register_completion_callback(
ScanServiceCompletionCallback(self, progress)
)
return callback_manager
async def _handle_progress_update(
self,
scan_progress: ScanProgress,
) -> None:
"""Handle a scan progress update.
Args:
scan_progress: Updated scan progress with 'key' as identifier
"""
# Update progress service
try:
await self._progress_service.update_progress(
progress_id=f"scan_{scan_progress.scan_id}",
current=scan_progress.current,
total=scan_progress.total,
message=scan_progress.message,
key=scan_progress.key,
folder=scan_progress.folder,
)
except Exception as e:
logger.debug("Progress update skipped", error=str(e))
# Emit progress event with key as primary identifier
await self._emit_scan_event({
"type": "scan_progress",
"data": scan_progress.to_dict(),
})
async def _handle_scan_error(
self,
scan_progress: ScanProgress,
error_context: ErrorContext,
) -> None:
"""Handle a scan error.
Args:
scan_progress: Current scan progress
error_context: Error context with key and folder metadata
"""
# Emit error event with key as primary identifier
await self._emit_scan_event({
"type": "scan_error",
"scan_id": scan_progress.scan_id,
"key": error_context.key,
"folder": error_context.folder,
"error": str(error_context.error),
"message": error_context.message,
"recoverable": error_context.recoverable,
})
async def _handle_scan_completion(
self,
scan_progress: ScanProgress,
completion_context: CompletionContext,
) -> None:
"""Handle scan completion.
Args:
scan_progress: Final scan progress
completion_context: Completion context with statistics
"""
async with self._lock:
self._is_scanning = False
# Add to history
self._scan_history.append(scan_progress)
if len(self._scan_history) > self._max_history_size:
self._scan_history.pop(0)
# Complete progress tracking
try:
if completion_context.success:
await self._progress_service.complete_progress(
progress_id=f"scan_{scan_progress.scan_id}",
message=completion_context.message,
)
else:
await self._progress_service.fail_progress(
progress_id=f"scan_{scan_progress.scan_id}",
error_message=completion_context.message,
)
except Exception as e:
logger.debug("Progress completion skipped", error=str(e))
# Emit completion event
await self._emit_scan_event({
"type": "scan_completed" if completion_context.success else "scan_failed",
"scan_id": scan_progress.scan_id,
"success": completion_context.success,
"message": completion_context.message,
"statistics": completion_context.statistics,
"data": scan_progress.to_dict(),
})
logger.info(
"Scan completed",
scan_id=scan_progress.scan_id,
success=completion_context.success,
series_found=scan_progress.series_found,
errors_count=len(scan_progress.errors),
)
async def cancel_scan(self) -> bool:
"""Cancel the current scan if one is in progress.
Returns:
True if scan was cancelled, False if no scan in progress
"""
async with self._lock:
if not self._is_scanning:
return False
self._is_scanning = False
if self._current_scan:
self._current_scan.status = "cancelled"
self._current_scan.message = "Scan cancelled by user"
self._current_scan.updated_at = datetime.now(timezone.utc)
# Add to history
self._scan_history.append(self._current_scan)
if len(self._scan_history) > self._max_history_size:
self._scan_history.pop(0)
# Emit cancellation event
if self._current_scan:
await self._emit_scan_event({
"type": "scan_cancelled",
"scan_id": self._current_scan.scan_id,
"message": "Scan cancelled by user",
})
# Update progress service
try:
await self._progress_service.fail_progress(
progress_id=f"scan_{self._current_scan.scan_id}",
error_message="Scan cancelled by user",
)
except Exception as e:
logger.debug("Progress cancellation skipped", error=str(e))
logger.info("Scan cancelled")
return True
async def get_scan_status(self) -> Dict[str, Any]:
"""Get the current scan status.
Returns:
Dictionary with scan status information, including 'key'
as the primary series identifier for any current scan.
"""
return {
"is_scanning": self._is_scanning,
"current_scan": (
self._current_scan.to_dict() if self._current_scan else None
),
}
async def get_scan_history(
self,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""Get scan history.
Args:
limit: Maximum number of history entries to return
Returns:
List of scan history entries, newest first.
Each entry includes 'key' as the primary identifier.
"""
history = self._scan_history[-limit:]
history.reverse() # Newest first
return [scan.to_dict() for scan in history]
# Module-level singleton instance
_scan_service: Optional[ScanService] = None
def get_scan_service() -> ScanService:
"""Get the singleton ScanService instance.
Returns:
The ScanService singleton
"""
global _scan_service
if _scan_service is None:
_scan_service = ScanService()
return _scan_service
def reset_scan_service() -> None:
"""Reset the singleton ScanService instance.
Primarily used for testing to ensure clean state.
"""
global _scan_service
_scan_service = None