"""Shared utilities for parsing fail2ban responses. This module provides canonical implementations of response parsing helpers used across all service modules. All services should import from here instead of maintaining local copies. """ from __future__ import annotations def ok(response: object) -> object: """Extract the payload from a fail2ban ``(return_code, data)`` response. fail2ban commands return a tuple of ``(return_code, data)`` where ``return_code`` is 0 for success or non-zero for errors. This function extracts and returns the ``data`` portion. Args: response: Raw value returned by :meth:`~Fail2BanClient.send`. Returns: The payload ``data`` portion of the response. Raises: ValueError: If the response indicates an error (return code ≠ 0) or has an unexpected shape. Examples: >>> response = (0, {'Jail list': 'sshd,recidive'}) >>> ok(response) {'Jail list': 'sshd,recidive'} >>> error_response = (1, 'Unknown jail') >>> ok(error_response) Traceback (most recent call last): ... ValueError: fail2ban returned error code 1: 'Unknown jail' """ try: code_val: int data_val: object code_val, data_val = response # type: ignore[misc] except (TypeError, ValueError) as exc: raise ValueError(f"Unexpected fail2ban response shape: {response!r}") from exc if code_val != 0: raise ValueError(f"fail2ban returned error code {code_val}: {data_val!r}") return data_val def to_dict(pairs: object) -> dict[str, object]: """Convert a list of ``(key, value)`` pairs to a plain dict. fail2ban returns many results as a list of key-value tuples. This function converts them to a regular Python dict, skipping malformed entries and converting keys to strings. Args: pairs: A list of ``(key, value)`` pairs (or any iterable thereof). Non-list/tuple inputs return an empty dict. Returns: A :class:`dict` with the keys and values from *pairs*. Keys are converted to strings; values are preserved as-is. Malformed entries are silently skipped. Examples: >>> to_dict([('name', 'sshd'), ('port', 22)]) {'name': 'sshd', 'port': 22} >>> to_dict([('a', 1), 'broken', ('b', 2)]) {'a': 1, 'b': 2} >>> to_dict('not a list') {} """ if not isinstance(pairs, (list, tuple)): return {} result: dict[str, object] = {} for item in pairs: try: k, v = item result[str(k)] = v except (TypeError, ValueError): pass return result def ensure_list(value: object | None) -> list[str]: """Coerce a fail2ban response value to a list of strings. Some fail2ban ``get`` responses return ``None`` when a field is empty, a single string when there is only one entry, or a list of strings. This helper normalises all three cases to a consistent list. Args: value: The raw value from a fail2ban command response. Can be ``None``, a string, a list/tuple of strings, or any other object. Returns: A :class:`list` of strings. Empty input returns an empty list. Single strings are wrapped in a list. Lists/tuples are converted to strings element-wise. Examples: >>> ensure_list(None) [] >>> ensure_list('sshd') ['sshd'] >>> ensure_list(['sshd', 'apache2']) ['sshd', 'apache2'] >>> ensure_list(42) ['42'] """ if value is None: return [] if isinstance(value, str): return [value] if value.strip() else [] if isinstance(value, (list, tuple)): return [str(v) for v in value if v is not None] return [str(value)] def is_not_found_error(exc: Exception) -> bool: """Return ``True`` if *exc* indicates a jail does not exist. fail2ban raises errors when a jail is not found, but serializes them in different formats depending on the context. This function checks for multiple common error message patterns. Args: exc: The exception to inspect. Returns: ``True`` if the exception message contains any of the known "not found" phrases (case-insensitive), ``False`` otherwise. Examples: >>> exc = ValueError('Unknown jail: sshd') >>> is_not_found_error(exc) True >>> exc = ValueError('unknownjail') >>> is_not_found_error(exc) True >>> exc = ValueError('Internal error') >>> is_not_found_error(exc) False """ msg = str(exc).lower() return any( phrase in msg for phrase in ( "unknown jail", "unknownjail", "no jail", "does not exist", "not found", ) )