Forced Alignment Storage

Standardized SQLite storage for forced alignment results with content hashing

ForcedAlignmentRow

A dataclass representing a single row in the standardized forced alignments table. This provides a type-safe way to work with stored forced alignment results.


ForcedAlignmentRow


def ForcedAlignmentRow(
    job_id:str, audio_path:str, audio_hash:str, text:str, text_hash:str, config_hash:str, items:Optional=None,
    metadata:Optional=None, created_at:Optional=None
)->None:

A single row from the forced_alignments table.

# Test ForcedAlignmentRow creation
row = ForcedAlignmentRow(
    job_id="fa_job_abc123",
    audio_path="/tmp/test.mp3",
    audio_hash="sha256:" + "a" * 64,
    text="Hello world",
    text_hash="sha256:" + "b" * 64,
    config_hash="sha256:" + "f" * 64,
    items=[{"text": "Hello", "start_time": 0.0, "end_time": 0.5}],
    metadata={"model": "qwen3-forced-aligner"}
)

print(f"Row: job_id={row.job_id}, text={row.text}")
print(f"Audio hash: {row.audio_hash[:20]}...")
print(f"Text hash: {row.text_hash[:20]}...")

ForcedAlignmentStorage

Standardized SQLite storage that all forced alignment plugins should use. Defines the canonical schema for the forced_alignments table with content hash columns for traceability and config-based caching.

Schema:

CREATE TABLE IF NOT EXISTS forced_alignments (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    audio_path TEXT NOT NULL,
    audio_hash TEXT NOT NULL,
    text TEXT NOT NULL,
    text_hash TEXT NOT NULL,
    config_hash TEXT NOT NULL DEFAULT '',
    items JSON,
    metadata JSON,
    created_at REAL NOT NULL
);

CREATE UNIQUE INDEX idx_forced_alignments_cache
    ON forced_alignments(audio_path, text_hash, config_hash);

A forced-alignment input is the (audio, transcript) pair, so the (audio_path, text_hash, config_hash) unique index is the cache/upsert keytext_hash is in the key because aligning the same audio against a different transcript is a distinct result (kept separately), while re-running the identical pair replaces the previous row via INSERT OR REPLACE. get_cached(audio_path, audio_hash, text_hash, config_hash) additionally matches the audio content hash, so a changed audio file misses the cache and the next save() replaces the stale row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.

The audio_hash and text_hash columns use the self-describing "algo:hexdigest" format, enabling downstream consumers to verify content integrity.


ForcedAlignmentStorage


def ForcedAlignmentStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for forced alignment results.

Testing

import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = ForcedAlignmentStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")
Storage initialized at: /tmp/tmpgfc7dvn_.db
# Save a forced alignment result with hashes
test_text = "November the 10th, Wednesday, 9 p.m."
text_hash = hash_bytes(test_text.encode())
audio_hash = "sha256:" + "e3b0c44298" * 6 + "e3b0"  # Simulated audio hash
cfg1 = "sha256:" + "1" * 64                          # Simulated config hash

test_items = [
    {"text": "November", "start_time": 1.04, "end_time": 1.6},
    {"text": "the", "start_time": 1.6, "end_time": 1.68},
    {"text": "10th", "start_time": 1.76, "end_time": 2.08},
    {"text": "Wednesday", "start_time": 2.48, "end_time": 3.04},
    {"text": "9", "start_time": 3.84, "end_time": 4.16},
    {"text": "pm", "start_time": 4.16, "end_time": 4.64},
]

storage.save(
    job_id="fa_test_001",
    audio_path="/tmp/test_audio.mp3",
    audio_hash=audio_hash,
    text=test_text,
    text_hash=text_hash,
    config_hash=cfg1,
    items=test_items,
    metadata={"model_id": "Qwen/Qwen3-ForcedAligner-0.6B", "language": "English", "word_count": 6}
)

print(f"Saved fa_test_001")
print(f"Text hash: {text_hash}")
# Retrieve by job ID
row = storage.get_by_job_id("fa_test_001")
assert row is not None
assert row.job_id == "fa_test_001"
assert row.text == test_text
assert row.text_hash == text_hash
assert row.audio_hash == audio_hash
assert row.items is not None
assert len(row.items) == 6
assert row.items[0]["text"] == "November"
assert row.items[0]["start_time"] == 1.04
assert row.metadata["model_id"] == "Qwen/Qwen3-ForcedAligner-0.6B"
assert row.created_at is not None

print(f"Retrieved: {row.job_id}")
print(f"Text: {row.text}")
print(f"Items: {len(row.items)} words")
print(f"First item: {row.items[0]}")
print(f"Created at: {row.created_at}")
Retrieved: fa_test_001
Text: November the 10th, Wednesday, 9 p.m.
Items: 6 words
First item: {'text': 'November', 'start_time': 1.04, 'end_time': 1.6}
Created at: 1773970556.1129797
# Missing job returns None
missing = storage.get_by_job_id("nonexistent")
assert missing is None
print("get_by_job_id returns None for missing job: OK")
get_by_job_id returns None for missing job: OK
# Save another and test list_jobs
storage.save(
    job_id="fa_test_002",
    audio_path="/tmp/test_audio_2.mp3",
    audio_hash="sha256:" + "f" * 64,
    text="Second alignment test.",
    text_hash=hash_bytes(b"Second alignment test."),
    config_hash="sha256:" + "2" * 64,
    items=[{"text": "Second", "start_time": 0.0, "end_time": 0.5}],
)

jobs = storage.list_jobs()
assert len(jobs) == 2
# Newest first
assert jobs[0].job_id == "fa_test_002"
assert jobs[1].job_id == "fa_test_001"

print(f"list_jobs returned {len(jobs)} rows (newest first): {[j.job_id for j in jobs]}")
# get_cached: content-correct lookup over the (audio, transcript) pair
hit = storage.get_cached("/tmp/test_audio.mp3", audio_hash, text_hash, cfg1)
assert hit is not None and hit.job_id == "fa_test_001"
# Changed audio content -> miss
assert storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "9" * 64, text_hash, cfg1) is None
# Different transcript -> miss
assert storage.get_cached("/tmp/test_audio.mp3", audio_hash, "sha256:" + "8" * 64, cfg1) is None
# Different config -> miss
assert storage.get_cached("/tmp/test_audio.mp3", audio_hash, text_hash, "sha256:" + "0" * 64) is None
print("ForcedAlignmentStorage.get_cached hit + 3 miss cases: OK")
# INSERT OR REPLACE + content-pair key: re-running the same (audio, text, config) replaces;
# a different transcript is a separate row (text_hash is part of the key).
tmp_db_repl = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
s2 = ForcedAlignmentStorage(tmp_db_repl.name)
th = hash_bytes(b"hello")
s2.save(job_id="r1", audio_path="/tmp/a.wav", audio_hash="sha256:" + "a" * 64,
        text="hello", text_hash=th, config_hash="sha256:" + "c" * 64)
# Same (audio_path, text, config), changed audio content -> replaces the stale row
s2.save(job_id="r2", audio_path="/tmp/a.wav", audio_hash="sha256:" + "b" * 64,
        text="hello", text_hash=th, config_hash="sha256:" + "c" * 64)
assert [j.job_id for j in s2.list_jobs()] == ["r2"]
# Different transcript -> separate row (kept; text_hash differs)
s2.save(job_id="r3", audio_path="/tmp/a.wav", audio_hash="sha256:" + "b" * 64,
        text="goodbye", text_hash=hash_bytes(b"goodbye"), config_hash="sha256:" + "c" * 64)
assert len(s2.list_jobs()) == 2
# get_cached over the (audio, text) pair: new audio content hits, old audio content misses
assert s2.get_cached("/tmp/a.wav", "sha256:" + "b" * 64, th, "sha256:" + "c" * 64).job_id == "r2"
assert s2.get_cached("/tmp/a.wav", "sha256:" + "a" * 64, th, "sha256:" + "c" * 64) is None
os.unlink(tmp_db_repl.name)

print("INSERT OR REPLACE + content-pair key (text in key, audio-hash freshness): OK")
# Migration + de-dup: a pre-cache forced_alignments table (no config_hash, append-only
# with duplicate (audio, text) rows) gains config_hash on open AND de-dups so the UNIQUE
# cache index can build (keep newest per (audio_path, text_hash, config_hash)).
tmp_db_mig = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
with sqlite3.connect(tmp_db_mig.name) as con:
    con.execute("""
        CREATE TABLE forced_alignments (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            job_id TEXT UNIQUE NOT NULL,
            audio_path TEXT NOT NULL,
            audio_hash TEXT NOT NULL,
            text TEXT NOT NULL,
            text_hash TEXT NOT NULL,
            items JSON,
            metadata JSON,
            created_at REAL NOT NULL
        )
    """)
    con.executemany(
        "INSERT INTO forced_alignments (job_id, audio_path, audio_hash, text, text_hash, created_at) "
        "VALUES (?, ?, ?, ?, ?, ?)",
        [("dup_old", "/tmp/a.wav", "sha256:a1", "hello", "sha256:th", 0.0),
         ("dup_new", "/tmp/a.wav", "sha256:a2", "hello", "sha256:th", 1.0),
         ("uniq", "/tmp/a.wav", "sha256:a1", "goodbye", "sha256:tg", 2.0)],
    )

s3 = ForcedAlignmentStorage(tmp_db_mig.name)  # __init__ migrates + de-dups + builds the unique index
with sqlite3.connect(tmp_db_mig.name) as con:
    cols = {r[1] for r in con.execute("PRAGMA table_info(forced_alignments)")}
assert "config_hash" in cols
# 'dup_old'/'dup_new' share (/tmp/a.wav, sha256:th, '') -> newest kept; 'uniq' distinct text_hash.
assert s3.get_by_job_id("dup_old") is None
assert s3.get_by_job_id("dup_new") is not None
assert s3.get_by_job_id("uniq") is not None
assert len(s3.list_jobs()) == 2

s3.save(job_id="new", audio_path="/tmp/new.wav", audio_hash="sha256:new",
        text="new text", text_hash=hash_bytes(b"new text"), config_hash="sha256:cfg")
assert s3.get_cached("/tmp/new.wav", "sha256:new", hash_bytes(b"new text"), "sha256:cfg").job_id == "new"
os.unlink(tmp_db_mig.name)

print("Pre-cache schema migrated + de-duped (UNIQUE index built; newest row kept): OK")
# Test save_with_logging (Track 11 helper): success returns True + persists
test_logger = logging.getLogger("forced_alignment_storage_test")

ok = storage.save_with_logging(
    job_id="fa_test_swl",
    audio_path="/tmp/fa_audio_swl.mp3",
    audio_hash="sha256:" + "c" * 64,
    text="Logged alignment.",
    text_hash=hash_bytes(b"Logged alignment."),
    config_hash="sha256:" + "3" * 64,
    items=[{"text": "Logged", "start_time": 0.0, "end_time": 0.5}],
    logger=test_logger,
)
assert ok is True
assert storage.get_by_job_id("fa_test_swl") is not None
print("save_with_logging success path: returned True, row persisted")

# Failure path: if save() raises, the helper logs + returns False (swallowed).
# (INSERT OR REPLACE means a duplicate no longer raises, so force an error directly.)
_orig_save = storage.save
def _boom(**kwargs):
    raise RuntimeError("simulated DB failure")
storage.save = _boom
try:
    ok_fail = storage.save_with_logging(
        job_id="fa_fail",
        audio_path="/tmp/x.mp3",
        audio_hash="sha256:" + "d" * 64,
        text="x",
        text_hash=hash_bytes(b"x"),
        config_hash="sha256:" + "4" * 64,
        logger=test_logger,
    )
finally:
    storage.save = _orig_save
assert ok_fail is False
print("save_with_logging failure path: returned False, error swallowed")
# Test text verification
assert storage.verify_text("fa_test_001") == True
print("verify_text with unchanged text: True")

# Tamper with text directly in DB
with sqlite3.connect(tmp_db.name) as con:
    con.execute("UPDATE forced_alignments SET text = 'TAMPERED' WHERE job_id = 'fa_test_001'")

assert storage.verify_text("fa_test_001") == False
print("verify_text after tampering: False")

# Missing job returns None
assert storage.verify_text("nonexistent") is None
print("verify_text for missing job: None")
verify_text with unchanged text: True
verify_text after tampering: False
verify_text for missing job: None
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")
Cleanup complete