Add runtime DB schema migration and version tracking

This commit is contained in:
2026-04-12 19:13:36 +02:00
parent ffe7ada469
commit 21b38365c4
3 changed files with 83 additions and 5 deletions

View File

@@ -89,6 +89,13 @@ CREATE TABLE IF NOT EXISTS history_archive (
);
"""
_CREATE_SCHEMA_MIGRATIONS: str = """
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
migrated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now'))
);
"""
# Ordered list of DDL statements to execute on initialisation.
_SCHEMA_STATEMENTS: list[str] = [
_CREATE_SETTINGS,
@@ -100,6 +107,12 @@ _SCHEMA_STATEMENTS: list[str] = [
_CREATE_HISTORY_ARCHIVE,
]
_CURRENT_SCHEMA_VERSION: int = 1
_MIGRATIONS: dict[int, str] = {
1: "\n".join(_SCHEMA_STATEMENTS),
}
# ---------------------------------------------------------------------------
# Public API
@@ -113,8 +126,44 @@ async def _configure_connection(db: aiosqlite.Connection) -> None:
await db.execute("PRAGMA busy_timeout=5000;")
async def _get_current_schema_version(db: aiosqlite.Connection) -> int:
"""Return the highest applied schema version for the given database."""
await db.executescript(_CREATE_SCHEMA_MIGRATIONS)
async with db.execute("SELECT MAX(version) FROM schema_migrations;") as cursor:
row = await cursor.fetchone()
if row is None or row[0] is None:
return 0
return int(row[0])
async def _apply_migration(db: aiosqlite.Connection, version: int) -> None:
"""Apply a single migration step and record its completion."""
migration_script = _MIGRATIONS[version]
await db.executescript(migration_script)
await db.execute("INSERT INTO schema_migrations (version) VALUES (?);", (version,))
await db.commit()
async def _migrate_schema(db: aiosqlite.Connection) -> None:
"""Migrate the database schema to the latest supported version."""
current_version = await _get_current_schema_version(db)
if current_version == _CURRENT_SCHEMA_VERSION:
return
if current_version > _CURRENT_SCHEMA_VERSION:
raise RuntimeError(
f"database schema version {current_version} is newer than supported "
f"version {_CURRENT_SCHEMA_VERSION}"
)
log.info("migrating_database_schema", from_version=current_version, to_version=_CURRENT_SCHEMA_VERSION)
for next_version in range(current_version + 1, _CURRENT_SCHEMA_VERSION + 1):
await _apply_migration(db, next_version)
log.info("database_schema_ready", schema_version=_CURRENT_SCHEMA_VERSION)
async def init_db(db: aiosqlite.Connection) -> None:
"""Create all BanGUI application tables if they do not already exist.
"""Create or migrate the BanGUI application database schema.
This function is idempotent — calling it on an already-initialised
database has no effect. It should be called once during application
@@ -125,10 +174,7 @@ async def init_db(db: aiosqlite.Connection) -> None:
"""
log.info("initialising_database_schema")
await _configure_connection(db)
for statement in _SCHEMA_STATEMENTS:
await db.executescript(statement)
await db.commit()
log.info("database_schema_ready")
await _migrate_schema(db)
async def open_db(database_path: str) -> aiosqlite.Connection: