cjm-media-plugin-system
Defines standardized interfaces and data structures for media analysis (VAD, Scene Detection) and processing (FFmpeg, Conversion) plugins within the cjm-plugin-system ecosystem.
Install
pip install cjm_media_plugin_systemProject Structure
nbs/
├── analysis_interface.ipynb # Domain-specific plugin interface for media analysis (read-only / signal extraction)
├── core.ipynb # DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer
├── processing_interface.ipynb # Domain-specific plugin interface for media processing (write / file manipulation)
└── storage.ipynb # Standardized SQLite storage for media analysis and processing results with content hashing
Total: 4 notebooks
Module Dependencies
graph LR
analysis_interface[analysis_interface<br/>Media Analysis Plugin Interface]
core[core<br/>Core Data Structures]
processing_interface[processing_interface<br/>Media Processing Plugin Interface]
storage[storage<br/>Media Storage]
analysis_interface --> core
processing_interface --> core
2 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Media Analysis Plugin Interface (analysis_interface.ipynb)
Domain-specific plugin interface for media analysis (read-only / signal extraction)
Import
from cjm_media_plugin_system.analysis_interface import (
MediaAnalysisPlugin
)Classes
class MediaAnalysisPlugin(PluginInterface):
"""
Abstract base class for plugins that analyze media files.
Analysis plugins perform read-only operations that extract temporal segments
from media files (VAD, scene detection, beat detection, etc.).
"""
def execute(
self,
media_path: Union[str, Path], # Path to media file to analyze
**kwargs
) -> MediaAnalysisResult: # Analysis result with detected TimeRanges
"Analyze the media file and return detected temporal segments."Core Data Structures (core.ipynb)
DTOs for media analysis and processing with FileBackedDTO support for zero-copy transfer
Import
from cjm_media_plugin_system.core import (
TimeRange,
MediaMetadata,
MediaAnalysisResult
)Classes
@dataclass
class TimeRange:
"Represents a temporal segment within a media file."
start: float # Start time in seconds
end: float # End time in seconds
label: str = 'segment' # Segment type (e.g., 'speech', 'silence', 'scene')
confidence: Optional[float] # Detection confidence (0.0 to 1.0)
payload: Dict[str, Any] = field(...) # Extra data (e.g., speaker embedding)
def to_dict(self) -> Dict[str, Any]: # Serialized representation
"Convert to dictionary for JSON serialization."@dataclass
class MediaMetadata:
"Container for media file metadata."
path: str # File path
duration: float # Duration in seconds
format: str # Container format (e.g., 'mp4', 'mkv')
size_bytes: int # File size in bytes
video_streams: List[Dict[str, Any]] = field(...) # Video stream info
audio_streams: List[Dict[str, Any]] = field(...) # Audio stream info
def to_dict(self) -> Dict[str, Any]: # Serialized representation
"Convert to dictionary for JSON serialization."@dataclass
class MediaAnalysisResult:
"Standard output for media analysis plugins."
ranges: List[TimeRange] # Detected temporal segments
metadata: Dict[str, Any] = field(...) # Global analysis stats
def to_temp_file(self) -> str: # Absolute path to temporary JSON file
"""Save results to a temp JSON file for zero-copy transfer."""
tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
data = {
"ranges": [r.to_dict() for r in self.ranges],
"Save results to a temp JSON file for zero-copy transfer."
def from_file(
cls,
filepath: str # Path to JSON file
) -> "MediaAnalysisResult": # Loaded result instance
"Load results from a JSON file."Media Processing Plugin Interface (processing_interface.ipynb)
Domain-specific plugin interface for media processing (write / file manipulation)
Import
from cjm_media_plugin_system.processing_interface import (
MediaProcessingPlugin
)Classes
class MediaProcessingPlugin(PluginInterface):
"""
Abstract base class for plugins that modify, convert, or extract media.
Processing plugins perform write operations that produce new files
(format conversion, segment extraction, re-encoding, etc.).
"""
def execute(
self,
action: str = "get_info", # Operation: 'get_info', 'convert', 'extract_segment'
**kwargs
) -> Dict[str, Any]: # JSON-serializable result (usually containing 'output_path')
"Execute a media processing operation."
def get_info(
self,
file_path: Union[str, Path] # Path to media file
) -> MediaMetadata: # File metadata (duration, codec, streams)
"Get metadata for a media file."
def convert(
self,
input_path: Union[str, Path], # Source file path
output_format: str, # Target format (e.g., 'mp4', 'wav')
**kwargs
) -> str: # Path to converted file
"Convert media to a different format."
def extract_segment(
self,
input_path: Union[str, Path], # Source file path
start: float, # Start time in seconds
end: float, # End time in seconds
output_path: Optional[str] = None # Custom output path (auto-generated if None)
) -> str: # Path to extracted segment file
"Extract a temporal segment from a media file."Media Storage (storage.ipynb)
Standardized SQLite storage for media analysis and processing results with content hashing
Import
from cjm_media_plugin_system.storage import (
MediaAnalysisRow,
MediaAnalysisStorage,
MediaProcessingRow,
MediaProcessingStorage
)Classes
@dataclass
class MediaAnalysisRow:
"A single row from the analysis_jobs table."
file_path: str # Path to the analyzed media file
file_hash: str # Hash of source file in "algo:hexdigest" format
config_hash: str # Hash of the analysis config used
ranges: Optional[List[Dict[str, Any]]] # Detected temporal segments
metadata: Optional[Dict[str, Any]] # Analysis metadata
created_at: Optional[float] # Unix timestampclass MediaAnalysisStorage:
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Standardized SQLite storage for media analysis results."
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Initialize storage and create table if needed."
def save(
self,
file_path: str, # Path to the analyzed media file
file_hash: str, # Hash of source file in "algo:hexdigest" format
config_hash: str, # Hash of the analysis config
ranges: Optional[List[Dict[str, Any]]] = None, # Detected temporal segments
metadata: Optional[Dict[str, Any]] = None # Analysis metadata
) -> None
"Save or replace an analysis result (upsert by file_path + config_hash)."
def get_cached(
self,
file_path: str, # Path to the media file
config_hash: str # Config hash to match
) -> Optional[MediaAnalysisRow]: # Cached row or None
"Retrieve a cached analysis result by file path and config hash."
def list_jobs(
self,
limit: int = 100 # Maximum number of rows to return
) -> List[MediaAnalysisRow]: # List of analysis rows
"List analysis jobs ordered by creation time (newest first)."
def verify_file(
self,
file_path: str, # Path to the media file
config_hash: str # Config hash to look up
) -> Optional[bool]: # True if file matches, False if changed, None if not found
"Verify the source media file still matches its stored hash."@dataclass
class MediaProcessingRow:
"A single row from the processing_jobs table."
job_id: str # Unique job identifier
action: str # Operation performed: 'convert', 'extract_segment', etc.
input_path: str # Path to the source media file
input_hash: str # Hash of source file in "algo:hexdigest" format
output_path: str # Path to the produced output file
output_hash: str # Hash of output file in "algo:hexdigest" format
parameters: Optional[Dict[str, Any]] # Action-specific parameters
metadata: Optional[Dict[str, Any]] # Processing metadata
created_at: Optional[float] # Unix timestampclass MediaProcessingStorage:
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Standardized SQLite storage for media processing results."
def __init__(
self,
db_path: str # Absolute path to the SQLite database file
)
"Initialize storage and create table if needed."
def save(
self,
job_id: str, # Unique job identifier
action: str, # Operation performed: 'convert', 'extract_segment', etc.
input_path: str, # Path to the source media file
input_hash: str, # Hash of source file in "algo:hexdigest" format
output_path: str, # Path to the produced output file
output_hash: str, # Hash of output file in "algo:hexdigest" format
parameters: Optional[Dict[str, Any]] = None, # Action-specific parameters
metadata: Optional[Dict[str, Any]] = None # Processing metadata
) -> None
"Save a media processing result to the database."
def get_by_job_id(
self,
job_id: str # Job identifier to look up
) -> Optional[MediaProcessingRow]: # Row or None if not found
"Retrieve a processing result by job ID."
def list_jobs(
self,
limit: int = 100 # Maximum number of rows to return
) -> List[MediaProcessingRow]: # List of processing rows
"List processing jobs ordered by creation time (newest first)."
def verify_input(
self,
job_id: str # Job identifier to verify
) -> Optional[bool]: # True if input matches, False if changed, None if not found
"Verify the source media file still matches its stored hash."
def verify_output(
self,
job_id: str # Job identifier to verify
) -> Optional[bool]: # True if output matches, False if changed, None if not found
"Verify the output media file still matches its stored hash."