Files
BanGUI/backend/tests/test_external_logging.py
2026-05-15 20:41:05 +02:00

286 lines
9.6 KiB
Python

"""Tests for external logging module."""
from __future__ import annotations
from typing import Any
from unittest.mock import patch
import pytest
from app.utils.external_logging import (
DatadogLogHandler,
ElasticsearchLogHandler,
ExternalLogHandler,
PapertrailLogHandler,
create_external_log_handler,
)
class TestExternalLogHandler:
"""Test the abstract base handler class."""
@pytest.mark.asyncio
async def test_queue_log_adds_to_buffer(self) -> None:
"""Queuing a log record adds it to the buffer."""
class TestHandler(ExternalLogHandler):
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
pass
handler = TestHandler(batch_size=10, flush_interval_seconds=5)
record = {"event": "test_event", "level": "info"}
handler.queue_log(record)
assert len(handler.buffer) == 1
assert handler.buffer[0] == record
@pytest.mark.asyncio
async def test_queue_log_triggers_flush_on_batch_size(self) -> None:
"""Queuing logs triggers flush when batch size is reached."""
class TestHandler(ExternalLogHandler):
def __init__(self: Any) -> None:
super().__init__(batch_size=2)
self.flushed = False
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
self.flushed = True
assert len(batch) == 2
handler = TestHandler()
with patch("asyncio.create_task"):
handler.queue_log({"event": "log1"})
handler.queue_log({"event": "log2"})
@pytest.mark.asyncio
async def test_flush_sends_batch(self) -> None:
"""Flush sends all buffered logs."""
class TestHandler(ExternalLogHandler):
def __init__(self: Any) -> None:
super().__init__()
self.batches_sent: list[list[dict[str, Any]]] = []
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
self.batches_sent.append(batch)
handler = TestHandler()
handler.queue_log({"event": "log1"})
handler.queue_log({"event": "log2"})
await handler.flush()
assert len(handler.batches_sent) == 1
assert len(handler.batches_sent[0]) == 2
assert len(handler.buffer) == 0
@pytest.mark.asyncio
async def test_buffer_size_limit(self) -> None:
"""Buffer respects maximum size."""
class TestHandler(ExternalLogHandler):
async def _send_batch(self, batch: list[dict[str, Any]]) -> None:
pass
handler = TestHandler(buffer_size=5)
for i in range(10):
handler.queue_log({"event": f"log{i}"})
assert len(handler.buffer) == 5
class TestDatadogLogHandler:
"""Test Datadog logging handler."""
@pytest.mark.asyncio
async def test_initialization(self) -> None:
"""Datadog handler initializes correctly."""
handler = DatadogLogHandler(
api_key="test-key",
site="datadoghq.com",
http_session=None,
)
assert handler.api_key == "test-key"
assert handler.site == "datadoghq.com"
assert "test-key" in handler._url
@pytest.mark.asyncio
async def test_send_batch_requires_http_session(self) -> None:
"""Send batch skips if no HTTP session."""
handler = DatadogLogHandler(api_key="test-key", http_session=None)
# Should not raise
await handler._send_batch([{"event": "test"}])
class TestPapertrailLogHandler:
"""Test Papertrail logging handler."""
def test_initialization(self) -> None:
"""Papertrail handler initializes correctly."""
handler = PapertrailLogHandler(
host="logs.papertrailapp.com",
port=12345,
program_name="test-app",
)
assert handler.host == "logs.papertrailapp.com"
assert handler.port == 12345
assert handler.program_name == "test-app"
def test_severity_from_level(self) -> None:
"""Severity mapping works correctly."""
assert PapertrailLogHandler._severity_from_level("debug") == 7
assert PapertrailLogHandler._severity_from_level("info") == 6
assert PapertrailLogHandler._severity_from_level("warning") == 4
assert PapertrailLogHandler._severity_from_level("error") == 3
assert PapertrailLogHandler._severity_from_level("critical") == 2
assert PapertrailLogHandler._severity_from_level("unknown") == 6
class TestElasticsearchLogHandler:
"""Test Elasticsearch logging handler."""
def test_initialization(self) -> None:
"""Elasticsearch handler initializes correctly."""
hosts = ["http://elasticsearch:9200", "http://elasticsearch2:9200"]
handler = ElasticsearchLogHandler(
hosts=hosts,
index_prefix="bangui",
http_session=None,
)
assert handler.hosts == hosts
assert handler.index_prefix == "bangui"
def test_host_rotation(self) -> None:
"""Handler rotates between hosts."""
hosts = ["http://es1:9200", "http://es2:9200", "http://es3:9200"]
handler = ElasticsearchLogHandler(hosts=hosts, http_session=None)
assert handler._host_index == 0
class TestCreateExternalLogHandler:
"""Test handler factory function."""
def test_create_datadog_handler(self) -> None:
"""Factory creates Datadog handler."""
handler = create_external_log_handler(
provider="datadog",
api_key="test-key",
)
assert isinstance(handler, DatadogLogHandler)
def test_create_papertrail_handler(self) -> None:
"""Factory creates Papertrail handler."""
handler = create_external_log_handler(
provider="papertrail",
papertrail_host="logs.papertrailapp.com",
papertrail_port=12345,
)
assert isinstance(handler, PapertrailLogHandler)
def test_create_elasticsearch_handler(self) -> None:
"""Factory creates Elasticsearch handler."""
handler = create_external_log_handler(
provider="elasticsearch",
elasticsearch_hosts=["http://elasticsearch:9200"],
)
assert isinstance(handler, ElasticsearchLogHandler)
def test_datadog_handler_requires_api_key(self) -> None:
"""Factory raises ValueError if Datadog API key is missing."""
with pytest.raises(ValueError, match="api_key is required"):
create_external_log_handler(provider="datadog")
def test_papertrail_handler_requires_host_and_port(self) -> None:
"""Factory raises ValueError if Papertrail host/port are missing."""
with pytest.raises(ValueError, match="papertrail_host and papertrail_port"):
create_external_log_handler(provider="papertrail")
def test_elasticsearch_handler_requires_hosts(self) -> None:
"""Factory raises ValueError if Elasticsearch hosts are missing."""
with pytest.raises(ValueError, match="elasticsearch_hosts is required"):
create_external_log_handler(provider="elasticsearch")
def test_unknown_provider_returns_none(self) -> None:
"""Factory returns None for unknown provider."""
handler = create_external_log_handler(provider="unknown") # type: ignore
assert handler is None
class TestExternalLoggingConfiguration:
"""Test external logging configuration via Settings."""
def test_external_logging_disabled_by_default(self, tmp_path: Path) -> None:
"""External logging is disabled by default."""
from app.config import Settings
config_dir = tmp_path / "fail2ban"
config_dir.mkdir()
settings = Settings(
session_secret="a" * 64,
fail2ban_socket="/tmp/test.sock",
fail2ban_config_dir=str(config_dir),
)
assert settings.external_logging_enabled is False
assert settings.external_logging_provider is None
def test_datadog_settings(self, tmp_path: Path) -> None:
"""Datadog settings can be configured."""
from app.config import Settings
config_dir = tmp_path / "fail2ban"
config_dir.mkdir()
settings = Settings(
session_secret="a" * 64,
fail2ban_socket="/tmp/test.sock",
fail2ban_config_dir=str(config_dir),
external_logging_enabled=True,
external_logging_provider="datadog",
datadog_api_key="test-key",
datadog_site="datadoghq.eu",
)
assert settings.external_logging_enabled is True
assert settings.external_logging_provider == "datadog"
assert settings.datadog_api_key == "test-key"
assert settings.datadog_site == "datadoghq.eu"
def test_elasticsearch_hosts_normalization(self, tmp_path: Path) -> None:
"""Elasticsearch hosts can be provided as string or list."""
from app.config import Settings
config_dir = tmp_path / "fail2ban"
config_dir.mkdir()
# Test as comma-separated string
settings1 = Settings(
session_secret="a" * 64,
fail2ban_socket="/tmp/test.sock",
fail2ban_config_dir=str(config_dir),
elasticsearch_hosts="http://es1:9200,http://es2:9200",
)
assert settings1.elasticsearch_hosts == ["http://es1:9200", "http://es2:9200"]
# Test as list
settings2 = Settings(
session_secret="a" * 64,
fail2ban_socket="/tmp/test.sock",
fail2ban_config_dir=str(config_dir),
elasticsearch_hosts=["http://es1:9200", "http://es2:9200"],
)
assert settings2.elasticsearch_hosts == ["http://es1:9200", "http://es2:9200"]