- Replace contextlib.suppress with try/except + warning log - Add test for fail2ban client - Remove stale Issue #21 from Tasks.md (indexes) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
418 lines
14 KiB
Python
418 lines
14 KiB
Python
"""External logging handlers for centralized log aggregation.
|
|
|
|
Supports shipping logs to Datadog, Papertrail, and Elasticsearch in non-blocking
|
|
fashion using async buffering and batching.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from abc import ABC, abstractmethod
|
|
from collections import deque
|
|
from datetime import datetime
|
|
from typing import TYPE_CHECKING, Any, Literal
|
|
|
|
if TYPE_CHECKING:
|
|
from aiohttp import ClientSession
|
|
|
|
import structlog
|
|
|
|
log: structlog.stdlib.BoundLogger = structlog.get_logger()
|
|
|
|
|
|
class ExternalLogHandler(ABC):
|
|
"""Base class for external log handlers."""
|
|
|
|
def __init__(
|
|
self,
|
|
batch_size: int = 10,
|
|
flush_interval_seconds: float = 5.0,
|
|
buffer_size: int = 1000,
|
|
) -> None:
|
|
"""Initialize handler with batching configuration.
|
|
|
|
Args:
|
|
batch_size: Number of records to batch before sending.
|
|
flush_interval_seconds: Maximum time to buffer before flushing.
|
|
buffer_size: Maximum records to buffer in memory.
|
|
"""
|
|
self.batch_size = batch_size
|
|
self.flush_interval_seconds = flush_interval_seconds
|
|
self.buffer_size = buffer_size
|
|
self.buffer: deque[dict[str, Any]] = deque(maxlen=buffer_size)
|
|
self._flush_task: asyncio.Task[None] | None = None
|
|
self._shutdown_event = asyncio.Event()
|
|
|
|
def queue_log(self, record: dict[str, Any]) -> None:
|
|
"""Queue a log record for batching.
|
|
|
|
Args:
|
|
record: The log record dictionary to queue.
|
|
"""
|
|
if len(self.buffer) >= self.buffer_size:
|
|
dropped = self.buffer[0]
|
|
self.buffer.append(record)
|
|
log.warning(
|
|
"external_log_buffer_full",
|
|
provider=self.__class__.__name__,
|
|
dropped_event=dropped.get("event", "unknown"),
|
|
)
|
|
else:
|
|
self.buffer.append(record)
|
|
|
|
if len(self.buffer) >= self.batch_size:
|
|
asyncio.create_task(self.flush())
|
|
|
|
async def flush(self) -> None:
|
|
"""Send all buffered logs to the external system."""
|
|
if not self.buffer:
|
|
return
|
|
|
|
batch = list(self.buffer)
|
|
self.buffer.clear()
|
|
|
|
try:
|
|
await self._send_batch(batch)
|
|
log.debug(
|
|
"external_log_batch_sent",
|
|
provider=self.__class__.__name__,
|
|
batch_size=len(batch),
|
|
)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.warning(
|
|
"external_log_send_failed",
|
|
provider=self.__class__.__name__,
|
|
batch_size=len(batch),
|
|
error=str(exc),
|
|
)
|
|
|
|
@abstractmethod
|
|
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
|
|
"""Send a batch of logs to the external system.
|
|
|
|
Args:
|
|
batch: List of log records to send.
|
|
|
|
Raises:
|
|
Exception: If sending fails.
|
|
"""
|
|
pass
|
|
|
|
def start_periodic_flush(self) -> None:
|
|
"""Start periodic flushing task."""
|
|
if self._flush_task is None or self._flush_task.done():
|
|
self._flush_task = asyncio.create_task(self._periodic_flush_loop())
|
|
|
|
async def _periodic_flush_loop(self) -> None:
|
|
"""Periodically flush buffered logs."""
|
|
try:
|
|
while not self._shutdown_event.is_set():
|
|
await asyncio.sleep(self.flush_interval_seconds)
|
|
await self.flush()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Shutdown handler and flush remaining logs."""
|
|
self._shutdown_event.set()
|
|
if self._flush_task is not None:
|
|
try:
|
|
await asyncio.wait_for(self._flush_task, timeout=5.0)
|
|
except TimeoutError:
|
|
self._flush_task.cancel()
|
|
await self.flush()
|
|
|
|
|
|
class DatadogLogHandler(ExternalLogHandler):
|
|
"""Handler for shipping logs to Datadog via HTTP API."""
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: str,
|
|
site: str = "datadoghq.com",
|
|
batch_size: int = 10,
|
|
flush_interval_seconds: float = 5.0,
|
|
buffer_size: int = 1000,
|
|
http_session: ClientSession | None = None,
|
|
) -> None:
|
|
"""Initialize Datadog handler.
|
|
|
|
Args:
|
|
api_key: Datadog API key.
|
|
site: Datadog site (datadoghq.com or datadoghq.eu).
|
|
batch_size: Number of records to batch before sending.
|
|
flush_interval_seconds: Maximum time to buffer before flushing.
|
|
buffer_size: Maximum records to buffer in memory.
|
|
http_session: Optional aiohttp ClientSession for requests.
|
|
"""
|
|
super().__init__(batch_size, flush_interval_seconds, buffer_size)
|
|
self.api_key = api_key
|
|
self.site = site
|
|
self.http_session = http_session
|
|
self._url = f"https://http-intake.logs.{site}/v1/input/{api_key}"
|
|
|
|
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
|
|
"""Send logs to Datadog.
|
|
|
|
Args:
|
|
batch: List of log records.
|
|
|
|
Raises:
|
|
Exception: If the request fails.
|
|
"""
|
|
if not self.http_session:
|
|
return
|
|
|
|
import aiohttp
|
|
|
|
for log_record in batch:
|
|
payload = json.dumps(log_record).encode("utf-8")
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
try:
|
|
async with self.http_session.post(
|
|
self._url, data=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
|
|
) as response:
|
|
if response.status not in (200, 204):
|
|
raise Exception(
|
|
f"Datadog API returned {response.status}: "
|
|
f"{await response.text()}"
|
|
)
|
|
except TimeoutError as exc:
|
|
raise Exception("Request to Datadog API timed out") from exc
|
|
|
|
|
|
class PapertrailLogHandler(ExternalLogHandler):
|
|
"""Handler for shipping logs to Papertrail via Syslog protocol."""
|
|
|
|
def __init__(
|
|
self,
|
|
host: str,
|
|
port: int,
|
|
program_name: str = "bangui",
|
|
batch_size: int = 10,
|
|
flush_interval_seconds: float = 5.0,
|
|
buffer_size: int = 1000,
|
|
) -> None:
|
|
"""Initialize Papertrail handler.
|
|
|
|
Args:
|
|
host: Papertrail host address.
|
|
port: Papertrail port number.
|
|
program_name: Program name for Syslog.
|
|
batch_size: Number of records to batch before sending.
|
|
flush_interval_seconds: Maximum time to buffer before flushing.
|
|
buffer_size: Maximum records to buffer in memory.
|
|
"""
|
|
super().__init__(batch_size, flush_interval_seconds, buffer_size)
|
|
self.host = host
|
|
self.port = port
|
|
self.program_name = program_name
|
|
|
|
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
|
|
"""Send logs to Papertrail via Syslog.
|
|
|
|
Args:
|
|
batch: List of log records.
|
|
|
|
Raises:
|
|
Exception: If sending fails.
|
|
"""
|
|
try:
|
|
reader, writer = await asyncio.open_connection(self.host, self.port)
|
|
except (TimeoutError, OSError) as exc:
|
|
raise Exception(f"Failed to connect to Papertrail {self.host}:{self.port}") from exc
|
|
|
|
try:
|
|
for log_record in batch:
|
|
severity = self._severity_from_level(log_record.get("level", "info"))
|
|
facility = 16 # local0
|
|
priority = (facility << 3) | severity
|
|
|
|
timestamp = datetime.now().isoformat()
|
|
hostname = "bangui"
|
|
event = log_record.get("event", "application_log")
|
|
|
|
syslog_msg = (
|
|
f"<{priority}>{timestamp} {hostname} "
|
|
f"{self.program_name}: {event} {json.dumps(log_record)}\n"
|
|
)
|
|
|
|
writer.write(syslog_msg.encode("utf-8"))
|
|
await writer.drain()
|
|
finally:
|
|
writer.close()
|
|
try:
|
|
await writer.wait_closed()
|
|
except Exception as e:
|
|
log.warning("external_logging_writer_close_error", error=str(e))
|
|
|
|
@staticmethod
|
|
def _severity_from_level(level: str) -> int:
|
|
"""Convert log level to Syslog severity.
|
|
|
|
Args:
|
|
level: Log level (debug, info, warning, error, critical).
|
|
|
|
Returns:
|
|
Syslog severity (0-7).
|
|
"""
|
|
level_map = {
|
|
"debug": 7,
|
|
"info": 6,
|
|
"warning": 4,
|
|
"error": 3,
|
|
"critical": 2,
|
|
}
|
|
return level_map.get(level.lower(), 6)
|
|
|
|
|
|
class ElasticsearchLogHandler(ExternalLogHandler):
|
|
"""Handler for shipping logs to Elasticsearch via HTTP API."""
|
|
|
|
def __init__(
|
|
self,
|
|
hosts: list[str],
|
|
index_prefix: str = "bangui",
|
|
batch_size: int = 10,
|
|
flush_interval_seconds: float = 5.0,
|
|
buffer_size: int = 1000,
|
|
http_session: ClientSession | None = None,
|
|
) -> None:
|
|
"""Initialize Elasticsearch handler.
|
|
|
|
Args:
|
|
hosts: List of Elasticsearch host URLs.
|
|
index_prefix: Prefix for index names.
|
|
batch_size: Number of records to batch before sending.
|
|
flush_interval_seconds: Maximum time to buffer before flushing.
|
|
buffer_size: Maximum records to buffer in memory.
|
|
http_session: Optional aiohttp ClientSession for requests.
|
|
"""
|
|
super().__init__(batch_size, flush_interval_seconds, buffer_size)
|
|
self.hosts = hosts
|
|
self.index_prefix = index_prefix
|
|
self.http_session = http_session
|
|
self._host_index = 0
|
|
|
|
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
|
|
"""Send logs to Elasticsearch.
|
|
|
|
Args:
|
|
batch: List of log records.
|
|
|
|
Raises:
|
|
Exception: If the request fails.
|
|
"""
|
|
if not self.http_session or not self.hosts:
|
|
return
|
|
|
|
import aiohttp
|
|
|
|
host = self.hosts[self._host_index]
|
|
self._host_index = (self._host_index + 1) % len(self.hosts)
|
|
|
|
now = datetime.now()
|
|
index_name = f"{self.index_prefix}-{now.strftime('%Y.%m.%d')}"
|
|
|
|
bulk_body = ""
|
|
for log_record in batch:
|
|
metadata = json.dumps({"index": {"_index": index_name}})
|
|
document = json.dumps(log_record)
|
|
bulk_body += f"{metadata}\n{document}\n"
|
|
|
|
try:
|
|
async with self.http_session.post(
|
|
f"{host.rstrip('/')}/_bulk",
|
|
data=bulk_body.encode("utf-8"),
|
|
headers={"Content-Type": "application/x-ndjson"},
|
|
timeout=aiohttp.ClientTimeout(total=10),
|
|
) as response:
|
|
if response.status not in (200, 204):
|
|
raise Exception(
|
|
f"Elasticsearch API returned {response.status}: "
|
|
f"{await response.text()}"
|
|
)
|
|
except TimeoutError as exc:
|
|
raise Exception("Request to Elasticsearch timed out") from exc
|
|
|
|
|
|
def create_external_log_handler(
|
|
provider: Literal["datadog", "papertrail", "elasticsearch"],
|
|
api_key: str | None = None,
|
|
datadog_site: str = "datadoghq.com",
|
|
datadog_batch_size: int = 10,
|
|
papertrail_host: str | None = None,
|
|
papertrail_port: int | None = None,
|
|
papertrail_program_name: str = "bangui",
|
|
elasticsearch_hosts: list[str] | None = None,
|
|
elasticsearch_index_prefix: str = "bangui",
|
|
elasticsearch_batch_size: int = 10,
|
|
batch_size: int = 10,
|
|
flush_interval_seconds: float = 5.0,
|
|
buffer_size: int = 1000,
|
|
http_session: ClientSession | None = None,
|
|
) -> ExternalLogHandler | None:
|
|
"""Factory function to create an external log handler.
|
|
|
|
Args:
|
|
provider: Logging provider (datadog, papertrail, elasticsearch).
|
|
api_key: API key for Datadog.
|
|
datadog_site: Datadog site (datadoghq.com or datadoghq.eu).
|
|
datadog_batch_size: Datadog batch size.
|
|
papertrail_host: Papertrail host address.
|
|
papertrail_port: Papertrail port number.
|
|
papertrail_program_name: Program name for Papertrail Syslog.
|
|
elasticsearch_hosts: List of Elasticsearch host URLs.
|
|
elasticsearch_index_prefix: Elasticsearch index prefix.
|
|
elasticsearch_batch_size: Elasticsearch batch size.
|
|
batch_size: Default batch size (unused, provider-specific batch sizes take precedence).
|
|
flush_interval_seconds: Flush interval for all providers.
|
|
buffer_size: Buffer size for all providers.
|
|
http_session: Optional aiohttp ClientSession.
|
|
|
|
Returns:
|
|
An ExternalLogHandler instance or None if provider is unknown.
|
|
|
|
Raises:
|
|
ValueError: If required parameters are missing for the provider.
|
|
"""
|
|
if provider == "datadog":
|
|
if not api_key:
|
|
raise ValueError("api_key is required for datadog provider")
|
|
return DatadogLogHandler(
|
|
api_key=api_key,
|
|
site=datadog_site,
|
|
batch_size=datadog_batch_size,
|
|
flush_interval_seconds=flush_interval_seconds,
|
|
buffer_size=buffer_size,
|
|
http_session=http_session,
|
|
)
|
|
elif provider == "papertrail":
|
|
if not papertrail_host or not papertrail_port:
|
|
raise ValueError(
|
|
"papertrail_host and papertrail_port are required for papertrail provider"
|
|
)
|
|
return PapertrailLogHandler(
|
|
host=papertrail_host,
|
|
port=papertrail_port,
|
|
program_name=papertrail_program_name,
|
|
batch_size=batch_size,
|
|
flush_interval_seconds=flush_interval_seconds,
|
|
buffer_size=buffer_size,
|
|
)
|
|
elif provider == "elasticsearch":
|
|
if not elasticsearch_hosts:
|
|
raise ValueError("elasticsearch_hosts is required for elasticsearch provider")
|
|
return ElasticsearchLogHandler(
|
|
hosts=elasticsearch_hosts,
|
|
index_prefix=elasticsearch_index_prefix,
|
|
batch_size=elasticsearch_batch_size,
|
|
flush_interval_seconds=flush_interval_seconds,
|
|
buffer_size=buffer_size,
|
|
http_session=http_session,
|
|
)
|
|
else:
|
|
return None
|