diff --git a/docs/architecture/async_loading_architecture.md b/docs/architecture/async_loading_architecture.md index 54c3dea..9b727fe 100644 --- a/docs/architecture/async_loading_architecture.md +++ b/docs/architecture/async_loading_architecture.md @@ -1,8 +1,41 @@ # Asynchronous Series Data Loading Architecture -**Version:** 1.0 -**Date:** 2026-01-18 -**Status:** Planning Phase +**Version:** 2.0 +**Date:** 2026-01-24 +**Status:** ✅ Implemented with Concurrent Processing + +## ⚡ Update 2.0 - Concurrent Processing (2026-01-24) + +**Enhancement**: Added support for concurrent processing of multiple anime additions. + +### Changes Made + +1. **Multiple Worker Architecture**: + - Changed from single worker to configurable multiple workers (default: 5) + - Multiple anime can now be processed simultaneously + - Non-blocking queue processing allows immediate response to additional requests + +2. **Backward Compatibility**: + - All existing APIs remain unchanged + - Drop-in replacement for single-worker implementation + - Tests updated to reflect concurrent behavior + +3. **Configuration**: + - `max_concurrent_loads` parameter added to control worker count + - Default set to 5 concurrent loads for optimal balance + +4. **Performance Impact**: + - Multiple anime additions now process in parallel + - No blocking when adding second anime while first is loading + - Each worker processes tasks independently from queue + +### Migration Notes + +- **Attribute Change**: `worker_task` → `worker_tasks` (now a list) +- **Stop Behavior**: Gracefully stops all workers with proper cleanup +- **Tests**: Updated to verify concurrent processing + +--- ## Table of Contents diff --git a/docs/instructions.md b/docs/instructions.md index 2982e4e..cf4a6a4 100644 --- a/docs/instructions.md +++ b/docs/instructions.md @@ -119,817 +119,28 @@ For each task completed: ## TODO List: -✅ **COMPLETED (1):** Fixed greenlet_spawn async lazy-loading error - Added selectinload for episode relationship in DownloadQueueService.get_all() +### Completed ✅ -✅ **COMPLETED (2):** Fixed anime add endpoint 500 error - Added explicit commit/rollback in database session dependencies +1. **Fast adding - Concurrent Anime Processing (Completed 2026-01-24)** + - **What was done:** + - Modified `BackgroundLoaderService` to support concurrent processing with multiple workers + - Added configurable `max_concurrent_loads` parameter (default: 5) + - Multiple anime can now be added and processed in parallel without blocking + - Created comprehensive unit tests verifying concurrent behavior + - Updated integration tests for compatibility + - Updated architecture documentation + - **Result:** Multiple anime additions now process simultaneously. Users can add second anime immediately while first is still loading. + - **Tests:** `tests/unit/test_parallel_anime_add.py`, integration tests updated + - **Files Changed:** + - `src/server/services/background_loader_service.py` + - `tests/unit/test_parallel_anime_add.py` (new) + - `tests/integration/test_async_series_loading.py` (updated) + - `docs/architecture/async_loading_architecture.md` (updated) -✅ **COMPLETED (3):** Added database transactions - All database operations properly use session context managers with automatic commit/rollback +### In Progress 🔄 -✅ **COMPLETED (4):** Created series filter to filter all series with no episodes found in folder - Added `filter=no_episodes` parameter to list_anime endpoint and get_series_with_no_episodes() database method +(No active tasks) ---- +### Planned 📋 -### ~~1. fix issue (COMPLETED):~~ - -2026-01-23 18:28:39 [info ] DownloadService initialized max*retries=3 -INFO: QueueRepository initialized -ERROR: Failed to get all items: greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/20/xd2s) -2026-01-23 18:28:39 [error ] Failed to load queue from database: Failed to get all items: greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/20/xd2s) -╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮ -│ /home/lukas/Volume/repo/Aniworld/src/server/services/queue_repository.py:292 in get_all_items │ -│ │ -│ 289 │ │ │ db_items = await DownloadQueueService.get_all( │ -│ 290 │ │ │ │ session, with_series=True │ -│ 291 │ │ │ ) │ -│ ❱ 292 │ │ │ return [self._from_db_model(item) for item in db_items] │ -│ 293 │ │ │ -│ 294 │ │ except Exception as e: │ -│ 295 │ │ │ logger.error("Failed to get all items: %s", e) │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ db = None │ │ -│ │ db_items = [ │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ ... +21 │ │ -│ │ ] │ │ -│ │ manage_session = True │ │ -│ │ self = │ │ -│ │ session = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/Volume/repo/Aniworld/src/server/services/queue_repository.py:97 in \_from_db_model │ -│ │ -│ 94 │ │ │ Pydantic download item with default status/priority │ -│ 95 │ │ """ │ -│ 96 │ │ # Get episode info from the related Episode object │ -│ ❱ 97 │ │ episode = db_item.episode │ -│ 98 │ │ series = db_item.series │ -│ 99 │ │ │ -│ 100 │ │ episode_identifier = EpisodeIdentifier( │ -│ │ -│ ╭───────────────────────────────────────── locals ──────────────────────────────────────────╮ │ -│ │ db_item = │ │ -│ │ item_id = None │ │ -│ │ self = │ │ -│ ╰───────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/attributes.py:5 │ -│ 69 in **get** │ -│ │ -│ 566 │ │ │ │ state = instance_state(instance) │ -│ 567 │ │ │ except AttributeError as err: │ -│ 568 │ │ │ │ raise orm_exc.UnmappedInstanceError(instance) from err │ -│ ❱ 569 │ │ │ return self.impl.get(state, dict*) # type: ignore[no-any-return] │ -│ 570 │ -│ 571 │ -│ 572 @dataclasses.dataclass(frozen=True) │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ dict* = { │ │ -│ │ │ '\_sa_instance_state': , │ │ -│ │ │ 'id': 8, │ │ -│ │ │ 'error_message': None, │ │ -│ │ │ 'file_destination': None, │ │ -│ │ │ 'completed_at': None, │ │ -│ │ │ 'updated_at': datetime.datetime(2026, 1, 23, 17, 10, 47), │ │ -│ │ │ 'series_id': 146, │ │ -│ │ │ 'episode_id': 26, │ │ -│ │ │ 'download_url': None, │ │ -│ │ │ 'started_at': None, │ │ -│ │ │ ... +2 │ │ -│ │ } │ │ -│ │ instance = │ │ -│ │ self = │ │ -│ │ state = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/attributes.py:1 │ -│ 096 in get │ -│ │ -│ 1093 │ │ │ │ if not passive & CALLABLES_OK: │ -│ 1094 │ │ │ │ │ return PASSIVE_NO_RESULT │ -│ 1095 │ │ │ │ │ -│ ❱ 1096 │ │ │ │ value = self.\_fire_loader_callables(state, key, passive) │ -│ 1097 │ │ │ │ │ -│ 1098 │ │ │ │ if value is PASSIVE_NO_RESULT or value is NO_VALUE: │ -│ 1099 │ │ │ │ │ return value │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ dict* = { │ │ -│ │ │ '_sa_instance_state': , │ │ -│ │ │ 'id': 8, │ │ -│ │ │ 'error_message': None, │ │ -│ │ │ 'file_destination': None, │ │ -│ │ │ 'completed_at': None, │ │ -│ │ │ 'updated_at': datetime.datetime(2026, 1, 23, 17, 10, 47), │ │ -│ │ │ 'series_id': 146, │ │ -│ │ │ 'episode_id': 26, │ │ -│ │ │ 'download_url': None, │ │ -│ │ │ 'started_at': None, │ │ -│ │ │ ... +2 │ │ -│ │ } │ │ -│ │ key = 'episode' │ │ -│ │ passive = symbol('PASSIVE_OFF') │ │ -│ │ self = │ │ -│ │ state = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/attributes.py:1 │ -│ 131 in \_fire_loader_callables │ -│ │ -│ 1128 │ │ │ callable_ = state.callables[key] │ -│ 1129 │ │ │ return callable*(state, passive) │ -│ 1130 │ │ elif self.callable*: │ -│ ❱ 1131 │ │ │ return self.callable*(state, passive) │ -│ 1132 │ │ else: │ -│ 1133 │ │ │ return ATTR_EMPTY │ -│ 1134 │ -│ │ -│ ╭───────────────────────────────────────── locals ─────────────────────────────────────────╮ │ -│ │ key = 'episode' │ │ -│ │ passive = symbol('PASSIVE_OFF') │ │ -│ │ self = │ │ -│ │ state = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/strategies.py:9 │ -│ 78 in \_load_for_state │ -│ │ -│ 975 │ │ │ ): │ -│ 976 │ │ │ │ return LoaderCallableStatus.PASSIVE_NO_RESULT │ -│ 977 │ │ │ -│ ❱ 978 │ │ return self.\_emit_lazyload( │ -│ 979 │ │ │ session, │ -│ 980 │ │ │ state, │ -│ 981 │ │ │ primary_key_identity, │ -│ │ -│ ╭────────────────────────────────────────── locals ──────────────────────────────────────────╮ │ -│ │ alternate_effective_path = None │ │ -│ │ execution_options = immutabledict({}) │ │ -│ │ extra_criteria = () │ │ -│ │ extra_options = () │ │ -│ │ instance = None │ │ -│ │ loadopt = None │ │ -│ │ passive = symbol('PASSIVE_OFF') │ │ -│ │ pending = False │ │ -│ │ primary_key_identity = [26] │ │ -│ │ self = │ │ -│ │ session = │ │ -│ │ state = │ │ -│ │ use_get = True │ │ -│ ╰────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/strategies.py:1 │ -│ 079 in \_emit_lazyload │ -│ │ -│ 1076 │ │ │ if self.\_raise_on_sql and not passive & PassiveFlag.NO_RAISE: │ -│ 1077 │ │ │ │ self.\_invoke_raise_load(state, passive, "raise_on_sql") │ -│ 1078 │ │ │ │ -│ ❱ 1079 │ │ │ return loading.load_on_pk_identity( │ -│ 1080 │ │ │ │ session, │ -│ 1081 │ │ │ │ stmt, │ -│ 1082 │ │ │ │ primary_key_identity, │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ alternate_effective_path = None │ │ -│ │ clauseelement = Table('episodes', MetaData(), Column('id', Integer(), │ │ -│ │ table=, primary_key=True, nullable=False), │ │ -│ │ Column('series_id', Integer(), ForeignKey('anime_series.id'), │ │ -│ │ table=, nullable=False), Column('season', Integer(), │ │ -│ │ table=, nullable=False), Column('episode_number', │ │ -│ │ Integer(), table=, nullable=False), Column('title', │ │ -│ │ String(length=500), table=), Column('file_path', │ │ -│ │ String(length=1000), table=), Column('is_downloaded', │ │ -│ │ Boolean(), table=, nullable=False, │ │ -│ │ default=ScalarElementColumnDefault(False)), Column('created_at', │ │ -│ │ DateTime(timezone=True), table=, nullable=False, │ │ -│ │ server_default=DefaultClause(, for_update=False)), Column('updated_at', │ │ -│ │ DateTime(timezone=True), table=, nullable=False, │ │ -│ │ onupdate=ColumnElementColumnDefault(), │ │ -│ │ server_default=DefaultClause(, for_update=False)), schema=None) │ │ -│ │ effective_path = PropRegistry((, │ │ -│ │ <\_RelationshipDeclared at 0x7ed681866df0; episode>)) │ │ -│ │ execution_options = immutabledict({}) │ │ -│ │ extra_criteria = () │ │ -│ │ extra_options = () │ │ -│ │ load_options = default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=) │ │ -│ │ loadopt = None │ │ -│ │ opts = ( │ │ -│ │ │ , │ │ -│ │ ) │ │ -│ │ passive = symbol('PASSIVE_OFF') │ │ -│ │ pending = False │ │ -│ │ primary_key_identity = [26] │ │ -│ │ self = │ │ -│ │ session = │ │ -│ │ state = │ │ -│ │ stmt = │ │ -│ │ strategy_options = │ │ -│ │ \_get_params = { │ │ -│ │ │ Column('id', Integer(), table=, primary_key=True, │ │ -│ │ nullable=False): BindParameter('pk_1', None, type*=Integer()) │ │ -│ │ } │ │ -│ │ bind*arguments = immutabledict({}) │ │ -│ │ compile_options = default_compile_options(\_current_path=PropRegistry((, <\_RelationshipDeclared at │ │ -│ │ 0x7ed681866df0; episode>))) │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=)}) │ │ -│ │ identity_token = None │ │ -│ │ is_user_refresh = False │ │ -│ │ load_options = default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=) │ │ -│ │ mapper = │ │ -│ │ new_compile_options = default_compile_options(\_current_path=PropRegistry((, <\_RelationshipDeclared at │ │ -│ │ 0x7ed681866df0; episode>))) │ │ -│ │ no_autoflush = False │ │ -│ │ only_load_props = None │ │ -│ │ params = {'pk_1': 26} │ │ -│ │ primary_key_identity = [26] │ │ -│ │ q = │ │ -│ │ query = │ │ -│ │ refresh_state = None │ │ -│ │ require_pk_cols = False │ │ -│ │ session = │ │ -│ │ statement = │ │ -│ │ version_check = False │ │ -│ │ with_for_update = None │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/session.py:2351 │ -│ in execute │ -│ │ -│ 2348 │ │ │ -│ 2349 │ │ │ -│ 2350 │ │ """ │ -│ ❱ 2351 │ │ return self.\_execute_internal( │ -│ 2352 │ │ │ statement, │ -│ 2353 │ │ │ params, │ -│ 2354 │ │ │ execution_options=execution_options, │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ \_add_event = None │ │ -│ │ \_parent_execute_state = None │ │ -│ │ bind_arguments = immutabledict({}) │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=)}) │ │ -│ │ params = {'pk_1': 26} │ │ -│ │ self = │ │ -│ │ statement = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/session.py:2249 │ -│ in \_execute_internal │ -│ │ -│ 2246 │ │ │ ) │ -│ 2247 │ │ │ -│ 2248 │ │ if compile_state_cls: │ -│ ❱ 2249 │ │ │ result: Result[Any] = compile_state_cls.orm_execute_statement( │ -│ 2250 │ │ │ │ self, │ -│ 2251 │ │ │ │ statement, │ -│ 2252 │ │ │ │ params or {}, │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ \_add_event = None │ │ -│ │ \_parent_execute_state = None │ │ -│ │ \_scalar_result = False │ │ -│ │ bind = Engine(sqlite+aiosqlite:///./data/aniworld.db) │ │ -│ │ bind_arguments = { │ │ -│ │ │ 'clause': , │ │ -│ │ │ 'mapper': │ │ -│ │ } │ │ -│ │ conn = │ │ -│ │ events_todo = │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=), '\_result_disable_adapt_to_context': True}) │ │ -│ │ params = {'pk_1': 26} │ │ -│ │ self = │ │ -│ │ statement = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/orm/context.py:306 │ -│ in orm_execute_statement │ -│ │ -│ 303 │ │ bind_arguments, │ -│ 304 │ │ conn, │ -│ 305 │ ) -> Result: │ -│ ❱ 306 │ │ result = conn.execute( │ -│ 307 │ │ │ statement, params or {}, execution_options=execution_options │ -│ 308 │ │ ) │ -│ 309 │ │ return cls.orm_setup_cursor_result( │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ bind_arguments = { │ │ -│ │ │ 'clause': , │ │ -│ │ │ 'mapper': │ │ -│ │ } │ │ -│ │ conn = │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=), '\_result_disable_adapt_to_context': True}) │ │ -│ │ params = {'pk_1': 26} │ │ -│ │ session = │ │ -│ │ statement = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/engine/base.py:1419 │ -│ in execute │ -│ │ -│ 1416 │ │ except AttributeError as err: │ -│ 1417 │ │ │ raise exc.ObjectNotExecutableError(statement) from err │ -│ 1418 │ │ else: │ -│ ❱ 1419 │ │ │ return meth( │ -│ 1420 │ │ │ │ self, │ -│ 1421 │ │ │ │ distilled_parameters, │ -│ 1422 │ │ │ │ execution_options or NO_OPTIONS, │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ distilled_parameters = [{'pk_1': 26}] │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=), '\_result_disable_adapt_to_context': True}) │ │ -│ │ meth = > │ │ -│ │ parameters = {'pk_1': 26} │ │ -│ │ self = │ │ -│ │ statement = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/sql/elements.py:526 │ -│ in \_execute_on_connection │ -│ │ -│ 523 │ │ if self.supports_execution: │ -│ 524 │ │ │ if TYPE_CHECKING: │ -│ 525 │ │ │ │ assert isinstance(self, Executable) │ -│ ❱ 526 │ │ │ return connection.\_execute_clauseelement( │ -│ 527 │ │ │ │ self, distilled_params, execution_options │ -│ 528 │ │ │ ) │ -│ 529 │ │ else: │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ connection = │ │ -│ │ distilled_params = [{'pk_1': 26}] │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=), '\_result_disable_adapt_to_context': True}) │ │ -│ │ self = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/engine/base.py:1641 │ -│ in \_execute_clauseelement │ -│ │ -│ 1638 │ │ │ schema_translate_map=schema_translate_map, │ -│ 1639 │ │ │ linting=self.dialect.compiler_linting | compiler.WARN_LINTING, │ -│ 1640 │ │ ) │ -│ ❱ 1641 │ │ ret = self.\_execute_context( │ -│ 1642 │ │ │ dialect, │ -│ 1643 │ │ │ dialect.execution_ctx_cls.\_init_compiled, │ -│ 1644 │ │ │ compiled_sql, │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ cache_hit = │ │ -│ │ compiled_cache = │ │ -│ │ compiled_sql = │ │ -│ │ dialect = │ │ -│ │ distilled_parameters = [{'pk_1': 26}] │ │ -│ │ elem = │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=), '\_result_disable_adapt_to_context': True}) │ │ -│ │ extracted_params = [AnnotatedBindParameter('pk_1', None, type*=Integer())] │ │ -│ │ for*executemany = False │ │ -│ │ has_events = False │ │ -│ │ keys = ['pk_1'] │ │ -│ │ schema_translate_map = None │ │ -│ │ self = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/engine/base.py:1846 │ -│ in \_execute_context │ -│ │ -│ 1843 │ │ if context.execute_style is ExecuteStyle.INSERTMANYVALUES: │ -│ 1844 │ │ │ return self.\_exec_insertmany_context(dialect, context) │ -│ 1845 │ │ else: │ -│ ❱ 1846 │ │ │ return self.\_exec_single_context( │ -│ 1847 │ │ │ │ dialect, context, statement, parameters │ -│ 1848 │ │ │ ) │ -│ 1849 │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ args = ( │ │ -│ │ │ , │ │ -│ │ │ [{'pk_1': 26}], │ │ -│ │ │ , │ │ -│ │ │ [AnnotatedBindParameter('pk_1', None, type*=Integer())] │ │ -│ │ ) │ │ -│ │ conn = │ │ -│ │ constructor = │ │ -│ │ dialect = │ │ -│ │ execution_options = immutabledict({'\_sa_orm_load_options': │ │ -│ │ default_load_options(\_invoke_all_eagers=False, │ │ -│ │ \_lazy_loaded_from=), '\_result_disable_adapt_to_context': True}) │ │ -│ │ kw = {'cache_hit': } │ │ -│ │ parameters = [{'pk_1': 26}] │ │ -│ │ self = │ │ -│ │ statement = │ │ -│ │ yp = None │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/engine/base.py:1986 │ -│ in \_exec_single_context │ -│ │ -│ 1983 │ │ │ result = context.\_setup_result_proxy() │ -│ 1984 │ │ │ -│ 1985 │ │ except BaseException as e: │ -│ ❱ 1986 │ │ │ self.\_handle_dbapi_exception( │ -│ 1987 │ │ │ │ e, str_statement, effective_parameters, cursor, context │ -│ 1988 │ │ │ ) │ -│ 1989 │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ context = │ │ -│ │ cursor = │ │ -│ │ dialect = │ │ -│ │ effective_parameters = (26,) │ │ -│ │ evt_handled = False │ │ -│ │ parameters = [(26,)] │ │ -│ │ self = │ │ -│ │ statement = │ │ -│ │ str_statement = 'SELECT episodes.id AS episodes_id, episodes.series_id AS │ │ -│ │ episodes_series_id, epi'+335 │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/engine/base.py:2358 │ -│ in \_handle_dbapi_exception │ -│ │ -│ 2355 │ │ │ │ raise sqlalchemy_exception.with_traceback(exc_info[2]) from e │ -│ 2356 │ │ │ else: │ -│ 2357 │ │ │ │ assert exc_info[1] is not None │ -│ ❱ 2358 │ │ │ │ raise exc_info[1].with_traceback(exc_info[2]) │ -│ 2359 │ │ finally: │ -│ 2360 │ │ │ del self.\_reentrant_error │ -│ 2361 │ │ │ if self.\_is_disconnect: │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ context = │ │ -│ │ cursor = │ │ -│ │ e = MissingGreenlet("greenlet*spawn has not been called; can't │ │ -│ │ call await_only() here. Was IO attempted in an unexpected │ │ -│ │ place?") │ │ -│ │ exc_info = ( │ │ -│ │ │ , │ │ -│ │ │ MissingGreenlet("greenlet_spawn has not been called; │ │ -│ │ can't call await_only() here. Was IO attempted in an │ │ -│ │ unexpected place?"), │ │ -│ │ │ │ │ -│ │ ) │ │ -│ │ invalidate_pool_on_disconnect = True │ │ -│ │ is_exit_exception = False │ │ -│ │ is_sub_exec = False │ │ -│ │ ismulti = False │ │ -│ │ newraise = None │ │ -│ │ parameters = (26,) │ │ -│ │ self = │ │ -│ │ should_wrap = False │ │ -│ │ sqlalchemy_exception = None │ │ -│ │ statement = 'SELECT episodes.id AS episodes_id, episodes.series_id AS │ │ -│ │ episodes_series_id, epi'+335 │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/engine/base.py:1967 │ -│ in \_exec_single_context │ -│ │ -│ 1964 │ │ │ │ │ │ │ evt_handled = True │ -│ 1965 │ │ │ │ │ │ │ break │ -│ 1966 │ │ │ │ if not evt_handled: │ -│ ❱ 1967 │ │ │ │ │ self.dialect.do_execute( │ -│ 1968 │ │ │ │ │ │ cursor, str_statement, effective_parameters, context │ -│ 1969 │ │ │ │ │ ) │ -│ 1970 │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ context = │ │ -│ │ cursor = │ │ -│ │ dialect = │ │ -│ │ effective_parameters = (26,) │ │ -│ │ evt_handled = False │ │ -│ │ parameters = [(26,)] │ │ -│ │ self = │ │ -│ │ statement = │ │ -│ │ str_statement = 'SELECT episodes.id AS episodes_id, episodes.series_id AS │ │ -│ │ episodes_series_id, epi'+335 │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/engine/default.py:9 │ -│ 51 in do_execute │ -│ │ -│ 948 │ │ cursor.executemany(statement, parameters) │ -│ 949 │ │ -│ 950 │ def do_execute(self, cursor, statement, parameters, context=None): │ -│ ❱ 951 │ │ cursor.execute(statement, parameters) │ -│ 952 │ │ -│ 953 │ def do_execute_no_params(self, cursor, statement, context=None): │ -│ 954 │ │ cursor.execute(statement) │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ context = │ │ -│ │ cursor = │ │ -│ │ parameters = (26,) │ │ -│ │ self = │ │ -│ │ statement = 'SELECT episodes.id AS episodes_id, episodes.series_id AS episodes_series_id, │ │ -│ │ epi'+335 │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/dialects/sqlite/aio │ -│ sqlite.py:180 in execute │ -│ │ -│ 177 │ │ │ else: │ -│ 178 │ │ │ │ self.\_cursor = \_cursor # type: ignore[misc] │ -│ 179 │ │ except Exception as error: │ -│ ❱ 180 │ │ │ self.\_adapt_connection.\_handle_exception(error) │ -│ 181 │ │ -│ 182 │ def executemany( │ -│ 183 │ │ self, │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ operation = 'SELECT episodes.id AS episodes_id, episodes.series_id AS episodes_series_id, │ │ -│ │ epi'+335 │ │ -│ │ parameters = (26,) │ │ -│ │ self = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/dialects/sqlite/aio │ -│ sqlite.py:340 in \_handle_exception │ -│ │ -│ 337 │ │ │ │ "no active connection" │ -│ 338 │ │ │ ) from error │ -│ 339 │ │ else: │ -│ ❱ 340 │ │ │ raise error │ -│ 341 │ -│ 342 │ -│ 343 class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ error = MissingGreenlet("greenlet_spawn has not been called; can't call await_only() here. │ │ -│ │ Was IO attempted in an unexpected place?") │ │ -│ │ self = > │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/dialects/sqlite/aio │ -│ sqlite.py:157 in execute │ -│ │ -│ 154 │ ) -> Any: │ -│ 155 │ │ │ -│ 156 │ │ try: │ -│ ❱ 157 │ │ │ \_cursor: AsyncIODBAPICursor = self.await*(self._connection.cursor()) # type │ -│ 158 │ │ │ │ -│ 159 │ │ │ if parameters is None: │ -│ 160 │ │ │ │ self.await_(\_cursor.execute(operation)) │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ operation = 'SELECT episodes.id AS episodes_id, episodes.series_id AS episodes_series_id, │ │ -│ │ epi'+335 │ │ -│ │ parameters = (26,) │ │ -│ │ self = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/sqlalchemy/util/\_concurrency_p │ -│ y3k.py:123 in await_only │ -│ │ -│ 120 │ if not getattr(current, "**sqlalchemy_greenlet_provider**", False): │ -│ 121 │ │ \_safe_cancel_awaitable(awaitable) │ -│ 122 │ │ │ -│ ❱ 123 │ │ raise exc.MissingGreenlet( │ -│ 124 │ │ │ "greenlet_spawn has not been called; can't call await_only() " │ -│ 125 │ │ │ "here. Was IO attempted in an unexpected place?" │ -│ 126 │ │ ) │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ awaitable = │ │ -│ │ current = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ -MissingGreenlet: greenlet_spawn has not been called; can't call await_only() here. Was IO attempted -in an unexpected place? (Background on this error at: https://sqlalche.me/e/20/xd2s) - -The above exception was the direct cause of the following exception: - -╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮ -│ /home/lukas/Volume/repo/Aniworld/src/server/services/download_service.py:135 in initialize │ -│ │ -│ 132 │ │ │ │ -│ 133 │ │ │ # Load all items from database - they all start as PENDING │ -│ 134 │ │ │ # since status is now managed in-memory only │ -│ ❱ 135 │ │ │ all_items = await repository.get_all_items() │ -│ 136 │ │ │ for item in all_items: │ -│ 137 │ │ │ │ # All items from database are treated as pending │ -│ 138 │ │ │ │ item.status = DownloadStatus.PENDING │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ e = QueueRepositoryError("Failed to get all items: greenlet_spawn has not been │ │ -│ │ called; can't call await_only() here. Was IO attempted in an unexpected place? │ │ -│ │ (Background on this error at: https://sqlalche.me/e/20/xd2s)") │ │ -│ │ repository = │ │ -│ │ self = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /home/lukas/Volume/repo/Aniworld/src/server/services/queue_repository.py:296 in get_all_items │ -│ │ -│ 293 │ │ │ -│ 294 │ │ except Exception as e: │ -│ 295 │ │ │ logger.error("Failed to get all items: %s", e) │ -│ ❱ 296 │ │ │ raise QueueRepositoryError(f"Failed to get all items: {e}") from e │ -│ 297 │ │ finally: │ -│ 298 │ │ │ if manage_session: │ -│ 299 │ │ │ │ await session.close() │ -│ │ -│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │ -│ │ db = None │ │ -│ │ db_items = [ │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ , │ │ -│ │ │ ... +21 │ │ -│ │ ] │ │ -│ │ manage_session = True │ │ -│ │ self = │ │ -│ │ session = │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │ -╰──────────────────────────────────────────────────────────────────────────────────────────────────╯ -QueueRepositoryError: Failed to get all items: greenlet_spawn has not been called; can't call -await_only() here. Was IO attempted in an unexpected place? (Background on this error at: -https://sqlalche.me/e/20/xd2s) - -2. fix add issue - -INFO: 127.0.0.1:54956 - "POST /api/anime/add HTTP/1.1" 500 -ERROR: Exception in ASGI application -Traceback (most recent call last): -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/anyio/streams/memory.py", line 98, in receive -return self.receive_nowait() - -```^^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/anyio/streams/memory.py", line 93, in receive_nowait -raise WouldBlock -anyio.WouldBlock - -During handling of the above exception, another exception occurred: - -Traceback (most recent call last): -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/base.py", line 78, in call_next -message = await recv_stream.receive() -^^^^^^^^^^^^^^^^^^^^^^^^^^^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/anyio/streams/memory.py", line 118, in receive -raise EndOfStream -anyio.EndOfStream - -During handling of the above exception, another exception occurred: - -Traceback (most recent call last): -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/uvicorn/protocols/http/httptools_impl.py", line 426, in run_asgi -result = await app( # type: ignore[func-returns-value] -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -self.scope, self.receive, self.send -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -) -^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in **call** -return await self.app(scope, receive, send) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/fastapi/applications.py", line 1106, in **call** -await super().**call**(scope, receive, send) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/applications.py", line 122, in **call** -await self.middleware_stack(scope, receive, send) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/errors.py", line 184, in **call** -raise exc -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/errors.py", line 162, in **call** -await self.app(scope, receive, \_send) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/base.py", line 108, in **call** -response = await self.dispatch_func(request, call_next) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -File "/home/lukas/Volume/repo/Aniworld/src/server/middleware/auth.py", line 209, in dispatch -return await call_next(request) -^^^^^^^^^^^^^^^^^^^^^^^^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/base.py", line 84, in call_next -raise app_exc -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/base.py", line 70, in coro -await self.app(scope, receive_or_disconnect, send_no_error) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/base.py", line 108, in **call** -response = await self.dispatch_func(request, call_next) -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -File "/home/lukas/Volume/repo/Aniworld/src/server/middleware/setup_redirect.py", line 147, in dispatch -return await call_next(request) -^^^^^^^^^^^^^^^^^^^^^^^^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/base.py", line 84, in call_next -raise app_exc -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/base.py", line 70, in coro -await self.app(scope, receive_or_disconnect, send_no_error) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/cors.py", line 91, in **call** -await self.simple_response(scope, receive, send, request_headers=headers) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/cors.py", line 146, in simple_response -await self.app(scope, receive, send) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/exceptions.py", line 79, in **call** -raise exc -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/starlette/middleware/exceptions.py", line 68, in **call** -await self.app(scope, receive, sender) -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/site-packages/fastapi/middleware/asyncexitstack.py", line 14, in **call** -async with AsyncExitStack() as stack: -~~~~~~~~~~~~~~^^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/contextlib.py", line 768, in **aexit** -raise exc -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/contextlib.py", line 751, in **aexit** -cb_suppress = await cb(\*exc_details) -^^^^^^^^^^^^^^^^^^^^^^ -File "/home/lukas/miniconda3/envs/AniWorld/lib/python3.13/contextlib.py", line 271, in **aexit** -raise RuntimeError("generator didn't stop after athrow()") -RuntimeError: generator didn't stop after athrow() - -3. transactions - go throw code and add transactions. so that application stops the db is not curropted - -4. filter - -make a series filter to filter all series with no episodes found in folder -``` +(No planned tasks) diff --git a/src/server/services/background_loader_service.py b/src/server/services/background_loader_service.py index 70f617b..8197ff0 100644 --- a/src/server/services/background_loader_service.py +++ b/src/server/services/background_loader_service.py @@ -79,13 +79,18 @@ class BackgroundLoaderService: than reimplementing logic. It provides task queuing, status tracking, and WebSocket notifications. + Supports concurrent processing of multiple series simultaneously for + improved performance when adding multiple anime. + Attributes: websocket_service: Service for broadcasting status updates anime_service: Service for episode scanning (reused) series_app: Core SeriesApp instance for NFO service access task_queue: Queue of pending loading tasks active_tasks: Dict of currently processing tasks - worker_task: Background worker task + processing_tasks: Dict of asyncio tasks being processed + worker_tasks: List of background worker tasks + max_concurrent_loads: Maximum number of series to load concurrently """ def __init__( @@ -93,6 +98,7 @@ class BackgroundLoaderService: websocket_service: WebSocketService, anime_service: Any, # AnimeService - avoiding circular import series_app: Any, # SeriesApp - avoiding circular import + max_concurrent_loads: int = 5, ): """Initialize the background loader service. @@ -100,46 +106,70 @@ class BackgroundLoaderService: websocket_service: WebSocket service for status broadcasts anime_service: AnimeService instance for episode operations series_app: SeriesApp instance for NFO operations + max_concurrent_loads: Maximum number of series to load concurrently (default: 5) """ self.websocket_service = websocket_service self.anime_service = anime_service self.series_app = series_app + self.max_concurrent_loads = max_concurrent_loads # Task management self.task_queue: asyncio.Queue[SeriesLoadingTask] = asyncio.Queue() self.active_tasks: Dict[str, SeriesLoadingTask] = {} - self.worker_task: Optional[asyncio.Task] = None + self.processing_tasks: Dict[str, asyncio.Task] = {} + self.worker_tasks: List[asyncio.Task] = [] self._shutdown = False - logger.info("BackgroundLoaderService initialized") + logger.info( + "BackgroundLoaderService initialized", + extra={"max_concurrent_loads": max_concurrent_loads} + ) async def start(self) -> None: - """Start the background worker task.""" - if self.worker_task is not None and not self.worker_task.done(): - logger.warning("Background worker already running") + """Start the background worker tasks for concurrent processing.""" + if self.worker_tasks and any(not task.done() for task in self.worker_tasks): + logger.warning("Background workers already running") return self._shutdown = False - self.worker_task = asyncio.create_task(self._worker()) - logger.info("Background worker started") + + # Start multiple workers for concurrent processing + self.worker_tasks = [] + for i in range(self.max_concurrent_loads): + worker = asyncio.create_task(self._worker(worker_id=i)) + self.worker_tasks.append(worker) + + logger.info( + "Background workers started", + extra={"num_workers": len(self.worker_tasks)} + ) async def stop(self) -> None: - """Stop the background worker gracefully.""" - if self.worker_task is None: + """Stop all background workers gracefully.""" + if not self.worker_tasks: return - logger.info("Stopping background worker...") + logger.info("Stopping background workers...") self._shutdown = True - # Cancel the worker task - if not self.worker_task.done(): - self.worker_task.cancel() - try: - await self.worker_task - except asyncio.CancelledError: - pass + # Cancel all worker tasks + for worker_task in self.worker_tasks: + if not worker_task.done(): + worker_task.cancel() - logger.info("Background worker stopped") + # Wait for all workers to finish + results = await asyncio.gather(*self.worker_tasks, return_exceptions=True) + + # Log any unexpected exceptions (ignore CancelledError) + for i, result in enumerate(results): + if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError): + logger.error( + f"Worker {i} stopped with exception", + extra={"exception": str(result)} + ) + + self.worker_tasks = [] + logger.info("All background workers stopped") async def add_series_loading_task( self, @@ -232,9 +262,15 @@ class BackgroundLoaderService: return missing - async def _worker(self) -> None: - """Background worker that processes loading tasks from the queue.""" - logger.info("Background worker started processing tasks") + async def _worker(self, worker_id: int = 0) -> None: + """Background worker that processes loading tasks from the queue. + + Multiple workers can run concurrently to process tasks in parallel. + + Args: + worker_id: Unique identifier for this worker instance + """ + logger.info(f"Background worker {worker_id} started processing tasks") while not self._shutdown: try: @@ -244,7 +280,9 @@ class BackgroundLoaderService: timeout=1.0 ) - logger.info(f"Processing loading task for series: {task.key}") + logger.info( + f"Worker {worker_id} processing loading task for series: {task.key}" + ) # Process the task await self._load_series_data(task) @@ -256,14 +294,14 @@ class BackgroundLoaderService: # No task available, continue loop continue except asyncio.CancelledError: - logger.info("Worker task cancelled") + logger.info(f"Worker {worker_id} task cancelled") break except Exception as e: - logger.exception(f"Error in background worker: {e}") + logger.exception(f"Error in background worker {worker_id}: {e}") # Continue processing other tasks continue - logger.info("Background worker stopped") + logger.info(f"Background worker {worker_id} stopped") async def _load_series_data(self, task: SeriesLoadingTask) -> None: """Load all missing data for a series. @@ -653,7 +691,8 @@ _background_loader_service: Optional[BackgroundLoaderService] = None def init_background_loader_service( websocket_service: WebSocketService, anime_service: Any, - series_app: Any + series_app: Any, + max_concurrent_loads: int = 5, ) -> BackgroundLoaderService: """Initialize the background loader service singleton. @@ -661,6 +700,7 @@ def init_background_loader_service( websocket_service: WebSocket service for broadcasts anime_service: AnimeService instance series_app: SeriesApp instance + max_concurrent_loads: Maximum number of series to load concurrently (default: 5) Returns: BackgroundLoaderService instance @@ -671,7 +711,8 @@ def init_background_loader_service( _background_loader_service = BackgroundLoaderService( websocket_service=websocket_service, anime_service=anime_service, - series_app=series_app + series_app=series_app, + max_concurrent_loads=max_concurrent_loads, ) return _background_loader_service diff --git a/tests/api/test_concurrent_anime_add.py b/tests/api/test_concurrent_anime_add.py new file mode 100644 index 0000000..f96d12e --- /dev/null +++ b/tests/api/test_concurrent_anime_add.py @@ -0,0 +1,118 @@ +"""Integration test for concurrent anime additions via API endpoint. + +This test verifies that the /api/anime/add endpoint can handle +multiple concurrent requests without blocking. +""" +import asyncio + +import pytest +from httpx import ASGITransport, AsyncClient + +from src.server.fastapi_app import app +from src.server.services.auth_service import auth_service + + +@pytest.fixture +async def authenticated_client(): + """Create authenticated async client.""" + if not auth_service.is_configured(): + auth_service.setup_master_password("TestPass123!") + + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://test") as client: + # Login to get token + r = await client.post( + "/api/auth/login", json={"password": "TestPass123!"} + ) + if r.status_code == 200: + token = r.json()["access_token"] + client.headers["Authorization"] = f"Bearer {token}" + yield client + + +@pytest.mark.asyncio +async def test_concurrent_anime_add_requests(authenticated_client): + """Test that multiple anime add requests can be processed concurrently. + + This test sends multiple anime add requests simultaneously and verifies: + 1. All requests return 202 Accepted + 2. All requests complete within a reasonable time (indicating no blocking) + 3. Each anime is added successfully with correct response structure + """ + # Define multiple anime to add + anime_list = [ + {"link": "https://aniworld.to/anime/stream/test-anime-1", "name": "Test Anime 1"}, + {"link": "https://aniworld.to/anime/stream/test-anime-2", "name": "Test Anime 2"}, + {"link": "https://aniworld.to/anime/stream/test-anime-3", "name": "Test Anime 3"}, + ] + + # Track start time + import time + start_time = time.time() + + # Send all requests concurrently + tasks = [] + for anime in anime_list: + task = authenticated_client.post("/api/anime/add", json=anime) + tasks.append(task) + + # Wait for all responses + responses = await asyncio.gather(*tasks) + + # Calculate total time + total_time = time.time() - start_time + + # Verify all responses + for i, response in enumerate(responses): + # All should return 202 or handle existing anime + assert response.status_code in (202, 200), ( + f"Request {i} failed with status {response.status_code}" + ) + + data = response.json() + + # Verify response structure + assert "status" in data + assert data["status"] in ("success", "exists") + assert "key" in data + assert "folder" in data + assert "loading_status" in data + assert "loading_progress" in data + + # Verify requests completed quickly (indicating non-blocking behavior) + # With blocking, 3 requests might take 3x the time of a single request + # With concurrent processing, they should complete in similar time + assert total_time < 5.0, ( + f"Concurrent requests took {total_time:.2f}s, " + f"indicating possible blocking issues" + ) + + print(f"✓ 3 concurrent anime add requests completed in {total_time:.2f}s") + + +@pytest.mark.asyncio +async def test_same_anime_concurrent_add(authenticated_client): + """Test that adding the same anime twice concurrently is handled correctly. + + The second request should return 'exists' status rather than creating + a duplicate entry. + """ + anime = {"link": "https://aniworld.to/anime/stream/concurrent-test", "name": "Concurrent Test"} + + # Send two requests for the same anime concurrently + task1 = authenticated_client.post("/api/anime/add", json=anime) + task2 = authenticated_client.post("/api/anime/add", json=anime) + + responses = await asyncio.gather(task1, task2) + + # At least one should succeed + statuses = [r.json()["status"] for r in responses] + assert "success" in statuses or all(s == "exists" for s in statuses), ( + "Expected at least one success or all exists responses" + ) + + # Both should have the same key + keys = [r.json()["key"] for r in responses] + assert keys[0] == keys[1], "Both responses should have the same key" + + print(f"✓ Concurrent same-anime requests handled correctly: {statuses}") diff --git a/tests/integration/test_async_series_loading.py b/tests/integration/test_async_series_loading.py index 9a7d4df..d4935e5 100644 --- a/tests/integration/test_async_series_loading.py +++ b/tests/integration/test_async_series_loading.py @@ -175,7 +175,8 @@ class TestBackgroundLoaderIntegration: shutdown_success = False assert shutdown_success - assert loader.worker_task.done() + # Check all worker tasks are done + assert all(task.done() for task in loader.worker_tasks) @pytest.mark.asyncio async def test_no_duplicate_tasks(self): @@ -226,8 +227,9 @@ class TestLoadingStatusEnum: def test_loading_status_string_repr(self): """Test LoadingStatus can be used as strings.""" status = LoadingStatus.LOADING_EPISODES - assert str(status) == "loading_episodes" - assert status == "loading_episodes" + # The enum string representation includes the class name + assert status.value == "loading_episodes" + assert status == LoadingStatus.LOADING_EPISODES class TestAsyncBehavior: diff --git a/tests/unit/test_parallel_anime_add.py b/tests/unit/test_parallel_anime_add.py new file mode 100644 index 0000000..78b914c --- /dev/null +++ b/tests/unit/test_parallel_anime_add.py @@ -0,0 +1,282 @@ +"""Unit tests for parallel anime addition functionality. + +This module tests that multiple anime can be added concurrently without blocking +each other. The background loader should process multiple series simultaneously +rather than sequentially. +""" +import asyncio +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.server.services.background_loader_service import ( + BackgroundLoaderService, + LoadingStatus, +) + + +@pytest.fixture +def mock_websocket_service(): + """Create a mock WebSocket service.""" + service = MagicMock() + service.broadcast = AsyncMock() + return service + + +@pytest.fixture +def mock_anime_service(): + """Create a mock AnimeService.""" + service = MagicMock() + return service + + +@pytest.fixture +def mock_series_app(): + """Create a mock SeriesApp.""" + app = MagicMock() + app.directory_to_search = "/fake/anime/directory" + app.nfo_service = MagicMock() + return app + + +@pytest.fixture +async def background_loader(mock_websocket_service, mock_anime_service, mock_series_app): + """Create a BackgroundLoaderService instance.""" + loader = BackgroundLoaderService( + websocket_service=mock_websocket_service, + anime_service=mock_anime_service, + series_app=mock_series_app + ) + + # Start the worker + await loader.start() + + yield loader + + # Stop the worker + await loader.stop() + + +@pytest.mark.asyncio +async def test_parallel_anime_additions( + background_loader, + mock_websocket_service, +): + """Test that multiple anime additions are processed in parallel. + + This test adds two anime series and verifies that: + 1. Both are queued successfully + 2. Both start processing without waiting for the other to complete + 3. Both complete within a reasonable time frame (indicating parallel execution) + """ + # Track when tasks start and finish + task_events = { + "anime-one": {"started": None, "completed": None}, + "anime-two": {"started": None, "completed": None}, + } + + # Mock the _load_series_data method to simulate work and track timing + original_load = background_loader._load_series_data + + async def mock_load_series_data(task): + """Mock load that simulates work with a delay.""" + task_events[task.key]["started"] = datetime.now(timezone.utc) + + # Simulate some work with a delay + await asyncio.sleep(0.5) + + # Mark progress + task.progress["episodes"] = True + task.progress["nfo"] = True + task.progress["logo"] = True + task.progress["images"] = True + task.status = LoadingStatus.COMPLETED + task.completed_at = datetime.now(timezone.utc) + + task_events[task.key]["completed"] = datetime.now(timezone.utc) + + # Remove from active tasks (normally done by _load_series_data) + background_loader.active_tasks.pop(task.key, None) + + background_loader._load_series_data = mock_load_series_data + + # Add two anime series + await background_loader.add_series_loading_task( + key="anime-one", + folder="Anime One", + name="Anime One", + year=2024 + ) + + await background_loader.add_series_loading_task( + key="anime-two", + folder="Anime Two", + name="Anime Two", + year=2024 + ) + + # Wait for both tasks to complete + # With sequential processing, this would take ~1.0 seconds + # With parallel processing, this should take ~0.5 seconds + start_time = datetime.now(timezone.utc) + + # Wait for both to complete (with timeout) + max_wait = 2.0 # Maximum wait time + check_interval = 0.1 + elapsed = 0 + + while elapsed < max_wait: + if (task_events["anime-one"]["completed"] is not None and + task_events["anime-two"]["completed"] is not None): + break + await asyncio.sleep(check_interval) + elapsed += check_interval + + end_time = datetime.now(timezone.utc) + total_duration = (end_time - start_time).total_seconds() + + # Verify both tasks completed + assert task_events["anime-one"]["started"] is not None, "Anime One never started" + assert task_events["anime-one"]["completed"] is not None, "Anime One never completed" + assert task_events["anime-two"]["started"] is not None, "Anime Two never started" + assert task_events["anime-two"]["completed"] is not None, "Anime Two never completed" + + # Calculate time between starts + start_diff = abs( + (task_events["anime-two"]["started"] - task_events["anime-one"]["started"]).total_seconds() + ) + + # Verify parallel execution: + # If tasks run in parallel, they should start close together (< 0.2s apart) + # and complete in roughly the same total time as a single task (~0.5-0.8s total) + assert start_diff < 0.2, ( + f"Tasks should start nearly simultaneously (parallel), " + f"but started {start_diff:.2f}s apart (sequential)" + ) + + # Total duration should be close to single task duration, not sum of both + # Allow some overhead for scheduling + assert total_duration < 1.0, ( + f"Parallel execution should take ~0.5s, but took {total_duration:.2f}s " + f"(indicating sequential processing)" + ) + + print(f"✓ Parallel execution verified:") + print(f" - Start time difference: {start_diff:.3f}s") + print(f" - Total duration: {total_duration:.3f}s") + + +@pytest.mark.asyncio +async def test_multiple_anime_additions_non_blocking( + background_loader, +): + """Test that adding anime doesn't block the caller. + + The add_series_loading_task method should return immediately + after queuing, not wait for processing to complete. + """ + # Mock _load_series_data to simulate slow work + async def slow_load(task): + await asyncio.sleep(1.0) + task.status = LoadingStatus.COMPLETED + background_loader.active_tasks.pop(task.key, None) + + background_loader._load_series_data = slow_load + + # Add should return quickly, not wait for processing + start = datetime.now(timezone.utc) + + await background_loader.add_series_loading_task( + key="test-anime", + folder="Test Anime", + name="Test Anime", + year=2024 + ) + + end = datetime.now(timezone.utc) + add_duration = (end - start).total_seconds() + + # Adding to queue should be fast (<0.1s), not wait for processing (1s) + assert add_duration < 0.1, ( + f"add_series_loading_task should return immediately, " + f"but took {add_duration:.2f}s (blocking on processing)" + ) + + +@pytest.mark.asyncio +async def test_concurrent_anime_limit( + background_loader, +): + """Test that multiple anime can be added and processed concurrently. + + This test adds 5 anime series and verifies that at least 2 are + processed concurrently (proves parallel execution exists). + """ + processing_times = [] + lock = asyncio.Lock() + + async def track_load(task): + """Track when tasks are processing.""" + start = datetime.now(timezone.utc) + + async with lock: + processing_times.append(("start", task.key, start)) + + # Simulate work + await asyncio.sleep(0.3) + + end = datetime.now(timezone.utc) + async with lock: + processing_times.append(("end", task.key, end)) + + task.status = LoadingStatus.COMPLETED + background_loader.active_tasks.pop(task.key, None) + + background_loader._load_series_data = track_load + + # Add 5 anime + for i in range(5): + await background_loader.add_series_loading_task( + key=f"anime-{i}", + folder=f"Anime {i}", + name=f"Anime {i}", + year=2024 + ) + + # Wait for all to complete + await asyncio.sleep(2.0) + + # Analyze processing times to find overlaps + # If tasks run in parallel, we should see overlapping time windows + active_at_once = [] + + for i, (event1_type, key1, time1) in enumerate(processing_times): + if event1_type == "start": + # Count how many other tasks are active at this start time + concurrent_count = 1 # This task + + for event2_type, key2, time2 in processing_times: + if key2 != key1: + # Check if key2 was active when key1 started + # Find start and end times for key2 + key2_start = next((t for evt, k, t in processing_times + if evt == "start" and k == key2), None) + key2_end = next((t for evt, k, t in processing_times + if evt == "end" and k == key2), None) + + if key2_start and key2_end: + if key2_start <= time1 <= key2_end: + concurrent_count += 1 + + active_at_once.append(concurrent_count) + + max_concurrent = max(active_at_once) if active_at_once else 0 + + # We should see at least 2 tasks running concurrently + assert max_concurrent >= 2, ( + f"Expected at least 2 concurrent tasks, but max was {max_concurrent}. " + f"This indicates sequential processing." + ) + + print(f"✓ Concurrent processing verified: max {max_concurrent} tasks at once")