# Media Storage


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## MediaAnalysisRow

A dataclass representing a single row in the standardized
`analysis_jobs` table.

------------------------------------------------------------------------

### MediaAnalysisRow

``` python

def MediaAnalysisRow(
    file_path:str, file_hash:str, config_hash:str, ranges:Optional=None, metadata:Optional=None,
    created_at:Optional=None
)->None:

```

*A single row from the analysis_jobs table.*

``` python
# Test MediaAnalysisRow creation
row = MediaAnalysisRow(
    file_path="/tmp/test.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "b" * 64,
    ranges=[{"start": 0.0, "end": 2.5, "label": "speech"}],
    metadata={"segment_count": 1}
)

print(f"Row: file_path={row.file_path}")
print(f"File hash: {row.file_hash[:20]}...")
print(f"Config hash: {row.config_hash[:20]}...")
```

    Row: file_path=/tmp/test.mp3
    File hash: sha256:aaaaaaaaaaaaa...
    Config hash: sha256:bbbbbbbbbbbbb...

## MediaAnalysisStorage

Standardized SQLite storage that all media analysis plugins should use.
Defines the canonical schema for the `analysis_jobs` table with file
hashing for traceability and config-based caching.

**Schema:**

``` sql
CREATE TABLE IF NOT EXISTS analysis_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    file_path TEXT NOT NULL,
    file_hash TEXT NOT NULL,
    config_hash TEXT NOT NULL,
    ranges JSON,
    metadata JSON,
    created_at REAL NOT NULL,
    UNIQUE(file_path, config_hash)
);
```

The `UNIQUE(file_path, config_hash)` constraint enables result caching —
re-running the same file with the same config replaces the previous
result. Different configs for the same file are stored separately.

------------------------------------------------------------------------

### MediaAnalysisStorage

``` python

def MediaAnalysisStorage(
    db_path:str, # Absolute path to the SQLite database file
):

```

*Standardized SQLite storage for media analysis results.*

## Testing

``` python
import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = MediaAnalysisStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")
```

    Storage initialized at: /tmp/tmpbdme_74n.db

``` python
# Save an analysis result
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "c" * 64,
    ranges=[
        {"start": 0.0, "end": 2.5, "label": "speech", "confidence": 0.98},
        {"start": 4.0, "end": 8.5, "label": "speech", "confidence": 0.95}
    ],
    metadata={"segment_count": 2, "total_speech": 7.0}
)

print("Saved analysis result")
```

    Saved analysis result

``` python
# Retrieve cached result
cached = storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "c" * 64)
assert cached is not None
assert cached.file_path == "/tmp/test_audio.mp3"
assert len(cached.ranges) == 2
assert cached.metadata["segment_count"] == 2
assert cached.created_at is not None

print(f"Cached: {cached.file_path}")
print(f"Ranges: {len(cached.ranges)} segments")
print(f"File hash: {cached.file_hash[:20]}...")

# Missing config returns None
missing = storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "d" * 64)
assert missing is None
print("Cache miss for different config: OK")
```

    Cached: /tmp/test_audio.mp3
    Ranges: 2 segments
    File hash: sha256:aaaaaaaaaaaaa...
    Cache miss for different config: OK

``` python
# Save with same file+config replaces (upsert)
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "c" * 64,
    ranges=[{"start": 0.0, "end": 3.0, "label": "speech"}],
    metadata={"segment_count": 1, "total_speech": 3.0}
)

updated = storage.get_cached("/tmp/test_audio.mp3", "sha256:" + "c" * 64)
assert len(updated.ranges) == 1  # Updated to 1 range
assert updated.metadata["segment_count"] == 1

# Only 1 row total (replaced, not appended)
all_jobs = storage.list_jobs()
assert len(all_jobs) == 1

print("Upsert replaced existing row: OK")
```

    Upsert replaced existing row: OK

``` python
# Different config for same file creates separate row
storage.save(
    file_path="/tmp/test_audio.mp3",
    file_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "e" * 64,  # Different config
    ranges=[{"start": 0.5, "end": 2.0, "label": "speech"}],
    metadata={"segment_count": 1}
)

all_jobs = storage.list_jobs()
assert len(all_jobs) == 2

print(f"Two configs for same file: {len(all_jobs)} rows")
```

    Two configs for same file: 2 rows

``` python
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")
```

    Cleanup complete

## MediaProcessingRow

A dataclass representing a single row in the standardized
`processing_jobs` table. Tracks input/output file pairs with hashes for
full traceability of media transformations.

------------------------------------------------------------------------

### MediaProcessingRow

``` python

def MediaProcessingRow(
    job_id:str, action:str, input_path:str, input_hash:str, output_path:str, output_hash:str,
    parameters:Optional=None, metadata:Optional=None, created_at:Optional=None
)->None:

```

*A single row from the processing_jobs table.*

``` python
# Test MediaProcessingRow creation
proc_row = MediaProcessingRow(
    job_id="job_conv_001",
    action="convert",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    output_path="/tmp/output.mp4",
    output_hash="sha256:" + "b" * 64,
    parameters={"output_format": "mp4", "codec": "h264"}
)

print(f"Row: job_id={proc_row.job_id}, action={proc_row.action}")
print(f"Input: {proc_row.input_path} -> Output: {proc_row.output_path}")
```

    Row: job_id=job_conv_001, action=convert
    Input: /tmp/source.mkv -> Output: /tmp/output.mp4

## MediaProcessingStorage

Standardized SQLite storage that all media processing plugins should
use. Defines the canonical schema for the `processing_jobs` table,
tracking input/output file pairs with content hashes.

**Schema:**

``` sql
CREATE TABLE IF NOT EXISTS processing_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    action TEXT NOT NULL,
    input_path TEXT NOT NULL,
    input_hash TEXT NOT NULL,
    output_path TEXT NOT NULL,
    output_hash TEXT NOT NULL,
    parameters JSON,
    metadata JSON,
    created_at REAL NOT NULL
);
```

Both `input_hash` and `output_hash` use the self-describing
`"algo:hexdigest"` format, enabling verification of both source
integrity (“is this the same file we converted?”) and output integrity
(“has the output been modified since conversion?”).

------------------------------------------------------------------------

### MediaProcessingStorage

``` python

def MediaProcessingStorage(
    db_path:str, # Absolute path to the SQLite database file
):

```

*Standardized SQLite storage for media processing results.*

### Testing MediaProcessingStorage

``` python
# Create processing storage with temp database
tmp_db2 = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
proc_storage = MediaProcessingStorage(tmp_db2.name)

print(f"Processing storage initialized at: {tmp_db2.name}")
```

    Processing storage initialized at: /tmp/tmp6cy8gfmq.db

``` python
# Save a conversion job
proc_storage.save(
    job_id="job_conv_001",
    action="convert",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    output_path="/tmp/output.mp4",
    output_hash="sha256:" + "b" * 64,
    parameters={"output_format": "mp4", "codec": "h264"},
    metadata={"duration": 120.5}
)

print("Saved conversion job")
```

    Saved conversion job

``` python
# Retrieve by job ID
row = proc_storage.get_by_job_id("job_conv_001")
assert row is not None
assert row.job_id == "job_conv_001"
assert row.action == "convert"
assert row.input_path == "/tmp/source.mkv"
assert row.output_path == "/tmp/output.mp4"
assert row.parameters["output_format"] == "mp4"
assert row.created_at is not None

print(f"Retrieved: {row.job_id} ({row.action})")
print(f"Input: {row.input_path} ({row.input_hash[:20]}...)")
print(f"Output: {row.output_path} ({row.output_hash[:20]}...)")

# Missing job returns None
assert proc_storage.get_by_job_id("nonexistent") is None
print("get_by_job_id returns None for missing job: OK")
```

    Retrieved: job_conv_001 (convert)
    Input: /tmp/source.mkv (sha256:aaaaaaaaaaaaa...)
    Output: /tmp/output.mp4 (sha256:bbbbbbbbbbbbb...)
    get_by_job_id returns None for missing job: OK

``` python
# Save an extract_segment job and test list_jobs
proc_storage.save(
    job_id="job_ext_001",
    action="extract_segment",
    input_path="/tmp/source.mkv",
    input_hash="sha256:" + "a" * 64,
    output_path="/tmp/segment_10-20.wav",
    output_hash="sha256:" + "c" * 64,
    parameters={"start": 10.0, "end": 20.0}
)

jobs = proc_storage.list_jobs()
assert len(jobs) == 2
assert jobs[0].job_id == "job_ext_001"  # Newest first
assert jobs[0].action == "extract_segment"

print(f"list_jobs returned {len(jobs)} rows: {[(j.job_id, j.action) for j in jobs]}")
```

    list_jobs returned 2 rows: [('job_ext_001', 'extract_segment'), ('job_conv_001', 'convert')]

``` python
# Cleanup
os.unlink(tmp_db2.name)
print("Processing storage cleanup complete")
```

    Processing storage cleanup complete
