cjm-media-plugin-system

Defines standardized interfaces and data structures for media analysis (VAD, Scene Detection) and processing (FFmpeg, Conversion) plugins within the cjm-plugin-system ecosystem.

Install

pip install cjm_media_plugin_system

Project Structure

nbs/
├── analysis_interface.ipynb   # Domain-specific plugin interface for media analysis (read-only / signal extraction)
├── core.ipynb                 # DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer
├── processing_interface.ipynb # Domain-specific plugin interface for media processing (write / file manipulation)
└── storage.ipynb              # Standardized SQLite storage for media analysis and processing results with content hashing

Total: 4 notebooks

Module Dependencies

graph LR
    analysis_interface[analysis_interface<br/>Media Analysis Plugin Interface]
    core[core<br/>Core Data Structures]
    processing_interface[processing_interface<br/>Media Processing Plugin Interface]
    storage[storage<br/>Media Storage]

    analysis_interface --> core
    processing_interface --> core

2 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Media Analysis Plugin Interface (analysis_interface.ipynb)

Domain-specific plugin interface for media analysis (read-only / signal extraction)

Import

from cjm_media_plugin_system.analysis_interface import (
    MediaAnalysisPlugin
)

Classes

class MediaAnalysisPlugin(PluginInterface):
    """
    Abstract base class for plugins that analyze media files.
    
    Analysis plugins perform read-only operations that extract temporal segments
    from media files (VAD, scene detection, beat detection, etc.).
    """
    
    def execute(
            self,
            media_path: Union[str, Path],  # Path to media file to analyze
            **kwargs
        ) -> MediaAnalysisResult:  # Analysis result with detected TimeRanges
        "Analyze the media file and return detected temporal segments."

Core Data Structures (core.ipynb)

DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer

Import

from cjm_media_plugin_system.core import (
    TimeRange,
    MediaMetadata,
    MediaAnalysisResult
)

Classes

@dataclass
class TimeRange:
    "Represents a temporal segment within a media file."
    
    start: float  # Start time in seconds
    end: float  # End time in seconds
    label: str = 'segment'  # Segment type (e.g., 'speech', 'silence', 'scene')
    confidence: Optional[float]  # Detection confidence (0.0 to 1.0)
    payload: Dict[str, Any] = field(...)  # Extra data (e.g., speaker embedding)
    
    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        "Convert to dictionary for JSON serialization."
@dataclass
class MediaMetadata:
    "Container for media file metadata."
    
    path: str  # File path
    duration: float  # Duration in seconds
    format: str  # Container format (e.g., 'mp4', 'mkv')
    size_bytes: int  # File size in bytes
    video_streams: List[Dict[str, Any]] = field(...)  # Video stream info
    audio_streams: List[Dict[str, Any]] = field(...)  # Audio stream info
    
    def to_dict(self) -> Dict[str, Any]:  # Serialized representation
        "Convert to dictionary for JSON serialization."
@dataclass
class MediaAnalysisResult:
    "Standard output for media analysis plugins."
    
    ranges: List[TimeRange]  # Detected temporal segments
    metadata: Dict[str, Any] = field(...)  # Global analysis stats
    
    def to_temp_file(self) -> str:  # Absolute path to temporary JSON file
            """Save results to a temp JSON file for zero-copy transfer."""
            tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
            
            data = {
                "ranges": [r.to_dict() for r in self.ranges],
        "Save results to a temp JSON file for zero-copy transfer."
    
    def from_file(
            cls,
            filepath: str  # Path to JSON file
        ) -> "MediaAnalysisResult":  # Loaded result instance
        "Load results from a JSON file."

Media Processing Plugin Interface (processing_interface.ipynb)

Domain-specific plugin interface for media processing (write / file manipulation)

Import

from cjm_media_plugin_system.processing_interface import (
    MediaProcessingPlugin
)

Classes

class MediaProcessingPlugin(PluginInterface):
    """
    Abstract base class for plugins that modify, convert, or extract media.
    
    Processing plugins perform write operations that produce new files
    (format conversion, segment extraction, re-encoding, etc.).
    """
    
    def execute(
            self,
            action: str = "get_info",  # Operation: 'get_info', 'convert', 'extract_segment'
            **kwargs
        ) -> Dict[str, Any]:  # JSON-serializable result (usually containing 'output_path')
        "Execute a media processing operation."
    
    def get_info(
            self,
            file_path: Union[str, Path]  # Path to media file
        ) -> MediaMetadata:  # File metadata (duration, codec, streams)
        "Get metadata for a media file."
    
    def convert(
            self,
            input_path: Union[str, Path],  # Source file path
            output_format: str,            # Target format (e.g., 'mp4', 'wav')
            **kwargs
        ) -> str:  # Path to converted file
        "Convert media to a different format."
    
    def extract_segment(
            self,
            input_path: Union[str, Path],      # Source file path
            start: float,                       # Start time in seconds
            end: float,                         # End time in seconds
            output_path: Optional[str] = None   # Custom output path (auto-generated if None)
        ) -> str:  # Path to extracted segment file
        "Extract a temporal segment from a media file."

Media Storage (storage.ipynb)

Standardized SQLite storage for media analysis and processing results with content hashing

Import

from cjm_media_plugin_system.storage import (
    MediaAnalysisRow,
    MediaAnalysisStorage,
    MediaProcessingRow,
    MediaProcessingStorage
)

Classes

@dataclass
class MediaAnalysisRow:
    "A single row from the analysis_jobs table."
    
    file_path: str  # Path to the analyzed media file
    file_hash: str  # Hash of source file in "algo:hexdigest" format
    config_hash: str  # Hash of the analysis config used
    ranges: Optional[List[Dict[str, Any]]]  # Detected temporal segments
    metadata: Optional[Dict[str, Any]]  # Analysis metadata
    created_at: Optional[float]  # Unix timestamp
class MediaAnalysisStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media analysis results."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage and create table if needed."
    
    def save(
            self,
            file_path: str,     # Path to the analyzed media file
            file_hash: str,     # Hash of source file in "algo:hexdigest" format
            config_hash: str,   # Hash of the analysis config
            ranges: Optional[List[Dict[str, Any]]] = None,  # Detected temporal segments
            metadata: Optional[Dict[str, Any]] = None        # Analysis metadata
        ) -> None
        "Save or replace an analysis result (upsert by file_path + config_hash)."
    
    def get_cached(
            self,
            file_path: str,   # Path to the media file
            config_hash: str  # Config hash to match
        ) -> Optional[MediaAnalysisRow]:  # Cached row or None
        "Retrieve a cached analysis result by file path and config hash."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaAnalysisRow]:  # List of analysis rows
        "List analysis jobs ordered by creation time (newest first)."
    
    def verify_file(
            self,
            file_path: str,   # Path to the media file
            config_hash: str  # Config hash to look up
        ) -> Optional[bool]:  # True if file matches, False if changed, None if not found
        "Verify the source media file still matches its stored hash."
@dataclass
class MediaProcessingRow:
    "A single row from the processing_jobs table."
    
    job_id: str  # Unique job identifier
    action: str  # Operation performed: 'convert', 'extract_segment', etc.
    input_path: str  # Path to the source media file
    input_hash: str  # Hash of source file in "algo:hexdigest" format
    output_path: str  # Path to the produced output file
    output_hash: str  # Hash of output file in "algo:hexdigest" format
    parameters: Optional[Dict[str, Any]]  # Action-specific parameters
    metadata: Optional[Dict[str, Any]]  # Processing metadata
    created_at: Optional[float]  # Unix timestamp
class MediaProcessingStorage:
    def __init__(
        self,
        db_path: str  # Absolute path to the SQLite database file
    )
    "Standardized SQLite storage for media processing results."
    
    def __init__(
            self,
            db_path: str  # Absolute path to the SQLite database file
        )
        "Initialize storage and create table if needed."
    
    def save(
            self,
            job_id: str,        # Unique job identifier
            action: str,        # Operation performed: 'convert', 'extract_segment', etc.
            input_path: str,    # Path to the source media file
            input_hash: str,    # Hash of source file in "algo:hexdigest" format
            output_path: str,   # Path to the produced output file
            output_hash: str,   # Hash of output file in "algo:hexdigest" format
            parameters: Optional[Dict[str, Any]] = None,  # Action-specific parameters
            metadata: Optional[Dict[str, Any]] = None       # Processing metadata
        ) -> None
        "Save a media processing result to the database."
    
    def get_by_job_id(
            self,
            job_id: str  # Job identifier to look up
        ) -> Optional[MediaProcessingRow]:  # Row or None if not found
        "Retrieve a processing result by job ID."
    
    def list_jobs(
            self,
            limit: int = 100  # Maximum number of rows to return
        ) -> List[MediaProcessingRow]:  # List of processing rows
        "List processing jobs ordered by creation time (newest first)."
    
    def verify_input(
            self,
            job_id: str  # Job identifier to verify
        ) -> Optional[bool]:  # True if input matches, False if changed, None if not found
        "Verify the source media file still matches its stored hash."
    
    def verify_output(
            self,
            job_id: str  # Job identifier to verify
        ) -> Optional[bool]:  # True if output matches, False if changed, None if not found
        "Verify the output media file still matches its stored hash."