feat: action config service, router endpoints, and full test coverage (Tasks 3.1, 3.2, 3.4)

- ActionConfig extended with active, used_by_jails, source_file, has_local_override
- New models: ActionListResponse, ActionUpdateRequest, ActionCreateRequest, AssignActionRequest
- New service functions: list_actions, get_action, update_action, create_action, delete_action, assign_action_to_jail, remove_action_from_jail
- New error classes: ActionNotFoundError, ActionAlreadyExistsError, ActionReadonlyError, ActionNameError
- New router endpoints: GET/PUT/POST/DELETE /api/config/actions, POST/DELETE /api/config/jails/{name}/action
- Service + router tests: 290 tests passing, mypy strict clean, ruff clean
This commit is contained in:
2026-03-13 19:12:31 +01:00
parent 2f60b0915e
commit f7cc130432
6 changed files with 2866 additions and 3 deletions

View File

@@ -1289,3 +1289,425 @@ class TestAssignFilterToJail:
).post("/api/config/jails/sshd/filter", json={"filter_name": "sshd"})
assert resp.status_code == 401
# ===========================================================================
# Action router tests (Task 3.1 + 3.2)
# ===========================================================================
@pytest.mark.asyncio
class TestListActionsRouter:
async def test_200_returns_action_list(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig, ActionListResponse
mock_action = ActionConfig(
name="iptables",
filename="iptables.conf",
actionban="/sbin/iptables -I f2b-<name> 1 -s <ip> -j DROP",
)
mock_response = ActionListResponse(actions=[mock_action], total=1)
with patch(
"app.routers.config.config_file_service.list_actions",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/actions")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 1
assert data["actions"][0]["name"] == "iptables"
async def test_active_sorted_first(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig, ActionListResponse
inactive = ActionConfig(name="aaa", filename="aaa.conf", active=False)
active = ActionConfig(name="zzz", filename="zzz.conf", active=True)
mock_response = ActionListResponse(actions=[inactive, active], total=2)
with patch(
"app.routers.config.config_file_service.list_actions",
AsyncMock(return_value=mock_response),
):
resp = await config_client.get("/api/config/actions")
data = resp.json()
assert data["actions"][0]["name"] == "zzz" # active comes first
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/actions")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestGetActionRouter:
async def test_200_returns_action(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig
mock_action = ActionConfig(
name="iptables",
filename="iptables.conf",
actionban="/sbin/iptables -I f2b-<name> 1 -s <ip> -j DROP",
)
with patch(
"app.routers.config.config_file_service.get_action",
AsyncMock(return_value=mock_action),
):
resp = await config_client.get("/api/config/actions/iptables")
assert resp.status_code == 200
assert resp.json()["name"] == "iptables"
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.get_action",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.get("/api/config/actions/missing")
assert resp.status_code == 404
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).get("/api/config/actions/iptables")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestUpdateActionRouter:
async def test_200_returns_updated_action(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig
updated = ActionConfig(
name="iptables",
filename="iptables.local",
actionban="echo ban",
)
with patch(
"app.routers.config.config_file_service.update_action",
AsyncMock(return_value=updated),
):
resp = await config_client.put(
"/api/config/actions/iptables",
json={"actionban": "echo ban"},
)
assert resp.status_code == 200
assert resp.json()["actionban"] == "echo ban"
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.update_action",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.put(
"/api/config/actions/missing", json={}
)
assert resp.status_code == 404
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.update_action",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.put(
"/api/config/actions/badname", json={}
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).put("/api/config/actions/iptables", json={})
assert resp.status_code == 401
@pytest.mark.asyncio
class TestCreateActionRouter:
async def test_201_returns_created_action(self, config_client: AsyncClient) -> None:
from app.models.config import ActionConfig
created = ActionConfig(
name="custom",
filename="custom.local",
actionban="echo ban",
)
with patch(
"app.routers.config.config_file_service.create_action",
AsyncMock(return_value=created),
):
resp = await config_client.post(
"/api/config/actions",
json={"name": "custom", "actionban": "echo ban"},
)
assert resp.status_code == 201
assert resp.json()["name"] == "custom"
async def test_409_when_already_exists(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionAlreadyExistsError
with patch(
"app.routers.config.config_file_service.create_action",
AsyncMock(side_effect=ActionAlreadyExistsError("iptables")),
):
resp = await config_client.post(
"/api/config/actions",
json={"name": "iptables"},
)
assert resp.status_code == 409
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.create_action",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.post(
"/api/config/actions",
json={"name": "badname"},
)
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/actions", json={"name": "x"})
assert resp.status_code == 401
@pytest.mark.asyncio
class TestDeleteActionRouter:
async def test_204_on_delete(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(return_value=None),
):
resp = await config_client.delete("/api/config/actions/custom")
assert resp.status_code == 204
async def test_404_when_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.delete("/api/config/actions/missing")
assert resp.status_code == 404
async def test_409_when_readonly(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionReadonlyError
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(side_effect=ActionReadonlyError("iptables")),
):
resp = await config_client.delete("/api/config/actions/iptables")
assert resp.status_code == 409
async def test_400_for_bad_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.delete_action",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.delete("/api/config/actions/badname")
assert resp.status_code == 400
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).delete("/api/config/actions/iptables")
assert resp.status_code == 401
@pytest.mark.asyncio
class TestAssignActionToJailRouter:
async def test_204_on_success(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(return_value=None),
):
resp = await config_client.post(
"/api/config/jails/sshd/action",
json={"action_name": "iptables"},
)
assert resp.status_code == 204
async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNotFoundInConfigError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
):
resp = await config_client.post(
"/api/config/jails/missing/action",
json={"action_name": "iptables"},
)
assert resp.status_code == 404
async def test_404_when_action_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNotFoundError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=ActionNotFoundError("missing")),
):
resp = await config_client.post(
"/api/config/jails/sshd/action",
json={"action_name": "missing"},
)
assert resp.status_code == 404
async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=JailNameError()),
):
resp = await config_client.post(
"/api/config/jails/badjailname/action",
json={"action_name": "iptables"},
)
assert resp.status_code == 400
async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.post(
"/api/config/jails/sshd/action",
json={"action_name": "badaction"},
)
assert resp.status_code == 400
async def test_reload_param_passed(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.assign_action_to_jail",
AsyncMock(return_value=None),
) as mock_assign:
resp = await config_client.post(
"/api/config/jails/sshd/action?reload=true",
json={"action_name": "iptables"},
)
assert resp.status_code == 204
assert mock_assign.call_args.kwargs.get("do_reload") is True
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).post("/api/config/jails/sshd/action", json={"action_name": "iptables"})
assert resp.status_code == 401
@pytest.mark.asyncio
class TestRemoveActionFromJailRouter:
async def test_204_on_success(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(return_value=None),
):
resp = await config_client.delete(
"/api/config/jails/sshd/action/iptables"
)
assert resp.status_code == 204
async def test_404_when_jail_not_found(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNotFoundInConfigError
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(side_effect=JailNotFoundInConfigError("missing")),
):
resp = await config_client.delete(
"/api/config/jails/missing/action/iptables"
)
assert resp.status_code == 404
async def test_400_for_bad_jail_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import JailNameError
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(side_effect=JailNameError()),
):
resp = await config_client.delete(
"/api/config/jails/badjailname/action/iptables"
)
assert resp.status_code == 400
async def test_400_for_bad_action_name(self, config_client: AsyncClient) -> None:
from app.services.config_file_service import ActionNameError
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(side_effect=ActionNameError()),
):
resp = await config_client.delete(
"/api/config/jails/sshd/action/badactionname"
)
assert resp.status_code == 400
async def test_reload_param_passed(self, config_client: AsyncClient) -> None:
with patch(
"app.routers.config.config_file_service.remove_action_from_jail",
AsyncMock(return_value=None),
) as mock_rm:
resp = await config_client.delete(
"/api/config/jails/sshd/action/iptables?reload=true"
)
assert resp.status_code == 204
assert mock_rm.call_args.kwargs.get("do_reload") is True
async def test_401_when_unauthenticated(self, config_client: AsyncClient) -> None:
resp = await AsyncClient(
transport=ASGITransport(app=config_client._transport.app), # type: ignore[attr-defined]
base_url="http://test",
).delete("/api/config/jails/sshd/action/iptables")
assert resp.status_code == 401