Update observability docs and task utilities
- Add Observability.md documentation - Standardize task logging with correlation_id support - Add log_sanitizer utility for PII masking - Update Tasks.md tracking - Update geo_cache tasks and other task modules with correlation_id Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -9,10 +9,16 @@ be updated at runtime through the blocklist router.
|
||||
The scheduler job ID is ``"blocklist_import"`` — using a stable id means
|
||||
re-registering the job (e.g. after a schedule update) safely replaces the
|
||||
existing entry without creating duplicates.
|
||||
|
||||
Correlation IDs are propagated through the task using
|
||||
:mod:`app.utils.correlation` so that task logs can be correlated with the
|
||||
triggering request (for manually triggered imports) or assigned a unique
|
||||
ID per run (for scheduled imports).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import structlog
|
||||
@@ -20,6 +26,7 @@ import structlog
|
||||
from app.services import ban_service, blocklist_service
|
||||
from app.tasks.db import task_db
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -37,18 +44,36 @@ JOB_ID: str = "blocklist_import"
|
||||
TASK_TIMEOUT_SECONDS: int = 300
|
||||
|
||||
|
||||
async def _run_import_with_resources(settings: Settings, http_session: ClientSession) -> None:
|
||||
async def _run_import_with_resources(
|
||||
settings: Settings,
|
||||
http_session: ClientSession,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""APScheduler callback that imports all enabled blocklist sources.
|
||||
|
||||
Args:
|
||||
settings: The resolved application settings used for database access.
|
||||
http_session: The shared aiohttp session used for blocklist downloads.
|
||||
correlation_id: Optional correlation ID from the triggering request.
|
||||
If ``None``, a new UUID4 is generated to identify this run.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_import_with_settings(settings, http_session)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_import_with_settings(settings: Settings, http_session: ClientSession) -> None:
|
||||
"""Inner import logic that runs with correlation context set."""
|
||||
|
||||
async def _do_import() -> None:
|
||||
socket_path: str = settings.fail2ban_socket
|
||||
|
||||
log.info("blocklist_import_starting")
|
||||
log.info("blocklist_import_starting", correlation_id=get_correlation_id())
|
||||
try:
|
||||
async with task_db(settings) as db:
|
||||
result = await blocklist_service.import_all(
|
||||
@@ -59,12 +84,13 @@ async def _run_import_with_resources(settings: Settings, http_session: ClientSes
|
||||
)
|
||||
log.info(
|
||||
"blocklist_import_finished",
|
||||
correlation_id=get_correlation_id(),
|
||||
total_imported=result.total_imported,
|
||||
total_skipped=result.total_skipped,
|
||||
errors=result.errors_count,
|
||||
)
|
||||
except Exception:
|
||||
log.exception("blocklist_import_unexpected_error")
|
||||
log.exception("blocklist_import_unexpected_error", correlation_id=get_correlation_id())
|
||||
|
||||
await run_with_timeout("blocklist_import", _do_import(), TASK_TIMEOUT_SECONDS)
|
||||
|
||||
|
||||
@@ -7,10 +7,14 @@ database file and maintains query performance on geo lookups.
|
||||
|
||||
When a stale IP is encountered again after purge, it will be re-resolved from
|
||||
the MaxMind database or ip-api.com (if configured), which is acceptable.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
@@ -19,6 +23,7 @@ import structlog
|
||||
from app.repositories import geo_cache_repo
|
||||
from app.tasks.db import task_db
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -41,7 +46,10 @@ JOB_ID: str = "geo_cache_cleanup"
|
||||
TASK_TIMEOUT_SECONDS: int = 60
|
||||
|
||||
|
||||
async def _run_cleanup_with_resources(settings: Settings) -> None:
|
||||
async def _run_cleanup_with_resources(
|
||||
settings: Settings,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Delete stale entries from the geo cache.
|
||||
|
||||
Calculates a cutoff timestamp (now - retention period) and removes all
|
||||
@@ -49,7 +57,20 @@ async def _run_cleanup_with_resources(settings: Settings) -> None:
|
||||
|
||||
Args:
|
||||
settings: The resolved application settings used for database access.
|
||||
correlation_id: Optional correlation ID for log correlation.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_cleanup_with_settings(settings)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_cleanup_with_settings(settings: Settings) -> None:
|
||||
"""Inner cleanup logic that runs with correlation context set."""
|
||||
|
||||
async def _do_cleanup() -> None:
|
||||
cutoff_dt = datetime.now(UTC) - timedelta(days=GEO_CACHE_RETENTION_DAYS)
|
||||
@@ -60,9 +81,19 @@ async def _run_cleanup_with_resources(settings: Settings) -> None:
|
||||
await db.commit()
|
||||
|
||||
if deleted > 0:
|
||||
log.info("geo_cache_cleanup_ran", deleted=deleted, retention_days=GEO_CACHE_RETENTION_DAYS)
|
||||
log.info(
|
||||
"geo_cache_cleanup_ran",
|
||||
correlation_id=get_correlation_id(),
|
||||
deleted=deleted,
|
||||
retention_days=GEO_CACHE_RETENTION_DAYS,
|
||||
)
|
||||
else:
|
||||
log.debug("geo_cache_cleanup_ran", deleted=deleted, retention_days=GEO_CACHE_RETENTION_DAYS)
|
||||
log.debug(
|
||||
"geo_cache_cleanup_ran",
|
||||
correlation_id=get_correlation_id(),
|
||||
deleted=deleted,
|
||||
retention_days=GEO_CACHE_RETENTION_DAYS,
|
||||
)
|
||||
|
||||
await run_with_timeout("geo_cache_cleanup", _do_cleanup(), TASK_TIMEOUT_SECONDS)
|
||||
|
||||
|
||||
@@ -7,16 +7,21 @@ After Task 2 removed geo cache writes from GET requests, newly resolved IPs
|
||||
are only held in the in-memory cache until this task flushes them. With the
|
||||
default 60-second interval, at most one minute of new resolution results is
|
||||
at risk on an unexpected process restart.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from app.tasks.db import task_db
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -37,20 +42,37 @@ JOB_ID: str = "geo_cache_flush"
|
||||
TASK_TIMEOUT_SECONDS: int = 60
|
||||
|
||||
|
||||
async def _run_flush_with_resources(geo_cache: GeoCache, settings: Settings) -> None:
|
||||
async def _run_flush_with_resources(
|
||||
geo_cache: GeoCache,
|
||||
settings: Settings,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Flush the geo cache dirty set to the application database.
|
||||
|
||||
Args:
|
||||
geo_cache: The application's GeoCache instance.
|
||||
settings: The resolved application settings used for database access.
|
||||
correlation_id: Optional correlation ID for log correlation.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_flush_with_resources(geo_cache, settings)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_flush_with_resources(geo_cache: GeoCache, settings: Settings) -> None:
|
||||
"""Inner flush logic that runs with correlation context set."""
|
||||
|
||||
async def _do_flush() -> None:
|
||||
async with task_db(settings) as db:
|
||||
count = await geo_cache.flush_dirty(db)
|
||||
|
||||
if count > 0:
|
||||
log.debug("geo_cache_flush_ran", flushed=count)
|
||||
log.debug("geo_cache_flush_ran", correlation_id=get_correlation_id(), flushed=count)
|
||||
|
||||
await run_with_timeout("geo_cache_flush", _do_flush(), TASK_TIMEOUT_SECONDS)
|
||||
|
||||
|
||||
@@ -13,16 +13,21 @@ The task runs every 10 minutes. On each invocation it:
|
||||
3. Delegates to :meth:`~app.services.geo_cache.GeoCache.lookup_batch` which
|
||||
already handles rate-limit throttling and retries.
|
||||
4. Logs how many IPs were retried and how many resolved successfully.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from app.tasks.db import task_db
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -45,7 +50,10 @@ TASK_TIMEOUT_SECONDS: int = 120
|
||||
|
||||
|
||||
async def _run_re_resolve_with_resources(
|
||||
geo_cache: GeoCache, settings: Settings, http_session: ClientSession
|
||||
geo_cache: GeoCache,
|
||||
settings: Settings,
|
||||
http_session: ClientSession,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Query NULL-country IPs from the database and re-resolve them.
|
||||
|
||||
@@ -53,7 +61,24 @@ async def _run_re_resolve_with_resources(
|
||||
geo_cache: The application's GeoCache instance.
|
||||
settings: The resolved application settings used for database access.
|
||||
http_session: The shared aiohttp session used for external lookups.
|
||||
correlation_id: Optional correlation ID for log correlation.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_re_resolve_with_resources(geo_cache, settings, http_session)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_re_resolve_with_resources(
|
||||
geo_cache: GeoCache,
|
||||
settings: Settings,
|
||||
http_session: ClientSession,
|
||||
) -> None:
|
||||
"""Inner re-resolve logic that runs with correlation context set."""
|
||||
|
||||
async def _do_re_resolve() -> None:
|
||||
async with task_db(settings) as db:
|
||||
@@ -61,10 +86,10 @@ async def _run_re_resolve_with_resources(
|
||||
unresolved_ips = await geo_cache.get_unresolved_ips(db)
|
||||
|
||||
if not unresolved_ips:
|
||||
log.debug("geo_re_resolve_skip", reason="no_unresolved_ips")
|
||||
log.debug("geo_re_resolve_skip", correlation_id=get_correlation_id(), reason="no_unresolved_ips")
|
||||
return
|
||||
|
||||
log.info("geo_re_resolve_start", unresolved=len(unresolved_ips))
|
||||
log.info("geo_re_resolve_start", correlation_id=get_correlation_id(), unresolved=len(unresolved_ips))
|
||||
|
||||
# Clear the negative cache so these IPs are eligible for fresh API calls.
|
||||
await geo_cache.clear_neg_cache()
|
||||
@@ -78,6 +103,7 @@ async def _run_re_resolve_with_resources(
|
||||
)
|
||||
log.info(
|
||||
"geo_re_resolve_complete",
|
||||
correlation_id=get_correlation_id(),
|
||||
retried=len(unresolved_ips),
|
||||
resolved=resolved_count,
|
||||
)
|
||||
|
||||
@@ -13,11 +13,15 @@ keys). If the health probe subsequently detects an online→offline transition
|
||||
within 60 seconds of that activation, a
|
||||
:class:`~app.models.config.PendingRecovery` record is written to
|
||||
``app.state.pending_recovery`` so the UI can offer a one-click rollback.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
@@ -25,6 +29,7 @@ import structlog
|
||||
from app.models.server import ServerStatus
|
||||
from app.services import health_service
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import (
|
||||
RuntimeState,
|
||||
get_effective_settings,
|
||||
@@ -47,13 +52,30 @@ HEALTH_CHECK_INTERVAL: int = 30
|
||||
HEALTH_PROBE_TIMEOUT_SECONDS: int = 10
|
||||
|
||||
|
||||
async def _run_probe_with_resources(settings: Settings, runtime_state: RuntimeState) -> None:
|
||||
async def _run_probe_with_resources(
|
||||
settings: Settings,
|
||||
runtime_state: RuntimeState,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Probe fail2ban and cache the result on the runtime state.
|
||||
|
||||
Args:
|
||||
settings: The resolved application settings used for the probe.
|
||||
runtime_state: The mutable runtime state manager.
|
||||
correlation_id: Optional correlation ID for log correlation.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_probe_with_resources(settings, runtime_state)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_probe_with_resources(settings: Settings, runtime_state: RuntimeState) -> None:
|
||||
"""Inner probe logic that runs with correlation context set."""
|
||||
|
||||
async def _do_probe() -> None:
|
||||
socket_path: str = settings.fail2ban_socket
|
||||
|
||||
@@ -2,11 +2,15 @@
|
||||
|
||||
Periodically copies new records from the fail2ban sqlite database into the
|
||||
BanGUI application archive table to prevent gaps when fail2ban purges old rows.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
@@ -14,6 +18,7 @@ import structlog
|
||||
from app.services import history_service
|
||||
from app.tasks.db import task_db
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -36,16 +41,32 @@ BACKFILL_WINDOW: int = 648000
|
||||
TASK_TIMEOUT_SECONDS: int = 60
|
||||
|
||||
|
||||
async def _run_sync_with_settings(settings: Settings) -> None:
|
||||
async def _run_sync_with_settings(
|
||||
settings: Settings,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Run the history sync with correlation ID context."""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_sync_with_settings(settings)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_sync_with_settings(settings: Settings) -> None:
|
||||
"""Inner sync logic that runs with correlation context set."""
|
||||
socket_path: str = settings.fail2ban_socket
|
||||
|
||||
async def _do_sync() -> None:
|
||||
try:
|
||||
async with task_db(settings) as db:
|
||||
synced = await history_service.sync_from_fail2ban_db(db, socket_path)
|
||||
log.info("history_sync_complete", synced=synced)
|
||||
log.info("history_sync_complete", correlation_id=get_correlation_id(), synced=synced)
|
||||
except Exception:
|
||||
log.exception("history_sync_failed")
|
||||
log.exception("history_sync_failed", correlation_id=get_correlation_id())
|
||||
|
||||
await run_with_timeout("history_sync", _do_sync(), TASK_TIMEOUT_SECONDS)
|
||||
|
||||
|
||||
@@ -8,15 +8,20 @@ consuming excessive memory.
|
||||
The cleanup is conservative: it only removes IPs with no recent attempts
|
||||
(all timestamps outside the rate-limit window), so active or recently-active
|
||||
IPs are preserved.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import FastAPI
|
||||
@@ -35,7 +40,10 @@ JOB_ID: str = "rate_limiter_cleanup"
|
||||
TASK_TIMEOUT_SECONDS: int = 5
|
||||
|
||||
|
||||
async def _run_cleanup(app: FastAPI) -> None:
|
||||
async def _run_cleanup(
|
||||
app: FastAPI,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Trigger cleanup of expired rate-limiter entries.
|
||||
|
||||
Cleans up both the login-specific rate limiter (exponential backoff)
|
||||
@@ -43,13 +51,27 @@ async def _run_cleanup(app: FastAPI) -> None:
|
||||
|
||||
Args:
|
||||
app: The FastAPI application instance (holds the rate limiters).
|
||||
correlation_id: Optional correlation ID for log correlation.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_cleanup_with_app(app)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_cleanup_with_app(app: FastAPI) -> None:
|
||||
"""Inner cleanup logic that runs with correlation context set."""
|
||||
|
||||
async def _do_cleanup() -> None:
|
||||
login_limiter = getattr(app.state, "login_rate_limiter", None)
|
||||
if login_limiter is None:
|
||||
log.warning(
|
||||
"rate_limiter_cleanup_skipped",
|
||||
correlation_id=get_correlation_id(),
|
||||
reason="login_rate_limiter not found on app.state",
|
||||
)
|
||||
else:
|
||||
@@ -59,6 +81,7 @@ async def _run_cleanup(app: FastAPI) -> None:
|
||||
if global_limiter is None:
|
||||
log.warning(
|
||||
"rate_limiter_cleanup_skipped",
|
||||
correlation_id=get_correlation_id(),
|
||||
reason="global_rate_limiter not found on app.state",
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -7,16 +7,21 @@ if the running instance experiences temporary delays or high load.
|
||||
Without this heartbeat, stale lock detection (based on TTL) could incorrectly
|
||||
determine that the scheduler instance has crashed when it's merely busy, and
|
||||
a new instance could take over.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from app.tasks.db import task_db
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
from app.utils.scheduler_lock import update_scheduler_lock_heartbeat
|
||||
|
||||
@@ -38,7 +43,10 @@ JOB_ID: str = "scheduler_lock_heartbeat"
|
||||
TASK_TIMEOUT_SECONDS: int = 5
|
||||
|
||||
|
||||
async def _update_heartbeat_with_resources(settings: Settings) -> None:
|
||||
async def _update_heartbeat_with_resources(
|
||||
settings: Settings,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Update the scheduler lock heartbeat timestamp.
|
||||
|
||||
If the heartbeat update fails (e.g., we no longer hold the lock), log
|
||||
@@ -51,17 +59,31 @@ async def _update_heartbeat_with_resources(settings: Settings) -> None:
|
||||
|
||||
Args:
|
||||
settings: The resolved application settings used for database access.
|
||||
correlation_id: Optional correlation ID from the triggering request.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_update_heartbeat_with_settings(settings)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_update_heartbeat_with_settings(settings: Settings) -> None:
|
||||
"""Inner heartbeat logic that runs with correlation context set."""
|
||||
|
||||
async def _do_update() -> None:
|
||||
async with task_db(settings) as db:
|
||||
success = await update_scheduler_lock_heartbeat(db)
|
||||
|
||||
if success:
|
||||
log.debug("scheduler_lock_heartbeat_updated")
|
||||
log.debug("scheduler_lock_heartbeat_updated", correlation_id=get_correlation_id())
|
||||
else:
|
||||
log.warning(
|
||||
"scheduler_lock_heartbeat_failed",
|
||||
correlation_id=get_correlation_id(),
|
||||
message="Failed to update heartbeat; we no longer hold the lock. "
|
||||
"Another instance may have taken over or the database connection failed.",
|
||||
)
|
||||
@@ -71,12 +93,14 @@ async def _update_heartbeat_with_resources(settings: Settings) -> None:
|
||||
except TimeoutError:
|
||||
log.error(
|
||||
"scheduler_lock_heartbeat_timeout",
|
||||
correlation_id=get_correlation_id(),
|
||||
timeout_seconds=TASK_TIMEOUT_SECONDS,
|
||||
message="Heartbeat update exceeded timeout. The database may be slow or unresponsive.",
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"scheduler_lock_heartbeat_error",
|
||||
correlation_id=get_correlation_id(),
|
||||
error=str(e),
|
||||
message="Unexpected error during heartbeat update.",
|
||||
)
|
||||
|
||||
@@ -6,10 +6,14 @@ months of operation and degrades query performance.
|
||||
|
||||
Individual expired sessions are removed on-demand when validated, but the bulk
|
||||
cleanup ensures comprehensive pruning at a predictable interval.
|
||||
|
||||
Correlation IDs are propagated through the task using :mod:`app.utils.correlation`
|
||||
so that task logs can be correlated across runs.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
@@ -17,6 +21,7 @@ import structlog
|
||||
from app.repositories import session_repo
|
||||
from app.tasks.db import task_db
|
||||
from app.tasks.timeout_utils import run_with_timeout
|
||||
from app.utils.correlation import get_correlation_id, reset_correlation_id, set_correlation_id
|
||||
from app.utils.runtime_state import get_effective_settings
|
||||
from app.utils.time_utils import utc_now
|
||||
|
||||
@@ -37,19 +42,40 @@ JOB_ID: str = "session_cleanup"
|
||||
TASK_TIMEOUT_SECONDS: int = 30
|
||||
|
||||
|
||||
async def _run_cleanup_with_resources(settings: Settings) -> None:
|
||||
async def _run_cleanup_with_resources(
|
||||
settings: Settings,
|
||||
correlation_id: str | None = None,
|
||||
) -> None:
|
||||
"""Delete all expired sessions from the database.
|
||||
|
||||
Args:
|
||||
settings: The resolved application settings used for database access.
|
||||
correlation_id: Optional correlation ID from the triggering request.
|
||||
"""
|
||||
if correlation_id is None:
|
||||
correlation_id = str(uuid.uuid4())
|
||||
|
||||
token = set_correlation_id(correlation_id)
|
||||
try:
|
||||
await _do_cleanup_with_settings(settings)
|
||||
finally:
|
||||
reset_correlation_id(token)
|
||||
|
||||
|
||||
async def _do_cleanup_with_settings(settings: Settings) -> None:
|
||||
"""Inner cleanup logic that runs with correlation context set."""
|
||||
|
||||
async def _do_cleanup() -> None:
|
||||
now_iso = utc_now().isoformat()
|
||||
async with task_db(settings) as db:
|
||||
deleted_count = await session_repo.delete_expired_sessions(db, now_iso)
|
||||
|
||||
log.info("session_cleanup_ran", deleted_count=deleted_count, cutoff_time=now_iso)
|
||||
log.info(
|
||||
"session_cleanup_ran",
|
||||
correlation_id=get_correlation_id(),
|
||||
deleted_count=deleted_count,
|
||||
cutoff_time=now_iso,
|
||||
)
|
||||
|
||||
await run_with_timeout("session_cleanup", _do_cleanup(), TASK_TIMEOUT_SECONDS)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user