remove part 2

This commit is contained in:
2025-10-30 20:11:38 +01:00
parent 4649cf562d
commit fd76be02fd
8 changed files with 138 additions and 1260 deletions

View File

@@ -1,426 +0,0 @@
"""Logging API endpoints for Aniworld.
This module provides endpoints for managing application logging
configuration and accessing log files.
"""
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import FileResponse, PlainTextResponse
from pydantic import BaseModel, Field
from src.server.models.config import LoggingConfig
from src.server.services.config_service import ConfigServiceError, get_config_service
from src.server.utils.dependencies import require_auth
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/logging", tags=["logging"])
class LogFileInfo(BaseModel):
"""Information about a log file."""
name: str = Field(..., description="File name")
size: int = Field(..., description="File size in bytes")
modified: float = Field(..., description="Last modified timestamp")
path: str = Field(..., description="Relative path from logs directory")
class LogCleanupResult(BaseModel):
"""Result of log cleanup operation."""
files_deleted: int = Field(..., description="Number of files deleted")
space_freed: int = Field(..., description="Space freed in bytes")
errors: List[str] = Field(
default_factory=list, description="Any errors encountered"
)
def get_logs_directory() -> Path:
"""Get the logs directory path.
Returns:
Path: Logs directory path
Raises:
HTTPException: If logs directory doesn't exist
"""
# Check both common locations
possible_paths = [
Path("logs"),
Path("src/cli/logs"),
Path("data/logs"),
]
for log_path in possible_paths:
if log_path.exists() and log_path.is_dir():
return log_path
# Default to logs directory even if it doesn't exist
logs_dir = Path("logs")
logs_dir.mkdir(parents=True, exist_ok=True)
return logs_dir
@router.get("/config", response_model=LoggingConfig)
def get_logging_config(
auth: Optional[dict] = Depends(require_auth)
) -> LoggingConfig:
"""Get current logging configuration.
Args:
auth: Authentication token (optional for read operations)
Returns:
LoggingConfig: Current logging configuration
Raises:
HTTPException: If configuration cannot be loaded
"""
try:
config_service = get_config_service()
app_config = config_service.load_config()
return app_config.logging
except ConfigServiceError as e:
logger.error(f"Failed to load logging config: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to load logging configuration: {e}",
) from e
@router.post("/config", response_model=LoggingConfig)
def update_logging_config(
logging_config: LoggingConfig,
auth: dict = Depends(require_auth),
) -> LoggingConfig:
"""Update logging configuration.
Args:
logging_config: New logging configuration
auth: Authentication token (required)
Returns:
LoggingConfig: Updated logging configuration
Raises:
HTTPException: If configuration update fails
"""
try:
config_service = get_config_service()
app_config = config_service.load_config()
# Update logging section
app_config.logging = logging_config
# Save and return
config_service.save_config(app_config)
logger.info(
f"Logging config updated by {auth.get('username', 'unknown')}"
)
# Apply the new logging configuration
_apply_logging_config(logging_config)
return logging_config
except ConfigServiceError as e:
logger.error(f"Failed to update logging config: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update logging configuration: {e}",
) from e
def _apply_logging_config(config: LoggingConfig) -> None:
"""Apply logging configuration to the Python logging system.
Args:
config: Logging configuration to apply
"""
# Set the root logger level
logging.getLogger().setLevel(config.level)
# If a file is specified, configure file handler
if config.file:
file_path = Path(config.file)
file_path.parent.mkdir(parents=True, exist_ok=True)
# Remove existing file handlers
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
if isinstance(handler, logging.FileHandler):
root_logger.removeHandler(handler)
# Add new file handler with rotation if configured
if config.max_bytes and config.max_bytes > 0:
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
config.file,
maxBytes=config.max_bytes,
backupCount=config.backup_count or 3,
)
else:
handler = logging.FileHandler(config.file)
handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
)
root_logger.addHandler(handler)
@router.get("/files", response_model=List[LogFileInfo])
def list_log_files(
auth: Optional[dict] = Depends(require_auth)
) -> List[LogFileInfo]:
"""List available log files.
Args:
auth: Authentication token (optional for read operations)
Returns:
List of log file information
Raises:
HTTPException: If logs directory cannot be accessed
"""
try:
logs_dir = get_logs_directory()
files: List[LogFileInfo] = []
for file_path in logs_dir.rglob("*.log*"):
if file_path.is_file():
stat = file_path.stat()
rel_path = file_path.relative_to(logs_dir)
files.append(
LogFileInfo(
name=file_path.name,
size=stat.st_size,
modified=stat.st_mtime,
path=str(rel_path),
)
)
# Sort by modified time, newest first
files.sort(key=lambda x: x.modified, reverse=True)
return files
except Exception as e:
logger.exception("Failed to list log files")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to list log files: {str(e)}",
) from e
@router.get("/files/{filename:path}/download")
async def download_log_file(
filename: str, auth: dict = Depends(require_auth)
) -> FileResponse:
"""Download a specific log file.
Args:
filename: Name or relative path of the log file
auth: Authentication token (required)
Returns:
File download response
Raises:
HTTPException: If file not found or access denied
"""
try:
logs_dir = get_logs_directory()
file_path = logs_dir / filename
# Security: Ensure the file is within logs directory
if not file_path.resolve().is_relative_to(logs_dir.resolve()):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to file outside logs directory",
)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Log file not found: {filename}",
)
logger.info(
f"Log file download: {filename} "
f"by {auth.get('username', 'unknown')}"
)
return FileResponse(
path=str(file_path),
filename=file_path.name,
media_type="text/plain",
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to download log file: {filename}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to download log file: {str(e)}",
) from e
@router.get("/files/{filename:path}/tail")
async def tail_log_file(
filename: str,
lines: int = 100,
auth: Optional[dict] = Depends(require_auth),
) -> PlainTextResponse:
"""Get the last N lines of a log file.
Args:
filename: Name or relative path of the log file
lines: Number of lines to retrieve (default: 100)
auth: Authentication token (optional)
Returns:
Plain text response with log file tail
Raises:
HTTPException: If file not found or access denied
"""
try:
logs_dir = get_logs_directory()
file_path = logs_dir / filename
# Security: Ensure the file is within logs directory
if not file_path.resolve().is_relative_to(logs_dir.resolve()):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied to file outside logs directory",
)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Log file not found: {filename}",
)
# Read the last N lines efficiently
with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
# For small files, just read all
content = f.readlines()
tail_lines = content[-lines:] if len(content) > lines else content
return PlainTextResponse(content="".join(tail_lines))
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to tail log file: {filename}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to tail log file: {str(e)}",
) from e
@router.post("/test", response_model=Dict[str, str])
async def test_logging(
auth: dict = Depends(require_auth)
) -> Dict[str, str]:
"""Test logging by writing messages at all levels.
Args:
auth: Authentication token (required)
Returns:
Success message
"""
try:
test_logger = logging.getLogger("aniworld.test")
test_logger.debug("Test DEBUG message")
test_logger.info("Test INFO message")
test_logger.warning("Test WARNING message")
test_logger.error("Test ERROR message")
test_logger.critical("Test CRITICAL message")
logger.info(
f"Logging test triggered by {auth.get('username', 'unknown')}"
)
return {
"status": "success",
"message": "Test messages logged at all levels",
}
except Exception as e:
logger.exception("Failed to test logging")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to test logging: {str(e)}",
) from e
@router.post("/cleanup", response_model=LogCleanupResult)
async def cleanup_logs(
max_age_days: int = 30, auth: dict = Depends(require_auth)
) -> LogCleanupResult:
"""Clean up old log files.
Args:
max_age_days: Maximum age in days for log files to keep
auth: Authentication token (required)
Returns:
Cleanup result with statistics
Raises:
HTTPException: If cleanup fails
"""
try:
logs_dir = get_logs_directory()
current_time = os.path.getmtime(logs_dir)
max_age_seconds = max_age_days * 24 * 60 * 60
files_deleted = 0
space_freed = 0
errors: List[str] = []
for file_path in logs_dir.rglob("*.log*"):
if not file_path.is_file():
continue
try:
file_age = current_time - file_path.stat().st_mtime
if file_age > max_age_seconds:
file_size = file_path.stat().st_size
file_path.unlink()
files_deleted += 1
space_freed += file_size
logger.info(f"Deleted old log file: {file_path.name}")
except Exception as e:
error_msg = f"Failed to delete {file_path.name}: {str(e)}"
errors.append(error_msg)
logger.warning(error_msg)
logger.info(
f"Log cleanup by {auth.get('username', 'unknown')}: "
f"{files_deleted} files, {space_freed} bytes"
)
return LogCleanupResult(
files_deleted=files_deleted,
space_freed=space_freed,
errors=errors,
)
except Exception as e:
logger.exception("Failed to cleanup logs")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to cleanup logs: {str(e)}",
) from e