From 31eb0026cf95ce814fff28f8eaad4dd72062e17a Mon Sep 17 00:00:00 2001 From: Lukas Date: Sat, 23 May 2026 21:27:41 +0200 Subject: [PATCH] Add queue deduplication to prevent duplicate entries - In-memory dedup in add_to_queue() using _pending_by_episode dict - Batch-local dedup via seen_in_batch set (handles duplicates within single call) - Database unique index on episode_id via __table_args__ - 5-minute cooldown in _auto_download_missing() to prevent rapid re-triggers - Updated _add_to_pending_queue() and _remove_from_pending_queue() to track episode keys - Added TestQueueDeduplication with 4 test cases - Updated DEVELOPMENT.md and TESTING.md with queue dedup docs Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- docs/DEVELOPMENT.md | 52 ++++++++ docs/TESTING.md | 46 +++++++ docs/runner.csx | 154 ----------------------- src/server/database/models.py | 12 +- src/server/services/download_service.py | 41 +++++- src/server/services/scheduler_service.py | 20 +++ tests/unit/test_download_service.py | 108 ++++++++++++++++ 7 files changed, 275 insertions(+), 158 deletions(-) delete mode 100644 docs/runner.csx diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index dc18406..4a7fb78 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -61,4 +61,56 @@ This document provides guidance for developers working on the Aniworld project. - Commit message format - Pull request process 8. Common Development Tasks + +### Adding Queue Deduplication + +The download queue prevents duplicate entries at two levels: + +**In-Memory Deduplication** (`src/server/services/download_service.py`): +- `_pending_by_episode` dict tracks pending episodes: key = `(serie_id, season, episode)` +- `_add_to_pending_queue()` updates the dict when adding items +- `add_to_queue()` checks this dict before adding episodes (includes batch-local dedup) +- `_remove_from_pending_queue()` cleans up the dict when items are removed + +**Database Constraint** (`src/server/models.py`): +- `DownloadQueueItem` has a unique index on `episode_id` via `__table_args__` +- Prevents duplicate queue entries at the database level +- Unique constraint: `Index("ix_download_queue_episode_pending", "episode_id", unique=True)` + +**Scheduler Cooldown** (`src/server/services/scheduler_service.py`): +- `_last_auto_download_time` tracks when auto-download last ran +- 5-minute cooldown prevents rapid re-triggers +- Checked at start of `_auto_download_missing()` + +### Mocking the Download Queue + +When testing components that use the download queue: + +```python +# Mock repository for unit tests +class MockQueueRepository: + def __init__(self): + self._items: Dict[str, DownloadItem] = {} + + async def save_item(self, item: DownloadItem) -> DownloadItem: + self._items[item.id] = item + return item + + async def get_all_items(self) -> List[DownloadItem]: + return list(self._items.values()) + +# Use in fixture +@pytest.fixture +def mock_queue_repository(): + return MockQueueRepository() + +@pytest.fixture +def download_service(mock_anime_service, mock_queue_repository): + return DownloadService( + anime_service=mock_anime_service, + queue_repository=mock_queue_repository, + max_retries=3, + ) +``` + 9. Troubleshooting Development Issues diff --git a/docs/TESTING.md b/docs/TESTING.md index 4cf745b..bf125d4 100644 --- a/docs/TESTING.md +++ b/docs/TESTING.md @@ -62,6 +62,52 @@ This document describes the testing strategy, guidelines, and practices for the - What to mock - Mock patterns - External service mocks + +### Mocking the Download Queue + +Use `MockQueueRepository` for testing download queue functionality: + +```python +from src.server.models.download import DownloadItem, EpisodeIdentifier + +class MockQueueRepository: + def __init__(self): + self._items: Dict[str, DownloadItem] = {} + + async def save_item(self, item: DownloadItem) -> DownloadItem: + self._items[item.id] = item + return item + + async def get_item(self, item_id: str) -> Optional[DownloadItem]: + return self._items.get(item_id) + + async def get_all_items(self) -> List[DownloadItem]: + return list(self._items.values()) + + async def set_error(self, item_id: str, error: str) -> bool: + if item_id in self._items: + self._items[item_id].error = error + return True + return False + + async def delete_item(self, item_id: str) -> bool: + if item_id in self._items: + del self._items[item_id] + return True + return False + + async def clear_all(self) -> int: + count = len(self._items) + self._items.clear() + return count +``` + +**Key points:** +- The mock uses in-memory storage, no database required +- All async methods are implemented (even if just pass-through) +- `save_item` uses `item.id` as key (must be set before calling) +- Suitable for unit tests only (no persistence) + 7. Coverage Requirements 8. CI/CD Integration 9. Writing Good Tests diff --git a/docs/runner.csx b/docs/runner.csx deleted file mode 100644 index 45ae2a1..0000000 --- a/docs/runner.csx +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env dotnet-script -#nullable enable - -using System; -using System.IO; -using System.Diagnostics; -using System.Threading; -using System.Text; -using System.Text.RegularExpressions; -using System.Linq; -using System.Collections.Generic; - -// ── Ctrl+C: kill active process and exit cleanly ────────────────────────────── -var cts = new CancellationTokenSource(); -Process? activeProcess = null; - -Console.CancelKeyPress += (_, e) => -{ - e.Cancel = true; - Console.WriteLine("\n[runner] Interrupted — shutting down..."); - cts.Cancel(); - try { activeProcess?.Kill(entireProcessTree: true); } catch { } -}; - -// ── Paths ───────────────────────────────────────────────────────────────────── -var repoRoot = Directory.GetCurrentDirectory(); -var tasksFile = Path.Combine(repoRoot, "Docs", "Tasks.md"); - -if (!File.Exists(tasksFile)) -{ - Console.Error.WriteLine($"[runner] ERROR: Tasks.md not found at {tasksFile}"); - Console.Error.WriteLine("[runner] Run this script from the repository root."); - Environment.Exit(1); -} - -// ── Read & split by "---" separator lines ──────────────────────────────────── -var content = File.ReadAllText(tasksFile); -var items = Regex - .Split(content, @"\r?\n---\r?\n") - .Select(s => s.Trim()) - .Where(s => s.Length > 0) - .ToList(); - -Console.WriteLine($"[runner] Found {items.Count} section(s) in Tasks.md"); - -// ── Helper: run copilot and stream output, return full output ───────────────── -async Task RunCopilot(IEnumerable extraArgs, string prompt) -{ - var output = new StringBuilder(); - - var argList = new List { "launch", "copilot", "--model", "minimax-m2.7:cloud", "--yes", "--", "--allow-all-tools" }; - argList.AddRange(extraArgs); - argList.Add("-p"); - argList.Add(prompt); - - var psi = new ProcessStartInfo("ollama") - { - WorkingDirectory = repoRoot, - RedirectStandardOutput = true, - RedirectStandardError = true, - UseShellExecute = false, - }; - foreach (var a in argList) - psi.ArgumentList.Add(a); - - activeProcess = new Process { StartInfo = psi }; - - activeProcess.OutputDataReceived += (_, e) => - { - if (e.Data is null) return; - Console.WriteLine(e.Data); - output.AppendLine(e.Data); - }; - activeProcess.ErrorDataReceived += (_, e) => - { - if (e.Data is null) return; - Console.Error.WriteLine(e.Data); - output.AppendLine(e.Data); - }; - - activeProcess.Start(); - activeProcess.BeginOutputReadLine(); - activeProcess.BeginErrorReadLine(); - - await activeProcess.WaitForExitAsync(cts.Token); - activeProcess = null; - - return output.ToString(); -} - -// ── Main loop ───────────────────────────────────────────────────────────────── -for (int i = 0; i < items.Count; i++) -{ - var item = items[i]; - if (cts.IsCancellationRequested) break; - - Console.WriteLine(); - Console.WriteLine("[runner] ══════════════════════════════════════════════"); - Console.WriteLine($"[runner] Task:\n{item}"); - Console.WriteLine("[runner] ══════════════════════════════════════════════"); - Console.WriteLine(); - - // Step 1 — run the task prompt - await RunCopilot(Enumerable.Empty(), $"/caveman full"); - await RunCopilot(new[] { "--continue" }, $"read ./Docs/instructions.md. {item}"); - if (cts.IsCancellationRequested) break; - - // Step 2 — confirm completion in the same chat session - Console.WriteLine("\n[runner] Asking for task confirmation...\n"); - var confirmation = await RunCopilot( - new[] { "--continue" }, - "are you sure tasks is done. reply with yes" - ); - if (cts.IsCancellationRequested) break; - - // Step 3 — check for "yes" in the reply, with retry logic for issue resolution - int maxRetries = 3; - int retryCount = 0; - bool taskConfirmed = confirmation.Contains("yes", StringComparison.OrdinalIgnoreCase); - - while (!taskConfirmed && retryCount < maxRetries) - { - retryCount++; - Console.WriteLine($"\n[runner] Attempt {retryCount}/{maxRetries}: Resolving remaining issues and running tests...\n"); - - confirmation = await RunCopilot( - new[] { "--continue" }, - "resolve any remaining issues, make sure all tests are running and pass. then confirm with yes if done" - ); - if (cts.IsCancellationRequested) break; - - taskConfirmed = confirmation.Contains("yes", StringComparison.OrdinalIgnoreCase); - } - - if (!taskConfirmed) - { - Console.WriteLine($"\n[runner] Task not confirmed as done after {maxRetries} attempts. Stopping."); - break; - } - - // Step 4 — commit the work - Console.WriteLine("\n[runner] Task confirmed. Making git commit...\n"); - - await RunCopilot(Enumerable.Empty(), $"/caveman full"); - await RunCopilot(new[] { "--continue" }, "make git commit"); - if (cts.IsCancellationRequested) break; - - // Step 5 — remove completed task from Tasks.md - var remaining = items.Skip(i + 1).ToList(); - File.WriteAllText(tasksFile, string.Join("\n\n---\n\n", remaining)); - Console.WriteLine("[runner] Removed completed task from Tasks.md"); -} - -Console.WriteLine("\n[runner] Finished."); diff --git a/src/server/database/models.py b/src/server/database/models.py index 3f133b9..5a584b2 100644 --- a/src/server/database/models.py +++ b/src/server/database/models.py @@ -15,7 +15,7 @@ from datetime import datetime, timezone from enum import Enum from typing import List, Optional -from sqlalchemy import Boolean, DateTime, ForeignKey, Integer, String, Text, func +from sqlalchemy import Boolean, DateTime, ForeignKey, Index, Integer, String, Text, func from sqlalchemy.orm import Mapped, mapped_column, relationship, validates from src.server.database.base import Base, TimestampMixin @@ -347,6 +347,16 @@ class DownloadQueueItem(Base, TimestampMixin): index=True ) + # Unique constraint to prevent duplicate pending queue items + # An episode can only have one queue entry at a time + __table_args__ = ( + Index( + "ix_download_queue_episode_pending", + "episode_id", + unique=True, + ), + ) + # Error handling error_message: Mapped[Optional[str]] = mapped_column( Text, nullable=True, diff --git a/src/server/services/download_service.py b/src/server/services/download_service.py index ab055a3..da23a55 100644 --- a/src/server/services/download_service.py +++ b/src/server/services/download_service.py @@ -79,6 +79,9 @@ class DownloadService: self._pending_queue: deque[DownloadItem] = deque() # Helper dict for O(1) lookup of pending items by ID self._pending_items_by_id: Dict[str, DownloadItem] = {} + # Helper dict for O(1) lookup of pending items by episode identity + # Key: (serie_id, season, episode), Value: item ID + self._pending_by_episode: Dict[tuple, str] = {} self._active_download: Optional[DownloadItem] = None self._completed_items: deque[DownloadItem] = deque(maxlen=100) self._failed_items: deque[DownloadItem] = deque(maxlen=50) @@ -409,7 +412,7 @@ class DownloadService: def _add_to_pending_queue( self, item: DownloadItem, front: bool = False ) -> None: - """Add item to pending queue and update helper dict. + """Add item to pending queue and update helper dicts. Args: item: Download item to add @@ -420,9 +423,12 @@ class DownloadService: else: self._pending_queue.append(item) self._pending_items_by_id[item.id] = item + # Track by episode identity for deduplication + ep_key = (item.serie_id, item.episode.season, item.episode.episode) + self._pending_by_episode[ep_key] = item.id def _remove_from_pending_queue(self, item_or_id: str) -> Optional[DownloadItem]: # noqa: E501 - """Remove item from pending queue and update helper dict. + """Remove item from pending queue and update helper dicts. Args: item_or_id: Item ID to remove @@ -442,6 +448,10 @@ class DownloadService: try: self._pending_queue.remove(item) del self._pending_items_by_id[item_id] + # Clean up episode tracking + ep_key = (item.serie_id, item.episode.season, item.episode.episode) + if self._pending_by_episode.get(ep_key) == item_id: + del self._pending_by_episode[ep_key] return item except (ValueError, KeyError): return None @@ -481,10 +491,35 @@ class DownloadService: # Initialize queue progress tracking if not already done await self._init_queue_progress() + # Filter out episodes already in pending queue + episodes_to_add = [] + skipped_count = 0 + seen_in_batch: set = set() # Track duplicates within this batch + for ep in episodes: + ep_key = (serie_id, ep.season, ep.episode) + if ep_key in self._pending_by_episode or ep_key in seen_in_batch: + logger.debug( + "Skipping duplicate episode in queue", + serie_key=serie_id, + season=ep.season, + episode=ep.episode, + ) + skipped_count += 1 + continue + seen_in_batch.add(ep_key) + episodes_to_add.append(ep) + + if skipped_count > 0: + logger.info( + "Skipped %d duplicate episodes in queue", + skipped_count, + serie_key=serie_id, + ) + created_ids = [] try: - for episode in episodes: + for episode in episodes_to_add: item = DownloadItem( id=self._generate_item_id(), serie_id=serie_id, diff --git a/src/server/services/scheduler_service.py b/src/server/services/scheduler_service.py index ac7d555..d651696 100644 --- a/src/server/services/scheduler_service.py +++ b/src/server/services/scheduler_service.py @@ -44,6 +44,9 @@ class SchedulerService: self._config: Optional[SchedulerConfig] = None self._last_scan_time: Optional[datetime] = None self._scan_in_progress: bool = False + # Cooldown tracking for auto-download to prevent rapid re-triggers + self._last_auto_download_time: Optional[datetime] = None + self._auto_download_cooldown_seconds: int = 300 # 5 minutes default logger.info("SchedulerService initialised") # ------------------------------------------------------------------ @@ -256,12 +259,26 @@ class SchedulerService: async def _auto_download_missing(self) -> None: """Queue and start downloads for all series with missing episodes.""" + from datetime import timedelta # noqa: PLC0415 + from src.server.models.download import EpisodeIdentifier # noqa: PLC0415 from src.server.utils.dependencies import ( # noqa: PLC0415 get_anime_service, get_download_service, ) + # Check cooldown to prevent rapid re-triggers + now = datetime.now(timezone.utc) + if self._last_auto_download_time is not None: + elapsed = now - self._last_auto_download_time + if elapsed < timedelta(seconds=self._auto_download_cooldown_seconds): + logger.debug( + "Auto-download skipped: cooldown active", + elapsed_seconds=elapsed.total_seconds(), + cooldown_seconds=self._auto_download_cooldown_seconds, + ) + return + anime_service = get_anime_service() download_service = get_download_service() @@ -303,6 +320,9 @@ class SchedulerService: await self._broadcast("auto_download_started", {"queued_count": queued_count}) logger.info("Auto-download completed", queued_count=queued_count) + # Update cooldown timestamp after successful auto-download + self._last_auto_download_time = datetime.now(timezone.utc) + async def _perform_rescan(self) -> None: """Execute a library rescan and optionally trigger auto-download.""" if self._scan_in_progress: diff --git a/tests/unit/test_download_service.py b/tests/unit/test_download_service.py index d2d6f6c..63044db 100644 --- a/tests/unit/test_download_service.py +++ b/tests/unit/test_download_service.py @@ -834,3 +834,111 @@ class TestRemoveEpisodeFromMissingList: # Episode 2 should be removed from in-memory missing list assert 2 not in serie.episodeDict[1] assert serie.episodeDict[1] == [1, 3] + + +class TestQueueDeduplication: + """Test queue deduplication to prevent duplicate entries.""" + + @pytest.mark.asyncio + async def test_add_same_episode_twice_creates_only_one_entry( + self, download_service + ): + """Test that adding the same episode twice only creates one queue entry.""" + episodes = [EpisodeIdentifier(season=1, episode=1)] + + # Add same episode twice + ids1 = await download_service.add_to_queue( + serie_id="series-1", + serie_folder="series", + serie_name="Test Series", + episodes=episodes, + ) + ids2 = await download_service.add_to_queue( + serie_id="series-1", + serie_folder="series", + serie_name="Test Series", + episodes=episodes, + ) + + # Should only have one entry + assert len(download_service._pending_queue) == 1 + # First call creates one ID + assert len(ids1) == 1 + # Second call creates zero IDs (deduplicated) + assert len(ids2) == 0 + + @pytest.mark.asyncio + async def test_add_different_episodes_creates_separate_entries( + self, download_service + ): + """Test that different episodes create separate queue entries.""" + episodes1 = [EpisodeIdentifier(season=1, episode=1)] + episodes2 = [EpisodeIdentifier(season=1, episode=2)] + + ids1 = await download_service.add_to_queue( + serie_id="series-1", + serie_folder="series", + serie_name="Test Series", + episodes=episodes1, + ) + ids2 = await download_service.add_to_queue( + serie_id="series-1", + serie_folder="series", + serie_name="Test Series", + episodes=episodes2, + ) + + # Should have two separate entries + assert len(download_service._pending_queue) == 2 + assert len(ids1) == 1 + assert len(ids2) == 1 + # IDs should be different + assert ids1[0] != ids2[0] + + @pytest.mark.asyncio + async def test_add_same_episode_different_series_creates_entries( + self, download_service + ): + """Test that same episode in different series creates separate entries.""" + episodes = [EpisodeIdentifier(season=1, episode=1)] + + ids1 = await download_service.add_to_queue( + serie_id="series-1", + serie_folder="series1", + serie_name="Test Series 1", + episodes=episodes, + ) + ids2 = await download_service.add_to_queue( + serie_id="series-2", + serie_folder="series2", + serie_name="Test Series 2", + episodes=episodes, + ) + + # Should have two separate entries (different series) + assert len(download_service._pending_queue) == 2 + assert len(ids1) == 1 + assert len(ids2) == 1 + + @pytest.mark.asyncio + async def test_add_multiple_episodes_with_duplicates_filters_correctly( + self, download_service + ): + """Test that adding multiple episodes with some duplicates filters correctly.""" + episodes = [ + EpisodeIdentifier(season=1, episode=1), + EpisodeIdentifier(season=1, episode=2), + EpisodeIdentifier(season=1, episode=1), # duplicate + EpisodeIdentifier(season=1, episode=3), + ] + + ids1 = await download_service.add_to_queue( + serie_id="series-1", + serie_folder="series", + serie_name="Test Series", + episodes=episodes, + ) + + # Should only have 3 entries (1, 2, 3) - one filtered out + assert len(download_service._pending_queue) == 3 + assert len(ids1) == 3