Transcription Storage

Standardized SQLite storage for transcription results with content hashing

TranscriptionRow

A dataclass representing a single row in the standardized transcriptions table. This provides a type-safe way to work with stored transcription results.


TranscriptionRow


def TranscriptionRow(
    job_id:str, audio_path:str, audio_hash:str, config_hash:str, text:str, text_hash:str, segments:Optional=None,
    metadata:Optional=None, created_at:Optional=None
)->None:

A single row from the transcriptions table.

# Test TranscriptionRow creation
row = TranscriptionRow(
    job_id="job_abc123",
    audio_path="/tmp/test.mp3",
    audio_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "f" * 64,
    text="Hello world",
    text_hash="sha256:" + "b" * 64,
    segments=[{"start": 0.0, "end": 1.0, "text": "Hello world"}],
    metadata={"model": "whisper-large-v3"}
)

print(f"Row: job_id={row.job_id}, text={row.text[:20]}...")
print(f"Audio hash: {row.audio_hash[:20]}...")
print(f"Text hash: {row.text_hash[:20]}...")

TranscriptionStorage

Standardized SQLite storage that all transcription plugins should use. Defines the canonical schema for the transcriptions table with content hash columns for traceability and config-based caching.

Schema:

CREATE TABLE IF NOT EXISTS transcriptions (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    audio_path TEXT NOT NULL,
    audio_hash TEXT NOT NULL,
    config_hash TEXT NOT NULL DEFAULT '',
    text TEXT NOT NULL,
    text_hash TEXT NOT NULL,
    segments JSON,
    metadata JSON,
    created_at REAL NOT NULL
);

CREATE UNIQUE INDEX idx_transcriptions_cache
    ON transcriptions(audio_path, config_hash);

The (audio_path, config_hash) unique index is the cache/upsert key — re-transcribing the same audio with the same config replaces the previous row via INSERT OR REPLACE (no stale-content rows accumulate). get_cached(audio_path, audio_hash, config_hash) additionally matches the audio content hash, so a changed audio file misses the cache and the next save() replaces the stale row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.

The audio_hash and text_hash columns use the self-describing "algo:hexdigest" format (e.g., "sha256:a3f2b8..."), enabling downstream consumers to verify content integrity.


TranscriptionStorage


def TranscriptionStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for transcription results.

Testing

import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = TranscriptionStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")
Storage initialized at: /tmp/tmpsztizwz8.db
# Save a transcription result with hashes
test_text = "Laying Plans Sun Tzu said, The art of war is of vital importance to the state."
text_hash = hash_bytes(test_text.encode())
audio_hash = "sha256:" + "e3b0c44298" * 6 + "e3b0"  # Simulated audio hash
cfg1 = "sha256:" + "1" * 64                          # Simulated config hash

storage.save(
    job_id="job_test_001",
    audio_path="/tmp/test_audio.mp3",
    audio_hash=audio_hash,
    config_hash=cfg1,
    text=test_text,
    text_hash=text_hash,
    segments=[{"start": 0.0, "end": 5.0, "text": test_text}],
    metadata={"model": "whisper-large-v3", "language": "en"}
)

print(f"Saved job_test_001")
print(f"Text hash: {text_hash}")
# Retrieve by job ID
row = storage.get_by_job_id("job_test_001")
assert row is not None
assert row.job_id == "job_test_001"
assert row.text == test_text
assert row.text_hash == text_hash
assert row.audio_hash == audio_hash
assert row.segments is not None
assert row.metadata["model"] == "whisper-large-v3"
assert row.created_at is not None

print(f"Retrieved: {row.job_id}")
print(f"Text: {row.text[:40]}...")
print(f"Audio hash: {row.audio_hash[:30]}...")
print(f"Text hash: {row.text_hash[:30]}...")
print(f"Created at: {row.created_at}")
Retrieved: job_test_001
Text: Laying Plans Sun Tzu said, The art of wa...
Audio hash: sha256:e3b0c44298e3b0c44298e3b...
Text hash: sha256:83efd1674de9fcf20e5c2ed...
Created at: 1770425259.7641876
# Missing job returns None
missing = storage.get_by_job_id("nonexistent")
assert missing is None
print("get_by_job_id returns None for missing job: OK")
get_by_job_id returns None for missing job: OK
# Save another and test list_jobs
storage.save(
    job_id="job_test_002",
    audio_path="/tmp/test_audio_2.mp3",
    audio_hash="sha256:" + "f" * 64,
    config_hash="sha256:" + "2" * 64,
    text="Second transcription.",
    text_hash=hash_bytes(b"Second transcription.")
)

jobs = storage.list_jobs()
assert len(jobs) == 2
# Newest first
assert jobs[0].job_id == "job_test_002"
assert jobs[1].job_id == "job_test_001"

print(f"list_jobs returned {len(jobs)} rows (newest first): {[j.job_id for j in jobs]}")
# get_cached: content-correct lookup by (audio_path, audio_hash, config_hash)
hit = storage.get_cached("/tmp/test_audio.mp3", audio_hash, cfg1)
assert hit is not None and hit.job_id == "job_test_001"
# Changed audio content -> miss
assert storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "9" * 64, cfg1) is None
# Different config -> miss
assert storage.get_cached("/tmp/test_audio.mp3", audio_hash, "sha256:" + "0" * 64) is None
print("TranscriptionStorage.get_cached hit + 2 miss cases: OK")
# INSERT OR REPLACE: re-transcribing the same (audio_path, config_hash) replaces the row.
tmp_db_repl = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
s2 = TranscriptionStorage(tmp_db_repl.name)
s2.save(job_id="r1", audio_path="/tmp/a.wav", audio_hash="sha256:" + "a" * 64,
        config_hash="sha256:" + "c" * 64, text="v1", text_hash=hash_bytes(b"v1"))
# Same audio_path + config, changed audio content -> replaces the stale row
s2.save(job_id="r2", audio_path="/tmp/a.wav", audio_hash="sha256:" + "b" * 64,
        config_hash="sha256:" + "c" * 64, text="v2", text_hash=hash_bytes(b"v2"))

rows = s2.list_jobs()
assert len(rows) == 1 and rows[0].job_id == "r2"   # one row, old r1 replaced
assert s2.get_cached("/tmp/a.wav", "sha256:" + "a" * 64, "sha256:" + "c" * 64) is None   # old content misses
assert s2.get_cached("/tmp/a.wav", "sha256:" + "b" * 64, "sha256:" + "c" * 64).job_id == "r2"
os.unlink(tmp_db_repl.name)

print("INSERT OR REPLACE on (audio_path, config_hash) — stale row replaced, no orphan: OK")
# Migration + de-dup: a pre-cache transcriptions table (no config_hash, append-only
# with duplicate-audio rows) gains config_hash on open AND de-dups so the UNIQUE cache
# index can build (keep newest per (audio_path, config_hash)).
tmp_db_mig = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
with sqlite3.connect(tmp_db_mig.name) as con:
    con.execute("""
        CREATE TABLE transcriptions (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            job_id TEXT UNIQUE NOT NULL,
            audio_path TEXT NOT NULL,
            audio_hash TEXT NOT NULL,
            text TEXT NOT NULL,
            text_hash TEXT NOT NULL,
            segments JSON,
            metadata JSON,
            created_at REAL NOT NULL
        )
    """)
    con.executemany(
        "INSERT INTO transcriptions (job_id, audio_path, audio_hash, text, text_hash, created_at) "
        "VALUES (?, ?, ?, ?, ?, ?)",
        [("dup_old", "/tmp/a.wav", "sha256:a1", "old", "sha256:t1", 0.0),
         ("dup_new", "/tmp/a.wav", "sha256:a2", "new", "sha256:t2", 1.0),
         ("uniq", "/tmp/b.wav", "sha256:b1", "other", "sha256:t3", 2.0)],
    )

s3 = TranscriptionStorage(tmp_db_mig.name)  # __init__ migrates + de-dups + builds the unique index
with sqlite3.connect(tmp_db_mig.name) as con:
    cols = {r[1] for r in con.execute("PRAGMA table_info(transcriptions)")}
assert "config_hash" in cols
# 'dup_old'/'dup_new' share (/tmp/a.wav, '') -> newest kept; 'uniq' distinct audio_path.
assert s3.get_by_job_id("dup_old") is None
assert s3.get_by_job_id("dup_new") is not None
assert s3.get_by_job_id("uniq") is not None
assert len(s3.list_jobs()) == 2

s3.save(job_id="new", audio_path="/tmp/new.wav", audio_hash="sha256:new",
        config_hash="sha256:cfg", text="new", text_hash=hash_bytes(b"new"))
assert s3.get_cached("/tmp/new.wav", "sha256:new", "sha256:cfg").job_id == "new"
os.unlink(tmp_db_mig.name)

print("Pre-cache schema migrated + de-duped (UNIQUE index built; newest row kept): OK")
# Test save_with_logging (Track 11 helper): success returns True + persists
test_logger = logging.getLogger("transcription_storage_test")

ok = storage.save_with_logging(
    job_id="job_test_swl",
    audio_path="/tmp/test_audio_swl.mp3",
    audio_hash="sha256:" + "c" * 64,
    config_hash="sha256:" + "3" * 64,
    text="Logged save.",
    text_hash=hash_bytes(b"Logged save."),
    logger=test_logger,
)
assert ok is True
assert storage.get_by_job_id("job_test_swl") is not None
print("save_with_logging success path: returned True, row persisted")

# Failure path: if save() raises, the helper logs + returns False (swallowed).
# (INSERT OR REPLACE means a duplicate no longer raises, so force an error directly.)
_orig_save = storage.save
def _boom(**kwargs):
    raise RuntimeError("simulated DB failure")
storage.save = _boom
try:
    ok_fail = storage.save_with_logging(
        job_id="job_fail",
        audio_path="/tmp/x.mp3",
        audio_hash="sha256:" + "d" * 64,
        config_hash="sha256:" + "4" * 64,
        text="x",
        text_hash=hash_bytes(b"x"),
        logger=test_logger,
    )
finally:
    storage.save = _orig_save
assert ok_fail is False
print("save_with_logging failure path: returned False, error swallowed")
# Test text verification
assert storage.verify_text("job_test_001") == True
print("verify_text with unchanged text: True")

# Tamper with text directly in DB
with sqlite3.connect(tmp_db.name) as con:
    con.execute("UPDATE transcriptions SET text = 'TAMPERED' WHERE job_id = 'job_test_001'")

assert storage.verify_text("job_test_001") == False
print("verify_text after tampering: False")

# Missing job returns None
assert storage.verify_text("nonexistent") is None
print("verify_text for missing job: None")
verify_text with unchanged text: True
verify_text after tampering: False
verify_text for missing job: None
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")
Cleanup complete