Files
Aniworld/src/server/utils/log_manager.py
2025-10-22 09:20:35 +02:00

381 lines
12 KiB
Python

"""Log management utilities for rotation, archival, and search."""
import gzip
import logging
import shutil
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class LogFile:
"""Information about a log file."""
filename: str
path: Path
size_bytes: int
created_time: datetime
modified_time: datetime
class LogManager:
"""Manage application logs."""
def __init__(self, log_dir: str = "logs"):
"""Initialize log manager.
Args:
log_dir: Directory containing log files.
"""
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.archived_dir = self.log_dir / "archived"
self.archived_dir.mkdir(exist_ok=True)
def get_log_files(self, pattern: str = "*.log") -> List[LogFile]:
"""Get list of log files.
Args:
pattern: Glob pattern for log files.
Returns:
list: List of LogFile objects.
"""
log_files = []
for log_path in self.log_dir.glob(pattern):
if log_path.is_file():
stat = log_path.stat()
log_files.append(
LogFile(
filename=log_path.name,
path=log_path,
size_bytes=stat.st_size,
created_time=datetime.fromtimestamp(
stat.st_ctime
),
modified_time=datetime.fromtimestamp(
stat.st_mtime
),
)
)
return sorted(log_files, key=lambda x: x.modified_time, reverse=True)
def rotate_log(
self, log_file: str, max_size_bytes: int = 10485760
) -> bool:
"""Rotate a log file if it exceeds max size.
Args:
log_file: Name of the log file.
max_size_bytes: Maximum size before rotation (default 10MB).
Returns:
bool: True if rotation was needed and successful.
"""
try:
log_path = self.log_dir / log_file
if not log_path.exists():
logger.warning(f"Log file not found: {log_file}")
return False
stat = log_path.stat()
if stat.st_size < max_size_bytes:
return False
# Create rotated filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
rotated_name = f"{log_path.stem}_{timestamp}.log"
rotated_path = self.log_dir / rotated_name
shutil.move(str(log_path), str(rotated_path))
# Compress the rotated file
self._compress_log(rotated_path)
logger.info(f"Rotated log file: {log_file} -> {rotated_name}")
return True
except Exception as e:
logger.error(f"Failed to rotate log file {log_file}: {e}")
return False
def _compress_log(self, log_path: Path) -> bool:
"""Compress a log file.
Args:
log_path: Path to the log file.
Returns:
bool: True if compression was successful.
"""
try:
gz_path = log_path.parent / f"{log_path.name}.gz"
with open(log_path, "rb") as f_in:
with gzip.open(gz_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
log_path.unlink()
logger.debug(f"Compressed log file: {log_path.name}")
return True
except Exception as e:
logger.error(f"Failed to compress log {log_path}: {e}")
return False
def archive_old_logs(
self, days_old: int = 30
) -> int:
"""Archive log files older than specified days.
Args:
days_old: Archive logs older than this many days.
Returns:
int: Number of logs archived.
"""
try:
cutoff_time = datetime.now() - timedelta(days=days_old)
archived_count = 0
for log_file in self.get_log_files():
if log_file.modified_time < cutoff_time:
try:
archived_path = (
self.archived_dir / log_file.filename
)
shutil.move(str(log_file.path), str(archived_path))
self._compress_log(archived_path)
archived_count += 1
logger.debug(
f"Archived log: {log_file.filename}"
)
except Exception as e:
logger.warning(
f"Failed to archive {log_file.filename}: {e}"
)
logger.info(f"Archived {archived_count} old log files")
return archived_count
except Exception as e:
logger.error(f"Failed to archive logs: {e}")
return 0
def search_logs(
self, search_term: str, case_sensitive: bool = False
) -> Dict[str, List[str]]:
"""Search for lines matching a term in log files.
Args:
search_term: Text to search for.
case_sensitive: Whether search is case-sensitive.
Returns:
dict: Dictionary mapping log files to matching lines.
"""
try:
results = {}
for log_file in self.get_log_files():
try:
with open(log_file.path, "r", encoding="utf-8") as f:
matching_lines = []
for line_num, line in enumerate(f, 1):
if case_sensitive:
if search_term in line:
matching_lines.append(
f"{line_num}: {line.strip()}"
)
else:
if search_term.lower() in line.lower():
matching_lines.append(
f"{line_num}: {line.strip()}"
)
if matching_lines:
results[log_file.filename] = matching_lines
except Exception as e:
logger.warning(
f"Failed to search {log_file.filename}: {e}"
)
logger.debug(
f"Search for '{search_term}' found {len(results)} log files"
)
return results
except Exception as e:
logger.error(f"Failed to search logs: {e}")
return {}
def export_logs(
self,
output_file: str,
log_pattern: str = "*.log",
compress: bool = True,
) -> bool:
"""Export logs to a file or archive.
Args:
output_file: Path to output file.
log_pattern: Pattern for logs to include.
compress: Whether to compress the output.
Returns:
bool: True if export was successful.
"""
try:
output_path = Path(output_file)
if compress:
import tarfile
tar_path = output_path.with_suffix(".tar.gz")
with tarfile.open(tar_path, "w:gz") as tar:
for log_file in self.get_log_files(log_pattern):
tar.add(
log_file.path,
arcname=log_file.filename,
)
logger.info(f"Exported logs to: {tar_path}")
return True
else:
# Concatenate all logs
with open(output_path, "w") as out_f:
for log_file in self.get_log_files(log_pattern):
out_f.write(f"\n\n=== {log_file.filename} ===\n\n")
with open(log_file.path, "r") as in_f:
out_f.write(in_f.read())
logger.info(f"Exported logs to: {output_path}")
return True
except Exception as e:
logger.error(f"Failed to export logs: {e}")
return False
def get_log_stats(self) -> Dict[str, Any]:
"""Get statistics about log files.
Returns:
dict: Log statistics.
"""
try:
log_files = self.get_log_files()
total_size = sum(log.size_bytes for log in log_files)
total_files = len(log_files)
if not log_files:
return {
"total_files": 0,
"total_size_bytes": 0,
"total_size_mb": 0,
"average_size_bytes": 0,
"largest_file": None,
"oldest_file": None,
"newest_file": None,
}
return {
"total_files": total_files,
"total_size_bytes": total_size,
"total_size_mb": total_size / (1024 * 1024),
"average_size_bytes": total_size // total_files,
"largest_file": max(
log_files, key=lambda x: x.size_bytes
).filename,
"oldest_file": log_files[-1].filename,
"newest_file": log_files[0].filename,
}
except Exception as e:
logger.error(f"Failed to get log stats: {e}")
return {}
def cleanup_logs(
self, max_total_size_mb: int = 100, keep_files: int = 5
) -> int:
"""Clean up old logs to maintain size limit.
Args:
max_total_size_mb: Maximum total log size in MB.
keep_files: Minimum files to keep.
Returns:
int: Number of files deleted.
"""
try:
max_bytes = max_total_size_mb * 1024 * 1024
log_files = self.get_log_files()
if len(log_files) <= keep_files:
return 0
total_size = sum(log.size_bytes for log in log_files)
deleted_count = 0
for log_file in reversed(log_files):
if (
total_size <= max_bytes
or len(log_files) <= keep_files
):
break
try:
log_file.path.unlink()
total_size -= log_file.size_bytes
deleted_count += 1
logger.debug(f"Deleted log file: {log_file.filename}")
except Exception as e:
logger.warning(
f"Failed to delete {log_file.filename}: {e}"
)
logger.info(f"Cleaned up {deleted_count} log files")
return deleted_count
except Exception as e:
logger.error(f"Failed to cleanup logs: {e}")
return 0
def set_log_level(self, logger_name: str, level: str) -> bool:
"""Set log level for a specific logger.
Args:
logger_name: Name of the logger.
level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
Returns:
bool: True if successful.
"""
try:
log_level = getattr(logging, level.upper(), logging.INFO)
target_logger = logging.getLogger(logger_name)
target_logger.setLevel(log_level)
logger.info(f"Set {logger_name} log level to {level}")
return True
except Exception as e:
logger.error(f"Failed to set log level: {e}")
return False
# Global log manager instance
_log_manager: Optional[LogManager] = None
def get_log_manager() -> LogManager:
"""Get or create the global log manager instance.
Returns:
LogManager: The log manager instance.
"""
global _log_manager
if _log_manager is None:
_log_manager = LogManager()
return _log_manager