Add queue deduplication to prevent duplicate entries
- In-memory dedup in add_to_queue() using _pending_by_episode dict - Batch-local dedup via seen_in_batch set (handles duplicates within single call) - Database unique index on episode_id via __table_args__ - 5-minute cooldown in _auto_download_missing() to prevent rapid re-triggers - Updated _add_to_pending_queue() and _remove_from_pending_queue() to track episode keys - Added TestQueueDeduplication with 4 test cases - Updated DEVELOPMENT.md and TESTING.md with queue dedup docs Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -15,7 +15,7 @@ from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text, func
|
||||
from sqlalchemy import Boolean, DateTime, ForeignKey, Index, Integer, String, Text, func
|
||||
from sqlalchemy.orm import Mapped, mapped_column, relationship, validates
|
||||
|
||||
from src.server.database.base import Base, TimestampMixin
|
||||
@@ -347,6 +347,16 @@ class DownloadQueueItem(Base, TimestampMixin):
|
||||
index=True
|
||||
)
|
||||
|
||||
# Unique constraint to prevent duplicate pending queue items
|
||||
# An episode can only have one queue entry at a time
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"ix_download_queue_episode_pending",
|
||||
"episode_id",
|
||||
unique=True,
|
||||
),
|
||||
)
|
||||
|
||||
# Error handling
|
||||
error_message: Mapped[Optional[str]] = mapped_column(
|
||||
Text, nullable=True,
|
||||
|
||||
@@ -79,6 +79,9 @@ class DownloadService:
|
||||
self._pending_queue: deque[DownloadItem] = deque()
|
||||
# Helper dict for O(1) lookup of pending items by ID
|
||||
self._pending_items_by_id: Dict[str, DownloadItem] = {}
|
||||
# Helper dict for O(1) lookup of pending items by episode identity
|
||||
# Key: (serie_id, season, episode), Value: item ID
|
||||
self._pending_by_episode: Dict[tuple, str] = {}
|
||||
self._active_download: Optional[DownloadItem] = None
|
||||
self._completed_items: deque[DownloadItem] = deque(maxlen=100)
|
||||
self._failed_items: deque[DownloadItem] = deque(maxlen=50)
|
||||
@@ -409,7 +412,7 @@ class DownloadService:
|
||||
def _add_to_pending_queue(
|
||||
self, item: DownloadItem, front: bool = False
|
||||
) -> None:
|
||||
"""Add item to pending queue and update helper dict.
|
||||
"""Add item to pending queue and update helper dicts.
|
||||
|
||||
Args:
|
||||
item: Download item to add
|
||||
@@ -420,9 +423,12 @@ class DownloadService:
|
||||
else:
|
||||
self._pending_queue.append(item)
|
||||
self._pending_items_by_id[item.id] = item
|
||||
# Track by episode identity for deduplication
|
||||
ep_key = (item.serie_id, item.episode.season, item.episode.episode)
|
||||
self._pending_by_episode[ep_key] = item.id
|
||||
|
||||
def _remove_from_pending_queue(self, item_or_id: str) -> Optional[DownloadItem]: # noqa: E501
|
||||
"""Remove item from pending queue and update helper dict.
|
||||
"""Remove item from pending queue and update helper dicts.
|
||||
|
||||
Args:
|
||||
item_or_id: Item ID to remove
|
||||
@@ -442,6 +448,10 @@ class DownloadService:
|
||||
try:
|
||||
self._pending_queue.remove(item)
|
||||
del self._pending_items_by_id[item_id]
|
||||
# Clean up episode tracking
|
||||
ep_key = (item.serie_id, item.episode.season, item.episode.episode)
|
||||
if self._pending_by_episode.get(ep_key) == item_id:
|
||||
del self._pending_by_episode[ep_key]
|
||||
return item
|
||||
except (ValueError, KeyError):
|
||||
return None
|
||||
@@ -481,10 +491,35 @@ class DownloadService:
|
||||
# Initialize queue progress tracking if not already done
|
||||
await self._init_queue_progress()
|
||||
|
||||
# Filter out episodes already in pending queue
|
||||
episodes_to_add = []
|
||||
skipped_count = 0
|
||||
seen_in_batch: set = set() # Track duplicates within this batch
|
||||
for ep in episodes:
|
||||
ep_key = (serie_id, ep.season, ep.episode)
|
||||
if ep_key in self._pending_by_episode or ep_key in seen_in_batch:
|
||||
logger.debug(
|
||||
"Skipping duplicate episode in queue",
|
||||
serie_key=serie_id,
|
||||
season=ep.season,
|
||||
episode=ep.episode,
|
||||
)
|
||||
skipped_count += 1
|
||||
continue
|
||||
seen_in_batch.add(ep_key)
|
||||
episodes_to_add.append(ep)
|
||||
|
||||
if skipped_count > 0:
|
||||
logger.info(
|
||||
"Skipped %d duplicate episodes in queue",
|
||||
skipped_count,
|
||||
serie_key=serie_id,
|
||||
)
|
||||
|
||||
created_ids = []
|
||||
|
||||
try:
|
||||
for episode in episodes:
|
||||
for episode in episodes_to_add:
|
||||
item = DownloadItem(
|
||||
id=self._generate_item_id(),
|
||||
serie_id=serie_id,
|
||||
|
||||
@@ -44,6 +44,9 @@ class SchedulerService:
|
||||
self._config: Optional[SchedulerConfig] = None
|
||||
self._last_scan_time: Optional[datetime] = None
|
||||
self._scan_in_progress: bool = False
|
||||
# Cooldown tracking for auto-download to prevent rapid re-triggers
|
||||
self._last_auto_download_time: Optional[datetime] = None
|
||||
self._auto_download_cooldown_seconds: int = 300 # 5 minutes default
|
||||
logger.info("SchedulerService initialised")
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
@@ -256,12 +259,26 @@ class SchedulerService:
|
||||
|
||||
async def _auto_download_missing(self) -> None:
|
||||
"""Queue and start downloads for all series with missing episodes."""
|
||||
from datetime import timedelta # noqa: PLC0415
|
||||
|
||||
from src.server.models.download import EpisodeIdentifier # noqa: PLC0415
|
||||
from src.server.utils.dependencies import ( # noqa: PLC0415
|
||||
get_anime_service,
|
||||
get_download_service,
|
||||
)
|
||||
|
||||
# Check cooldown to prevent rapid re-triggers
|
||||
now = datetime.now(timezone.utc)
|
||||
if self._last_auto_download_time is not None:
|
||||
elapsed = now - self._last_auto_download_time
|
||||
if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds):
|
||||
logger.debug(
|
||||
"Auto-download skipped: cooldown active",
|
||||
elapsed_seconds=elapsed.total_seconds(),
|
||||
cooldown_seconds=self._auto_download_cooldown_seconds,
|
||||
)
|
||||
return
|
||||
|
||||
anime_service = get_anime_service()
|
||||
download_service = get_download_service()
|
||||
|
||||
@@ -303,6 +320,9 @@ class SchedulerService:
|
||||
await self._broadcast("auto_download_started", {"queued_count": queued_count})
|
||||
logger.info("Auto-download completed", queued_count=queued_count)
|
||||
|
||||
# Update cooldown timestamp after successful auto-download
|
||||
self._last_auto_download_time = datetime.now(timezone.utc)
|
||||
|
||||
async def _perform_rescan(self) -> None:
|
||||
"""Execute a library rescan and optionally trigger auto-download."""
|
||||
if self._scan_in_progress:
|
||||
|
||||
Reference in New Issue
Block a user