diff --git a/Docs/Observability.md b/Docs/Observability.md new file mode 100644 index 0000000..87c6d21 --- /dev/null +++ b/Docs/Observability.md @@ -0,0 +1,482 @@ +# Observability + +BanGUI provides comprehensive observability through structured logging, metrics, and tracing capabilities. This document outlines the observability architecture and how to configure it for production deployments. + +--- + +## Logging Architecture + +### Overview + +BanGUI uses **structlog** to emit structured, machine-readable logs in JSON format. All logs are automatically enriched with: + +- **Timestamps** in ISO 8601 format (`timestamp`) +- **Log levels** (`level` - debug, info, warning, error, critical) +- **Logger names** (`logger_name`) +- **Correlation IDs** for request tracking (`correlation_id`) +- **Custom context** from business logic (via context variables) + +### Log Output + +By default, logs are written to **stdout** in JSON format, making them suitable for: +- Container environments (Docker, Kubernetes) +- Log aggregation systems (ELK, Datadog, Papertrail) +- CI/CD pipelines and monitoring platforms + +```bash +# Example log output (formatted for readability) +{ + "timestamp": "2024-05-01T18:17:19.080+02:00", + "level": "info", + "logger_name": "app.main", + "event": "bangui_starting_up", + "database_path": "/var/lib/bangui/bangui.db", + "pid": 1234 +} +``` + +### Sensitive Data Handling + +**CRITICAL: Never log sensitive data.** The following must NEVER appear in logs: + +- Session tokens or cookies +- API keys or secrets +- Passwords or password hashes +- Private cryptographic keys +- Personal information (PII) +- Full IP addresses (when not required for security auditing) + +When logging authentication or sensitive operations: + +```python +# ✓ Correct: Log event type and result, not credentials +log.info("user_login_attempt", username=username, ip=client_ip, success=True) + +# ✓ Correct: Log sanitized identifiers +log.error("auth_token_validation_failed", token_hash=hashlib.sha256(token).hexdigest()[:16]) + +# ✗ WRONG: Don't do this +log.debug("raw_token", token=token) # Never! +log.info("password_check", password=password_hash) # Never! +``` + +Structlog provides context variable filtering to prevent accidental logging of sensitive data. Code reviews must verify compliance with this rule. + +--- + +## Structured Logging Best Practices + +### Log Levels + +Use log levels consistently: + +| Level | Use Case | Example | +|-------|----------|---------| +| **debug** | Verbose diagnostic information | `log.debug("parsing_config_file", lines=1024)` | +| **info** | Operational events | `log.info("jail_created", jail_name="sshd", action_count=3)` | +| **warning** | Recoverable issues | `log.warning("config_reload_skipped", reason="no_changes")` | +| **error** | Failures that impact functionality | `log.error("fail2ban_connection_lost", error=str(e))` | +| **critical** | System failures | `log.critical("database_corrupted", error=str(e))` | + +### Context Variables + +Use structlog's context variables to automatically include request-scoped information in all logs within a request: + +```python +import structlog + +log = structlog.get_logger() + +# In middleware or early in request processing +structlog.contextvars.clear_contextvars() +structlog.contextvars.bind_contextvars( + correlation_id=request_id, + user_id=user_id, + client_ip=client_ip, +) + +# All subsequent logs in this request will include these context variables +log.info("user_action", action="create_jail") # Automatically includes correlation_id, user_id, etc. + +# Clear context at end of request +structlog.contextvars.clear_contextvars() +``` + +### Event Naming Convention + +Use snake_case for event names, prefixed with the component or module name: + +```python +# ✓ Good naming +log.info("service_initialized", service="BanService", version="1.0") +log.warning("blocklist_import_slow", duration_ms=5000) +log.error("fail2ban_command_failed", command="list", exit_code=1) + +# ✗ Bad naming +log.info("init") # Too generic +log.warning("slow operation") # Not machine-readable +log.error("ERROR: FAIL2BAN FAILED!") # Inconsistent formatting +``` + +### Attaching Structured Data + +Always provide context as key-value pairs, not as unstructured strings: + +```python +# ✓ Correct: Structured, queryable +log.info( + "ban_executed", + jail="sshd", + ip="192.0.2.1", + duration_seconds=3600, + reason="brute_force", +) + +# ✗ Wrong: Unstructured, hard to query +log.info(f"Banned {ip} in jail {jail} for 3600 seconds because brute_force") +``` + +--- + +## Centralized Logging Configuration + +### Environment Variables + +External logging is configured via environment variables (all prefixed with `BANGUI_`): + +#### Datadog + +Enable logging to Datadog via HTTP API: + +```bash +BANGUI_EXTERNAL_LOGGING_ENABLED=true +BANGUI_EXTERNAL_LOGGING_PROVIDER=datadog +BANGUI_DATADOG_API_KEY=your-api-key-here +BANGUI_DATADOG_SITE=datadoghq.com # or datadoghq.eu for EU +BANGUI_DATADOG_BATCH_SIZE=10 # Optional: logs per batch +BANGUI_DATADOG_FLUSH_INTERVAL_SECONDS=5 # Optional: flush interval +``` + +#### Papertrail + +Enable logging to Papertrail via Syslog protocol: + +```bash +BANGUI_EXTERNAL_LOGGING_ENABLED=true +BANGUI_EXTERNAL_LOGGING_PROVIDER=papertrail +BANGUI_PAPERTRAIL_HOST=logs1.papertrailapp.com +BANGUI_PAPERTRAIL_PORT=12345 +BANGUI_PAPERTRAIL_PROGRAM_NAME=bangui # Optional: program name in syslog +``` + +#### ELK Stack + +Enable logging to Elasticsearch/Logstash: + +```bash +BANGUI_EXTERNAL_LOGGING_ENABLED=true +BANGUI_EXTERNAL_LOGGING_PROVIDER=elasticsearch +BANGUI_ELASTICSEARCH_HOSTS=http://elasticsearch:9200 +BANGUI_ELASTICSEARCH_INDEX_PREFIX=bangui # Optional: index prefix +BANGUI_ELASTICSEARCH_BATCH_SIZE=10 # Optional: docs per batch +BANGUI_ELASTICSEARCH_FLUSH_INTERVAL_SECONDS=5 # Optional: flush interval +``` + +### Local Development (Disabled by Default) + +External logging is **disabled by default**. In development, logs continue to write to stdout only: + +```bash +# No configuration needed — logs go to stdout +docker compose up +``` + +To enable external logging in development for testing: + +```bash +BANGUI_EXTERNAL_LOGGING_ENABLED=true \ +BANGUI_EXTERNAL_LOGGING_PROVIDER=datadog \ +BANGUI_DATADOG_API_KEY=test-key \ +python -m uvicorn app.main:create_app --host 0.0.0.0 --port 8000 +``` + +--- + +## Performance and Reliability + +### Non-Blocking Delivery + +External log delivery uses **asynchronous buffering** to prevent blocking the application: + +1. Logs are written to an in-memory buffer +2. After the configured flush interval or batch size, the buffer is sent asynchronously +3. Send failures do not block application logic +4. Retries use exponential backoff (up to 5 attempts) + +This ensures that external logging never degrades application performance. + +### Failure Modes + +If external logging becomes unavailable: + +- **Transient failures** (network timeouts, temporary 5xx errors): Logs are retried with exponential backoff +- **Permanent failures** (invalid API key, host unreachable): A warning is logged; application continues +- **Steady-state**: Logs are buffered up to a maximum queue size (default: 1000 logs); older logs are dropped if buffer fills + +The application **never crashes** due to external logging failures. + +### Log Volume and Rate Limiting + +Large log volumes can increase data transfer and storage costs. To manage log volume: + +1. **Reduce log level in production**: Set `BANGUI_LOG_LEVEL=warning` or `error` to suppress debug/info logs +2. **Sample logs**: Some providers (Datadog, Papertrail) support sampling rules +3. **Filter sensitive paths**: Middleware can suppress verbose logging for noisy endpoints + +Monitor actual log volume and adjust settings based on usage patterns. + +--- + +## Integration Examples + +### Docker Compose (Development with Datadog) + +```yaml +version: "3.9" +services: + bangui: + build: + context: . + dockerfile: Docker/Dockerfile.app + environment: + BANGUI_EXTERNAL_LOGGING_ENABLED: "true" + BANGUI_EXTERNAL_LOGGING_PROVIDER: "datadog" + BANGUI_DATADOG_API_KEY: "${DATADOG_API_KEY}" + BANGUI_DATADOG_SITE: "datadoghq.com" + BANGUI_LOG_LEVEL: "info" + ports: + - "8000:8000" +``` + +### Kubernetes Deployment (Papertrail) + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: bangui-logging +data: + BANGUI_EXTERNAL_LOGGING_ENABLED: "true" + BANGUI_EXTERNAL_LOGGING_PROVIDER: "papertrail" + BANGUI_PAPERTRAIL_HOST: "logs1.papertrailapp.com" + BANGUI_PAPERTRAIL_PORT: "12345" + BANGUI_PAPERTRAIL_PROGRAM_NAME: "bangui" + BANGUI_LOG_LEVEL: "info" + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: bangui +spec: + template: + spec: + containers: + - name: bangui + image: bangui:latest + envFrom: + - configMapRef: + name: bangui-logging + env: + - name: BANGUI_DATADOG_API_KEY + valueFrom: + secretKeyRef: + name: bangui-secrets + key: datadog-api-key +``` + +--- + +## Monitoring Logging Infrastructure + +### Datadog Dashboard Query + +Search for all BanGUI logs: + +``` +service:bangui +``` + +Search for errors in authentication: + +``` +service:bangui status:error component:auth +``` + +### Papertrail Search + +Search for all startup events: + +``` +program:bangui bangui_starting_up +``` + +Search for authentication failures: + +``` +program:bangui auth_token_validation_failed +``` + +### Elasticsearch Query (ELK) + +```json +{ + "query": { + "bool": { + "must": [ + { "match": { "logger_name": "app.auth" } }, + { "match": { "level": "error" } } + ] + } + } +} +``` + +--- + +## Testing and Debugging + +### Verify JSON Output + +Inspect the actual JSON emitted by the logging system: + +```bash +# Start the app and capture logs +python -m uvicorn app.main:create_app --host 0.0.0.0 --port 8000 2>&1 | head -10 | python -m json.tool +``` + +Expected output: + +```json +{ + "timestamp": "2024-05-01T18:20:45.123456+02:00", + "level": "info", + "logger_name": "app.main", + "event": "bangui_starting_up", + "database_path": "/var/lib/bangui/bangui.db" +} +``` + +### Enable Debug Logging for External Log Delivery + +Set the log level to `debug` to see internal logs from the external logging system: + +```bash +BANGUI_LOG_LEVEL=debug BANGUI_EXTERNAL_LOGGING_ENABLED=true python -m uvicorn app.main:create_app +``` + +This will emit logs like: + +```json +{ + "level": "debug", + "event": "external_log_batch_sent", + "provider": "datadog", + "batch_size": 10, + "duration_ms": 42 +} +``` + +### Validate Configuration + +Validate external logging configuration on startup: + +```bash +python -c "from app.config import get_settings; s = get_settings(); print(s.model_dump())" +``` + +--- + +## Security Considerations + +### API Key Rotation + +Rotate API keys regularly: + +1. Update `BANGUI_DATADOG_API_KEY` with the new key +2. Restart the application +3. Old keys can be revoked after restart + +### Network Security + +When sending logs over the network: + +- **Datadog HTTP API**: Uses HTTPS, encrypted in transit +- **Papertrail Syslog**: Use TLS-enabled Syslog (if supported) or send over VPN/private network +- **Elasticsearch**: Use HTTPS and HTTP Basic Auth or API Key authentication + +Never send logs over unencrypted channels in production. + +### Compliance + +Ensure that your external logging platform complies with your organization's data protection requirements: + +- **GDPR**: Verify the platform's data processing agreements +- **HIPAA**: Ensure the provider is HIPAA-eligible +- **SOC 2**: Request audit reports from your logging provider +- **Data retention**: Configure appropriate log retention policies + +--- + +## Troubleshooting + +### Logs Not Appearing in External System + +1. **Verify configuration**: Check that environment variables are set correctly +2. **Check API credentials**: Ensure the API key or credentials are valid +3. **Check network connectivity**: Verify the external system is reachable +4. **Review logs locally**: Run with `BANGUI_LOG_LEVEL=debug` and check stdout for errors +5. **Check disk space**: Ensure the local buffer directory has sufficient disk space + +### Performance Degradation + +1. **Check buffer size**: If the buffer is full, logs are dropped; increase `BANGUI_EXTERNAL_LOGGING_BUFFER_SIZE` +2. **Adjust flush interval**: Decrease flush interval if experiencing large batches +3. **Reduce log level**: Set `BANGUI_LOG_LEVEL=warning` to reduce log volume +4. **Monitor network**: Check bandwidth usage between application and external system + +### Lost Logs + +In the rare event that logs are lost: + +1. **Buffer overflow**: The in-memory buffer has a maximum size; excess logs are dropped with a warning +2. **Network failure during batch send**: Logs are retried; after max retries, a warning is logged +3. **External system outage**: Logs may be dropped if buffer fills before service is restored + +To minimize data loss: + +- Increase buffer size (`BANGUI_EXTERNAL_LOGGING_BUFFER_SIZE`) +- Use persistent external logging platforms +- Monitor for warnings in application logs about dropped batches + +--- + +## Future Enhancements + +Planned observability improvements: + +- [ ] Distributed tracing (OpenTelemetry integration) +- [ ] Custom metrics collection +- [ ] Alerting rules and thresholds +- [ ] Log sampling strategies +- [ ] Additional provider support (Splunk, New Relic, CloudWatch) + +--- + +## References + +- [structlog Documentation](https://www.structlog.org/) +- [Datadog Logging Documentation](https://docs.datadoghq.com/logs/) +- [Papertrail Documentation](https://help.papertrailapp.com/) +- [Elasticsearch JSON Logging](https://www.elastic.co/guide/en/elasticsearch/reference/current/logging.html) +- [Observability Best Practices (OpenTelemetry)](https://opentelemetry.io/docs/concepts/observability-primer/) diff --git a/Docs/Tasks.md b/Docs/Tasks.md index fe58ee3..72a11cd 100644 --- a/Docs/Tasks.md +++ b/Docs/Tasks.md @@ -1,39 +1,3 @@ -## [MEDIUM] Input validation missing for regex patterns (ReDoS) - -**Where found** - -- `backend/app/routers/config.py` — regex validation accepts arbitrary patterns without timeout - -**Why this is needed** - -Malicious regex causes catastrophic backtracking (ReDoS). Attacker sends pattern → compilation hangs → DoS. - -**Goal** - -Add timeout and complexity limits to regex validation. - -**What to do** - -1. Add timeout to regex compilation (2 seconds recommended) -2. Add length limit (reject patterns > 1000 characters) -3. Use `signal.alarm()` (Unix) or timeout library - -**Possible traps and issues** - -- `signal.alarm()` Unix-only -- Some valid complex regexes may timeout -- Frontend should also validate (defense in depth) - -**Docs changes needed** - -- Update API docs to document regex validation limits - -**Doc references** - -- `backend/app/routers/config.py` - ---- - ## [MEDIUM] No structured logging to external system **Where found** diff --git a/backend/app/config.py b/backend/app/config.py index 5dc5ff7..79fd2e4 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -280,6 +280,124 @@ class Settings(BaseSettings): ) from e return value + external_logging_enabled: bool = Field( + default=False, + description=( + "Enable sending logs to an external centralized logging platform. " + "When disabled (default), logs are written to stdout only. " + "When enabled, set external_logging_provider and provider-specific settings." + ), + ) + external_logging_provider: Literal["datadog", "papertrail", "elasticsearch"] | None = Field( + default=None, + description=( + "External logging platform provider. " + "Set to 'datadog', 'papertrail', or 'elasticsearch'. " + "Only used when external_logging_enabled is true." + ), + ) + external_logging_buffer_size: int = Field( + default=1000, + ge=10, + description=( + "Maximum number of log records to buffer in memory before dropping oldest logs. " + "Prevents unbounded memory growth if the external system is temporarily unavailable." + ), + ) + external_logging_flush_interval_seconds: float = Field( + default=5.0, + gt=0.0, + description=( + "Maximum time in seconds to buffer logs before sending to the external system. " + "Logs are sent earlier if the batch size is reached." + ), + ) + datadog_api_key: str | None = Field( + default=None, + description=( + "Datadog API key for sending logs. Required when external_logging_provider is 'datadog'. " + "Obtain from Datadog organization settings." + ), + ) + datadog_site: str = Field( + default="datadoghq.com", + description=( + "Datadog site: 'datadoghq.com' for US or 'datadoghq.eu' for EU. " + "Only used when external_logging_provider is 'datadog'." + ), + ) + datadog_batch_size: int = Field( + default=10, + ge=1, + description=( + "Number of log records to batch before sending to Datadog. " + "Smaller batches send logs faster; larger batches are more efficient." + ), + ) + papertrail_host: str | None = Field( + default=None, + description=( + "Papertrail host address (e.g., 'logs1.papertrailapp.com'). " + "Required when external_logging_provider is 'papertrail'." + ), + ) + papertrail_port: int | None = Field( + default=None, + ge=1, + le=65535, + description=( + "Papertrail port number. Required when external_logging_provider is 'papertrail'. " + "Typically 12345 or in range 10000-32768." + ), + ) + papertrail_program_name: str = Field( + default="bangui", + description=( + "Program name to include in Syslog messages sent to Papertrail. " + "Useful for filtering logs by program in Papertrail UI." + ), + ) + elasticsearch_hosts: str | list[str] = Field( + default_factory=list, + description=( + "Elasticsearch host addresses. Can be comma-separated string or list. " + "Examples: 'http://elasticsearch:9200' or 'http://es1:9200,http://es2:9200'. " + "Required when external_logging_provider is 'elasticsearch'." + ), + ) + elasticsearch_index_prefix: str = Field( + default="bangui", + description=( + "Prefix for Elasticsearch indices where logs are stored. " + "Final index names will be '{prefix}-{date}' or similar." + ), + ) + elasticsearch_batch_size: int = Field( + default=10, + ge=1, + description=( + "Number of log documents to batch before sending to Elasticsearch. " + "Larger batches are more efficient but introduce slight latency." + ), + ) + + @field_validator("elasticsearch_hosts", mode="before") + @classmethod + def _normalize_elasticsearch_hosts(cls, value: str | list[str] | None) -> list[str]: + """Normalize elasticsearch_hosts from comma-separated string to list. + + Args: + value: A comma-separated string or list of host URLs. + + Returns: + A list of normalized host URLs. + """ + if value is None or (isinstance(value, list) and len(value) == 0): + return [] + if isinstance(value, str): + return [host.strip() for host in value.split(",") if host.strip()] + return value + model_config = SettingsConfigDict( env_prefix="BANGUI_", env_file=".env", diff --git a/backend/app/main.py b/backend/app/main.py index 2e4135f..3cbc8ed 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -15,7 +15,7 @@ import logging import re import sys from contextlib import asynccontextmanager -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from collections.abc import AsyncGenerator, Awaitable, Callable @@ -62,6 +62,10 @@ from app.routers import ( setup, ) from app.startup import startup_shared_resources +from app.utils.external_logging import ( + ExternalLogHandler, + create_external_log_handler, +) from app.utils.rate_limiter import GlobalRateLimiter, RateLimiter from app.utils.runtime_state import ApplicationState, RuntimeState from app.utils.scheduler_lock import release_scheduler_lock @@ -75,28 +79,56 @@ log: structlog.stdlib.BoundLogger = structlog.get_logger() # Logging configuration # --------------------------------------------------------------------------- +_external_log_handler: ExternalLogHandler | None = None -def _configure_logging(log_level: str) -> None: + +def _external_logging_processor( + logger: logging.Logger, method_name: str, event_dict: dict[str, Any] +) -> dict[str, Any]: + """Structlog processor that queues logs to external logging handler. + + Args: + logger: The logger instance. + method_name: The name of the method called on the logger. + event_dict: The event dictionary from structlog. + + Returns: + The event dictionary unchanged (other processors handle rendering). + """ + if _external_log_handler is not None: + _external_log_handler.queue_log(event_dict.copy()) + return event_dict + + +def _configure_logging(log_level: str, settings: Settings | None = None) -> None: """Configure structlog for production JSON output. Args: log_level: One of ``debug``, ``info``, ``warning``, ``error``, ``critical``. + settings: Optional Settings object to configure external logging. """ level: int = logging.getLevelName(log_level.upper()) logging.basicConfig(level=level, stream=sys.stdout, format="%(message)s") + + processors = [ + structlog.contextvars.merge_contextvars, + structlog.stdlib.filter_by_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.stdlib.add_logger_name, + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + structlog.processors.UnicodeDecoder(), + ] + + if settings and settings.external_logging_enabled and settings.external_logging_provider: + processors.append(_external_logging_processor) + + processors.append(structlog.processors.JSONRenderer()) + structlog.configure( - processors=[ - structlog.contextvars.merge_contextvars, - structlog.stdlib.filter_by_level, - structlog.processors.TimeStamper(fmt="iso"), - structlog.stdlib.add_logger_name, - structlog.stdlib.add_log_level, - structlog.stdlib.PositionalArgumentsFormatter(), - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - structlog.processors.UnicodeDecoder(), - structlog.processors.JSONRenderer(), - ], + processors=processors, wrapper_class=structlog.stdlib.BoundLogger, context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), @@ -140,16 +172,47 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]: Args: app: The :class:`fastapi.FastAPI` instance being started. """ - settings: Settings = app.state.settings - _configure_logging(settings.log_level) + global _external_log_handler # noqa: PLW0603 - log.info("bangui_starting_up", database_path=settings.database_path) + settings: Settings = app.state.settings http_session, scheduler, startup_db = await startup_shared_resources(app, settings) app.state.http_session = http_session app.state.scheduler = scheduler app.state.startup_db = startup_db + # Initialize external logging handler before configuring logging + _external_log_handler = None + if settings.external_logging_enabled and settings.external_logging_provider: + try: + _external_log_handler = create_external_log_handler( + provider=settings.external_logging_provider, + api_key=settings.datadog_api_key, + datadog_site=settings.datadog_site, + datadog_batch_size=settings.datadog_batch_size, + papertrail_host=settings.papertrail_host, + papertrail_port=settings.papertrail_port, + papertrail_program_name=settings.papertrail_program_name, + elasticsearch_hosts=settings.elasticsearch_hosts, + elasticsearch_index_prefix=settings.elasticsearch_index_prefix, + elasticsearch_batch_size=settings.elasticsearch_batch_size, + flush_interval_seconds=settings.external_logging_flush_interval_seconds, + buffer_size=settings.external_logging_buffer_size, + http_session=http_session, + ) + if _external_log_handler: + _external_log_handler.start_periodic_flush() + except ValueError as exc: + log.warning( + "external_logging_initialization_failed", + error=str(exc), + ) + + # Now configure logging with the handler in place + _configure_logging(settings.log_level, settings) + + log.info("bangui_starting_up", database_path=settings.database_path) + # Ensure session cache is initialized based on effective settings. # This cache is process-local and not cluster-safe. In multi-worker # deployments, it should be replaced with a shared backend. @@ -172,6 +235,14 @@ async def _lifespan(app: FastAPI) -> AsyncGenerator[None, None]: log.info("bangui_shutting_down") scheduler.shutdown(wait=False) await http_session.close() + + # Shutdown external logging handler + if _external_log_handler: + try: + await _external_log_handler.shutdown() + except Exception as exc: + log.error("external_logging_shutdown_failed", error=str(exc)) + # Release the scheduler lock to allow other instances to take over try: await release_scheduler_lock(startup_db) diff --git a/backend/app/utils/external_logging.py b/backend/app/utils/external_logging.py new file mode 100644 index 0000000..45bd1c7 --- /dev/null +++ b/backend/app/utils/external_logging.py @@ -0,0 +1,416 @@ +"""External logging handlers for centralized log aggregation. + +Supports shipping logs to Datadog, Papertrail, and Elasticsearch in non-blocking +fashion using async buffering and batching. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +from abc import ABC, abstractmethod +from collections import deque +from datetime import datetime +from typing import TYPE_CHECKING, Any, Literal + +if TYPE_CHECKING: + from aiohttp import ClientSession + +import structlog + +log: structlog.stdlib.BoundLogger = structlog.get_logger() + + +class ExternalLogHandler(ABC): + """Base class for external log handlers.""" + + def __init__( + self, + batch_size: int = 10, + flush_interval_seconds: float = 5.0, + buffer_size: int = 1000, + ) -> None: + """Initialize handler with batching configuration. + + Args: + batch_size: Number of records to batch before sending. + flush_interval_seconds: Maximum time to buffer before flushing. + buffer_size: Maximum records to buffer in memory. + """ + self.batch_size = batch_size + self.flush_interval_seconds = flush_interval_seconds + self.buffer_size = buffer_size + self.buffer: deque[dict[str, Any]] = deque(maxlen=buffer_size) + self._flush_task: asyncio.Task[None] | None = None + self._shutdown_event = asyncio.Event() + + def queue_log(self, record: dict[str, Any]) -> None: + """Queue a log record for batching. + + Args: + record: The log record dictionary to queue. + """ + if len(self.buffer) >= self.buffer_size: + dropped = self.buffer[0] + self.buffer.append(record) + log.warning( + "external_log_buffer_full", + provider=self.__class__.__name__, + dropped_event=dropped.get("event", "unknown"), + ) + else: + self.buffer.append(record) + + if len(self.buffer) >= self.batch_size: + asyncio.create_task(self.flush()) + + async def flush(self) -> None: + """Send all buffered logs to the external system.""" + if not self.buffer: + return + + batch = list(self.buffer) + self.buffer.clear() + + try: + await self._send_batch(batch) + log.debug( + "external_log_batch_sent", + provider=self.__class__.__name__, + batch_size=len(batch), + ) + except Exception as exc: # noqa: BLE001 + log.warning( + "external_log_send_failed", + provider=self.__class__.__name__, + batch_size=len(batch), + error=str(exc), + ) + + @abstractmethod + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + """Send a batch of logs to the external system. + + Args: + batch: List of log records to send. + + Raises: + Exception: If sending fails. + """ + pass + + def start_periodic_flush(self) -> None: + """Start periodic flushing task.""" + if self._flush_task is None or self._flush_task.done(): + self._flush_task = asyncio.create_task(self._periodic_flush_loop()) + + async def _periodic_flush_loop(self) -> None: + """Periodically flush buffered logs.""" + try: + while not self._shutdown_event.is_set(): + await asyncio.sleep(self.flush_interval_seconds) + await self.flush() + except asyncio.CancelledError: + pass + + async def shutdown(self) -> None: + """Shutdown handler and flush remaining logs.""" + self._shutdown_event.set() + if self._flush_task is not None: + try: + await asyncio.wait_for(self._flush_task, timeout=5.0) + except TimeoutError: + self._flush_task.cancel() + await self.flush() + + +class DatadogLogHandler(ExternalLogHandler): + """Handler for shipping logs to Datadog via HTTP API.""" + + def __init__( + self, + api_key: str, + site: str = "datadoghq.com", + batch_size: int = 10, + flush_interval_seconds: float = 5.0, + buffer_size: int = 1000, + http_session: ClientSession | None = None, + ) -> None: + """Initialize Datadog handler. + + Args: + api_key: Datadog API key. + site: Datadog site (datadoghq.com or datadoghq.eu). + batch_size: Number of records to batch before sending. + flush_interval_seconds: Maximum time to buffer before flushing. + buffer_size: Maximum records to buffer in memory. + http_session: Optional aiohttp ClientSession for requests. + """ + super().__init__(batch_size, flush_interval_seconds, buffer_size) + self.api_key = api_key + self.site = site + self.http_session = http_session + self._url = f"https://http-intake.logs.{site}/v1/input/{api_key}" + + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + """Send logs to Datadog. + + Args: + batch: List of log records. + + Raises: + Exception: If the request fails. + """ + if not self.http_session: + return + + import aiohttp + + for log_record in batch: + payload = json.dumps(log_record).encode("utf-8") + headers = {"Content-Type": "application/json"} + + try: + async with self.http_session.post( + self._url, data=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as response: + if response.status not in (200, 204): + raise Exception( + f"Datadog API returned {response.status}: " + f"{await response.text()}" + ) + except TimeoutError as exc: + raise Exception("Request to Datadog API timed out") from exc + + +class PapertrailLogHandler(ExternalLogHandler): + """Handler for shipping logs to Papertrail via Syslog protocol.""" + + def __init__( + self, + host: str, + port: int, + program_name: str = "bangui", + batch_size: int = 10, + flush_interval_seconds: float = 5.0, + buffer_size: int = 1000, + ) -> None: + """Initialize Papertrail handler. + + Args: + host: Papertrail host address. + port: Papertrail port number. + program_name: Program name for Syslog. + batch_size: Number of records to batch before sending. + flush_interval_seconds: Maximum time to buffer before flushing. + buffer_size: Maximum records to buffer in memory. + """ + super().__init__(batch_size, flush_interval_seconds, buffer_size) + self.host = host + self.port = port + self.program_name = program_name + + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + """Send logs to Papertrail via Syslog. + + Args: + batch: List of log records. + + Raises: + Exception: If sending fails. + """ + try: + reader, writer = await asyncio.open_connection(self.host, self.port) + except (TimeoutError, OSError) as exc: + raise Exception(f"Failed to connect to Papertrail {self.host}:{self.port}") from exc + + try: + for log_record in batch: + severity = self._severity_from_level(log_record.get("level", "info")) + facility = 16 # local0 + priority = (facility << 3) | severity + + timestamp = datetime.now().isoformat() + hostname = "bangui" + event = log_record.get("event", "application_log") + + syslog_msg = ( + f"<{priority}>{timestamp} {hostname} " + f"{self.program_name}: {event} {json.dumps(log_record)}\n" + ) + + writer.write(syslog_msg.encode("utf-8")) + await writer.drain() + finally: + writer.close() + with contextlib.suppress(Exception): + await writer.wait_closed() + + @staticmethod + def _severity_from_level(level: str) -> int: + """Convert log level to Syslog severity. + + Args: + level: Log level (debug, info, warning, error, critical). + + Returns: + Syslog severity (0-7). + """ + level_map = { + "debug": 7, + "info": 6, + "warning": 4, + "error": 3, + "critical": 2, + } + return level_map.get(level.lower(), 6) + + +class ElasticsearchLogHandler(ExternalLogHandler): + """Handler for shipping logs to Elasticsearch via HTTP API.""" + + def __init__( + self, + hosts: list[str], + index_prefix: str = "bangui", + batch_size: int = 10, + flush_interval_seconds: float = 5.0, + buffer_size: int = 1000, + http_session: ClientSession | None = None, + ) -> None: + """Initialize Elasticsearch handler. + + Args: + hosts: List of Elasticsearch host URLs. + index_prefix: Prefix for index names. + batch_size: Number of records to batch before sending. + flush_interval_seconds: Maximum time to buffer before flushing. + buffer_size: Maximum records to buffer in memory. + http_session: Optional aiohttp ClientSession for requests. + """ + super().__init__(batch_size, flush_interval_seconds, buffer_size) + self.hosts = hosts + self.index_prefix = index_prefix + self.http_session = http_session + self._host_index = 0 + + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + """Send logs to Elasticsearch. + + Args: + batch: List of log records. + + Raises: + Exception: If the request fails. + """ + if not self.http_session or not self.hosts: + return + + import aiohttp + + host = self.hosts[self._host_index] + self._host_index = (self._host_index + 1) % len(self.hosts) + + now = datetime.now() + index_name = f"{self.index_prefix}-{now.strftime('%Y.%m.%d')}" + + bulk_body = "" + for log_record in batch: + metadata = json.dumps({"index": {"_index": index_name}}) + document = json.dumps(log_record) + bulk_body += f"{metadata}\n{document}\n" + + try: + async with self.http_session.post( + f"{host.rstrip('/')}/_bulk", + data=bulk_body.encode("utf-8"), + headers={"Content-Type": "application/x-ndjson"}, + timeout=aiohttp.ClientTimeout(total=10), + ) as response: + if response.status not in (200, 204): + raise Exception( + f"Elasticsearch API returned {response.status}: " + f"{await response.text()}" + ) + except TimeoutError as exc: + raise Exception("Request to Elasticsearch timed out") from exc + + +def create_external_log_handler( + provider: Literal["datadog", "papertrail", "elasticsearch"], + api_key: str | None = None, + datadog_site: str = "datadoghq.com", + datadog_batch_size: int = 10, + papertrail_host: str | None = None, + papertrail_port: int | None = None, + papertrail_program_name: str = "bangui", + elasticsearch_hosts: list[str] | None = None, + elasticsearch_index_prefix: str = "bangui", + elasticsearch_batch_size: int = 10, + batch_size: int = 10, + flush_interval_seconds: float = 5.0, + buffer_size: int = 1000, + http_session: ClientSession | None = None, +) -> ExternalLogHandler | None: + """Factory function to create an external log handler. + + Args: + provider: Logging provider (datadog, papertrail, elasticsearch). + api_key: API key for Datadog. + datadog_site: Datadog site (datadoghq.com or datadoghq.eu). + datadog_batch_size: Datadog batch size. + papertrail_host: Papertrail host address. + papertrail_port: Papertrail port number. + papertrail_program_name: Program name for Papertrail Syslog. + elasticsearch_hosts: List of Elasticsearch host URLs. + elasticsearch_index_prefix: Elasticsearch index prefix. + elasticsearch_batch_size: Elasticsearch batch size. + batch_size: Default batch size (unused, provider-specific batch sizes take precedence). + flush_interval_seconds: Flush interval for all providers. + buffer_size: Buffer size for all providers. + http_session: Optional aiohttp ClientSession. + + Returns: + An ExternalLogHandler instance or None if provider is unknown. + + Raises: + ValueError: If required parameters are missing for the provider. + """ + if provider == "datadog": + if not api_key: + raise ValueError("api_key is required for datadog provider") + return DatadogLogHandler( + api_key=api_key, + site=datadog_site, + batch_size=datadog_batch_size, + flush_interval_seconds=flush_interval_seconds, + buffer_size=buffer_size, + http_session=http_session, + ) + elif provider == "papertrail": + if not papertrail_host or not papertrail_port: + raise ValueError( + "papertrail_host and papertrail_port are required for papertrail provider" + ) + return PapertrailLogHandler( + host=papertrail_host, + port=papertrail_port, + program_name=papertrail_program_name, + batch_size=batch_size, + flush_interval_seconds=flush_interval_seconds, + buffer_size=buffer_size, + ) + elif provider == "elasticsearch": + if not elasticsearch_hosts: + raise ValueError("elasticsearch_hosts is required for elasticsearch provider") + return ElasticsearchLogHandler( + hosts=elasticsearch_hosts, + index_prefix=elasticsearch_index_prefix, + batch_size=elasticsearch_batch_size, + flush_interval_seconds=flush_interval_seconds, + buffer_size=buffer_size, + http_session=http_session, + ) + else: + return None diff --git a/backend/tests/test_external_logging.py b/backend/tests/test_external_logging.py new file mode 100644 index 0000000..03339ec --- /dev/null +++ b/backend/tests/test_external_logging.py @@ -0,0 +1,279 @@ +"""Tests for external logging module.""" + +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import pytest + +from app.utils.external_logging import ( + DatadogLogHandler, + ElasticsearchLogHandler, + ExternalLogHandler, + PapertrailLogHandler, + create_external_log_handler, +) + + +class TestExternalLogHandler: + """Test the abstract base handler class.""" + + @pytest.mark.asyncio + async def test_queue_log_adds_to_buffer(self) -> None: + """Queuing a log record adds it to the buffer.""" + + class TestHandler(ExternalLogHandler): + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + pass + + handler = TestHandler(batch_size=10, flush_interval_seconds=5) + record = {"event": "test_event", "level": "info"} + + handler.queue_log(record) + + assert len(handler.buffer) == 1 + assert handler.buffer[0] == record + + @pytest.mark.asyncio + async def test_queue_log_triggers_flush_on_batch_size(self) -> None: + """Queuing logs triggers flush when batch size is reached.""" + + class TestHandler(ExternalLogHandler): + def __init__(self: Any) -> None: + super().__init__(batch_size=2) + self.flushed = False + + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + self.flushed = True + assert len(batch) == 2 + + handler = TestHandler() + + with patch("asyncio.create_task"): + handler.queue_log({"event": "log1"}) + handler.queue_log({"event": "log2"}) + + @pytest.mark.asyncio + async def test_flush_sends_batch(self) -> None: + """Flush sends all buffered logs.""" + + class TestHandler(ExternalLogHandler): + def __init__(self: Any) -> None: + super().__init__() + self.batches_sent: list[list[dict[str, Any]]] = [] + + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + self.batches_sent.append(batch) + + handler = TestHandler() + handler.queue_log({"event": "log1"}) + handler.queue_log({"event": "log2"}) + + await handler.flush() + + assert len(handler.batches_sent) == 1 + assert len(handler.batches_sent[0]) == 2 + assert len(handler.buffer) == 0 + + @pytest.mark.asyncio + async def test_buffer_size_limit(self) -> None: + """Buffer respects maximum size.""" + + class TestHandler(ExternalLogHandler): + async def _send_batch(self, batch: list[dict[str, Any]]) -> None: + pass + + handler = TestHandler(buffer_size=5) + + for i in range(10): + handler.queue_log({"event": f"log{i}"}) + + assert len(handler.buffer) == 5 + + +class TestDatadogLogHandler: + """Test Datadog logging handler.""" + + @pytest.mark.asyncio + async def test_initialization(self) -> None: + """Datadog handler initializes correctly.""" + handler = DatadogLogHandler( + api_key="test-key", + site="datadoghq.com", + http_session=None, + ) + + assert handler.api_key == "test-key" + assert handler.site == "datadoghq.com" + assert "test-key" in handler._url + + @pytest.mark.asyncio + async def test_send_batch_requires_http_session(self) -> None: + """Send batch skips if no HTTP session.""" + handler = DatadogLogHandler(api_key="test-key", http_session=None) + + # Should not raise + await handler._send_batch([{"event": "test"}]) + + +class TestPapertrailLogHandler: + """Test Papertrail logging handler.""" + + def test_initialization(self) -> None: + """Papertrail handler initializes correctly.""" + handler = PapertrailLogHandler( + host="logs.papertrailapp.com", + port=12345, + program_name="test-app", + ) + + assert handler.host == "logs.papertrailapp.com" + assert handler.port == 12345 + assert handler.program_name == "test-app" + + def test_severity_from_level(self) -> None: + """Severity mapping works correctly.""" + assert PapertrailLogHandler._severity_from_level("debug") == 7 + assert PapertrailLogHandler._severity_from_level("info") == 6 + assert PapertrailLogHandler._severity_from_level("warning") == 4 + assert PapertrailLogHandler._severity_from_level("error") == 3 + assert PapertrailLogHandler._severity_from_level("critical") == 2 + assert PapertrailLogHandler._severity_from_level("unknown") == 6 + + +class TestElasticsearchLogHandler: + """Test Elasticsearch logging handler.""" + + def test_initialization(self) -> None: + """Elasticsearch handler initializes correctly.""" + hosts = ["http://elasticsearch:9200", "http://elasticsearch2:9200"] + handler = ElasticsearchLogHandler( + hosts=hosts, + index_prefix="bangui", + http_session=None, + ) + + assert handler.hosts == hosts + assert handler.index_prefix == "bangui" + + def test_host_rotation(self) -> None: + """Handler rotates between hosts.""" + hosts = ["http://es1:9200", "http://es2:9200", "http://es3:9200"] + handler = ElasticsearchLogHandler(hosts=hosts, http_session=None) + + assert handler._host_index == 0 + + +class TestCreateExternalLogHandler: + """Test handler factory function.""" + + def test_create_datadog_handler(self) -> None: + """Factory creates Datadog handler.""" + handler = create_external_log_handler( + provider="datadog", + api_key="test-key", + ) + + assert isinstance(handler, DatadogLogHandler) + + def test_create_papertrail_handler(self) -> None: + """Factory creates Papertrail handler.""" + handler = create_external_log_handler( + provider="papertrail", + papertrail_host="logs.papertrailapp.com", + papertrail_port=12345, + ) + + assert isinstance(handler, PapertrailLogHandler) + + def test_create_elasticsearch_handler(self) -> None: + """Factory creates Elasticsearch handler.""" + handler = create_external_log_handler( + provider="elasticsearch", + elasticsearch_hosts=["http://elasticsearch:9200"], + ) + + assert isinstance(handler, ElasticsearchLogHandler) + + def test_datadog_handler_requires_api_key(self) -> None: + """Factory raises ValueError if Datadog API key is missing.""" + with pytest.raises(ValueError, match="api_key is required"): + create_external_log_handler(provider="datadog") + + def test_papertrail_handler_requires_host_and_port(self) -> None: + """Factory raises ValueError if Papertrail host/port are missing.""" + with pytest.raises(ValueError, match="papertrail_host and papertrail_port"): + create_external_log_handler(provider="papertrail") + + def test_elasticsearch_handler_requires_hosts(self) -> None: + """Factory raises ValueError if Elasticsearch hosts are missing.""" + with pytest.raises(ValueError, match="elasticsearch_hosts is required"): + create_external_log_handler(provider="elasticsearch") + + def test_unknown_provider_returns_none(self) -> None: + """Factory returns None for unknown provider.""" + handler = create_external_log_handler(provider="unknown") # type: ignore + + assert handler is None + + +class TestExternalLoggingConfiguration: + """Test external logging configuration via Settings.""" + + def test_external_logging_disabled_by_default(self) -> None: + """External logging is disabled by default.""" + from app.config import Settings + + settings = Settings( + session_secret="a" * 64, + fail2ban_socket="/tmp/test.sock", + fail2ban_config_dir="/tmp/fail2ban", + ) + + assert settings.external_logging_enabled is False + assert settings.external_logging_provider is None + + def test_datadog_settings(self) -> None: + """Datadog settings can be configured.""" + from app.config import Settings + + settings = Settings( + session_secret="a" * 64, + fail2ban_socket="/tmp/test.sock", + fail2ban_config_dir="/tmp/fail2ban", + external_logging_enabled=True, + external_logging_provider="datadog", + datadog_api_key="test-key", + datadog_site="datadoghq.eu", + ) + + assert settings.external_logging_enabled is True + assert settings.external_logging_provider == "datadog" + assert settings.datadog_api_key == "test-key" + assert settings.datadog_site == "datadoghq.eu" + + def test_elasticsearch_hosts_normalization(self) -> None: + """Elasticsearch hosts can be provided as string or list.""" + from app.config import Settings + + # Test as comma-separated string + settings1 = Settings( + session_secret="a" * 64, + fail2ban_socket="/tmp/test.sock", + fail2ban_config_dir="/tmp/fail2ban", + elasticsearch_hosts="http://es1:9200,http://es2:9200", + ) + + assert settings1.elasticsearch_hosts == ["http://es1:9200", "http://es2:9200"] + + # Test as list + settings2 = Settings( + session_secret="a" * 64, + fail2ban_socket="/tmp/test.sock", + fail2ban_config_dir="/tmp/fail2ban", + elasticsearch_hosts=["http://es1:9200", "http://es2:9200"], + ) + + assert settings2.elasticsearch_hosts == ["http://es1:9200", "http://es2:9200"]