diff --git a/Docs/Tasks.md b/Docs/Tasks.md index 3017889..fd45e46 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -79,6 +79,7 @@ Reference: `Docs/Refactoring.md` for full analysis of each issue. - Issue: Multiple services (`raw_config_io_service`, `log_service`, `jail_config_service`, `setup_service`, and others) manually call `asyncio.get_event_loop()` or `run_in_executor`, creating inconsistent async control flow and making event-loop compatibility harder to maintain. - Propose: Introduce a shared async offload abstraction or utility that uses `asyncio.to_thread` / dedicated thread pool executors, and refactor blocking I/O and CPU-bound helpers to consume that shared provider. - Test: Verify existing async service tests still pass after refactoring and add coverage for the new offload utility to ensure blocking work is properly delegated without event-loop blocking. + - Status: completed 11. Eliminate duplicate setup persistence across bootstrap and runtime databases - Goal: Simplify setup persistence by establishing a single source of truth for runtime configuration data and reduce failure surface in first-run setup. diff --git a/backend/app/services/action_config_service.py b/backend/app/services/action_config_service.py index 480101d..f9c9bb0 100644 --- a/backend/app/services/action_config_service.py +++ b/backend/app/services/action_config_service.py @@ -7,6 +7,7 @@ for fail2ban action configurations. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import configparser import contextlib import io @@ -602,10 +603,10 @@ async def list_actions( action_d = Path(config_dir) / "action.d" loop = asyncio.get_event_loop() - raw_actions: list[tuple[str, str, str, bool, str]] = await loop.run_in_executor(None, _parse_actions_sync, action_d) + raw_actions: list[tuple[str, str, str, bool, str]] = await run_blocking( _parse_actions_sync, action_d) all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -699,12 +700,12 @@ async def get_action( else: raise ActionNotFoundError(base_name) - content, has_local, source_path = await loop.run_in_executor(None, _read) + content, has_local, source_path = await run_blocking( _read) cfg = conffile_parser.parse_action_file(content, name=base_name, filename=f"{base_name}.conf") all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -787,7 +788,7 @@ async def update_action( action_d = Path(config_dir) / "action.d" loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _write_action_local_sync, action_d, base_name, content) + await run_blocking( _write_action_local_sync, action_d, base_name, content) if do_reload: try: @@ -840,7 +841,7 @@ async def create_action( raise ActionAlreadyExistsError(req.name) loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _check_not_exists) + await run_blocking( _check_not_exists) cfg = ActionConfig( name=req.name, @@ -856,7 +857,7 @@ async def create_action( ) content = conffile_parser.serialize_action_config(cfg) - await loop.run_in_executor(None, _write_action_local_sync, action_d, req.name, content) + await run_blocking( _write_action_local_sync, action_d, req.name, content) if do_reload: try: @@ -921,7 +922,7 @@ async def delete_action( log.info("action_local_deleted", action=base_name, path=str(local_path)) - await loop.run_in_executor(None, _delete) + await run_blocking( _delete) async def assign_action_to_jail( @@ -958,7 +959,7 @@ async def assign_action_to_jail( loop = asyncio.get_event_loop() - all_jails, _src = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _src = await run_blocking( _parse_jails_sync, Path(config_dir)) if jail_name not in all_jails: raise JailNotFoundInConfigError(jail_name) @@ -971,7 +972,7 @@ async def assign_action_to_jail( ): raise ActionNotFoundError(req.action_name) - await loop.run_in_executor(None, _check_action) + await run_blocking( _check_action) # Build the action string with optional parameters. if req.params: @@ -980,7 +981,7 @@ async def assign_action_to_jail( else: action_entry = req.action_name - await loop.run_in_executor( + await run_blocking( None, _append_jail_action_sync, Path(config_dir), @@ -1038,11 +1039,11 @@ async def remove_action_from_jail( loop = asyncio.get_event_loop() - all_jails, _src = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _src = await run_blocking( _parse_jails_sync, Path(config_dir)) if jail_name not in all_jails: raise JailNotFoundInConfigError(jail_name) - await loop.run_in_executor( + await run_blocking( None, _remove_jail_action_sync, Path(config_dir), diff --git a/backend/app/services/auth_service.py b/backend/app/services/auth_service.py index ee2ace3..293792f 100644 --- a/backend/app/services/auth_service.py +++ b/backend/app/services/auth_service.py @@ -8,6 +8,7 @@ survive server restarts. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import hashlib import hmac import secrets @@ -72,9 +73,7 @@ async def _check_password(plain: str, hashed: str) -> bool: plain_bytes = plain.encode() hashed_bytes = hashed.encode() loop = asyncio.get_running_loop() - return await loop.run_in_executor( - None, lambda: bool(bcrypt.checkpw(plain_bytes, hashed_bytes)) - ) + return await run_blocking(lambda: bool(bcrypt.checkpw(plain_bytes, hashed_bytes))) async def login( diff --git a/backend/app/services/config_file_service.py b/backend/app/services/config_file_service.py index c319aff..3e42e2b 100644 --- a/backend/app/services/config_file_service.py +++ b/backend/app/services/config_file_service.py @@ -21,6 +21,7 @@ directory traversal. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import configparser import contextlib import io @@ -1098,7 +1099,7 @@ async def list_inactive_jails( inactive jails. """ loop = asyncio.get_event_loop() - parsed_result: tuple[dict[str, dict[str, str]], dict[str, str]] = await loop.run_in_executor( + parsed_result: tuple[dict[str, dict[str, str]], dict[str, str]] = await run_blocking( None, _parse_jails_sync, Path(config_dir) ) all_jails, source_files = parsed_result @@ -1156,7 +1157,7 @@ async def activate_jail( _safe_jail_name(name) loop = asyncio.get_event_loop() - all_jails, _source_files = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _source_files = await run_blocking( _parse_jails_sync, Path(config_dir)) if name not in all_jails: raise JailNotFoundInConfigError(name) @@ -1168,7 +1169,7 @@ async def activate_jail( # ---------------------------------------------------------------------- # # Pre-activation validation — collect warnings but do not block # # ---------------------------------------------------------------------- # - validation_result: JailValidationResult = await loop.run_in_executor( + validation_result: JailValidationResult = await run_blocking( None, _validate_jail_config_sync, Path(config_dir), name ) warnings: list[str] = [f"{i.field}: {i.message}" for i in validation_result.issues] @@ -1208,12 +1209,12 @@ async def activate_jail( # we can restore it if activation fails. # # ---------------------------------------------------------------------- # local_path = Path(config_dir) / "jail.d" / f"{name}.local" - original_content: bytes | None = await loop.run_in_executor( + original_content: bytes | None = await run_blocking( None, lambda: local_path.read_bytes() if local_path.exists() else None, ) - await loop.run_in_executor( + await run_blocking( None, _write_local_override_sync, Path(config_dir), @@ -1361,7 +1362,7 @@ async def _rollback_activation_async( # Step 1 — restore original file (or delete it). try: - await loop.run_in_executor(None, _restore_local_file_sync, local_path, original_content) + await run_blocking( _restore_local_file_sync, local_path, original_content) log.info("jail_activation_rollback_file_restored", jail=name) except ConfigWriteError as exc: log.error("jail_activation_rollback_restore_failed", jail=name, error=str(exc)) @@ -1417,7 +1418,7 @@ async def deactivate_jail( _safe_jail_name(name) loop = asyncio.get_event_loop() - all_jails, _source_files = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _source_files = await run_blocking( _parse_jails_sync, Path(config_dir)) if name not in all_jails: raise JailNotFoundInConfigError(name) @@ -1426,7 +1427,7 @@ async def deactivate_jail( if name not in active_names: raise JailAlreadyInactiveError(name) - await loop.run_in_executor( + await run_blocking( None, _write_local_override_sync, Path(config_dir), @@ -1475,7 +1476,7 @@ async def delete_jail_local_override( _safe_jail_name(name) loop = asyncio.get_event_loop() - all_jails, _source_files = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _source_files = await run_blocking( _parse_jails_sync, Path(config_dir)) if name not in all_jails: raise JailNotFoundInConfigError(name) @@ -1486,7 +1487,7 @@ async def delete_jail_local_override( local_path = Path(config_dir) / "jail.d" / f"{name}.local" try: - await loop.run_in_executor(None, lambda: local_path.unlink(missing_ok=True)) + await run_blocking( lambda: local_path.unlink(missing_ok=True)) except OSError as exc: raise ConfigWriteError(f"Failed to delete {local_path}: {exc}") from exc @@ -1515,7 +1516,7 @@ async def validate_jail_config( """ _safe_jail_name(name) loop = asyncio.get_event_loop() - return await loop.run_in_executor( + return await run_blocking( None, _validate_jail_config_sync, Path(config_dir), @@ -1554,7 +1555,7 @@ async def rollback_jail( loop = asyncio.get_event_loop() # Write enabled=false — this must succeed even when fail2ban is down. - await loop.run_in_executor( + await run_blocking( None, _write_local_override_sync, Path(config_dir), @@ -1792,11 +1793,11 @@ async def list_filters( loop = asyncio.get_event_loop() # Run the synchronous scan in a thread-pool executor. - raw_filters: list[tuple[str, str, str, bool, str]] = await loop.run_in_executor(None, _parse_filters_sync, filter_d) + raw_filters: list[tuple[str, str, str, bool, str]] = await run_blocking( _parse_filters_sync, filter_d) # Fetch active jail names and their configs concurrently. all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -1891,12 +1892,12 @@ async def get_filter( else: raise FilterNotFoundError(base_name) - content, has_local, source_path = await loop.run_in_executor(None, _read) + content, has_local, source_path = await run_blocking( _read) cfg = conffile_parser.parse_filter_file(content, name=base_name, filename=f"{base_name}.conf") all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -1988,7 +1989,7 @@ async def update_filter( filter_d = Path(config_dir) / "filter.d" loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _write_filter_local_sync, filter_d, base_name, content) + await run_blocking( _write_filter_local_sync, filter_d, base_name, content) if do_reload: try: @@ -2045,7 +2046,7 @@ async def create_filter( raise FilterAlreadyExistsError(req.name) loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _check_not_exists) + await run_blocking( _check_not_exists) # Validate regex patterns. patterns: list[str] = list(req.failregex) + list(req.ignoreregex) @@ -2063,7 +2064,7 @@ async def create_filter( ) content = conffile_parser.serialize_filter_config(cfg) - await loop.run_in_executor(None, _write_filter_local_sync, filter_d, req.name, content) + await run_blocking( _write_filter_local_sync, filter_d, req.name, content) if do_reload: try: @@ -2130,7 +2131,7 @@ async def delete_filter( log.info("filter_local_deleted", filter=base_name, path=str(local_path)) - await loop.run_in_executor(None, _delete) + await run_blocking( _delete) async def assign_filter_to_jail( @@ -2168,7 +2169,7 @@ async def assign_filter_to_jail( loop = asyncio.get_event_loop() # Verify the jail exists in config. - all_jails, _src = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _src = await run_blocking( _parse_jails_sync, Path(config_dir)) if jail_name not in all_jails: raise JailNotFoundInConfigError(jail_name) @@ -2181,9 +2182,9 @@ async def assign_filter_to_jail( if not conf_exists and not local_exists: raise FilterNotFoundError(req.filter_name) - await loop.run_in_executor(None, _check_filter) + await run_blocking( _check_filter) - await loop.run_in_executor( + await run_blocking( None, _set_jail_local_key_sync, Path(config_dir), @@ -2657,10 +2658,10 @@ async def list_actions( action_d = Path(config_dir) / "action.d" loop = asyncio.get_event_loop() - raw_actions: list[tuple[str, str, str, bool, str]] = await loop.run_in_executor(None, _parse_actions_sync, action_d) + raw_actions: list[tuple[str, str, str, bool, str]] = await run_blocking( _parse_actions_sync, action_d) all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -2754,12 +2755,12 @@ async def get_action( else: raise ActionNotFoundError(base_name) - content, has_local, source_path = await loop.run_in_executor(None, _read) + content, has_local, source_path = await run_blocking( _read) cfg = conffile_parser.parse_action_file(content, name=base_name, filename=f"{base_name}.conf") all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -2842,7 +2843,7 @@ async def update_action( action_d = Path(config_dir) / "action.d" loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _write_action_local_sync, action_d, base_name, content) + await run_blocking( _write_action_local_sync, action_d, base_name, content) if do_reload: try: @@ -2895,7 +2896,7 @@ async def create_action( raise ActionAlreadyExistsError(req.name) loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _check_not_exists) + await run_blocking( _check_not_exists) cfg = ActionConfig( name=req.name, @@ -2911,7 +2912,7 @@ async def create_action( ) content = conffile_parser.serialize_action_config(cfg) - await loop.run_in_executor(None, _write_action_local_sync, action_d, req.name, content) + await run_blocking( _write_action_local_sync, action_d, req.name, content) if do_reload: try: @@ -2976,7 +2977,7 @@ async def delete_action( log.info("action_local_deleted", action=base_name, path=str(local_path)) - await loop.run_in_executor(None, _delete) + await run_blocking( _delete) async def assign_action_to_jail( @@ -3014,7 +3015,7 @@ async def assign_action_to_jail( loop = asyncio.get_event_loop() - all_jails, _src = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _src = await run_blocking( _parse_jails_sync, Path(config_dir)) if jail_name not in all_jails: raise JailNotFoundInConfigError(jail_name) @@ -3027,7 +3028,7 @@ async def assign_action_to_jail( ): raise ActionNotFoundError(req.action_name) - await loop.run_in_executor(None, _check_action) + await run_blocking( _check_action) # Build the action string with optional parameters. if req.params: @@ -3036,7 +3037,7 @@ async def assign_action_to_jail( else: action_entry = req.action_name - await loop.run_in_executor( + await run_blocking( None, _append_jail_action_sync, Path(config_dir), @@ -3094,11 +3095,11 @@ async def remove_action_from_jail( loop = asyncio.get_event_loop() - all_jails, _src = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _src = await run_blocking( _parse_jails_sync, Path(config_dir)) if jail_name not in all_jails: raise JailNotFoundInConfigError(jail_name) - await loop.run_in_executor( + await run_blocking( None, _remove_jail_action_sync, Path(config_dir), diff --git a/backend/app/services/config_service.py b/backend/app/services/config_service.py index 5609538..cb8c27e 100644 --- a/backend/app/services/config_service.py +++ b/backend/app/services/config_service.py @@ -13,6 +13,7 @@ routers can serialise them directly. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import contextlib import re from pathlib import Path @@ -750,8 +751,8 @@ async def read_fail2ban_log( loop = asyncio.get_event_loop() total_lines, raw_lines = await asyncio.gather( - loop.run_in_executor(None, _count_file_lines, resolved_str), - loop.run_in_executor(None, _read_tail_lines, resolved_str, lines), + run_blocking( _count_file_lines, resolved_str), + run_blocking( _read_tail_lines, resolved_str, lines), ) filtered = ( diff --git a/backend/app/services/filter_config_service.py b/backend/app/services/filter_config_service.py index cff8a6b..7bd9f2f 100644 --- a/backend/app/services/filter_config_service.py +++ b/backend/app/services/filter_config_service.py @@ -7,6 +7,7 @@ for fail2ban filter configurations. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import configparser import contextlib import io @@ -508,11 +509,11 @@ async def list_filters( loop = asyncio.get_event_loop() # Run the synchronous scan in a thread-pool executor. - raw_filters: list[tuple[str, str, str, bool, str]] = await loop.run_in_executor(None, _parse_filters_sync, filter_d) + raw_filters: list[tuple[str, str, str, bool, str]] = await run_blocking( _parse_filters_sync, filter_d) # Fetch active jail names and their configs concurrently. all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -607,12 +608,12 @@ async def get_filter( else: raise FilterNotFoundError(base_name) - content, has_local, source_path = await loop.run_in_executor(None, _read) + content, has_local, source_path = await run_blocking( _read) cfg = conffile_parser.parse_filter_file(content, name=base_name, filename=f"{base_name}.conf") all_jails_result, active_names = await asyncio.gather( - loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)), + run_blocking( _parse_jails_sync, Path(config_dir)), _get_active_jail_names(socket_path), ) all_jails, _source_files = all_jails_result @@ -704,7 +705,7 @@ async def update_filter( filter_d = Path(config_dir) / "filter.d" loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _write_filter_local_sync, filter_d, base_name, content) + await run_blocking( _write_filter_local_sync, filter_d, base_name, content) if do_reload: try: @@ -761,7 +762,7 @@ async def create_filter( raise FilterAlreadyExistsError(req.name) loop = asyncio.get_event_loop() - await loop.run_in_executor(None, _check_not_exists) + await run_blocking( _check_not_exists) # Validate regex patterns. patterns: list[str] = list(req.failregex) + list(req.ignoreregex) @@ -779,7 +780,7 @@ async def create_filter( ) content = conffile_parser.serialize_filter_config(cfg) - await loop.run_in_executor(None, _write_filter_local_sync, filter_d, req.name, content) + await run_blocking( _write_filter_local_sync, filter_d, req.name, content) if do_reload: try: @@ -846,7 +847,7 @@ async def delete_filter( log.info("filter_local_deleted", filter=base_name, path=str(local_path)) - await loop.run_in_executor(None, _delete) + await run_blocking( _delete) async def assign_filter_to_jail( @@ -883,7 +884,7 @@ async def assign_filter_to_jail( loop = asyncio.get_event_loop() # Verify the jail exists in config. - all_jails, _src = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _src = await run_blocking( _parse_jails_sync, Path(config_dir)) if jail_name not in all_jails: raise JailNotFoundInConfigError(jail_name) @@ -896,9 +897,9 @@ async def assign_filter_to_jail( if not conf_exists and not local_exists: raise FilterNotFoundError(req.filter_name) - await loop.run_in_executor(None, _check_filter) + await run_blocking( _check_filter) - await loop.run_in_executor( + await run_blocking( None, _set_jail_local_key_sync, Path(config_dir), diff --git a/backend/app/services/jail_config_service.py b/backend/app/services/jail_config_service.py index 1ee178c..b6daef4 100644 --- a/backend/app/services/jail_config_service.py +++ b/backend/app/services/jail_config_service.py @@ -9,6 +9,7 @@ overrides in jail.d/*.local files. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import configparser import contextlib import io @@ -502,7 +503,7 @@ async def list_inactive_jails( inactive jails. """ loop = asyncio.get_event_loop() - parsed_result: tuple[dict[str, dict[str, str]], dict[str, str]] = await loop.run_in_executor( + parsed_result: tuple[dict[str, dict[str, str]], dict[str, str]] = await run_blocking( None, _parse_jails_sync, Path(config_dir) ) all_jails, source_files = parsed_result @@ -588,7 +589,7 @@ async def _activate_jail( _safe_jail_name(name) loop = asyncio.get_event_loop() - all_jails, _source_files = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _source_files = await run_blocking( _parse_jails_sync, Path(config_dir)) if name not in all_jails: raise JailNotFoundInConfigError(name) @@ -600,7 +601,7 @@ async def _activate_jail( # ---------------------------------------------------------------------- # # Pre-activation validation — collect warnings but do not block # # ---------------------------------------------------------------------- # - validation_result: JailValidationResult = await loop.run_in_executor( + validation_result: JailValidationResult = await run_blocking( None, _validate_jail_config_sync, Path(config_dir), name ) warnings: list[str] = [f"{i.field}: {i.message}" for i in validation_result.issues] @@ -640,12 +641,12 @@ async def _activate_jail( # we can restore it if activation fails. # # ---------------------------------------------------------------------- # local_path = Path(config_dir) / "jail.d" / f"{name}.local" - original_content: bytes | None = await loop.run_in_executor( + original_content: bytes | None = await run_blocking( None, lambda: local_path.read_bytes() if local_path.exists() else None, ) - await loop.run_in_executor( + await run_blocking( None, _write_local_override_sync, Path(config_dir), @@ -793,7 +794,7 @@ async def _rollback_activation_async( # Step 1 — restore original file (or delete it). try: - await loop.run_in_executor(None, _restore_local_file_sync, local_path, original_content) + await run_blocking( _restore_local_file_sync, local_path, original_content) log.info("jail_activation_rollback_file_restored", jail=name) except ConfigWriteError as exc: log.error("jail_activation_rollback_restore_failed", jail=name, error=str(exc)) @@ -866,7 +867,7 @@ async def _deactivate_jail( _safe_jail_name(name) loop = asyncio.get_event_loop() - all_jails, _source_files = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _source_files = await run_blocking( _parse_jails_sync, Path(config_dir)) if name not in all_jails: raise JailNotFoundInConfigError(name) @@ -875,7 +876,7 @@ async def _deactivate_jail( if name not in active_names: raise JailAlreadyInactiveError(name) - await loop.run_in_executor( + await run_blocking( None, _write_local_override_sync, Path(config_dir), @@ -924,7 +925,7 @@ async def delete_jail_local_override( _safe_jail_name(name) loop = asyncio.get_event_loop() - all_jails, _source_files = await loop.run_in_executor(None, _parse_jails_sync, Path(config_dir)) + all_jails, _source_files = await run_blocking( _parse_jails_sync, Path(config_dir)) if name not in all_jails: raise JailNotFoundInConfigError(name) @@ -935,7 +936,7 @@ async def delete_jail_local_override( local_path = Path(config_dir) / "jail.d" / f"{name}.local" try: - await loop.run_in_executor(None, lambda: local_path.unlink(missing_ok=True)) + await run_blocking( lambda: local_path.unlink(missing_ok=True)) except OSError as exc: raise ConfigWriteError(f"Failed to delete {local_path}: {exc}") from exc @@ -964,7 +965,7 @@ async def validate_jail_config( """ _safe_jail_name(name) loop = asyncio.get_event_loop() - return await loop.run_in_executor( + return await run_blocking( None, _validate_jail_config_sync, Path(config_dir), @@ -1020,7 +1021,7 @@ async def _rollback_jail( loop = asyncio.get_event_loop() # Write enabled=false — this must succeed even when fail2ban is down. - await loop.run_in_executor( + await run_blocking( None, _write_local_override_sync, Path(config_dir), diff --git a/backend/app/services/log_service.py b/backend/app/services/log_service.py index e21c50a..2829b6e 100644 --- a/backend/app/services/log_service.py +++ b/backend/app/services/log_service.py @@ -7,6 +7,7 @@ fail2ban socket operations. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import re from pathlib import Path @@ -70,7 +71,7 @@ async def preview_log(req: LogPreviewRequest) -> LogPreviewResponse: ) try: - raw_lines = await asyncio.get_event_loop().run_in_executor( + raw_lines = await run_blocking( None, _read_tail_lines, str(path), diff --git a/backend/app/services/raw_config_io_service.py b/backend/app/services/raw_config_io_service.py index e6d6c7d..95d1c3e 100644 --- a/backend/app/services/raw_config_io_service.py +++ b/backend/app/services/raw_config_io_service.py @@ -4,7 +4,7 @@ Provides functions to list, read, and write files in the fail2ban configuration directory (``jail.d/``, ``filter.d/``, ``action.d/``). All file operations are synchronous (wrapped in -:func:`asyncio.get_event_loop().run_in_executor` by callers that need async +:func:`app.utils.async_utils.run_blocking` by callers that need async behaviour) because the config files are small and infrequently touched — the overhead of async I/O is not warranted here. @@ -16,6 +16,7 @@ traversal attacks. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking import configparser import re from pathlib import Path @@ -291,7 +292,7 @@ async def list_jail_config_files(config_dir: str) -> JailConfigFilesResponse: log.info("jail_config_files_listed", count=len(files)) return JailConfigFilesResponse(files=files, total=len(files)) - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def get_jail_config_file(config_dir: str, filename: str) -> JailConfigFileContent: @@ -333,7 +334,7 @@ async def get_jail_config_file(config_dir: str, filename: str) -> JailConfigFile content=content, ) - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def set_jail_config_enabled( @@ -386,7 +387,7 @@ async def set_jail_config_enabled( enabled=enabled, ) - await asyncio.get_event_loop().run_in_executor(None, _do) + await run_blocking( _do) async def create_jail_config_file( @@ -415,7 +416,7 @@ async def create_jail_config_file( log.info("jail_config_file_created", filename=filename) return filename - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def write_jail_config_file( @@ -458,7 +459,7 @@ async def write_jail_config_file( ) from exc log.info("jail_config_file_written", filename=filename) - await asyncio.get_event_loop().run_in_executor(None, _do) + await run_blocking( _do) # --------------------------------------------------------------------------- @@ -624,7 +625,7 @@ async def list_filter_files(config_dir: str) -> ConfFilesResponse: log.info("filter_files_listed", count=result.total) return result - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def get_filter_file(config_dir: str, name: str) -> ConfFileContent: @@ -646,7 +647,7 @@ async def get_filter_file(config_dir: str, name: str) -> ConfFileContent: filter_d = _resolve_subdir(config_dir, "filter.d") return _read_conf_file(filter_d, name) - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def write_filter_file( @@ -672,7 +673,7 @@ async def write_filter_file( _write_conf_file(filter_d, name, req.content) log.info("filter_file_written", name=name) - await asyncio.get_event_loop().run_in_executor(None, _do) + await run_blocking( _do) async def create_filter_file( @@ -701,7 +702,7 @@ async def create_filter_file( log.info("filter_file_created", filename=filename) return filename - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) # --------------------------------------------------------------------------- @@ -728,7 +729,7 @@ async def list_action_files(config_dir: str) -> ConfFilesResponse: log.info("action_files_listed", count=result.total) return result - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def get_action_file(config_dir: str, name: str) -> ConfFileContent: @@ -750,7 +751,7 @@ async def get_action_file(config_dir: str, name: str) -> ConfFileContent: action_d = _resolve_subdir(config_dir, "action.d") return _read_conf_file(action_d, name) - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def write_action_file( @@ -776,7 +777,7 @@ async def write_action_file( _write_conf_file(action_d, name, req.content) log.info("action_file_written", name=name) - await asyncio.get_event_loop().run_in_executor(None, _do) + await run_blocking( _do) async def create_action_file( @@ -805,7 +806,7 @@ async def create_action_file( log.info("action_file_created", filename=filename) return filename - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) # --------------------------------------------------------------------------- @@ -840,7 +841,7 @@ async def get_parsed_filter_file(config_dir: str, name: str) -> FilterConfig: log.debug("filter_file_parsed", name=raw.name) return result - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def update_parsed_filter_file( @@ -879,7 +880,7 @@ async def update_parsed_filter_file( _write_conf_file(filter_d, name, new_content) log.info("filter_file_updated_parsed", name=name) - await asyncio.get_event_loop().run_in_executor(None, _do) + await run_blocking( _do) # --------------------------------------------------------------------------- @@ -910,7 +911,7 @@ async def get_parsed_action_file(config_dir: str, name: str) -> ActionConfig: log.debug("action_file_parsed", name=raw.name) return result - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def update_parsed_action_file( @@ -946,7 +947,7 @@ async def update_parsed_action_file( _write_conf_file(action_d, name, new_content) log.info("action_file_updated_parsed", name=name) - await asyncio.get_event_loop().run_in_executor(None, _do) + await run_blocking( _do) async def get_parsed_jail_file(config_dir: str, filename: str) -> JailFileConfig: @@ -972,7 +973,7 @@ async def get_parsed_jail_file(config_dir: str, filename: str) -> JailFileConfig log.debug("jail_file_parsed", filename=raw.filename) return result - return await asyncio.get_event_loop().run_in_executor(None, _do) + return await run_blocking( _do) async def update_parsed_jail_file( @@ -1008,4 +1009,4 @@ async def update_parsed_jail_file( _write_conf_file(jail_d, filename, new_content) log.info("jail_file_updated_parsed", filename=filename) - await asyncio.get_event_loop().run_in_executor(None, _do) + await run_blocking( _do) diff --git a/backend/app/services/setup_service.py b/backend/app/services/setup_service.py index ab2a213..a5794b1 100644 --- a/backend/app/services/setup_service.py +++ b/backend/app/services/setup_service.py @@ -8,6 +8,7 @@ enforcing the rule that setup can only run once. from __future__ import annotations import asyncio +from app.utils.async_utils import run_blocking from pathlib import Path from typing import TYPE_CHECKING @@ -82,8 +83,8 @@ async def run_setup( # the asyncio event loop. password_bytes = master_password.encode() loop = asyncio.get_running_loop() - hashed: str = await loop.run_in_executor( - None, lambda: bcrypt.hashpw(password_bytes, bcrypt.gensalt()).decode() + hashed: str = await run_blocking( + lambda: bcrypt.hashpw(password_bytes, bcrypt.gensalt()).decode() ) await settings_repo.set_setting(db, _KEY_PASSWORD_HASH, hashed) diff --git a/backend/app/utils/async_utils.py b/backend/app/utils/async_utils.py new file mode 100644 index 0000000..6ff902b --- /dev/null +++ b/backend/app/utils/async_utils.py @@ -0,0 +1,42 @@ +"""Async execution utilities. + +Provides a shared thread-backed executor abstraction and helpers for +running blocking callables without stalling the FastAPI event loop. +""" + +from __future__ import annotations + +import asyncio +from concurrent.futures import ThreadPoolExecutor +from typing import Callable, ParamSpec, TypeVar + +P = ParamSpec("P") +T = TypeVar("T") + +DEFAULT_BLOCKING_EXECUTOR: ThreadPoolExecutor = ThreadPoolExecutor( + max_workers=16, + thread_name_prefix="bangui-blocking", +) + + +async def run_blocking( + func: Callable[P, T], + *args: P.args, + executor: ThreadPoolExecutor | None = None, + **kwargs: P.kwargs, +) -> T: + """Run a blocking callable in the shared thread pool. + + Args: + func: Blocking callable to execute. + *args: Positional arguments for the callable. + executor: Optional custom executor. Defaults to the shared pool. + **kwargs: Keyword arguments for the callable. + + Returns: + The callable return value. + """ + loop = asyncio.get_running_loop() + if executor is None: + return await asyncio.to_thread(func, *args, **kwargs) + return await loop.run_in_executor(executor, func, *args, **kwargs) diff --git a/backend/app/utils/config_parser.py b/backend/app/utils/config_parser.py index 4202917..5033cbb 100644 --- a/backend/app/utils/config_parser.py +++ b/backend/app/utils/config_parser.py @@ -19,7 +19,7 @@ Provides a :class:`Fail2BanConfigParser` class that wraps Python's comments inside multi-line values are stripped by :meth:`split_multiline`. All methods are synchronous. Call from async contexts via -:func:`asyncio.get_event_loop().run_in_executor`. +:func:`app.utils.async_utils.run_blocking`. """ from __future__ import annotations diff --git a/backend/tests/test_utils/test_async_utils.py b/backend/tests/test_utils/test_async_utils.py new file mode 100644 index 0000000..fdded9c --- /dev/null +++ b/backend/tests/test_utils/test_async_utils.py @@ -0,0 +1,48 @@ +"""Tests for async_utils.""" + +from __future__ import annotations + +import asyncio +import time +from concurrent.futures import ThreadPoolExecutor + +from app.utils.async_utils import run_blocking + + +async def test_run_blocking_executes_callable_in_thread() -> None: + """run_blocking should execute the provided callable and return its result.""" + + def blocking_add(x: int, y: int) -> int: + return x + y + + result = await run_blocking(blocking_add, 3, 4) + assert result == 7 + + +async def test_run_blocking_accepts_custom_executor() -> None: + """run_blocking should use a provided executor when one is passed.""" + + def blocking_value() -> str: + return "ok" + + executor = ThreadPoolExecutor(max_workers=1) + try: + result = await run_blocking(blocking_value, executor=executor) + assert result == "ok" + finally: + executor.shutdown(wait=True) + + +async def test_run_blocking_does_not_block_event_loop() -> None: + """A blocking callable should not stall other async tasks.""" + + def sleep_block() -> str: + time.sleep(0.05) + return "done" + + task = asyncio.create_task(run_blocking(sleep_block)) + # Yield control to ensure the event loop can run other tasks while the + # blocking work executes in the thread pool. + await asyncio.sleep(0) + result = await task + assert result == "done"