Fix history origin filter path and add regression tests
This commit is contained in:
@@ -294,6 +294,7 @@ async def get_history_page(
|
||||
since: int | None = None,
|
||||
jail: str | None = None,
|
||||
ip_filter: str | None = None,
|
||||
origin: BanOrigin | None = None,
|
||||
page: int = 1,
|
||||
page_size: int = 100,
|
||||
) -> tuple[list[HistoryRecord], int]:
|
||||
@@ -314,6 +315,12 @@ async def get_history_page(
|
||||
wheres.append("ip LIKE ?")
|
||||
params.append(f"{ip_filter}%")
|
||||
|
||||
origin_clause, origin_params = _origin_sql_filter(origin)
|
||||
if origin_clause:
|
||||
origin_clause_clean = origin_clause.removeprefix(" AND ")
|
||||
wheres.append(origin_clause_clean)
|
||||
params.extend(origin_params)
|
||||
|
||||
where_sql: str = ("WHERE " + " AND ".join(wheres)) if wheres else ""
|
||||
|
||||
effective_page_size: int = page_size
|
||||
|
||||
@@ -18,7 +18,7 @@ import structlog
|
||||
if TYPE_CHECKING:
|
||||
from app.models.geo import GeoEnricher
|
||||
|
||||
from app.models.ban import TIME_RANGE_SECONDS, TimeRange
|
||||
from app.models.ban import TIME_RANGE_SECONDS, BanOrigin, TimeRange
|
||||
from app.models.history import (
|
||||
HistoryBanItem,
|
||||
HistoryListResponse,
|
||||
@@ -62,6 +62,7 @@ async def list_history(
|
||||
range_: TimeRange | None = None,
|
||||
jail: str | None = None,
|
||||
ip_filter: str | None = None,
|
||||
origin: BanOrigin | None = None,
|
||||
page: int = 1,
|
||||
page_size: int = _DEFAULT_PAGE_SIZE,
|
||||
geo_enricher: GeoEnricher | None = None,
|
||||
@@ -108,6 +109,7 @@ async def list_history(
|
||||
since=since,
|
||||
jail=jail,
|
||||
ip_filter=ip_filter,
|
||||
origin=origin,
|
||||
page=page,
|
||||
page_size=effective_page_size,
|
||||
)
|
||||
|
||||
@@ -136,3 +136,32 @@ async def test_get_history_page_and_for_ip(tmp_path: Path) -> None:
|
||||
history_for_ip = await fail2ban_db_repo.get_history_for_ip(db_path=db_path, ip="2.2.2.2")
|
||||
assert len(history_for_ip) == 1
|
||||
assert history_for_ip[0].ip == "2.2.2.2"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_history_page_origin_filter(tmp_path: Path) -> None:
|
||||
db_path = str(tmp_path / "fail2ban.db")
|
||||
async with aiosqlite.connect(db_path) as db:
|
||||
await _create_bans_table(db)
|
||||
await db.executemany(
|
||||
"INSERT INTO bans (jail, ip, timeofban, bancount, data) VALUES (?, ?, ?, ?, ?)",
|
||||
[
|
||||
("jail1", "1.1.1.1", 100, 1, "{}"),
|
||||
("blocklist-import", "2.2.2.2", 200, 1, "{}"),
|
||||
],
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
page, total = await fail2ban_db_repo.get_history_page(
|
||||
db_path=db_path,
|
||||
since=None,
|
||||
jail=None,
|
||||
ip_filter=None,
|
||||
origin="selfblock",
|
||||
page=1,
|
||||
page_size=10,
|
||||
)
|
||||
|
||||
assert total == 1
|
||||
assert len(page) == 1
|
||||
assert page[0].ip == "1.1.1.1"
|
||||
|
||||
@@ -179,6 +179,19 @@ class TestListHistory:
|
||||
# 2 sshd bans for 1.2.3.4
|
||||
assert result.total == 2
|
||||
|
||||
async def test_origin_filter_selfblock(self, f2b_db_path: str) -> None:
|
||||
"""Origin filter should include only selfblock entries."""
|
||||
with patch(
|
||||
"app.services.history_service.get_fail2ban_db_path",
|
||||
new=AsyncMock(return_value=f2b_db_path),
|
||||
):
|
||||
result = await history_service.list_history(
|
||||
"fake_socket", origin="selfblock"
|
||||
)
|
||||
|
||||
assert result.total == 4
|
||||
assert all(item.jail != "blocklist-import" for item in result.items)
|
||||
|
||||
async def test_unknown_ip_returns_empty(self, f2b_db_path: str) -> None:
|
||||
"""Filtering by a non-existent IP returns an empty result set."""
|
||||
with patch(
|
||||
|
||||
Reference in New Issue
Block a user