Replace asyncio.to_thread with ThreadPoolExecutor.run_in_executor

- Add ThreadPoolExecutor with 3 max workers to SeriesApp
- Replace all asyncio.to_thread calls with loop.run_in_executor
- Add shutdown() method to properly cleanup executor
- Integrate SeriesApp.shutdown() into FastAPI shutdown sequence
- Ensures proper resource cleanup on Ctrl+C (SIGINT/SIGTERM)
This commit is contained in:
Lukas 2026-01-03 21:04:52 +01:00
parent b1726968e5
commit ab7d78261e
6 changed files with 102 additions and 329 deletions

View File

@ -17,7 +17,7 @@
"keep_days": 30
},
"other": {
"master_password_hash": "$pbkdf2-sha256$29000$LkUohZASQmgthdD6n9Nayw$6VmJzv/pYSdyW7..eU57P.YJpjK/6fXvXvef0L6PLDg",
"master_password_hash": "$pbkdf2-sha256$29000$kxLi/J9zTukdA6BUitHa.w$tLseUX7kHXkjl3N9pFAd2Y.dzveyx0buInX7Wu9MHLg",
"anime_directory": "/mnt/server/serien/Serien/"
},
"version": "1.0.0"

View File

@ -0,0 +1,24 @@
{
"name": "Aniworld",
"data_dir": "data",
"scheduler": {
"enabled": true,
"interval_minutes": 60
},
"logging": {
"level": "INFO",
"file": null,
"max_bytes": null,
"backup_count": 3
},
"backup": {
"enabled": false,
"path": "data/backups",
"keep_days": 30
},
"other": {
"master_password_hash": "$pbkdf2-sha256$29000$X4uRMibE.N.bM.acs1ZKSQ$88em69lhlaLiS6vcF9oqf4pCC8KBbIj/O3h4cQFwM.I",
"anime_directory": "/mnt/server/serien/Serien/"
},
"version": "1.0.0"
}

View File

@ -0,0 +1,24 @@
{
"name": "Aniworld",
"data_dir": "data",
"scheduler": {
"enabled": true,
"interval_minutes": 60
},
"logging": {
"level": "INFO",
"file": null,
"max_bytes": null,
"backup_count": 3
},
"backup": {
"enabled": false,
"path": "data/backups",
"keep_days": 30
},
"other": {
"master_password_hash": "$pbkdf2-sha256$29000$TQlBSOndm1MKAWAMoTRGKA$a/q5miowGpjWSc71WDvqBpL9JJmuAO1FrZlCi3qwp2E",
"anime_directory": "/mnt/server/serien/Serien/"
},
"version": "1.0.0"
}

View File

@ -1,319 +0,0 @@
"""Command-line interface for the Aniworld anime download manager."""
import asyncio
import logging
import os
from typing import Optional, Sequence
from rich.progress import Progress
from src.core.entities.series import Serie
from src.core.SeriesApp import SeriesApp as CoreSeriesApp
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
logger = logging.getLogger(__name__)
class SeriesCLI:
"""Thin wrapper around :class:`SeriesApp` providing an interactive CLI."""
def __init__(self, directory_to_search: str) -> None:
print("Please wait while initializing...")
self.directory_to_search = directory_to_search
self.series_app = CoreSeriesApp(directory_to_search)
self._progress: Optional[Progress] = None
self._overall_task_id: Optional[int] = None
self._series_task_id: Optional[int] = None
self._episode_task_id: Optional[int] = None
self._scan_task_id: Optional[int] = None
# ------------------------------------------------------------------
# Utility helpers
# ------------------------------------------------------------------
def _get_series_list(self) -> Sequence[Serie]:
"""Return the currently cached series with missing episodes."""
return self.series_app.get_series_list()
# ------------------------------------------------------------------
# Display & selection
# ------------------------------------------------------------------
def display_series(self) -> None:
"""Print all series with assigned numbers."""
series = self._get_series_list()
if not series:
print("\nNo series with missing episodes were found.")
return
print("\nCurrent result:")
for index, serie in enumerate(series, start=1):
name = (serie.name or "").strip()
label = name if name else serie.folder
print(f"{index}. {label}")
def get_user_selection(self) -> Optional[Sequence[Serie]]:
"""Prompt the user to select one or more series for download."""
series = list(self._get_series_list())
if not series:
print("No series available for download.")
return None
self.display_series()
prompt = (
"\nSelect series by number (e.g. '1', '1,2' or 'all') "
"or type 'exit' to return: "
)
selection = input(prompt).strip().lower()
if selection in {"exit", ""}:
return None
if selection == "all":
return series
try:
indexes = [
int(value.strip()) - 1
for value in selection.split(",")
]
except ValueError:
print("Invalid selection. Returning to main menu.")
return None
chosen = [
series[i]
for i in indexes
if 0 <= i < len(series)
]
if not chosen:
print("No valid series selected.")
return None
return chosen
# ------------------------------------------------------------------
# Download logic
# ------------------------------------------------------------------
def download_series(self, series: Sequence[Serie]) -> None:
"""Download all missing episodes for the provided series list."""
total_episodes = sum(
len(episodes)
for serie in series
for episodes in serie.episodeDict.values()
)
if total_episodes == 0:
print("Selected series do not contain missing episodes.")
return
self._progress = Progress()
with self._progress:
self._overall_task_id = self._progress.add_task(
"[red]Processing...", total=total_episodes
)
self._series_task_id = self._progress.add_task(
"[green]Current series", total=1
)
self._episode_task_id = self._progress.add_task(
"[gray]Download", total=100
)
for serie in series:
serie_total = sum(len(eps) for eps in serie.episodeDict.values())
self._progress.update(
self._series_task_id,
total=max(serie_total, 1),
completed=0,
description=f"[green]{serie.folder}",
)
for season, episodes in serie.episodeDict.items():
for episode in episodes:
if not self.series_app.loader.is_language(
season, episode, serie.key
):
logger.info(
"Skipping %s S%02dE%02d because the desired language is unavailable",
serie.folder,
season,
episode,
)
continue
result = self.series_app.download(
serieFolder=serie.folder,
season=season,
episode=episode,
key=serie.key,
callback=self._update_download_progress,
)
if not result.success:
logger.error("Download failed: %s", result.message)
self._progress.advance(self._overall_task_id)
self._progress.advance(self._series_task_id)
self._progress.update(
self._episode_task_id,
completed=0,
description="[gray]Waiting...",
)
self._progress = None
self.series_app.refresh_series_list()
def _update_download_progress(self, percent: float) -> None:
"""Update the episode progress bar based on download progress."""
if not self._progress or self._episode_task_id is None:
return
description = f"[gray]Download: {percent:.1f}%"
self._progress.update(
self._episode_task_id,
completed=percent,
description=description,
)
# ------------------------------------------------------------------
# Rescan logic
# ------------------------------------------------------------------
def rescan(self) -> None:
"""Trigger a rescan of the anime directory using the core app.
Uses the legacy file-based scan mode for CLI compatibility.
"""
total_to_scan = self.series_app.serie_scanner.get_total_to_scan()
total_to_scan = max(total_to_scan, 1)
self._progress = Progress()
with self._progress:
self._scan_task_id = self._progress.add_task(
"[red]Scanning folders...",
total=total_to_scan,
)
# Run async rescan in sync context with file-based mode
asyncio.run(
self.series_app.rescan(use_database=False)
)
self._progress = None
self._scan_task_id = None
series_count = len(self.series_app.series_list)
print(f"Scan completed. Found {series_count} series with missing episodes.")
def _wrap_scan_callback(self, total: int):
"""Create a callback that updates the scan progress bar."""
def _callback(folder: str, current: int) -> None:
if not self._progress or self._scan_task_id is None:
return
self._progress.update(
self._scan_task_id,
completed=min(current, total),
description=f"[green]{folder}",
)
return _callback
# ------------------------------------------------------------------
# Search & add logic
# ------------------------------------------------------------------
def search_mode(self) -> None:
"""Search for a series and add it to the local list if chosen."""
query = input("Enter search string: ").strip()
if not query:
return
results = self.series_app.search(query)
if not results:
print("No results found. Returning to main menu.")
return
print("\nSearch results:")
for index, result in enumerate(results, start=1):
print(f"{index}. {result.get('name', 'Unknown')}")
selection = input(
"\nSelect an option by number or press <enter> to cancel: "
).strip()
if selection == "":
return
try:
chosen_index = int(selection) - 1
except ValueError:
print("Invalid input. Returning to main menu.")
return
if not (0 <= chosen_index < len(results)):
print("Invalid selection. Returning to main menu.")
return
chosen = results[chosen_index]
serie = Serie(
chosen.get("link", ""),
chosen.get("name", "Unknown"),
"aniworld.to",
chosen.get("link", ""),
{},
)
self.series_app.List.add(serie)
self.series_app.refresh_series_list()
print(f"Added '{serie.name}' to the local catalogue.")
# ------------------------------------------------------------------
# Main loop
# ------------------------------------------------------------------
def run(self) -> None:
"""Run the interactive CLI loop."""
while True:
action = input(
"\nChoose action ('s' for search, 'i' for rescan, 'd' for download, 'q' to quit): "
).strip().lower()
if action == "s":
self.search_mode()
elif action == "i":
print("\nRescanning series...\n")
self.rescan()
elif action == "d":
selected_series = self.get_user_selection()
if selected_series:
self.download_series(selected_series)
elif action in {"q", "quit", "exit"}:
print("Goodbye!")
break
else:
print("Unknown command. Please choose 's', 'i', 'd', or 'q'.")
def configure_logging() -> None:
"""Set up a basic logging configuration for the CLI."""
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger("charset_normalizer").setLevel(logging.ERROR)
def main() -> None:
"""Entry point for the CLI application."""
configure_logging()
default_dir = os.getenv("ANIME_DIRECTORY")
if not default_dir:
print(
"Environment variable ANIME_DIRECTORY is not set. Please configure it to the base anime directory."
)
return
app = SeriesCLI(default_dir)
app.run()
if __name__ == "__main__":
main()

View File

@ -12,6 +12,7 @@ Note:
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional
from events import Events
@ -148,6 +149,9 @@ class SeriesApp:
self.directory_to_search = directory_to_search
# Initialize thread pool executor
self.executor = ThreadPoolExecutor(max_workers=3)
# Initialize events
self._events = Events()
self._events.download_status = None
@ -229,7 +233,9 @@ class SeriesApp:
async def _init_list(self) -> None:
"""Initialize the series list with missing episodes (async)."""
self.series_list = await asyncio.to_thread(
loop = asyncio.get_running_loop()
self.series_list = await loop.run_in_executor(
self.executor,
self.list.GetMissingEpisode
)
logger.debug(
@ -251,7 +257,12 @@ class SeriesApp:
RuntimeError: If search fails
"""
logger.info("Searching for: %s", words)
results = await asyncio.to_thread(self.loader.search, words)
loop = asyncio.get_running_loop()
results = await loop.run_in_executor(
self.executor,
self.loader.search,
words
)
logger.info("Found %d results", len(results))
return results
@ -348,7 +359,9 @@ class SeriesApp:
try:
# Perform download in thread to avoid blocking event loop
download_success = await asyncio.to_thread(
loop = asyncio.get_running_loop()
download_success = await loop.run_in_executor(
self.executor,
self.loader.download,
self.directory_to_search,
serie_folder,
@ -481,7 +494,9 @@ class SeriesApp:
try:
# Get total items to scan
logger.info("Getting total items to scan...")
total_to_scan = await asyncio.to_thread(
loop = asyncio.get_running_loop()
total_to_scan = await loop.run_in_executor(
self.executor,
self.serie_scanner.get_total_to_scan
)
logger.info("Total folders to scan: %d", total_to_scan)
@ -503,7 +518,10 @@ class SeriesApp:
)
# Reinitialize scanner
await asyncio.to_thread(self.serie_scanner.reinit)
await loop.run_in_executor(
self.executor,
self.serie_scanner.reinit
)
def scan_progress_handler(progress_data):
"""Handle scan progress events from scanner."""
@ -528,7 +546,10 @@ class SeriesApp:
try:
# Perform scan (file-based, returns results in scanner.keyDict)
await asyncio.to_thread(self.serie_scanner.scan)
await loop.run_in_executor(
self.executor,
self.serie_scanner.scan
)
finally:
# Always unsubscribe after scan completes or fails
self.serie_scanner.unsubscribe_on_progress(
@ -685,3 +706,14 @@ class SeriesApp:
)
return all_series
def shutdown(self) -> None:
"""
Shutdown the thread pool executor.
Should be called when the SeriesApp instance is no longer needed
to properly clean up resources.
"""
if hasattr(self, 'executor'):
self.executor.shutdown(wait=True)
logger.info("ThreadPoolExecutor shut down successfully")

View File

@ -197,7 +197,17 @@ async def lifespan(_application: FastAPI):
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error stopping download service: %s", e, exc_info=True)
# 3. Cleanup progress service
# 3. Shutdown SeriesApp and cleanup thread pool
try:
from src.server.utils.dependencies import _series_app
if _series_app is not None:
logger.info("Shutting down SeriesApp thread pool...")
_series_app.shutdown()
logger.info("SeriesApp shutdown complete")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error during SeriesApp shutdown: %s", e, exc_info=True)
# 4. Cleanup progress service
try:
progress_service = get_progress_service()
logger.info("Cleaning up progress service...")
@ -205,9 +215,11 @@ async def lifespan(_application: FastAPI):
progress_service._active_progress.clear()
logger.info("Progress service cleanup complete")
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Error cleaning up progress service: %s", e, exc_info=True)
logger.error(
"Error cleaning up progress service: %s", e, exc_info=True
)
# 4. Close database connections with WAL checkpoint
# 5. Close database connections with WAL checkpoint
try:
from src.server.database.connection import close_db
logger.info("Closing database connections...")