Standardized SQLite storage that all media analysis plugins should use. Defines the canonical schema for the analysis_jobs table with file hashing for traceability and config-based caching.
Schema:
CREATETABLEIFNOTEXISTS analysis_jobs (idINTEGERPRIMARYKEY AUTOINCREMENT, file_path TEXT NOTNULL, file_hash TEXT NOTNULL, config_hash TEXT NOTNULL, ranges JSON, metadata JSON, created_at REALNOTNULL,UNIQUE(file_path, config_hash));
The UNIQUE(file_path, config_hash) constraint is the upsert key — re-running the same file with the same config replaces the previous result (one row per file+config; no stale-content rows accumulate). get_cached(file_path, file_hash, config_hash) additionally matches the content hash, so a changed file misses the cache (the stored row is for the old content) and the next save() replaces it. This content-correct lookup is what lets chained plugin sequences auto-invalidate downstream caches when an upstream output changes.
MediaAnalysisStorage
def MediaAnalysisStorage( db_path:str, # Absolute path to the SQLite database file):
Standardized SQLite storage for media analysis results.
# Retrieve cached result (content-correct: file_path + file_hash + config_hash)fhash ="sha256:"+"a"*64chash ="sha256:"+"c"*64cached = storage.get_cached("/tmp/test_audio.mp3", fhash, chash)assert cached isnotNoneassert cached.file_path =="/tmp/test_audio.mp3"assertlen(cached.ranges) ==2assert cached.metadata["segment_count"] ==2assert cached.created_at isnotNoneprint(f"Cached: {cached.file_path}")print(f"Ranges: {len(cached.ranges)} segments")print(f"File hash: {cached.file_hash[:20]}...")# Different config returns Noneassert storage.get_cached("/tmp/test_audio.mp3", fhash, "sha256:"+"d"*64) isNoneprint("Cache miss for different config: OK")# Changed file content (different file_hash) misses even at the same path+configassert storage.get_cached("/tmp/test_audio.mp3", "sha256:"+"z"*64, chash) isNoneprint("Cache miss for changed file content: OK")
# Save with same file+config replaces (upsert by file_path + config_hash)storage.save( file_path="/tmp/test_audio.mp3", file_hash="sha256:"+"a"*64, config_hash="sha256:"+"c"*64, ranges=[{"start": 0.0, "end": 3.0, "label": "speech"}], metadata={"segment_count": 1, "total_speech": 3.0})updated = storage.get_cached("/tmp/test_audio.mp3", "sha256:"+"a"*64, "sha256:"+"c"*64)assertlen(updated.ranges) ==1# Updated to 1 rangeassert updated.metadata["segment_count"] ==1# Only 1 row total (replaced, not appended)all_jobs = storage.list_jobs()assertlen(all_jobs) ==1print("Upsert replaced existing row: OK")
# Different config for same file creates separate rowstorage.save( file_path="/tmp/test_audio.mp3", file_hash="sha256:"+"a"*64, config_hash="sha256:"+"e"*64, # Different config ranges=[{"start": 0.5, "end": 2.0, "label": "speech"}], metadata={"segment_count": 1})all_jobs = storage.list_jobs()assertlen(all_jobs) ==2print(f"Two configs for same file: {len(all_jobs)} rows")
A dataclass representing a single row in the standardized processing_jobs table. Tracks input/output file pairs with hashes for full traceability of media transformations.
Standardized SQLite storage that all media processing plugins should use. Defines the canonical schema for the processing_jobs table, tracking input/output file pairs with content hashes plus config-based caching.
Schema:
CREATETABLEIFNOTEXISTS processing_jobs (idINTEGERPRIMARYKEY AUTOINCREMENT, job_id TEXT UNIQUENOTNULL, action TEXT NOTNULL, input_path TEXT NOTNULL, input_hash TEXT NOTNULL, config_hash TEXT NOTNULLDEFAULT'', output_path TEXT NOTNULL, output_hash TEXT NOTNULL,parameters JSON, metadata JSON, created_at REALNOTNULL);CREATEUNIQUEINDEX idx_processing_cacheON processing_jobs(action, input_path, config_hash);
The (action, input_path, config_hash) unique index is the cache/upsert key — re-running the same action on the same input with the same config replaces the previous row via INSERT OR REPLACE (no stale-content rows accumulate). get_cached(action, input_path, input_hash, config_hash) additionally matches the input content hash, so a changed input file misses the cache and the next save() replaces the stale row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.
Both input_hash and output_hash use the self-describing "algo:hexdigest" format, enabling verification of source integrity (“is this the same file we processed?”) and output integrity (“has the output been modified since?”).
MediaProcessingStorage
def MediaProcessingStorage( db_path:str, # Absolute path to the SQLite database file):
Standardized SQLite storage for media processing results.
# Save an extract_segment job and test list_jobsproc_storage.save( job_id="job_ext_001", action="extract_segment", input_path="/tmp/source.mkv", input_hash="sha256:"+"a"*64, config_hash="sha256:"+"g"*64, output_path="/tmp/segment_10-20.wav", output_hash="sha256:"+"c"*64, parameters={"start": 10.0, "end": 20.0})jobs = proc_storage.list_jobs()assertlen(jobs) ==2assert jobs[0].job_id =="job_ext_001"# Newest firstassert jobs[0].action =="extract_segment"print(f"list_jobs returned {len(jobs)} rows: {[(j.job_id, j.action) for j in jobs]}")
# get_cached: content-correct lookup by (action, input_path, input_hash, config_hash)hit = proc_storage.get_cached("convert", "/tmp/source.mkv", "sha256:"+"a"*64, "sha256:"+"f"*64)assert hit isnotNoneand hit.job_id =="job_conv_001"# Wrong config -> missassert proc_storage.get_cached("convert", "/tmp/source.mkv", "sha256:"+"a"*64, "sha256:"+"0"*64) isNone# Changed input content -> miss (even at same action+input_path+config)assert proc_storage.get_cached("convert", "/tmp/source.mkv", "sha256:"+"9"*64, "sha256:"+"f"*64) isNone# Wrong action -> missassert proc_storage.get_cached("extract_segment", "/tmp/source.mkv", "sha256:"+"a"*64, "sha256:"+"f"*64) isNoneprint("MediaProcessingStorage.get_cached hit + 3 miss cases: OK")
# INSERT OR REPLACE: re-running the same (action, input_path, config_hash) replaces the row.proc_storage.save( job_id="job_conv_002", # new job id action="convert", input_path="/tmp/source.mkv", input_hash="sha256:"+"9"*64, # input file changed config_hash="sha256:"+"f"*64, # same config as job_conv_001 output_path="/tmp/output_v2.mp4", output_hash="sha256:"+"e"*64, parameters={"output_format": "mp4", "codec": "h264"},)# Still exactly one row for (convert, /tmp/source.mkv, f-config) — old job_conv_001 replaced.convert_rows = [j for j in proc_storage.list_jobs(limit=1000)if j.action =="convert"and j.input_path =="/tmp/source.mkv"and j.config_hash =="sha256:"+"f"*64]assertlen(convert_rows) ==1assert convert_rows[0].job_id =="job_conv_002"# Old content now misses; new content hits.assert proc_storage.get_cached("convert", "/tmp/source.mkv", "sha256:"+"a"*64, "sha256:"+"f"*64) isNoneassert proc_storage.get_cached("convert", "/tmp/source.mkv", "sha256:"+"9"*64, "sha256:"+"f"*64).job_id =="job_conv_002"print("INSERT OR REPLACE on (action, input_path, config_hash) — stale row replaced, no orphan: OK")