"""Health-check background task. Registers an APScheduler job that probes the fail2ban socket every 30 seconds and stores the result on ``app.state.server_status``. The dashboard endpoint reads from this cache, keeping HTTP responses fast and the daemon connection decoupled from user-facing requests. """ from __future__ import annotations from typing import TYPE_CHECKING, Any import structlog from app.models.server import ServerStatus from app.services import health_service if TYPE_CHECKING: # pragma: no cover from fastapi import FastAPI log: structlog.stdlib.BoundLogger = structlog.get_logger() #: How often the probe fires (seconds). HEALTH_CHECK_INTERVAL: int = 30 async def _run_probe(app: Any) -> None: """Probe fail2ban and cache the result on *app.state*. This is the APScheduler job callback. It reads ``fail2ban_socket`` from ``app.state.settings``, runs the health probe, and writes the result to ``app.state.server_status``. Args: app: The :class:`fastapi.FastAPI` application instance passed by the scheduler via the ``kwargs`` mechanism. """ socket_path: str = app.state.settings.fail2ban_socket prev_status: ServerStatus = getattr( app.state, "server_status", ServerStatus(online=False) ) status: ServerStatus = await health_service.probe(socket_path) app.state.server_status = status # Log transitions between online and offline states. if status.online and not prev_status.online: log.info("fail2ban_came_online", version=status.version) elif not status.online and prev_status.online: log.warning("fail2ban_went_offline") log.debug( "health_check_complete", online=status.online, version=status.version, active_jails=status.active_jails, ) def register(app: FastAPI) -> None: """Add the health-check job to the application scheduler. Must be called after the scheduler has been started (i.e., inside the lifespan handler, after ``scheduler.start()``). Args: app: The :class:`fastapi.FastAPI` application instance whose ``app.state.scheduler`` will receive the job. """ # Initialise the cache with an offline placeholder so the dashboard # endpoint is always able to return a valid response even before the # first probe fires. app.state.server_status = ServerStatus(online=False) app.state.scheduler.add_job( _run_probe, trigger="interval", seconds=HEALTH_CHECK_INTERVAL, kwargs={"app": app}, id="health_check", replace_existing=True, # Fire immediately on startup too, so the UI isn't dark for 30 s. next_run_time=__import__("datetime").datetime.now( tz=__import__("datetime").timezone.utc ), ) log.info( "health_check_scheduled", interval_seconds=HEALTH_CHECK_INTERVAL, )