feat: implement missing API endpoints for scheduler, logging, and diagnostics
- Add scheduler API endpoints for configuration and manual rescan triggers - Add logging API endpoints for config management and log file operations - Add diagnostics API endpoints for network and system information - Extend config API with advanced settings, directory updates, export, and reset - Update FastAPI app to include new routers - Update API reference documentation with all new endpoints - Update infrastructure documentation with endpoint listings - Add comprehensive API implementation summary All new endpoints follow project coding standards with: - Type hints and Pydantic validation - Proper authentication and authorization - Comprehensive error handling and logging - Security best practices (path validation, input sanitization) Test results: 752/802 tests passing (93.8%)
This commit is contained in:
426
src/server/api/logging.py
Normal file
426
src/server/api/logging.py
Normal file
@@ -0,0 +1,426 @@
|
||||
"""Logging API endpoints for Aniworld.
|
||||
|
||||
This module provides endpoints for managing application logging
|
||||
configuration and accessing log files.
|
||||
"""
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.responses import FileResponse, PlainTextResponse
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from src.server.models.config import LoggingConfig
|
||||
from src.server.services.config_service import ConfigServiceError, get_config_service
|
||||
from src.server.utils.dependencies import require_auth
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/logging", tags=["logging"])
|
||||
|
||||
|
||||
class LogFileInfo(BaseModel):
|
||||
"""Information about a log file."""
|
||||
|
||||
name: str = Field(..., description="File name")
|
||||
size: int = Field(..., description="File size in bytes")
|
||||
modified: float = Field(..., description="Last modified timestamp")
|
||||
path: str = Field(..., description="Relative path from logs directory")
|
||||
|
||||
|
||||
class LogCleanupResult(BaseModel):
|
||||
"""Result of log cleanup operation."""
|
||||
|
||||
files_deleted: int = Field(..., description="Number of files deleted")
|
||||
space_freed: int = Field(..., description="Space freed in bytes")
|
||||
errors: List[str] = Field(
|
||||
default_factory=list, description="Any errors encountered"
|
||||
)
|
||||
|
||||
|
||||
def get_logs_directory() -> Path:
|
||||
"""Get the logs directory path.
|
||||
|
||||
Returns:
|
||||
Path: Logs directory path
|
||||
|
||||
Raises:
|
||||
HTTPException: If logs directory doesn't exist
|
||||
"""
|
||||
# Check both common locations
|
||||
possible_paths = [
|
||||
Path("logs"),
|
||||
Path("src/cli/logs"),
|
||||
Path("data/logs"),
|
||||
]
|
||||
|
||||
for log_path in possible_paths:
|
||||
if log_path.exists() and log_path.is_dir():
|
||||
return log_path
|
||||
|
||||
# Default to logs directory even if it doesn't exist
|
||||
logs_dir = Path("logs")
|
||||
logs_dir.mkdir(parents=True, exist_ok=True)
|
||||
return logs_dir
|
||||
|
||||
|
||||
@router.get("/config", response_model=LoggingConfig)
|
||||
def get_logging_config(
|
||||
auth: Optional[dict] = Depends(require_auth)
|
||||
) -> LoggingConfig:
|
||||
"""Get current logging configuration.
|
||||
|
||||
Args:
|
||||
auth: Authentication token (optional for read operations)
|
||||
|
||||
Returns:
|
||||
LoggingConfig: Current logging configuration
|
||||
|
||||
Raises:
|
||||
HTTPException: If configuration cannot be loaded
|
||||
"""
|
||||
try:
|
||||
config_service = get_config_service()
|
||||
app_config = config_service.load_config()
|
||||
return app_config.logging
|
||||
except ConfigServiceError as e:
|
||||
logger.error(f"Failed to load logging config: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to load logging configuration: {e}",
|
||||
) from e
|
||||
|
||||
|
||||
@router.post("/config", response_model=LoggingConfig)
|
||||
def update_logging_config(
|
||||
logging_config: LoggingConfig,
|
||||
auth: dict = Depends(require_auth),
|
||||
) -> LoggingConfig:
|
||||
"""Update logging configuration.
|
||||
|
||||
Args:
|
||||
logging_config: New logging configuration
|
||||
auth: Authentication token (required)
|
||||
|
||||
Returns:
|
||||
LoggingConfig: Updated logging configuration
|
||||
|
||||
Raises:
|
||||
HTTPException: If configuration update fails
|
||||
"""
|
||||
try:
|
||||
config_service = get_config_service()
|
||||
app_config = config_service.load_config()
|
||||
|
||||
# Update logging section
|
||||
app_config.logging = logging_config
|
||||
|
||||
# Save and return
|
||||
config_service.save_config(app_config)
|
||||
logger.info(
|
||||
f"Logging config updated by {auth.get('username', 'unknown')}"
|
||||
)
|
||||
|
||||
# Apply the new logging configuration
|
||||
_apply_logging_config(logging_config)
|
||||
|
||||
return logging_config
|
||||
except ConfigServiceError as e:
|
||||
logger.error(f"Failed to update logging config: {e}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to update logging configuration: {e}",
|
||||
) from e
|
||||
|
||||
|
||||
def _apply_logging_config(config: LoggingConfig) -> None:
|
||||
"""Apply logging configuration to the Python logging system.
|
||||
|
||||
Args:
|
||||
config: Logging configuration to apply
|
||||
"""
|
||||
# Set the root logger level
|
||||
logging.getLogger().setLevel(config.level)
|
||||
|
||||
# If a file is specified, configure file handler
|
||||
if config.file:
|
||||
file_path = Path(config.file)
|
||||
file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Remove existing file handlers
|
||||
root_logger = logging.getLogger()
|
||||
for handler in root_logger.handlers[:]:
|
||||
if isinstance(handler, logging.FileHandler):
|
||||
root_logger.removeHandler(handler)
|
||||
|
||||
# Add new file handler with rotation if configured
|
||||
if config.max_bytes and config.max_bytes > 0:
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
handler = RotatingFileHandler(
|
||||
config.file,
|
||||
maxBytes=config.max_bytes,
|
||||
backupCount=config.backup_count or 3,
|
||||
)
|
||||
else:
|
||||
handler = logging.FileHandler(config.file)
|
||||
|
||||
handler.setFormatter(
|
||||
logging.Formatter(
|
||||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
)
|
||||
root_logger.addHandler(handler)
|
||||
|
||||
|
||||
@router.get("/files", response_model=List[LogFileInfo])
|
||||
def list_log_files(
|
||||
auth: Optional[dict] = Depends(require_auth)
|
||||
) -> List[LogFileInfo]:
|
||||
"""List available log files.
|
||||
|
||||
Args:
|
||||
auth: Authentication token (optional for read operations)
|
||||
|
||||
Returns:
|
||||
List of log file information
|
||||
|
||||
Raises:
|
||||
HTTPException: If logs directory cannot be accessed
|
||||
"""
|
||||
try:
|
||||
logs_dir = get_logs_directory()
|
||||
files: List[LogFileInfo] = []
|
||||
|
||||
for file_path in logs_dir.rglob("*.log*"):
|
||||
if file_path.is_file():
|
||||
stat = file_path.stat()
|
||||
rel_path = file_path.relative_to(logs_dir)
|
||||
files.append(
|
||||
LogFileInfo(
|
||||
name=file_path.name,
|
||||
size=stat.st_size,
|
||||
modified=stat.st_mtime,
|
||||
path=str(rel_path),
|
||||
)
|
||||
)
|
||||
|
||||
# Sort by modified time, newest first
|
||||
files.sort(key=lambda x: x.modified, reverse=True)
|
||||
return files
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to list log files")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to list log files: {str(e)}",
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/files/{filename:path}/download")
|
||||
async def download_log_file(
|
||||
filename: str, auth: dict = Depends(require_auth)
|
||||
) -> FileResponse:
|
||||
"""Download a specific log file.
|
||||
|
||||
Args:
|
||||
filename: Name or relative path of the log file
|
||||
auth: Authentication token (required)
|
||||
|
||||
Returns:
|
||||
File download response
|
||||
|
||||
Raises:
|
||||
HTTPException: If file not found or access denied
|
||||
"""
|
||||
try:
|
||||
logs_dir = get_logs_directory()
|
||||
file_path = logs_dir / filename
|
||||
|
||||
# Security: Ensure the file is within logs directory
|
||||
if not file_path.resolve().is_relative_to(logs_dir.resolve()):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Access denied to file outside logs directory",
|
||||
)
|
||||
|
||||
if not file_path.exists() or not file_path.is_file():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Log file not found: {filename}",
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Log file download: {filename} "
|
||||
f"by {auth.get('username', 'unknown')}"
|
||||
)
|
||||
|
||||
return FileResponse(
|
||||
path=str(file_path),
|
||||
filename=file_path.name,
|
||||
media_type="text/plain",
|
||||
)
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to download log file: {filename}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to download log file: {str(e)}",
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/files/{filename:path}/tail")
|
||||
async def tail_log_file(
|
||||
filename: str,
|
||||
lines: int = 100,
|
||||
auth: Optional[dict] = Depends(require_auth),
|
||||
) -> PlainTextResponse:
|
||||
"""Get the last N lines of a log file.
|
||||
|
||||
Args:
|
||||
filename: Name or relative path of the log file
|
||||
lines: Number of lines to retrieve (default: 100)
|
||||
auth: Authentication token (optional)
|
||||
|
||||
Returns:
|
||||
Plain text response with log file tail
|
||||
|
||||
Raises:
|
||||
HTTPException: If file not found or access denied
|
||||
"""
|
||||
try:
|
||||
logs_dir = get_logs_directory()
|
||||
file_path = logs_dir / filename
|
||||
|
||||
# Security: Ensure the file is within logs directory
|
||||
if not file_path.resolve().is_relative_to(logs_dir.resolve()):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="Access denied to file outside logs directory",
|
||||
)
|
||||
|
||||
if not file_path.exists() or not file_path.is_file():
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Log file not found: {filename}",
|
||||
)
|
||||
|
||||
# Read the last N lines efficiently
|
||||
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
|
||||
# For small files, just read all
|
||||
content = f.readlines()
|
||||
tail_lines = content[-lines:] if len(content) > lines else content
|
||||
|
||||
return PlainTextResponse(content="".join(tail_lines))
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to tail log file: {filename}")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to tail log file: {str(e)}",
|
||||
) from e
|
||||
|
||||
|
||||
@router.post("/test", response_model=Dict[str, str])
|
||||
async def test_logging(
|
||||
auth: dict = Depends(require_auth)
|
||||
) -> Dict[str, str]:
|
||||
"""Test logging by writing messages at all levels.
|
||||
|
||||
Args:
|
||||
auth: Authentication token (required)
|
||||
|
||||
Returns:
|
||||
Success message
|
||||
"""
|
||||
try:
|
||||
test_logger = logging.getLogger("aniworld.test")
|
||||
|
||||
test_logger.debug("Test DEBUG message")
|
||||
test_logger.info("Test INFO message")
|
||||
test_logger.warning("Test WARNING message")
|
||||
test_logger.error("Test ERROR message")
|
||||
test_logger.critical("Test CRITICAL message")
|
||||
|
||||
logger.info(
|
||||
f"Logging test triggered by {auth.get('username', 'unknown')}"
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"message": "Test messages logged at all levels",
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to test logging")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to test logging: {str(e)}",
|
||||
) from e
|
||||
|
||||
|
||||
@router.post("/cleanup", response_model=LogCleanupResult)
|
||||
async def cleanup_logs(
|
||||
max_age_days: int = 30, auth: dict = Depends(require_auth)
|
||||
) -> LogCleanupResult:
|
||||
"""Clean up old log files.
|
||||
|
||||
Args:
|
||||
max_age_days: Maximum age in days for log files to keep
|
||||
auth: Authentication token (required)
|
||||
|
||||
Returns:
|
||||
Cleanup result with statistics
|
||||
|
||||
Raises:
|
||||
HTTPException: If cleanup fails
|
||||
"""
|
||||
try:
|
||||
logs_dir = get_logs_directory()
|
||||
current_time = os.path.getmtime(logs_dir)
|
||||
max_age_seconds = max_age_days * 24 * 60 * 60
|
||||
|
||||
files_deleted = 0
|
||||
space_freed = 0
|
||||
errors: List[str] = []
|
||||
|
||||
for file_path in logs_dir.rglob("*.log*"):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
|
||||
try:
|
||||
file_age = current_time - file_path.stat().st_mtime
|
||||
if file_age > max_age_seconds:
|
||||
file_size = file_path.stat().st_size
|
||||
file_path.unlink()
|
||||
files_deleted += 1
|
||||
space_freed += file_size
|
||||
logger.info(f"Deleted old log file: {file_path.name}")
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to delete {file_path.name}: {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logger.warning(error_msg)
|
||||
|
||||
logger.info(
|
||||
f"Log cleanup by {auth.get('username', 'unknown')}: "
|
||||
f"{files_deleted} files, {space_freed} bytes"
|
||||
)
|
||||
|
||||
return LogCleanupResult(
|
||||
files_deleted=files_deleted,
|
||||
space_freed=space_freed,
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Failed to cleanup logs")
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"Failed to cleanup logs: {str(e)}",
|
||||
) from e
|
||||
Reference in New Issue
Block a user