Media Storage

Standardized SQLite storage for media analysis and processing results with content hashing

MediaAnalysisRow

A dataclass representing a single row in the standardized analysis_jobs table.


MediaAnalysisRow


def MediaAnalysisRow(
    file_path:str, file_hash:str, config_hash:str, ranges:Optional=None, metadata:Optional=None,
    created_at:Optional=None
)->None:

A single row from the analysis_jobs table.

# Test MediaAnalysisRow creation
row = MediaAnalysisRow(
    file_path="/tmp/test.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "b" * 64,
    ranges=[{"start": 0.0, "end": 2.5, "label": "speech"}],
    metadata={"segment_count": 1}
)

print(f"Row: file_path={row.file_path}")
print(f"File hash: {row.file_hash[:20]}...")
print(f"Config hash: {row.config_hash[:20]}...")
Row: file_path=/tmp/test.mp3
File hash: sha256:aaaaaaaaaaaaa...
Config hash: sha256:bbbbbbbbbbbbb...

MediaAnalysisStorage

Standardized SQLite storage that all media analysis plugins should use. Defines the canonical schema for the analysis_jobs table with file hashing for traceability and config-based caching.

Schema:

CREATE TABLE IF NOT EXISTS analysis_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    file_path TEXT NOT NULL,
    file_hash TEXT NOT NULL,
    config_hash TEXT NOT NULL,
    ranges JSON,
    metadata JSON,
    created_at REAL NOT NULL,
    UNIQUE(file_path, config_hash)
);

The UNIQUE(file_path, config_hash) constraint is the upsert key — re-running the same file with the same config replaces the previous result (one row per file+config; no stale-content rows accumulate). get_cached(file_path, file_hash, config_hash) additionally matches the content hash, so a changed file misses the cache (the stored row is for the old content) and the next save() replaces it. This content-correct lookup is what lets chained plugin sequences auto-invalidate downstream caches when an upstream output changes.


MediaAnalysisStorage


def MediaAnalysisStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for media analysis results.

Testing

import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = MediaAnalysisStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")
Storage initialized at: /tmp/tmp3nld11gc.db
# Save an analysis result
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "c" * 64,
    ranges=[
        {"start": 0.0, "end": 2.5, "label": "speech", "confidence": 0.98},
        {"start": 4.0, "end": 8.5, "label": "speech", "confidence": 0.95}
    ],
    metadata={"segment_count": 2, "total_speech": 7.0}
)

print("Saved analysis result")
Saved analysis result
# Retrieve cached result (content-correct: file_path + file_hash + config_hash)
fhash = "sha256:" + "a" * 64
chash = "sha256:" + "c" * 64
cached = storage.get_cached("/tmp/test_audio.mp3", fhash, chash)
assert cached is not None
assert cached.file_path == "/tmp/test_audio.mp3"
assert len(cached.ranges) == 2
assert cached.metadata["segment_count"] == 2
assert cached.created_at is not None

print(f"Cached: {cached.file_path}")
print(f"Ranges: {len(cached.ranges)} segments")
print(f"File hash: {cached.file_hash[:20]}...")

# Different config returns None
assert storage.get_cached("/tmp/test_audio.mp3", fhash, "sha256:" + "d" * 64) is None
print("Cache miss for different config: OK")

# Changed file content (different file_hash) misses even at the same path+config
assert storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "z" * 64, chash) is None
print("Cache miss for changed file content: OK")
# Save with same file+config replaces (upsert by file_path + config_hash)
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "c" * 64,
    ranges=[{"start": 0.0, "end": 3.0, "label": "speech"}],
    metadata={"segment_count": 1, "total_speech": 3.0}
)

updated = storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "a" * 64, "sha256:" + "c" * 64)
assert len(updated.ranges) == 1  # Updated to 1 range
assert updated.metadata["segment_count"] == 1

# Only 1 row total (replaced, not appended)
all_jobs = storage.list_jobs()
assert len(all_jobs) == 1

print("Upsert replaced existing row: OK")
# Different config for same file creates separate row
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "e" * 64,  # Different config
    ranges=[{"start": 0.5, "end": 2.0, "label": "speech"}],
    metadata={"segment_count": 1}
)

all_jobs = storage.list_jobs()
assert len(all_jobs) == 2

print(f"Two configs for same file: {len(all_jobs)} rows")
Two configs for same file: 2 rows
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")
Cleanup complete

MediaProcessingRow

A dataclass representing a single row in the standardized processing_jobs table. Tracks input/output file pairs with hashes for full traceability of media transformations.


MediaProcessingRow


def MediaProcessingRow(
    job_id:str, action:str, input_path:str, input_hash:str, config_hash:str, output_path:str, output_hash:str,
    parameters:Optional=None, metadata:Optional=None, created_at:Optional=None
)->None:

A single row from the processing_jobs table.

# Test MediaProcessingRow creation
proc_row = MediaProcessingRow(
    job_id="job_conv_001",
    action="convert",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "f" * 64,
    output_path="/tmp/output.mp4",
    output_hash="sha256:" + "b" * 64,
    parameters={"output_format": "mp4", "codec": "h264"}
)

print(f"Row: job_id={proc_row.job_id}, action={proc_row.action}")
print(f"Input: {proc_row.input_path} -> Output: {proc_row.output_path}")

MediaProcessingStorage

Standardized SQLite storage that all media processing plugins should use. Defines the canonical schema for the processing_jobs table, tracking input/output file pairs with content hashes plus config-based caching.

Schema:

CREATE TABLE IF NOT EXISTS processing_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    action TEXT NOT NULL,
    input_path TEXT NOT NULL,
    input_hash TEXT NOT NULL,
    config_hash TEXT NOT NULL DEFAULT '',
    output_path TEXT NOT NULL,
    output_hash TEXT NOT NULL,
    parameters JSON,
    metadata JSON,
    created_at REAL NOT NULL
);

CREATE UNIQUE INDEX idx_processing_cache
    ON processing_jobs(action, input_path, config_hash);

The (action, input_path, config_hash) unique index is the cache/upsert key — re-running the same action on the same input with the same config replaces the previous row via INSERT OR REPLACE (no stale-content rows accumulate). get_cached(action, input_path, input_hash, config_hash) additionally matches the input content hash, so a changed input file misses the cache and the next save() replaces the stale row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.

Both input_hash and output_hash use the self-describing "algo:hexdigest" format, enabling verification of source integrity (“is this the same file we processed?”) and output integrity (“has the output been modified since?”).


MediaProcessingStorage


def MediaProcessingStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for media processing results.

Testing MediaProcessingStorage

# Create processing storage with temp database
tmp_db2 = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
proc_storage = MediaProcessingStorage(tmp_db2.name)

print(f"Processing storage initialized at: {tmp_db2.name}")
Processing storage initialized at: /tmp/tmp6cy8gfmq.db
# Save a conversion job
proc_storage.save(
    job_id="job_conv_001",
    action="convert",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "f" * 64,
    output_path="/tmp/output.mp4",
    output_hash="sha256:" + "b" * 64,
    parameters={"output_format": "mp4", "codec": "h264"},
    metadata={"duration": 120.5}
)

print("Saved conversion job")
# Retrieve by job ID
row = proc_storage.get_by_job_id("job_conv_001")
assert row is not None
assert row.job_id == "job_conv_001"
assert row.action == "convert"
assert row.input_path == "/tmp/source.mkv"
assert row.config_hash == "sha256:" + "f" * 64
assert row.output_path == "/tmp/output.mp4"
assert row.parameters["output_format"] == "mp4"
assert row.created_at is not None

print(f"Retrieved: {row.job_id} ({row.action})")
print(f"Input: {row.input_path} ({row.input_hash[:20]}...)")
print(f"Output: {row.output_path} ({row.output_hash[:20]}...)")

# Missing job returns None
assert proc_storage.get_by_job_id("nonexistent") is None
print("get_by_job_id returns None for missing job: OK")
# Save an extract_segment job and test list_jobs
proc_storage.save(
    job_id="job_ext_001",
    action="extract_segment",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "g" * 64,
    output_path="/tmp/segment_10-20.wav",
    output_hash="sha256:" + "c" * 64,
    parameters={"start": 10.0, "end": 20.0}
)

jobs = proc_storage.list_jobs()
assert len(jobs) == 2
assert jobs[0].job_id == "job_ext_001"  # Newest first
assert jobs[0].action == "extract_segment"

print(f"list_jobs returned {len(jobs)} rows: {[(j.job_id, j.action) for j in jobs]}")
# get_cached: content-correct lookup by (action, input_path, input_hash, config_hash)
hit = proc_storage.get_cached(
    "convert", "/tmp/source.mkv", "sha256:" + "a" * 64, "sha256:" + "f" * 64
)
assert hit is not None and hit.job_id == "job_conv_001"

# Wrong config -> miss
assert proc_storage.get_cached(
    "convert", "/tmp/source.mkv", "sha256:" + "a" * 64, "sha256:" + "0" * 64
) is None
# Changed input content -> miss (even at same action+input_path+config)
assert proc_storage.get_cached(
    "convert", "/tmp/source.mkv", "sha256:" + "9" * 64, "sha256:" + "f" * 64
) is None
# Wrong action -> miss
assert proc_storage.get_cached(
    "extract_segment", "/tmp/source.mkv", "sha256:" + "a" * 64, "sha256:" + "f" * 64
) is None

print("MediaProcessingStorage.get_cached hit + 3 miss cases: OK")
# INSERT OR REPLACE: re-running the same (action, input_path, config_hash) replaces the row.
proc_storage.save(
    job_id="job_conv_002",                       # new job id
    action="convert",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "9" * 64,             # input file changed
    config_hash="sha256:" + "f" * 64,            # same config as job_conv_001
    output_path="/tmp/output_v2.mp4",
    output_hash="sha256:" + "e" * 64,
    parameters={"output_format": "mp4", "codec": "h264"},
)

# Still exactly one row for (convert, /tmp/source.mkv, f-config) — old job_conv_001 replaced.
convert_rows = [j for j in proc_storage.list_jobs(limit=1000)
                if j.action == "convert" and j.input_path == "/tmp/source.mkv"
                and j.config_hash == "sha256:" + "f" * 64]
assert len(convert_rows) == 1
assert convert_rows[0].job_id == "job_conv_002"

# Old content now misses; new content hits.
assert proc_storage.get_cached("convert", "/tmp/source.mkv", "sha256:" + "a" * 64, "sha256:" + "f" * 64) is None
assert proc_storage.get_cached("convert", "/tmp/source.mkv", "sha256:" + "9" * 64, "sha256:" + "f" * 64).job_id == "job_conv_002"

print("INSERT OR REPLACE on (action, input_path, config_hash) — stale row replaced, no orphan: OK")
# save_with_logging returns True on success (both storage classes)
_logger = logging.getLogger("storage-test")

ok = proc_storage.save_with_logging(
    job_id="job_swl_001", action="convert",
    input_path="/tmp/swl.mkv", input_hash="sha256:" + "1" * 64,
    config_hash="sha256:" + "2" * 64,
    output_path="/tmp/swl.mp4", output_hash="sha256:" + "3" * 64,
    logger=_logger,
)
assert ok is True
assert proc_storage.get_by_job_id("job_swl_001") is not None

tmp_db3 = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
ana_storage = MediaAnalysisStorage(tmp_db3.name)
ok2 = ana_storage.save_with_logging(
    file_path="/tmp/a.wav", file_hash="sha256:" + "4" * 64, config_hash="sha256:" + "5" * 64,
    ranges=[{"start": 0.0, "end": 1.0}], logger=_logger,
)
assert ok2 is True
assert ana_storage.get_cached("/tmp/a.wav", "sha256:" + "4" * 64, "sha256:" + "5" * 64) is not None
os.unlink(tmp_db3.name)

print("save_with_logging (processing + analysis) returns True on success: OK")
# Migration + de-dup: a pre-cache processing_jobs table (no config_hash, append-only
# with duplicate rows) gains config_hash on open AND de-dups so the UNIQUE cache index
# can build (keep newest per (action, input_path, config_hash)).
tmp_db4 = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
with sqlite3.connect(tmp_db4.name) as con:
    con.execute("""
        CREATE TABLE processing_jobs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            job_id TEXT UNIQUE NOT NULL,
            action TEXT NOT NULL,
            input_path TEXT NOT NULL,
            input_hash TEXT NOT NULL,
            output_path TEXT NOT NULL,
            output_hash TEXT NOT NULL,
            parameters JSON,
            metadata JSON,
            created_at REAL NOT NULL
        )
    """)
    con.executemany(
        "INSERT INTO processing_jobs (job_id, action, input_path, input_hash, output_path, output_hash, created_at) "
        "VALUES (?, ?, ?, ?, ?, ?, ?)",
        [("dup_old", "convert", "/tmp/old.mkv", "sha256:o1", "/tmp/old1.mp4", "sha256:oo1", 0.0),
         ("dup_new", "convert", "/tmp/old.mkv", "sha256:o2", "/tmp/old2.mp4", "sha256:oo2", 1.0),
         ("uniq", "extract_segment", "/tmp/old.mkv", "sha256:o1", "/tmp/seg.wav", "sha256:os", 2.0)],
    )

migrated = MediaProcessingStorage(tmp_db4.name)  # __init__ migrates + de-dups + builds the unique index
with sqlite3.connect(tmp_db4.name) as con:
    cols = {r[1] for r in con.execute("PRAGMA table_info(processing_jobs)")}
assert "config_hash" in cols
# 'dup_old'/'dup_new' share (convert, /tmp/old.mkv, '') -> newest kept; 'uniq' distinct (action differs).
assert migrated.get_by_job_id("dup_old") is None
assert migrated.get_by_job_id("dup_new") is not None
assert migrated.get_by_job_id("uniq") is not None
assert len(migrated.list_jobs()) == 2

# A real save + get_cached works post-migration
migrated.save(job_id="new_job", action="convert", input_path="/tmp/new.mkv",
              input_hash="sha256:new", config_hash="sha256:cfg",
              output_path="/tmp/new.mp4", output_hash="sha256:newout")
assert migrated.get_cached("convert", "/tmp/new.mkv", "sha256:new", "sha256:cfg").job_id == "new_job"
os.unlink(tmp_db4.name)

print("Pre-cache schema migrated + de-duped (UNIQUE index built; newest row kept): OK")
# Cleanup
os.unlink(tmp_db2.name)
print("Processing storage cleanup complete")
Processing storage cleanup complete