""" Notification Service for AniWorld. This module provides notification functionality including email, webhooks, and in-app notifications for download events and system alerts. """ import asyncio import json import logging from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional, Set import aiohttp from pydantic import BaseModel, EmailStr, Field, HttpUrl logger = logging.getLogger(__name__) class NotificationType(str, Enum): """Types of notifications.""" DOWNLOAD_COMPLETE = "download_complete" DOWNLOAD_FAILED = "download_failed" QUEUE_COMPLETE = "queue_complete" SYSTEM_ERROR = "system_error" SYSTEM_WARNING = "system_warning" SYSTEM_INFO = "system_info" class NotificationPriority(str, Enum): """Notification priority levels.""" LOW = "low" NORMAL = "normal" HIGH = "high" CRITICAL = "critical" class NotificationChannel(str, Enum): """Available notification channels.""" EMAIL = "email" WEBHOOK = "webhook" IN_APP = "in_app" class NotificationPreferences(BaseModel): """User notification preferences.""" enabled_channels: Set[NotificationChannel] = Field( default_factory=lambda: {NotificationChannel.IN_APP} ) enabled_types: Set[NotificationType] = Field( default_factory=lambda: set(NotificationType) ) email_address: Optional[EmailStr] = None webhook_urls: List[HttpUrl] = Field(default_factory=list) quiet_hours_start: Optional[int] = Field(None, ge=0, le=23) quiet_hours_end: Optional[int] = Field(None, ge=0, le=23) min_priority: NotificationPriority = NotificationPriority.NORMAL class Notification(BaseModel): """Notification model.""" id: str type: NotificationType priority: NotificationPriority title: str message: str data: Optional[Dict[str, Any]] = None created_at: datetime = Field(default_factory=datetime.utcnow) read: bool = False channels: Set[NotificationChannel] = Field( default_factory=lambda: {NotificationChannel.IN_APP} ) class EmailNotificationService: """Service for sending email notifications.""" def __init__( self, smtp_host: Optional[str] = None, smtp_port: int = 587, smtp_username: Optional[str] = None, smtp_password: Optional[str] = None, from_address: Optional[str] = None, ): """ Initialize email notification service. Args: smtp_host: SMTP server hostname smtp_port: SMTP server port smtp_username: SMTP authentication username smtp_password: SMTP authentication password from_address: Email sender address """ self.smtp_host = smtp_host self.smtp_port = smtp_port self.smtp_username = smtp_username self.smtp_password = smtp_password self.from_address = from_address self._enabled = all( [smtp_host, smtp_username, smtp_password, from_address] ) async def send_email( self, to_address: str, subject: str, body: str, html: bool = False ) -> bool: """ Send an email notification. Args: to_address: Recipient email address subject: Email subject body: Email body content html: Whether body is HTML format Returns: True if email sent successfully """ if not self._enabled: logger.warning("Email notifications not configured") return False try: # Import here to make aiosmtplib optional from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import aiosmtplib message = MIMEMultipart("alternative") message["Subject"] = subject message["From"] = self.from_address message["To"] = to_address mime_type = "html" if html else "plain" message.attach(MIMEText(body, mime_type)) await aiosmtplib.send( message, hostname=self.smtp_host, port=self.smtp_port, username=self.smtp_username, password=self.smtp_password, start_tls=True, ) logger.info(f"Email notification sent to {to_address}") return True except ImportError: logger.error( "aiosmtplib not installed. Install with: pip install aiosmtplib" ) return False except Exception as e: logger.error(f"Failed to send email notification: {e}") return False class WebhookNotificationService: """Service for sending webhook notifications.""" def __init__(self, timeout: int = 10, max_retries: int = 3): """ Initialize webhook notification service. Args: timeout: Request timeout in seconds max_retries: Maximum number of retry attempts """ self.timeout = timeout self.max_retries = max_retries async def send_webhook( self, url: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None ) -> bool: """ Send a webhook notification. Args: url: Webhook URL payload: JSON payload to send headers: Optional custom headers Returns: True if webhook sent successfully """ if headers is None: headers = {"Content-Type": "application/json"} for attempt in range(self.max_retries): try: async with aiohttp.ClientSession() as session: async with session.post( url, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=self.timeout), ) as response: if response.status < 400: logger.info(f"Webhook notification sent to {url}") return True else: logger.warning( f"Webhook returned status {response.status}: {url}" ) except asyncio.TimeoutError: logger.warning(f"Webhook timeout (attempt {attempt + 1}/{self.max_retries}): {url}") except Exception as e: logger.error(f"Failed to send webhook (attempt {attempt + 1}/{self.max_retries}): {e}") if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) # Exponential backoff return False class InAppNotificationService: """Service for managing in-app notifications.""" def __init__(self, max_notifications: int = 100): """ Initialize in-app notification service. Args: max_notifications: Maximum number of notifications to keep """ self.notifications: List[Notification] = [] self.max_notifications = max_notifications self._lock = asyncio.Lock() async def add_notification(self, notification: Notification) -> None: """ Add a notification to the in-app list. Args: notification: Notification to add """ async with self._lock: self.notifications.insert(0, notification) if len(self.notifications) > self.max_notifications: self.notifications = self.notifications[: self.max_notifications] async def get_notifications( self, unread_only: bool = False, limit: Optional[int] = None ) -> List[Notification]: """ Get in-app notifications. Args: unread_only: Only return unread notifications limit: Maximum number of notifications to return Returns: List of notifications """ async with self._lock: notifications = self.notifications if unread_only: notifications = [n for n in notifications if not n.read] if limit: notifications = notifications[:limit] return notifications.copy() async def mark_as_read(self, notification_id: str) -> bool: """ Mark a notification as read. Args: notification_id: ID of notification to mark Returns: True if notification was found and marked """ async with self._lock: for notification in self.notifications: if notification.id == notification_id: notification.read = True return True return False async def mark_all_as_read(self) -> int: """ Mark all notifications as read. Returns: Number of notifications marked as read """ async with self._lock: count = 0 for notification in self.notifications: if not notification.read: notification.read = True count += 1 return count async def clear_notifications(self, read_only: bool = True) -> int: """ Clear notifications. Args: read_only: Only clear read notifications Returns: Number of notifications cleared """ async with self._lock: if read_only: initial_count = len(self.notifications) self.notifications = [n for n in self.notifications if not n.read] return initial_count - len(self.notifications) else: count = len(self.notifications) self.notifications.clear() return count class NotificationService: """Main notification service coordinating all notification channels.""" def __init__( self, email_service: Optional[EmailNotificationService] = None, webhook_service: Optional[WebhookNotificationService] = None, in_app_service: Optional[InAppNotificationService] = None, ): """ Initialize notification service. Args: email_service: Email notification service instance webhook_service: Webhook notification service instance in_app_service: In-app notification service instance """ self.email_service = email_service or EmailNotificationService() self.webhook_service = webhook_service or WebhookNotificationService() self.in_app_service = in_app_service or InAppNotificationService() self.preferences = NotificationPreferences() def set_preferences(self, preferences: NotificationPreferences) -> None: """ Update notification preferences. Args: preferences: New notification preferences """ self.preferences = preferences def _is_in_quiet_hours(self) -> bool: """ Check if current time is within quiet hours. Returns: True if in quiet hours """ if ( self.preferences.quiet_hours_start is None or self.preferences.quiet_hours_end is None ): return False current_hour = datetime.now().hour start = self.preferences.quiet_hours_start end = self.preferences.quiet_hours_end if start <= end: return start <= current_hour < end else: # Quiet hours span midnight return current_hour >= start or current_hour < end def _should_send_notification( self, notification_type: NotificationType, priority: NotificationPriority ) -> bool: """ Determine if a notification should be sent based on preferences. Args: notification_type: Type of notification priority: Priority level Returns: True if notification should be sent """ # Check if type is enabled if notification_type not in self.preferences.enabled_types: return False # Check priority level priority_order = [ NotificationPriority.LOW, NotificationPriority.NORMAL, NotificationPriority.HIGH, NotificationPriority.CRITICAL, ] if ( priority_order.index(priority) < priority_order.index(self.preferences.min_priority) ): return False # Check quiet hours (critical notifications bypass quiet hours) if priority != NotificationPriority.CRITICAL and self._is_in_quiet_hours(): return False return True async def send_notification(self, notification: Notification) -> Dict[str, bool]: """ Send a notification through enabled channels. Args: notification: Notification to send Returns: Dictionary mapping channel names to success status """ if not self._should_send_notification(notification.type, notification.priority): logger.debug( f"Notification not sent due to preferences: {notification.type}" ) return {} results = {} # Send in-app notification if NotificationChannel.IN_APP in self.preferences.enabled_channels: try: await self.in_app_service.add_notification(notification) results["in_app"] = True except Exception as e: logger.error(f"Failed to send in-app notification: {e}") results["in_app"] = False # Send email notification if ( NotificationChannel.EMAIL in self.preferences.enabled_channels and self.preferences.email_address ): try: success = await self.email_service.send_email( to_address=self.preferences.email_address, subject=f"[{notification.priority.upper()}] {notification.title}", body=notification.message, ) results["email"] = success except Exception as e: logger.error(f"Failed to send email notification: {e}") results["email"] = False # Send webhook notifications if ( NotificationChannel.WEBHOOK in self.preferences.enabled_channels and self.preferences.webhook_urls ): payload = { "id": notification.id, "type": notification.type, "priority": notification.priority, "title": notification.title, "message": notification.message, "data": notification.data, "created_at": notification.created_at.isoformat(), } webhook_results = [] for url in self.preferences.webhook_urls: try: success = await self.webhook_service.send_webhook(str(url), payload) webhook_results.append(success) except Exception as e: logger.error(f"Failed to send webhook notification to {url}: {e}") webhook_results.append(False) results["webhook"] = all(webhook_results) if webhook_results else False return results async def notify_download_complete( self, series_name: str, episode: str, file_path: str ) -> Dict[str, bool]: """ Send notification for completed download. Args: series_name: Name of the series episode: Episode identifier file_path: Path to downloaded file Returns: Dictionary of send results by channel """ notification = Notification( id=f"download_complete_{datetime.utcnow().timestamp()}", type=NotificationType.DOWNLOAD_COMPLETE, priority=NotificationPriority.NORMAL, title=f"Download Complete: {series_name}", message=f"Episode {episode} has been downloaded successfully.", data={ "series_name": series_name, "episode": episode, "file_path": file_path, }, ) return await self.send_notification(notification) async def notify_download_failed( self, series_name: str, episode: str, error: str ) -> Dict[str, bool]: """ Send notification for failed download. Args: series_name: Name of the series episode: Episode identifier error: Error message Returns: Dictionary of send results by channel """ notification = Notification( id=f"download_failed_{datetime.utcnow().timestamp()}", type=NotificationType.DOWNLOAD_FAILED, priority=NotificationPriority.HIGH, title=f"Download Failed: {series_name}", message=f"Episode {episode} failed to download: {error}", data={"series_name": series_name, "episode": episode, "error": error}, ) return await self.send_notification(notification) async def notify_queue_complete(self, total_downloads: int) -> Dict[str, bool]: """ Send notification for completed download queue. Args: total_downloads: Number of downloads completed Returns: Dictionary of send results by channel """ notification = Notification( id=f"queue_complete_{datetime.utcnow().timestamp()}", type=NotificationType.QUEUE_COMPLETE, priority=NotificationPriority.NORMAL, title="Download Queue Complete", message=f"All {total_downloads} downloads have been completed.", data={"total_downloads": total_downloads}, ) return await self.send_notification(notification) async def notify_system_error(self, error: str, details: Optional[Dict[str, Any]] = None) -> Dict[str, bool]: """ Send notification for system error. Args: error: Error message details: Optional error details Returns: Dictionary of send results by channel """ notification = Notification( id=f"system_error_{datetime.utcnow().timestamp()}", type=NotificationType.SYSTEM_ERROR, priority=NotificationPriority.CRITICAL, title="System Error", message=error, data=details, ) return await self.send_notification(notification) # Global notification service instance _notification_service: Optional[NotificationService] = None def get_notification_service() -> NotificationService: """ Get the global notification service instance. Returns: NotificationService instance """ global _notification_service if _notification_service is None: _notification_service = NotificationService() return _notification_service def configure_notification_service( smtp_host: Optional[str] = None, smtp_port: int = 587, smtp_username: Optional[str] = None, smtp_password: Optional[str] = None, from_address: Optional[str] = None, ) -> NotificationService: """ Configure the global notification service. Args: smtp_host: SMTP server hostname smtp_port: SMTP server port smtp_username: SMTP authentication username smtp_password: SMTP authentication password from_address: Email sender address Returns: Configured NotificationService instance """ global _notification_service email_service = EmailNotificationService( smtp_host=smtp_host, smtp_port=smtp_port, smtp_username=smtp_username, smtp_password=smtp_password, from_address=from_address, ) _notification_service = NotificationService(email_service=email_service) return _notification_service