import logging import secrets from datetime import datetime, timedelta from typing import Dict, Optional, Tuple from functools import wraps from flask import session, request, jsonify, redirect, url_for from config import config class SessionManager: """Manage user sessions and authentication.""" def __init__(self): self.active_sessions: Dict[str, Dict] = {} self.failed_attempts: Dict[str, Dict] = {} def _get_client_ip(self) -> str: """Get client IP address with proxy support.""" # Check for forwarded IP (in case of reverse proxy) forwarded_ip = request.headers.get('X-Forwarded-For') if forwarded_ip: return forwarded_ip.split(',')[0].strip() real_ip = request.headers.get('X-Real-IP') if real_ip: return real_ip return request.remote_addr or 'unknown' def _is_locked_out(self, ip_address: str) -> bool: """Check if IP is currently locked out.""" if ip_address not in self.failed_attempts: return False attempt_data = self.failed_attempts[ip_address] failed_count = attempt_data.get('count', 0) last_attempt = attempt_data.get('last_attempt') if failed_count < config.max_failed_attempts: return False if not last_attempt: return False # Check if lockout period has expired lockout_until = last_attempt + timedelta(minutes=config.lockout_duration_minutes) if datetime.now() >= lockout_until: # Reset failed attempts after lockout period self.failed_attempts[ip_address] = {'count': 0, 'last_attempt': None} return False return True def _record_failed_attempt(self, ip_address: str, username: str = 'admin') -> None: """Record failed login attempt for fail2ban logging.""" # Update failed attempts counter if ip_address not in self.failed_attempts: self.failed_attempts[ip_address] = {'count': 0, 'last_attempt': None} self.failed_attempts[ip_address]['count'] += 1 self.failed_attempts[ip_address]['last_attempt'] = datetime.now() # Log in fail2ban compatible format using the new logging system if config.enable_fail2ban_logging: try: # Import here to avoid circular imports from logging_config import log_auth_failure log_auth_failure(ip_address, username) except ImportError: # Fallback to simple logging if new system not available logger = logging.getLogger('auth_failures') logger.warning(f"authentication failure for [{ip_address}] user [{username}]") def authenticate(self, password: str) -> Tuple[bool, str, Optional[str]]: """ Authenticate user with password. Returns: (success, message, session_token) """ ip_address = self._get_client_ip() # Check if IP is locked out if self._is_locked_out(ip_address): remaining_time = self._get_remaining_lockout_time(ip_address) return False, f"Too many failed attempts. Try again in {remaining_time} minutes.", None # Verify password if not config.verify_password(password): self._record_failed_attempt(ip_address) attempts_left = config.max_failed_attempts - self.failed_attempts[ip_address]['count'] if attempts_left <= 0: return False, f"Invalid password. Account locked for {config.lockout_duration_minutes} minutes.", None else: return False, f"Invalid password. {attempts_left} attempts remaining.", None # Reset failed attempts on successful login if ip_address in self.failed_attempts: self.failed_attempts[ip_address] = {'count': 0, 'last_attempt': None} # Create session session_token = secrets.token_urlsafe(32) session_data = { 'token': session_token, 'ip_address': ip_address, 'login_time': datetime.now(), 'last_activity': datetime.now(), 'user': 'admin' } self.active_sessions[session_token] = session_data # Set Flask session session['token'] = session_token session['user'] = 'admin' session['login_time'] = datetime.now().isoformat() return True, "Login successful", session_token def _get_remaining_lockout_time(self, ip_address: str) -> int: """Get remaining lockout time in minutes.""" if ip_address not in self.failed_attempts: return 0 last_attempt = self.failed_attempts[ip_address].get('last_attempt') if not last_attempt: return 0 lockout_until = last_attempt + timedelta(minutes=config.lockout_duration_minutes) remaining = lockout_until - datetime.now() return max(0, int(remaining.total_seconds() / 60)) def is_authenticated(self, session_token: Optional[str] = None) -> bool: """Check if user is authenticated with valid session.""" if not session_token: session_token = session.get('token') if not session_token or session_token not in self.active_sessions: return False session_data = self.active_sessions[session_token] # Check session timeout last_activity = session_data['last_activity'] timeout_duration = timedelta(hours=config.session_timeout_hours) if datetime.now() - last_activity > timeout_duration: self.logout(session_token) return False # Update last activity session_data['last_activity'] = datetime.now() return True def logout(self, session_token: Optional[str] = None) -> bool: """Logout user and cleanup session.""" if not session_token: session_token = session.get('token') if session_token and session_token in self.active_sessions: del self.active_sessions[session_token] # Clear Flask session session.clear() return True def get_session_info(self, session_token: Optional[str] = None) -> Optional[Dict]: """Get session information.""" if not session_token: session_token = session.get('token') if not session_token or session_token not in self.active_sessions: return None session_data = self.active_sessions[session_token].copy() # Convert datetime objects to strings for JSON serialization session_data['login_time'] = session_data['login_time'].isoformat() session_data['last_activity'] = session_data['last_activity'].isoformat() return session_data def cleanup_expired_sessions(self) -> int: """Clean up expired sessions. Returns number of sessions removed.""" timeout_duration = timedelta(hours=config.session_timeout_hours) current_time = datetime.now() expired_tokens = [] for token, session_data in self.active_sessions.items(): last_activity = session_data['last_activity'] if current_time - last_activity > timeout_duration: expired_tokens.append(token) for token in expired_tokens: del self.active_sessions[token] return len(expired_tokens) # Global session manager instance session_manager = SessionManager() def require_auth(f): """Decorator to require authentication for Flask routes.""" @wraps(f) def decorated_function(*args, **kwargs): if not session_manager.is_authenticated(): if request.is_json: return jsonify({ 'status': 'error', 'message': 'Authentication required', 'code': 'AUTH_REQUIRED' }), 401 else: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function def optional_auth(f): """Decorator that checks auth but doesn't require it.""" @wraps(f) def decorated_function(*args, **kwargs): # Check if master password is configured if config.has_master_password(): # If configured, require authentication if not session_manager.is_authenticated(): if request.is_json: return jsonify({ 'status': 'error', 'message': 'Authentication required', 'code': 'AUTH_REQUIRED' }), 401 else: return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function