30 lines
781 B
Python
30 lines
781 B
Python
"""Shared database helpers for APScheduler background tasks."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from contextlib import asynccontextmanager
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.db import open_db
|
|
|
|
if TYPE_CHECKING:
|
|
from collections.abc import AsyncIterator
|
|
|
|
import aiosqlite
|
|
|
|
from app.config import Settings
|
|
|
|
|
|
@asynccontextmanager
|
|
async def task_db(settings: Settings) -> AsyncIterator[aiosqlite.Connection]:
|
|
"""Open a dedicated application database connection for a background task.
|
|
|
|
Background tasks run outside FastAPI request scope and therefore must
|
|
manage their own SQLite connection instead of using FastAPI dependencies.
|
|
"""
|
|
db = await open_db(settings.database_path)
|
|
try:
|
|
yield db
|
|
finally:
|
|
await db.close()
|