"""Rate limiter cleanup background task. Registers an APScheduler job that periodically removes expired rate-limit entries from the in-memory rate limiter. Without this cleanup, the rate-limiter state dictionary grows unbounded over long runtimes, eventually consuming excessive memory. The cleanup is conservative: it only removes IPs with no recent attempts (all timestamps outside the rate-limit window), so active or recently-active IPs are preserved. """ from __future__ import annotations from typing import TYPE_CHECKING import structlog if TYPE_CHECKING: from fastapi import FastAPI log: structlog.stdlib.BoundLogger = structlog.get_logger() #: How often the cleanup job fires (seconds). Chosen to balance memory #: management against CPU overhead. A 30-minute interval handles typical #: brute-force attack patterns while staying lightweight. RATE_LIMITER_CLEANUP_INTERVAL: int = 30 * 60 # 30 minutes #: Stable APScheduler job ID — ensures re-registration replaces, not duplicates. JOB_ID: str = "rate_limiter_cleanup" def _run_cleanup(app: FastAPI) -> None: """Trigger cleanup of expired rate-limiter entries. Args: app: The FastAPI application instance (holds the rate limiter). """ rate_limiter = getattr(app.state, "login_rate_limiter", None) if rate_limiter is None: log.warning( "rate_limiter_cleanup_skipped", reason="rate_limiter not found on app.state", ) return rate_limiter.cleanup_expired() def register(app: FastAPI) -> None: """Add (or replace) the rate-limiter cleanup job in the application scheduler. Must be called after the scheduler has been started (i.e., inside the lifespan handler, after ``scheduler.start()``). Args: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ app.state.scheduler.add_job( _run_cleanup, trigger="interval", seconds=RATE_LIMITER_CLEANUP_INTERVAL, kwargs={"app": app}, id=JOB_ID, replace_existing=True, ) log.info( "rate_limiter_cleanup_scheduled", interval_seconds=RATE_LIMITER_CLEANUP_INTERVAL, )