Aniworld/src/server/web/controllers/auth_controller.py
2025-09-29 14:53:25 +02:00

273 lines
10 KiB
Python

import logging
import secrets
from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple
from functools import wraps
from flask import session, request, jsonify, redirect, url_for
from config import config
class SessionManager:
"""Manage user sessions and authentication."""
def __init__(self):
self.active_sessions: Dict[str, Dict] = {}
self.failed_attempts: Dict[str, Dict] = {}
def _get_client_ip(self) -> str:
"""Get client IP address with proxy support."""
# Check for forwarded IP (in case of reverse proxy)
forwarded_ip = request.headers.get('X-Forwarded-For')
if forwarded_ip:
return forwarded_ip.split(',')[0].strip()
real_ip = request.headers.get('X-Real-IP')
if real_ip:
return real_ip
return request.remote_addr or 'unknown'
def _is_locked_out(self, ip_address: str) -> bool:
"""Check if IP is currently locked out."""
if ip_address not in self.failed_attempts:
return False
attempt_data = self.failed_attempts[ip_address]
failed_count = attempt_data.get('count', 0)
last_attempt = attempt_data.get('last_attempt')
if failed_count < config.max_failed_attempts:
return False
if not last_attempt:
return False
# Check if lockout period has expired
lockout_until = last_attempt + timedelta(minutes=config.lockout_duration_minutes)
if datetime.now() >= lockout_until:
# Reset failed attempts after lockout period
self.failed_attempts[ip_address] = {'count': 0, 'last_attempt': None}
return False
return True
def _record_failed_attempt(self, ip_address: str, username: str = 'admin') -> None:
"""Record failed login attempt for fail2ban logging."""
# Update failed attempts counter
if ip_address not in self.failed_attempts:
self.failed_attempts[ip_address] = {'count': 0, 'last_attempt': None}
self.failed_attempts[ip_address]['count'] += 1
self.failed_attempts[ip_address]['last_attempt'] = datetime.now()
# Log in fail2ban compatible format using the new logging system
if config.enable_fail2ban_logging:
try:
# Import here to avoid circular imports
from server.infrastructure.logging.config import log_auth_failure
log_auth_failure(ip_address, username)
except ImportError:
# Fallback to simple logging if new system not available
logger = logging.getLogger('auth_failures')
logger.warning(f"authentication failure for [{ip_address}] user [{username}]")
def authenticate(self, password: str) -> Tuple[bool, str, Optional[str]]:
"""
Authenticate user with password.
Returns: (success, message, session_token)
"""
ip_address = self._get_client_ip()
# Check if IP is locked out
if self._is_locked_out(ip_address):
remaining_time = self._get_remaining_lockout_time(ip_address)
return False, f"Too many failed attempts. Try again in {remaining_time} minutes.", None
# Verify password
if not config.verify_password(password):
self._record_failed_attempt(ip_address)
attempts_left = config.max_failed_attempts - self.failed_attempts[ip_address]['count']
if attempts_left <= 0:
return False, f"Invalid password. Account locked for {config.lockout_duration_minutes} minutes.", None
else:
return False, f"Invalid password. {attempts_left} attempts remaining.", None
# Reset failed attempts on successful login
if ip_address in self.failed_attempts:
self.failed_attempts[ip_address] = {'count': 0, 'last_attempt': None}
# Create session
session_token = secrets.token_urlsafe(32)
session_data = {
'token': session_token,
'ip_address': ip_address,
'login_time': datetime.now(),
'last_activity': datetime.now(),
'user': 'admin'
}
self.active_sessions[session_token] = session_data
# Set Flask session
session['token'] = session_token
session['user'] = 'admin'
session['login_time'] = datetime.now().isoformat()
return True, "Login successful", session_token
def login(self, password: str, ip_address: str = None) -> Dict:
"""
Login method that returns a dictionary response (for API compatibility).
"""
success, message, token = self.authenticate(password)
if success:
return {
'status': 'success',
'message': message,
'token': token
}
else:
return {
'status': 'error',
'message': message
}
def _get_remaining_lockout_time(self, ip_address: str) -> int:
"""Get remaining lockout time in minutes."""
if ip_address not in self.failed_attempts:
return 0
last_attempt = self.failed_attempts[ip_address].get('last_attempt')
if not last_attempt:
return 0
lockout_until = last_attempt + timedelta(minutes=config.lockout_duration_minutes)
remaining = lockout_until - datetime.now()
return max(0, int(remaining.total_seconds() / 60))
def is_authenticated(self, session_token: Optional[str] = None) -> bool:
"""Check if user is authenticated with valid session."""
if not session_token:
session_token = session.get('token')
if not session_token or session_token not in self.active_sessions:
return False
session_data = self.active_sessions[session_token]
# Check session timeout
last_activity = session_data['last_activity']
timeout_duration = timedelta(hours=config.session_timeout_hours)
if datetime.now() - last_activity > timeout_duration:
self.logout(session_token)
return False
# Update last activity
session_data['last_activity'] = datetime.now()
return True
def logout(self, session_token: Optional[str] = None) -> bool:
"""Logout user and cleanup session."""
if not session_token:
session_token = session.get('token')
if session_token and session_token in self.active_sessions:
del self.active_sessions[session_token]
# Clear Flask session
session.clear()
return True
def get_session_info(self, session_token: Optional[str] = None) -> Optional[Dict]:
"""Get session information."""
if not session_token:
session_token = session.get('token')
if not session_token or session_token not in self.active_sessions:
return None
session_data = self.active_sessions[session_token].copy()
# Convert datetime objects to strings for JSON serialization
session_data['login_time'] = session_data['login_time'].isoformat()
session_data['last_activity'] = session_data['last_activity'].isoformat()
return session_data
def cleanup_expired_sessions(self) -> int:
"""Clean up expired sessions. Returns number of sessions removed."""
timeout_duration = timedelta(hours=config.session_timeout_hours)
current_time = datetime.now()
expired_tokens = []
for token, session_data in self.active_sessions.items():
last_activity = session_data['last_activity']
if current_time - last_activity > timeout_duration:
expired_tokens.append(token)
for token in expired_tokens:
del self.active_sessions[token]
return len(expired_tokens)
# Global session manager instance
session_manager = SessionManager()
def require_auth(f):
"""Decorator to require authentication for Flask routes."""
@wraps(f)
def decorated_function(*args, **kwargs):
if not session_manager.is_authenticated():
# Check if this is an AJAX request (JSON, XMLHttpRequest, or fetch API request)
is_ajax = (
request.is_json or
request.headers.get('X-Requested-With') == 'XMLHttpRequest' or
request.headers.get('Accept', '').startswith('application/json') or
'/api/' in request.path # API endpoints should return JSON
)
if is_ajax:
return jsonify({
'status': 'error',
'message': 'Authentication required',
'code': 'AUTH_REQUIRED'
}), 401
else:
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
def optional_auth(f):
"""Decorator that checks auth but doesn't require it."""
@wraps(f)
def decorated_function(*args, **kwargs):
# Check if master password is configured
if config.has_master_password():
# If configured, require authentication
if not session_manager.is_authenticated():
# Check if this is an AJAX request (JSON, XMLHttpRequest, or fetch API request)
is_ajax = (
request.is_json or
request.headers.get('X-Requested-With') == 'XMLHttpRequest' or
request.headers.get('Accept', '').startswith('application/json') or
'/api/' in request.path # API endpoints should return JSON
)
if is_ajax:
return jsonify({
'status': 'error',
'message': 'Authentication required',
'code': 'AUTH_REQUIRED'
}), 401
else:
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function