Move conffile_parser from services to utils
This commit is contained in:
@@ -22,7 +22,27 @@ import errno
|
||||
import socket
|
||||
import time
|
||||
from pickle import HIGHEST_PROTOCOL, dumps, loads
|
||||
from typing import TYPE_CHECKING, Any
|
||||
from typing import TYPE_CHECKING, TypeAlias
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Fail2BanToken: TypeAlias = str | int | float | bool | None | dict[str, object] | list[object]
|
||||
"""A single token in a fail2ban command.
|
||||
|
||||
Fail2ban accepts simple types (str/int/float/bool) plus compound types
|
||||
(list/dict). Complex objects are stringified before being sent.
|
||||
"""
|
||||
|
||||
Fail2BanCommand: TypeAlias = list[Fail2BanToken]
|
||||
"""A command sent to fail2ban over the socket.
|
||||
|
||||
Commands are pickle serialised lists of tokens.
|
||||
"""
|
||||
|
||||
Fail2BanResponse: TypeAlias = tuple[int, object]
|
||||
"""A typical fail2ban response containing a status code and payload."""
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from types import TracebackType
|
||||
@@ -81,9 +101,9 @@ class Fail2BanProtocolError(Exception):
|
||||
|
||||
def _send_command_sync(
|
||||
socket_path: str,
|
||||
command: list[Any],
|
||||
command: Fail2BanCommand,
|
||||
timeout: float,
|
||||
) -> Any:
|
||||
) -> object:
|
||||
"""Send a command to fail2ban and return the parsed response.
|
||||
|
||||
This is a **synchronous** function intended to be called from within
|
||||
@@ -180,7 +200,7 @@ def _send_command_sync(
|
||||
) from last_oserror
|
||||
|
||||
|
||||
def _coerce_command_token(token: Any) -> Any:
|
||||
def _coerce_command_token(token: Fail2BanToken) -> Fail2BanToken:
|
||||
"""Coerce a command token to a type that fail2ban understands.
|
||||
|
||||
fail2ban's ``CSocket.convert`` accepts ``str``, ``bool``, ``int``,
|
||||
@@ -229,7 +249,7 @@ class Fail2BanClient:
|
||||
self.socket_path: str = socket_path
|
||||
self.timeout: float = timeout
|
||||
|
||||
async def send(self, command: list[Any]) -> Any:
|
||||
async def send(self, command: Fail2BanCommand) -> object:
|
||||
"""Send a command to fail2ban and return the response.
|
||||
|
||||
Acquires the module-level concurrency semaphore before dispatching
|
||||
@@ -267,13 +287,13 @@ class Fail2BanClient:
|
||||
log.debug("fail2ban_sending_command", command=command)
|
||||
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
|
||||
try:
|
||||
response: Any = await loop.run_in_executor(
|
||||
None,
|
||||
_send_command_sync,
|
||||
self.socket_path,
|
||||
command,
|
||||
self.timeout,
|
||||
)
|
||||
response: object = await loop.run_in_executor(
|
||||
None,
|
||||
_send_command_sync,
|
||||
self.socket_path,
|
||||
command,
|
||||
self.timeout,
|
||||
)
|
||||
except Fail2BanConnectionError:
|
||||
log.warning(
|
||||
"fail2ban_connection_error",
|
||||
@@ -300,7 +320,7 @@ class Fail2BanClient:
|
||||
``True`` when the daemon responds correctly, ``False`` otherwise.
|
||||
"""
|
||||
try:
|
||||
response: Any = await self.send(["ping"])
|
||||
response: object = await self.send(["ping"])
|
||||
return bool(response == 1) # fail2ban returns 1 on successful ping
|
||||
except (Fail2BanConnectionError, Fail2BanProtocolError):
|
||||
return False
|
||||
|
||||
Reference in New Issue
Block a user