# Test ForcedAlignmentRow creation
row = ForcedAlignmentRow(
job_id="fa_job_abc123",
audio_path="/tmp/test.mp3",
audio_hash="sha256:" + "a" * 64,
text="Hello world",
text_hash="sha256:" + "b" * 64,
config_hash="sha256:" + "f" * 64,
items=[{"text": "Hello", "start_time": 0.0, "end_time": 0.5}],
metadata={"model": "qwen3-forced-aligner"}
)
print(f"Row: job_id={row.job_id}, text={row.text}")
print(f"Audio hash: {row.audio_hash[:20]}...")
print(f"Text hash: {row.text_hash[:20]}...")Forced Alignment Storage
ForcedAlignmentRow
A dataclass representing a single row in the standardized forced alignments table. This provides a type-safe way to work with stored forced alignment results.
ForcedAlignmentRow
def ForcedAlignmentRow(
job_id:str, audio_path:str, audio_hash:str, text:str, text_hash:str, config_hash:str, items:Optional=None,
metadata:Optional=None, created_at:Optional=None
)->None:
A single row from the forced_alignments table.
ForcedAlignmentStorage
Standardized SQLite storage that all forced alignment plugins should use. Defines the canonical schema for the forced_alignments table with content hash columns for traceability and config-based caching.
Schema:
CREATE TABLE IF NOT EXISTS forced_alignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT UNIQUE NOT NULL,
audio_path TEXT NOT NULL,
audio_hash TEXT NOT NULL,
text TEXT NOT NULL,
text_hash TEXT NOT NULL,
config_hash TEXT NOT NULL DEFAULT '',
items JSON,
metadata JSON,
created_at REAL NOT NULL
);
CREATE UNIQUE INDEX idx_forced_alignments_cache
ON forced_alignments(audio_path, text_hash, config_hash);A forced-alignment input is the (audio, transcript) pair, so the (audio_path, text_hash, config_hash) unique index is the cache/upsert key — text_hash is in the key because aligning the same audio against a different transcript is a distinct result (kept separately), while re-running the identical pair replaces the previous row via INSERT OR REPLACE. get_cached(audio_path, audio_hash, text_hash, config_hash) additionally matches the audio content hash, so a changed audio file misses the cache and the next save() replaces the stale row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.
The audio_hash and text_hash columns use the self-describing "algo:hexdigest" format, enabling downstream consumers to verify content integrity.
ForcedAlignmentStorage
def ForcedAlignmentStorage(
db_path:str, # Absolute path to the SQLite database file
):
Standardized SQLite storage for forced alignment results.
Testing
import tempfile
import os
# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = ForcedAlignmentStorage(tmp_db.name)
print(f"Storage initialized at: {tmp_db.name}")Storage initialized at: /tmp/tmpgfc7dvn_.db
# Save a forced alignment result with hashes
test_text = "November the 10th, Wednesday, 9 p.m."
text_hash = hash_bytes(test_text.encode())
audio_hash = "sha256:" + "e3b0c44298" * 6 + "e3b0" # Simulated audio hash
cfg1 = "sha256:" + "1" * 64 # Simulated config hash
test_items = [
{"text": "November", "start_time": 1.04, "end_time": 1.6},
{"text": "the", "start_time": 1.6, "end_time": 1.68},
{"text": "10th", "start_time": 1.76, "end_time": 2.08},
{"text": "Wednesday", "start_time": 2.48, "end_time": 3.04},
{"text": "9", "start_time": 3.84, "end_time": 4.16},
{"text": "pm", "start_time": 4.16, "end_time": 4.64},
]
storage.save(
job_id="fa_test_001",
audio_path="/tmp/test_audio.mp3",
audio_hash=audio_hash,
text=test_text,
text_hash=text_hash,
config_hash=cfg1,
items=test_items,
metadata={"model_id": "Qwen/Qwen3-ForcedAligner-0.6B", "language": "English", "word_count": 6}
)
print(f"Saved fa_test_001")
print(f"Text hash: {text_hash}")# Retrieve by job ID
row = storage.get_by_job_id("fa_test_001")
assert row is not None
assert row.job_id == "fa_test_001"
assert row.text == test_text
assert row.text_hash == text_hash
assert row.audio_hash == audio_hash
assert row.items is not None
assert len(row.items) == 6
assert row.items[0]["text"] == "November"
assert row.items[0]["start_time"] == 1.04
assert row.metadata["model_id"] == "Qwen/Qwen3-ForcedAligner-0.6B"
assert row.created_at is not None
print(f"Retrieved: {row.job_id}")
print(f"Text: {row.text}")
print(f"Items: {len(row.items)} words")
print(f"First item: {row.items[0]}")
print(f"Created at: {row.created_at}")Retrieved: fa_test_001
Text: November the 10th, Wednesday, 9 p.m.
Items: 6 words
First item: {'text': 'November', 'start_time': 1.04, 'end_time': 1.6}
Created at: 1773970556.1129797
# Missing job returns None
missing = storage.get_by_job_id("nonexistent")
assert missing is None
print("get_by_job_id returns None for missing job: OK")get_by_job_id returns None for missing job: OK
# Save another and test list_jobs
storage.save(
job_id="fa_test_002",
audio_path="/tmp/test_audio_2.mp3",
audio_hash="sha256:" + "f" * 64,
text="Second alignment test.",
text_hash=hash_bytes(b"Second alignment test."),
config_hash="sha256:" + "2" * 64,
items=[{"text": "Second", "start_time": 0.0, "end_time": 0.5}],
)
jobs = storage.list_jobs()
assert len(jobs) == 2
# Newest first
assert jobs[0].job_id == "fa_test_002"
assert jobs[1].job_id == "fa_test_001"
print(f"list_jobs returned {len(jobs)} rows (newest first): {[j.job_id for j in jobs]}")# get_cached: content-correct lookup over the (audio, transcript) pair
hit = storage.get_cached("/tmp/test_audio.mp3", audio_hash, text_hash, cfg1)
assert hit is not None and hit.job_id == "fa_test_001"
# Changed audio content -> miss
assert storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "9" * 64, text_hash, cfg1) is None
# Different transcript -> miss
assert storage.get_cached("/tmp/test_audio.mp3", audio_hash, "sha256:" + "8" * 64, cfg1) is None
# Different config -> miss
assert storage.get_cached("/tmp/test_audio.mp3", audio_hash, text_hash, "sha256:" + "0" * 64) is None
print("ForcedAlignmentStorage.get_cached hit + 3 miss cases: OK")# INSERT OR REPLACE + content-pair key: re-running the same (audio, text, config) replaces;
# a different transcript is a separate row (text_hash is part of the key).
tmp_db_repl = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
s2 = ForcedAlignmentStorage(tmp_db_repl.name)
th = hash_bytes(b"hello")
s2.save(job_id="r1", audio_path="/tmp/a.wav", audio_hash="sha256:" + "a" * 64,
text="hello", text_hash=th, config_hash="sha256:" + "c" * 64)
# Same (audio_path, text, config), changed audio content -> replaces the stale row
s2.save(job_id="r2", audio_path="/tmp/a.wav", audio_hash="sha256:" + "b" * 64,
text="hello", text_hash=th, config_hash="sha256:" + "c" * 64)
assert [j.job_id for j in s2.list_jobs()] == ["r2"]
# Different transcript -> separate row (kept; text_hash differs)
s2.save(job_id="r3", audio_path="/tmp/a.wav", audio_hash="sha256:" + "b" * 64,
text="goodbye", text_hash=hash_bytes(b"goodbye"), config_hash="sha256:" + "c" * 64)
assert len(s2.list_jobs()) == 2
# get_cached over the (audio, text) pair: new audio content hits, old audio content misses
assert s2.get_cached("/tmp/a.wav", "sha256:" + "b" * 64, th, "sha256:" + "c" * 64).job_id == "r2"
assert s2.get_cached("/tmp/a.wav", "sha256:" + "a" * 64, th, "sha256:" + "c" * 64) is None
os.unlink(tmp_db_repl.name)
print("INSERT OR REPLACE + content-pair key (text in key, audio-hash freshness): OK")# Migration + de-dup: a pre-cache forced_alignments table (no config_hash, append-only
# with duplicate (audio, text) rows) gains config_hash on open AND de-dups so the UNIQUE
# cache index can build (keep newest per (audio_path, text_hash, config_hash)).
tmp_db_mig = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
with sqlite3.connect(tmp_db_mig.name) as con:
con.execute("""
CREATE TABLE forced_alignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT UNIQUE NOT NULL,
audio_path TEXT NOT NULL,
audio_hash TEXT NOT NULL,
text TEXT NOT NULL,
text_hash TEXT NOT NULL,
items JSON,
metadata JSON,
created_at REAL NOT NULL
)
""")
con.executemany(
"INSERT INTO forced_alignments (job_id, audio_path, audio_hash, text, text_hash, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
[("dup_old", "/tmp/a.wav", "sha256:a1", "hello", "sha256:th", 0.0),
("dup_new", "/tmp/a.wav", "sha256:a2", "hello", "sha256:th", 1.0),
("uniq", "/tmp/a.wav", "sha256:a1", "goodbye", "sha256:tg", 2.0)],
)
s3 = ForcedAlignmentStorage(tmp_db_mig.name) # __init__ migrates + de-dups + builds the unique index
with sqlite3.connect(tmp_db_mig.name) as con:
cols = {r[1] for r in con.execute("PRAGMA table_info(forced_alignments)")}
assert "config_hash" in cols
# 'dup_old'/'dup_new' share (/tmp/a.wav, sha256:th, '') -> newest kept; 'uniq' distinct text_hash.
assert s3.get_by_job_id("dup_old") is None
assert s3.get_by_job_id("dup_new") is not None
assert s3.get_by_job_id("uniq") is not None
assert len(s3.list_jobs()) == 2
s3.save(job_id="new", audio_path="/tmp/new.wav", audio_hash="sha256:new",
text="new text", text_hash=hash_bytes(b"new text"), config_hash="sha256:cfg")
assert s3.get_cached("/tmp/new.wav", "sha256:new", hash_bytes(b"new text"), "sha256:cfg").job_id == "new"
os.unlink(tmp_db_mig.name)
print("Pre-cache schema migrated + de-duped (UNIQUE index built; newest row kept): OK")# Test save_with_logging (Track 11 helper): success returns True + persists
test_logger = logging.getLogger("forced_alignment_storage_test")
ok = storage.save_with_logging(
job_id="fa_test_swl",
audio_path="/tmp/fa_audio_swl.mp3",
audio_hash="sha256:" + "c" * 64,
text="Logged alignment.",
text_hash=hash_bytes(b"Logged alignment."),
config_hash="sha256:" + "3" * 64,
items=[{"text": "Logged", "start_time": 0.0, "end_time": 0.5}],
logger=test_logger,
)
assert ok is True
assert storage.get_by_job_id("fa_test_swl") is not None
print("save_with_logging success path: returned True, row persisted")
# Failure path: if save() raises, the helper logs + returns False (swallowed).
# (INSERT OR REPLACE means a duplicate no longer raises, so force an error directly.)
_orig_save = storage.save
def _boom(**kwargs):
raise RuntimeError("simulated DB failure")
storage.save = _boom
try:
ok_fail = storage.save_with_logging(
job_id="fa_fail",
audio_path="/tmp/x.mp3",
audio_hash="sha256:" + "d" * 64,
text="x",
text_hash=hash_bytes(b"x"),
config_hash="sha256:" + "4" * 64,
logger=test_logger,
)
finally:
storage.save = _orig_save
assert ok_fail is False
print("save_with_logging failure path: returned False, error swallowed")# Test text verification
assert storage.verify_text("fa_test_001") == True
print("verify_text with unchanged text: True")
# Tamper with text directly in DB
with sqlite3.connect(tmp_db.name) as con:
con.execute("UPDATE forced_alignments SET text = 'TAMPERED' WHERE job_id = 'fa_test_001'")
assert storage.verify_text("fa_test_001") == False
print("verify_text after tampering: False")
# Missing job returns None
assert storage.verify_text("nonexistent") is None
print("verify_text for missing job: None")verify_text with unchanged text: True
verify_text after tampering: False
verify_text for missing job: None
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")Cleanup complete