removed old stff
This commit is contained in:
@@ -1,252 +0,0 @@
|
||||
import threading
|
||||
import time
|
||||
import schedule
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Callable, Dict, Any
|
||||
import logging
|
||||
from shared.utils.process_utils import (with_process_lock, RESCAN_LOCK,
|
||||
ProcessLockError, is_process_running)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ScheduledOperations:
|
||||
"""Handle scheduled operations like automatic rescans and downloads."""
|
||||
|
||||
def __init__(self, config_manager, socketio=None):
|
||||
self.config = config_manager
|
||||
self.socketio = socketio
|
||||
self.scheduler_thread = None
|
||||
self.running = False
|
||||
self.rescan_callback: Optional[Callable] = None
|
||||
self.download_callback: Optional[Callable] = None
|
||||
self.last_scheduled_rescan: Optional[datetime] = None
|
||||
|
||||
# Load scheduled rescan settings
|
||||
self.scheduled_rescan_enabled = getattr(self.config, 'scheduled_rescan_enabled', False)
|
||||
self.scheduled_rescan_time = getattr(self.config, 'scheduled_rescan_time', '03:00')
|
||||
self.auto_download_after_rescan = getattr(self.config, 'auto_download_after_rescan', False)
|
||||
|
||||
def set_rescan_callback(self, callback: Callable):
|
||||
"""Set callback function for performing rescan operations."""
|
||||
self.rescan_callback = callback
|
||||
|
||||
def set_download_callback(self, callback: Callable):
|
||||
"""Set callback function for performing download operations."""
|
||||
self.download_callback = callback
|
||||
|
||||
def start_scheduler(self):
|
||||
"""Start the background scheduler thread."""
|
||||
if self.running:
|
||||
logger.warning("Scheduler is already running")
|
||||
return
|
||||
|
||||
self.running = True
|
||||
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
|
||||
self.scheduler_thread.start()
|
||||
logger.info("Scheduled operations started")
|
||||
|
||||
def stop_scheduler(self):
|
||||
"""Stop the background scheduler."""
|
||||
self.running = False
|
||||
schedule.clear()
|
||||
if self.scheduler_thread and self.scheduler_thread.is_alive():
|
||||
self.scheduler_thread.join(timeout=5)
|
||||
logger.info("Scheduled operations stopped")
|
||||
|
||||
def _scheduler_loop(self):
|
||||
"""Main scheduler loop that runs in background thread."""
|
||||
self._setup_scheduled_jobs()
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
schedule.run_pending()
|
||||
time.sleep(60) # Check every minute
|
||||
except Exception as e:
|
||||
logger.error(f"Scheduler error: {e}")
|
||||
time.sleep(60)
|
||||
|
||||
def _setup_scheduled_jobs(self):
|
||||
"""Setup all scheduled jobs based on configuration."""
|
||||
schedule.clear()
|
||||
|
||||
if self.scheduled_rescan_enabled and self.scheduled_rescan_time:
|
||||
try:
|
||||
schedule.every().day.at(self.scheduled_rescan_time).do(self._perform_scheduled_rescan)
|
||||
logger.info(f"Scheduled daily rescan at {self.scheduled_rescan_time}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error setting up scheduled rescan: {e}")
|
||||
|
||||
def _perform_scheduled_rescan(self):
|
||||
"""Perform the scheduled rescan operation."""
|
||||
try:
|
||||
logger.info("Starting scheduled rescan...")
|
||||
|
||||
# Emit scheduled rescan started event
|
||||
if self.socketio:
|
||||
self.socketio.emit('scheduled_rescan_started')
|
||||
|
||||
# Check if rescan is already running
|
||||
if is_process_running(RESCAN_LOCK):
|
||||
logger.warning("Rescan is already running, skipping scheduled rescan")
|
||||
if self.socketio:
|
||||
self.socketio.emit('scheduled_rescan_skipped', {
|
||||
'reason': 'Rescan already in progress'
|
||||
})
|
||||
return
|
||||
|
||||
# Perform the rescan using process lock
|
||||
@with_process_lock(RESCAN_LOCK, timeout_minutes=180)
|
||||
def perform_rescan():
|
||||
self.last_scheduled_rescan = datetime.now()
|
||||
|
||||
if self.rescan_callback:
|
||||
result = self.rescan_callback()
|
||||
logger.info("Scheduled rescan completed successfully")
|
||||
|
||||
if self.socketio:
|
||||
self.socketio.emit('scheduled_rescan_completed', {
|
||||
'timestamp': self.last_scheduled_rescan.isoformat(),
|
||||
'result': result
|
||||
})
|
||||
|
||||
# Auto-start download if configured
|
||||
if self.auto_download_after_rescan and self.download_callback:
|
||||
logger.info("Starting auto-download after scheduled rescan")
|
||||
threading.Thread(
|
||||
target=self._perform_auto_download,
|
||||
daemon=True
|
||||
).start()
|
||||
else:
|
||||
logger.warning("No rescan callback configured")
|
||||
|
||||
perform_rescan(_locked_by='scheduled_operation')
|
||||
|
||||
except ProcessLockError:
|
||||
logger.warning("Could not acquire rescan lock for scheduled operation")
|
||||
if self.socketio:
|
||||
self.socketio.emit('scheduled_rescan_error', {
|
||||
'error': 'Could not acquire rescan lock'
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Scheduled rescan failed: {e}")
|
||||
if self.socketio:
|
||||
self.socketio.emit('scheduled_rescan_error', {
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
def _perform_auto_download(self):
|
||||
"""Perform automatic download after scheduled rescan."""
|
||||
try:
|
||||
# Wait a bit after rescan to let UI update
|
||||
time.sleep(10)
|
||||
|
||||
if self.download_callback:
|
||||
# Find series with missing episodes and start download
|
||||
logger.info("Starting auto-download of missing episodes")
|
||||
result = self.download_callback()
|
||||
|
||||
if self.socketio:
|
||||
self.socketio.emit('auto_download_started', {
|
||||
'timestamp': datetime.now().isoformat(),
|
||||
'result': result
|
||||
})
|
||||
else:
|
||||
logger.warning("No download callback configured for auto-download")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Auto-download after scheduled rescan failed: {e}")
|
||||
if self.socketio:
|
||||
self.socketio.emit('auto_download_error', {
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
def update_scheduled_rescan_config(self, enabled: bool, time_str: str, auto_download: bool = False):
|
||||
"""Update scheduled rescan configuration."""
|
||||
try:
|
||||
# Validate time format
|
||||
if enabled and time_str:
|
||||
datetime.strptime(time_str, '%H:%M')
|
||||
|
||||
# Update configuration
|
||||
self.scheduled_rescan_enabled = enabled
|
||||
self.scheduled_rescan_time = time_str
|
||||
self.auto_download_after_rescan = auto_download
|
||||
|
||||
# Save to config
|
||||
self.config.scheduled_rescan_enabled = enabled
|
||||
self.config.scheduled_rescan_time = time_str
|
||||
self.config.auto_download_after_rescan = auto_download
|
||||
self.config.save_config()
|
||||
|
||||
# Restart scheduler with new settings
|
||||
if self.running:
|
||||
self._setup_scheduled_jobs()
|
||||
|
||||
logger.info(f"Updated scheduled rescan config: enabled={enabled}, time={time_str}, auto_download={auto_download}")
|
||||
return True
|
||||
|
||||
except ValueError as e:
|
||||
logger.error(f"Invalid time format: {time_str}")
|
||||
raise ValueError(f"Invalid time format. Use HH:MM format.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating scheduled rescan config: {e}")
|
||||
raise
|
||||
|
||||
def get_scheduled_rescan_config(self) -> Dict[str, Any]:
|
||||
"""Get current scheduled rescan configuration."""
|
||||
next_run = None
|
||||
if self.scheduled_rescan_enabled and self.scheduled_rescan_time:
|
||||
try:
|
||||
# Calculate next run time
|
||||
now = datetime.now()
|
||||
today_run = datetime.strptime(f"{now.strftime('%Y-%m-%d')} {self.scheduled_rescan_time}", '%Y-%m-%d %H:%M')
|
||||
|
||||
if now > today_run:
|
||||
# Next run is tomorrow
|
||||
next_run = today_run + timedelta(days=1)
|
||||
else:
|
||||
# Next run is today
|
||||
next_run = today_run
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error calculating next run time: {e}")
|
||||
|
||||
return {
|
||||
'enabled': self.scheduled_rescan_enabled,
|
||||
'time': self.scheduled_rescan_time,
|
||||
'auto_download_after_rescan': self.auto_download_after_rescan,
|
||||
'next_run': next_run.isoformat() if next_run else None,
|
||||
'last_run': self.last_scheduled_rescan.isoformat() if self.last_scheduled_rescan else None,
|
||||
'is_running': self.running
|
||||
}
|
||||
|
||||
def trigger_manual_scheduled_rescan(self):
|
||||
"""Manually trigger a scheduled rescan (for testing purposes)."""
|
||||
logger.info("Manually triggering scheduled rescan")
|
||||
threading.Thread(target=self._perform_scheduled_rescan, daemon=True).start()
|
||||
|
||||
def get_next_scheduled_jobs(self) -> list:
|
||||
"""Get list of all scheduled jobs with their next run times."""
|
||||
jobs = []
|
||||
for job in schedule.jobs:
|
||||
jobs.append({
|
||||
'job_func': job.job_func.__name__ if hasattr(job.job_func, '__name__') else str(job.job_func),
|
||||
'next_run': job.next_run.isoformat() if job.next_run else None,
|
||||
'interval': str(job.interval),
|
||||
'unit': job.unit
|
||||
})
|
||||
return jobs
|
||||
|
||||
|
||||
# Global scheduler instance
|
||||
scheduled_operations = None
|
||||
|
||||
def init_scheduler(config_manager, socketio=None):
|
||||
"""Initialize the global scheduler."""
|
||||
global scheduled_operations
|
||||
scheduled_operations = ScheduledOperations(config_manager, socketio)
|
||||
return scheduled_operations
|
||||
|
||||
def get_scheduler():
|
||||
"""Get the global scheduler instance."""
|
||||
return scheduled_operations
|
||||
Reference in New Issue
Block a user