# Test TranscriptionRow creation
row = TranscriptionRow(
job_id="job_abc123",
audio_path="/tmp/test.mp3",
audio_hash="sha256:" + "a" * 64,
config_hash="sha256:" + "f" * 64,
text="Hello world",
text_hash="sha256:" + "b" * 64,
segments=[{"start": 0.0, "end": 1.0, "text": "Hello world"}],
metadata={"model": "whisper-large-v3"}
)
print(f"Row: job_id={row.job_id}, text={row.text[:20]}...")
print(f"Audio hash: {row.audio_hash[:20]}...")
print(f"Text hash: {row.text_hash[:20]}...")Transcription Storage
TranscriptionRow
A dataclass representing a single row in the standardized transcriptions table. This provides a type-safe way to work with stored transcription results.
TranscriptionRow
def TranscriptionRow(
job_id:str, audio_path:str, audio_hash:str, config_hash:str, text:str, text_hash:str, segments:Optional=None,
metadata:Optional=None, created_at:Optional=None
)->None:
A single row from the transcriptions table.
TranscriptionStorage
Standardized SQLite storage that all transcription plugins should use. Defines the canonical schema for the transcriptions table with content hash columns for traceability and config-based caching.
Schema:
CREATE TABLE IF NOT EXISTS transcriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT UNIQUE NOT NULL,
audio_path TEXT NOT NULL,
audio_hash TEXT NOT NULL,
config_hash TEXT NOT NULL DEFAULT '',
text TEXT NOT NULL,
text_hash TEXT NOT NULL,
segments JSON,
metadata JSON,
created_at REAL NOT NULL
);
CREATE UNIQUE INDEX idx_transcriptions_cache
ON transcriptions(audio_path, config_hash);The (audio_path, config_hash) unique index is the cache/upsert key — re-transcribing the same audio with the same config replaces the previous row via INSERT OR REPLACE (no stale-content rows accumulate). get_cached(audio_path, audio_hash, config_hash) additionally matches the audio content hash, so a changed audio file misses the cache and the next save() replaces the stale row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.
The audio_hash and text_hash columns use the self-describing "algo:hexdigest" format (e.g., "sha256:a3f2b8..."), enabling downstream consumers to verify content integrity.
TranscriptionStorage
def TranscriptionStorage(
db_path:str, # Absolute path to the SQLite database file
):
Standardized SQLite storage for transcription results.
Testing
import tempfile
import os
# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = TranscriptionStorage(tmp_db.name)
print(f"Storage initialized at: {tmp_db.name}")Storage initialized at: /tmp/tmpsztizwz8.db
# Save a transcription result with hashes
test_text = "Laying Plans Sun Tzu said, The art of war is of vital importance to the state."
text_hash = hash_bytes(test_text.encode())
audio_hash = "sha256:" + "e3b0c44298" * 6 + "e3b0" # Simulated audio hash
cfg1 = "sha256:" + "1" * 64 # Simulated config hash
storage.save(
job_id="job_test_001",
audio_path="/tmp/test_audio.mp3",
audio_hash=audio_hash,
config_hash=cfg1,
text=test_text,
text_hash=text_hash,
segments=[{"start": 0.0, "end": 5.0, "text": test_text}],
metadata={"model": "whisper-large-v3", "language": "en"}
)
print(f"Saved job_test_001")
print(f"Text hash: {text_hash}")# Retrieve by job ID
row = storage.get_by_job_id("job_test_001")
assert row is not None
assert row.job_id == "job_test_001"
assert row.text == test_text
assert row.text_hash == text_hash
assert row.audio_hash == audio_hash
assert row.segments is not None
assert row.metadata["model"] == "whisper-large-v3"
assert row.created_at is not None
print(f"Retrieved: {row.job_id}")
print(f"Text: {row.text[:40]}...")
print(f"Audio hash: {row.audio_hash[:30]}...")
print(f"Text hash: {row.text_hash[:30]}...")
print(f"Created at: {row.created_at}")Retrieved: job_test_001
Text: Laying Plans Sun Tzu said, The art of wa...
Audio hash: sha256:e3b0c44298e3b0c44298e3b...
Text hash: sha256:83efd1674de9fcf20e5c2ed...
Created at: 1770425259.7641876
# Missing job returns None
missing = storage.get_by_job_id("nonexistent")
assert missing is None
print("get_by_job_id returns None for missing job: OK")get_by_job_id returns None for missing job: OK
# Save another and test list_jobs
storage.save(
job_id="job_test_002",
audio_path="/tmp/test_audio_2.mp3",
audio_hash="sha256:" + "f" * 64,
config_hash="sha256:" + "2" * 64,
text="Second transcription.",
text_hash=hash_bytes(b"Second transcription.")
)
jobs = storage.list_jobs()
assert len(jobs) == 2
# Newest first
assert jobs[0].job_id == "job_test_002"
assert jobs[1].job_id == "job_test_001"
print(f"list_jobs returned {len(jobs)} rows (newest first): {[j.job_id for j in jobs]}")# get_cached: content-correct lookup by (audio_path, audio_hash, config_hash)
hit = storage.get_cached("/tmp/test_audio.mp3", audio_hash, cfg1)
assert hit is not None and hit.job_id == "job_test_001"
# Changed audio content -> miss
assert storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "9" * 64, cfg1) is None
# Different config -> miss
assert storage.get_cached("/tmp/test_audio.mp3", audio_hash, "sha256:" + "0" * 64) is None
print("TranscriptionStorage.get_cached hit + 2 miss cases: OK")# INSERT OR REPLACE: re-transcribing the same (audio_path, config_hash) replaces the row.
tmp_db_repl = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
s2 = TranscriptionStorage(tmp_db_repl.name)
s2.save(job_id="r1", audio_path="/tmp/a.wav", audio_hash="sha256:" + "a" * 64,
config_hash="sha256:" + "c" * 64, text="v1", text_hash=hash_bytes(b"v1"))
# Same audio_path + config, changed audio content -> replaces the stale row
s2.save(job_id="r2", audio_path="/tmp/a.wav", audio_hash="sha256:" + "b" * 64,
config_hash="sha256:" + "c" * 64, text="v2", text_hash=hash_bytes(b"v2"))
rows = s2.list_jobs()
assert len(rows) == 1 and rows[0].job_id == "r2" # one row, old r1 replaced
assert s2.get_cached("/tmp/a.wav", "sha256:" + "a" * 64, "sha256:" + "c" * 64) is None # old content misses
assert s2.get_cached("/tmp/a.wav", "sha256:" + "b" * 64, "sha256:" + "c" * 64).job_id == "r2"
os.unlink(tmp_db_repl.name)
print("INSERT OR REPLACE on (audio_path, config_hash) — stale row replaced, no orphan: OK")# Migration + de-dup: a pre-cache transcriptions table (no config_hash, append-only
# with duplicate-audio rows) gains config_hash on open AND de-dups so the UNIQUE cache
# index can build (keep newest per (audio_path, config_hash)).
tmp_db_mig = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
with sqlite3.connect(tmp_db_mig.name) as con:
con.execute("""
CREATE TABLE transcriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT UNIQUE NOT NULL,
audio_path TEXT NOT NULL,
audio_hash TEXT NOT NULL,
text TEXT NOT NULL,
text_hash TEXT NOT NULL,
segments JSON,
metadata JSON,
created_at REAL NOT NULL
)
""")
con.executemany(
"INSERT INTO transcriptions (job_id, audio_path, audio_hash, text, text_hash, created_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
[("dup_old", "/tmp/a.wav", "sha256:a1", "old", "sha256:t1", 0.0),
("dup_new", "/tmp/a.wav", "sha256:a2", "new", "sha256:t2", 1.0),
("uniq", "/tmp/b.wav", "sha256:b1", "other", "sha256:t3", 2.0)],
)
s3 = TranscriptionStorage(tmp_db_mig.name) # __init__ migrates + de-dups + builds the unique index
with sqlite3.connect(tmp_db_mig.name) as con:
cols = {r[1] for r in con.execute("PRAGMA table_info(transcriptions)")}
assert "config_hash" in cols
# 'dup_old'/'dup_new' share (/tmp/a.wav, '') -> newest kept; 'uniq' distinct audio_path.
assert s3.get_by_job_id("dup_old") is None
assert s3.get_by_job_id("dup_new") is not None
assert s3.get_by_job_id("uniq") is not None
assert len(s3.list_jobs()) == 2
s3.save(job_id="new", audio_path="/tmp/new.wav", audio_hash="sha256:new",
config_hash="sha256:cfg", text="new", text_hash=hash_bytes(b"new"))
assert s3.get_cached("/tmp/new.wav", "sha256:new", "sha256:cfg").job_id == "new"
os.unlink(tmp_db_mig.name)
print("Pre-cache schema migrated + de-duped (UNIQUE index built; newest row kept): OK")# Test save_with_logging (Track 11 helper): success returns True + persists
test_logger = logging.getLogger("transcription_storage_test")
ok = storage.save_with_logging(
job_id="job_test_swl",
audio_path="/tmp/test_audio_swl.mp3",
audio_hash="sha256:" + "c" * 64,
config_hash="sha256:" + "3" * 64,
text="Logged save.",
text_hash=hash_bytes(b"Logged save."),
logger=test_logger,
)
assert ok is True
assert storage.get_by_job_id("job_test_swl") is not None
print("save_with_logging success path: returned True, row persisted")
# Failure path: if save() raises, the helper logs + returns False (swallowed).
# (INSERT OR REPLACE means a duplicate no longer raises, so force an error directly.)
_orig_save = storage.save
def _boom(**kwargs):
raise RuntimeError("simulated DB failure")
storage.save = _boom
try:
ok_fail = storage.save_with_logging(
job_id="job_fail",
audio_path="/tmp/x.mp3",
audio_hash="sha256:" + "d" * 64,
config_hash="sha256:" + "4" * 64,
text="x",
text_hash=hash_bytes(b"x"),
logger=test_logger,
)
finally:
storage.save = _orig_save
assert ok_fail is False
print("save_with_logging failure path: returned False, error swallowed")# Test text verification
assert storage.verify_text("job_test_001") == True
print("verify_text with unchanged text: True")
# Tamper with text directly in DB
with sqlite3.connect(tmp_db.name) as con:
con.execute("UPDATE transcriptions SET text = 'TAMPERED' WHERE job_id = 'job_test_001'")
assert storage.verify_text("job_test_001") == False
print("verify_text after tampering: False")
# Missing job returns None
assert storage.verify_text("nonexistent") is None
print("verify_text for missing job: None")verify_text with unchanged text: True
verify_text after tampering: False
verify_text for missing job: None
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")Cleanup complete