"""External logging handlers for centralized log aggregation. Supports shipping logs to Datadog, Papertrail, and Elasticsearch in non-blocking fashion using async buffering and batching. """ from __future__ import annotations import asyncio import contextlib import json from abc import ABC, abstractmethod from collections import deque from datetime import datetime from typing import TYPE_CHECKING, Any, Literal if TYPE_CHECKING: from aiohttp import ClientSession import structlog log: structlog.stdlib.BoundLogger = structlog.get_logger() class ExternalLogHandler(ABC): """Base class for external log handlers.""" def __init__( self, batch_size: int = 10, flush_interval_seconds: float = 5.0, buffer_size: int = 1000, ) -> None: """Initialize handler with batching configuration. Args: batch_size: Number of records to batch before sending. flush_interval_seconds: Maximum time to buffer before flushing. buffer_size: Maximum records to buffer in memory. """ self.batch_size = batch_size self.flush_interval_seconds = flush_interval_seconds self.buffer_size = buffer_size self.buffer: deque[dict[str, Any]] = deque(maxlen=buffer_size) self._flush_task: asyncio.Task[None] | None = None self._shutdown_event = asyncio.Event() def queue_log(self, record: dict[str, Any]) -> None: """Queue a log record for batching. Args: record: The log record dictionary to queue. """ if len(self.buffer) >= self.buffer_size: dropped = self.buffer[0] self.buffer.append(record) log.warning( "external_log_buffer_full", provider=self.__class__.__name__, dropped_event=dropped.get("event", "unknown"), ) else: self.buffer.append(record) if len(self.buffer) >= self.batch_size: asyncio.create_task(self.flush()) async def flush(self) -> None: """Send all buffered logs to the external system.""" if not self.buffer: return batch = list(self.buffer) self.buffer.clear() try: await self._send_batch(batch) log.debug( "external_log_batch_sent", provider=self.__class__.__name__, batch_size=len(batch), ) except Exception as exc: # noqa: BLE001 log.warning( "external_log_send_failed", provider=self.__class__.__name__, batch_size=len(batch), error=str(exc), ) @abstractmethod async def _send_batch(self, batch: list[dict[str, Any]]) -> None: """Send a batch of logs to the external system. Args: batch: List of log records to send. Raises: Exception: If sending fails. """ pass def start_periodic_flush(self) -> None: """Start periodic flushing task.""" if self._flush_task is None or self._flush_task.done(): self._flush_task = asyncio.create_task(self._periodic_flush_loop()) async def _periodic_flush_loop(self) -> None: """Periodically flush buffered logs.""" try: while not self._shutdown_event.is_set(): await asyncio.sleep(self.flush_interval_seconds) await self.flush() except asyncio.CancelledError: pass async def shutdown(self) -> None: """Shutdown handler and flush remaining logs.""" self._shutdown_event.set() if self._flush_task is not None: try: await asyncio.wait_for(self._flush_task, timeout=5.0) except TimeoutError: self._flush_task.cancel() await self.flush() class DatadogLogHandler(ExternalLogHandler): """Handler for shipping logs to Datadog via HTTP API.""" def __init__( self, api_key: str, site: str = "datadoghq.com", batch_size: int = 10, flush_interval_seconds: float = 5.0, buffer_size: int = 1000, http_session: ClientSession | None = None, ) -> None: """Initialize Datadog handler. Args: api_key: Datadog API key. site: Datadog site (datadoghq.com or datadoghq.eu). batch_size: Number of records to batch before sending. flush_interval_seconds: Maximum time to buffer before flushing. buffer_size: Maximum records to buffer in memory. http_session: Optional aiohttp ClientSession for requests. """ super().__init__(batch_size, flush_interval_seconds, buffer_size) self.api_key = api_key self.site = site self.http_session = http_session self._url = f"https://http-intake.logs.{site}/v1/input/{api_key}" async def _send_batch(self, batch: list[dict[str, Any]]) -> None: """Send logs to Datadog. Args: batch: List of log records. Raises: Exception: If the request fails. """ if not self.http_session: return import aiohttp for log_record in batch: payload = json.dumps(log_record).encode("utf-8") headers = {"Content-Type": "application/json"} try: async with self.http_session.post( self._url, data=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as response: if response.status not in (200, 204): raise Exception( f"Datadog API returned {response.status}: " f"{await response.text()}" ) except TimeoutError as exc: raise Exception("Request to Datadog API timed out") from exc class PapertrailLogHandler(ExternalLogHandler): """Handler for shipping logs to Papertrail via Syslog protocol.""" def __init__( self, host: str, port: int, program_name: str = "bangui", batch_size: int = 10, flush_interval_seconds: float = 5.0, buffer_size: int = 1000, ) -> None: """Initialize Papertrail handler. Args: host: Papertrail host address. port: Papertrail port number. program_name: Program name for Syslog. batch_size: Number of records to batch before sending. flush_interval_seconds: Maximum time to buffer before flushing. buffer_size: Maximum records to buffer in memory. """ super().__init__(batch_size, flush_interval_seconds, buffer_size) self.host = host self.port = port self.program_name = program_name async def _send_batch(self, batch: list[dict[str, Any]]) -> None: """Send logs to Papertrail via Syslog. Args: batch: List of log records. Raises: Exception: If sending fails. """ try: reader, writer = await asyncio.open_connection(self.host, self.port) except (TimeoutError, OSError) as exc: raise Exception(f"Failed to connect to Papertrail {self.host}:{self.port}") from exc try: for log_record in batch: severity = self._severity_from_level(log_record.get("level", "info")) facility = 16 # local0 priority = (facility << 3) | severity timestamp = datetime.now().isoformat() hostname = "bangui" event = log_record.get("event", "application_log") syslog_msg = ( f"<{priority}>{timestamp} {hostname} " f"{self.program_name}: {event} {json.dumps(log_record)}\n" ) writer.write(syslog_msg.encode("utf-8")) await writer.drain() finally: writer.close() with contextlib.suppress(Exception): await writer.wait_closed() @staticmethod def _severity_from_level(level: str) -> int: """Convert log level to Syslog severity. Args: level: Log level (debug, info, warning, error, critical). Returns: Syslog severity (0-7). """ level_map = { "debug": 7, "info": 6, "warning": 4, "error": 3, "critical": 2, } return level_map.get(level.lower(), 6) class ElasticsearchLogHandler(ExternalLogHandler): """Handler for shipping logs to Elasticsearch via HTTP API.""" def __init__( self, hosts: list[str], index_prefix: str = "bangui", batch_size: int = 10, flush_interval_seconds: float = 5.0, buffer_size: int = 1000, http_session: ClientSession | None = None, ) -> None: """Initialize Elasticsearch handler. Args: hosts: List of Elasticsearch host URLs. index_prefix: Prefix for index names. batch_size: Number of records to batch before sending. flush_interval_seconds: Maximum time to buffer before flushing. buffer_size: Maximum records to buffer in memory. http_session: Optional aiohttp ClientSession for requests. """ super().__init__(batch_size, flush_interval_seconds, buffer_size) self.hosts = hosts self.index_prefix = index_prefix self.http_session = http_session self._host_index = 0 async def _send_batch(self, batch: list[dict[str, Any]]) -> None: """Send logs to Elasticsearch. Args: batch: List of log records. Raises: Exception: If the request fails. """ if not self.http_session or not self.hosts: return import aiohttp host = self.hosts[self._host_index] self._host_index = (self._host_index + 1) % len(self.hosts) now = datetime.now() index_name = f"{self.index_prefix}-{now.strftime('%Y.%m.%d')}" bulk_body = "" for log_record in batch: metadata = json.dumps({"index": {"_index": index_name}}) document = json.dumps(log_record) bulk_body += f"{metadata}\n{document}\n" try: async with self.http_session.post( f"{host.rstrip('/')}/_bulk", data=bulk_body.encode("utf-8"), headers={"Content-Type": "application/x-ndjson"}, timeout=aiohttp.ClientTimeout(total=10), ) as response: if response.status not in (200, 204): raise Exception( f"Elasticsearch API returned {response.status}: " f"{await response.text()}" ) except TimeoutError as exc: raise Exception("Request to Elasticsearch timed out") from exc def create_external_log_handler( provider: Literal["datadog", "papertrail", "elasticsearch"], api_key: str | None = None, datadog_site: str = "datadoghq.com", datadog_batch_size: int = 10, papertrail_host: str | None = None, papertrail_port: int | None = None, papertrail_program_name: str = "bangui", elasticsearch_hosts: list[str] | None = None, elasticsearch_index_prefix: str = "bangui", elasticsearch_batch_size: int = 10, batch_size: int = 10, flush_interval_seconds: float = 5.0, buffer_size: int = 1000, http_session: ClientSession | None = None, ) -> ExternalLogHandler | None: """Factory function to create an external log handler. Args: provider: Logging provider (datadog, papertrail, elasticsearch). api_key: API key for Datadog. datadog_site: Datadog site (datadoghq.com or datadoghq.eu). datadog_batch_size: Datadog batch size. papertrail_host: Papertrail host address. papertrail_port: Papertrail port number. papertrail_program_name: Program name for Papertrail Syslog. elasticsearch_hosts: List of Elasticsearch host URLs. elasticsearch_index_prefix: Elasticsearch index prefix. elasticsearch_batch_size: Elasticsearch batch size. batch_size: Default batch size (unused, provider-specific batch sizes take precedence). flush_interval_seconds: Flush interval for all providers. buffer_size: Buffer size for all providers. http_session: Optional aiohttp ClientSession. Returns: An ExternalLogHandler instance or None if provider is unknown. Raises: ValueError: If required parameters are missing for the provider. """ if provider == "datadog": if not api_key: raise ValueError("api_key is required for datadog provider") return DatadogLogHandler( api_key=api_key, site=datadog_site, batch_size=datadog_batch_size, flush_interval_seconds=flush_interval_seconds, buffer_size=buffer_size, http_session=http_session, ) elif provider == "papertrail": if not papertrail_host or not papertrail_port: raise ValueError( "papertrail_host and papertrail_port are required for papertrail provider" ) return PapertrailLogHandler( host=papertrail_host, port=papertrail_port, program_name=papertrail_program_name, batch_size=batch_size, flush_interval_seconds=flush_interval_seconds, buffer_size=buffer_size, ) elif provider == "elasticsearch": if not elasticsearch_hosts: raise ValueError("elasticsearch_hosts is required for elasticsearch provider") return ElasticsearchLogHandler( hosts=elasticsearch_hosts, index_prefix=elasticsearch_index_prefix, batch_size=elasticsearch_batch_size, flush_interval_seconds=flush_interval_seconds, buffer_size=buffer_size, http_session=http_session, ) else: return None