Media Storage

Standardized SQLite storage for media analysis and processing results with content hashing

MediaAnalysisRow

A dataclass representing a single row in the standardized analysis_jobs table.


MediaAnalysisRow


def MediaAnalysisRow(
    file_path:str, file_hash:str, config_hash:str, ranges:Optional=None, metadata:Optional=None,
    created_at:Optional=None
)->None:

A single row from the analysis_jobs table.

# Test MediaAnalysisRow creation
row = MediaAnalysisRow(
    file_path="/tmp/test.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "b" * 64,
    ranges=[{"start": 0.0, "end": 2.5, "label": "speech"}],
    metadata={"segment_count": 1}
)

print(f"Row: file_path={row.file_path}")
print(f"File hash: {row.file_hash[:20]}...")
print(f"Config hash: {row.config_hash[:20]}...")
Row: file_path=/tmp/test.mp3
File hash: sha256:aaaaaaaaaaaaa...
Config hash: sha256:bbbbbbbbbbbbb...

MediaAnalysisStorage

Standardized SQLite storage that all media analysis plugins should use. Defines the canonical schema for the analysis_jobs table with file hashing for traceability and config-based caching.

Schema:

CREATE TABLE IF NOT EXISTS analysis_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    file_path TEXT NOT NULL,
    file_hash TEXT NOT NULL,
    config_hash TEXT NOT NULL,
    ranges JSON,
    metadata JSON,
    created_at REAL NOT NULL,
    UNIQUE(file_path, config_hash)
);

The UNIQUE(file_path, config_hash) constraint enables result caching — re-running the same file with the same config replaces the previous result. Different configs for the same file are stored separately.


MediaAnalysisStorage


def MediaAnalysisStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for media analysis results.

Testing

import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = MediaAnalysisStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")
Storage initialized at: /tmp/tmpbdme_74n.db
# Save an analysis result
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "c" * 64,
    ranges=[
        {"start": 0.0, "end": 2.5, "label": "speech", "confidence": 0.98},
        {"start": 4.0, "end": 8.5, "label": "speech", "confidence": 0.95}
    ],
    metadata={"segment_count": 2, "total_speech": 7.0}
)

print("Saved analysis result")
Saved analysis result
# Retrieve cached result
cached = storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "c" * 64)
assert cached is not None
assert cached.file_path == "/tmp/test_audio.mp3"
assert len(cached.ranges) == 2
assert cached.metadata["segment_count"] == 2
assert cached.created_at is not None

print(f"Cached: {cached.file_path}")
print(f"Ranges: {len(cached.ranges)} segments")
print(f"File hash: {cached.file_hash[:20]}...")

# Missing config returns None
missing = storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "d" * 64)
assert missing is None
print("Cache miss for different config: OK")
Cached: /tmp/test_audio.mp3
Ranges: 2 segments
File hash: sha256:aaaaaaaaaaaaa...
Cache miss for different config: OK
# Save with same file+config replaces (upsert)
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "c" * 64,
    ranges=[{"start": 0.0, "end": 3.0, "label": "speech"}],
    metadata={"segment_count": 1, "total_speech": 3.0}
)

updated = storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "c" * 64)
assert len(updated.ranges) == 1  # Updated to 1 range
assert updated.metadata["segment_count"] == 1

# Only 1 row total (replaced, not appended)
all_jobs = storage.list_jobs()
assert len(all_jobs) == 1

print("Upsert replaced existing row: OK")
Upsert replaced existing row: OK
# Different config for same file creates separate row
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "e" * 64,  # Different config
    ranges=[{"start": 0.5, "end": 2.0, "label": "speech"}],
    metadata={"segment_count": 1}
)

all_jobs = storage.list_jobs()
assert len(all_jobs) == 2

print(f"Two configs for same file: {len(all_jobs)} rows")
Two configs for same file: 2 rows
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")
Cleanup complete

MediaProcessingRow

A dataclass representing a single row in the standardized processing_jobs table. Tracks input/output file pairs with hashes for full traceability of media transformations.


MediaProcessingRow


def MediaProcessingRow(
    job_id:str, action:str, input_path:str, input_hash:str, output_path:str, output_hash:str,
    parameters:Optional=None, metadata:Optional=None, created_at:Optional=None
)->None:

A single row from the processing_jobs table.

# Test MediaProcessingRow creation
proc_row = MediaProcessingRow(
    job_id="job_conv_001",
    action="convert",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    output_path="/tmp/output.mp4",
    output_hash="sha256:" + "b" * 64,
    parameters={"output_format": "mp4", "codec": "h264"}
)

print(f"Row: job_id={proc_row.job_id}, action={proc_row.action}")
print(f"Input: {proc_row.input_path} -> Output: {proc_row.output_path}")
Row: job_id=job_conv_001, action=convert
Input: /tmp/source.mkv -> Output: /tmp/output.mp4

MediaProcessingStorage

Standardized SQLite storage that all media processing plugins should use. Defines the canonical schema for the processing_jobs table, tracking input/output file pairs with content hashes.

Schema:

CREATE TABLE IF NOT EXISTS processing_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    action TEXT NOT NULL,
    input_path TEXT NOT NULL,
    input_hash TEXT NOT NULL,
    output_path TEXT NOT NULL,
    output_hash TEXT NOT NULL,
    parameters JSON,
    metadata JSON,
    created_at REAL NOT NULL
);

Both input_hash and output_hash use the self-describing "algo:hexdigest" format, enabling verification of both source integrity (“is this the same file we converted?”) and output integrity (“has the output been modified since conversion?”).


MediaProcessingStorage


def MediaProcessingStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for media processing results.

Testing MediaProcessingStorage

# Create processing storage with temp database
tmp_db2 = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
proc_storage = MediaProcessingStorage(tmp_db2.name)

print(f"Processing storage initialized at: {tmp_db2.name}")
Processing storage initialized at: /tmp/tmp6cy8gfmq.db
# Save a conversion job
proc_storage.save(
    job_id="job_conv_001",
    action="convert",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    output_path="/tmp/output.mp4",
    output_hash="sha256:" + "b" * 64,
    parameters={"output_format": "mp4", "codec": "h264"},
    metadata={"duration": 120.5}
)

print("Saved conversion job")
Saved conversion job
# Retrieve by job ID
row = proc_storage.get_by_job_id("job_conv_001")
assert row is not None
assert row.job_id == "job_conv_001"
assert row.action == "convert"
assert row.input_path == "/tmp/source.mkv"
assert row.output_path == "/tmp/output.mp4"
assert row.parameters["output_format"] == "mp4"
assert row.created_at is not None

print(f"Retrieved: {row.job_id} ({row.action})")
print(f"Input: {row.input_path} ({row.input_hash[:20]}...)")
print(f"Output: {row.output_path} ({row.output_hash[:20]}...)")

# Missing job returns None
assert proc_storage.get_by_job_id("nonexistent") is None
print("get_by_job_id returns None for missing job: OK")
Retrieved: job_conv_001 (convert)
Input: /tmp/source.mkv (sha256:aaaaaaaaaaaaa...)
Output: /tmp/output.mp4 (sha256:bbbbbbbbbbbbb...)
get_by_job_id returns None for missing job: OK
# Save an extract_segment job and test list_jobs
proc_storage.save(
    job_id="job_ext_001",
    action="extract_segment",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    output_path="/tmp/segment_10-20.wav",
    output_hash="sha256:" + "c" * 64,
    parameters={"start": 10.0, "end": 20.0}
)

jobs = proc_storage.list_jobs()
assert len(jobs) == 2
assert jobs[0].job_id == "job_ext_001"  # Newest first
assert jobs[0].action == "extract_segment"

print(f"list_jobs returned {len(jobs)} rows: {[(j.job_id, j.action) for j in jobs]}")
list_jobs returned 2 rows: [('job_ext_001', 'extract_segment'), ('job_conv_001', 'convert')]
# Cleanup
os.unlink(tmp_db2.name)
print("Processing storage cleanup complete")
Processing storage cleanup complete