from typing import List
class ExampleFFmpegPlugin(MediaProcessingPlugin):
"""Example FFmpeg-based media processing plugin implementation."""
def __init__(self):
self._config: Dict[str, Any] = {}
@property
def name(self) -> str:
return "example-ffmpeg"
@property
def version(self) -> str:
return "1.0.0"
def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
"""Initialize with configuration."""
self._config = config or {"ffmpeg_path": "ffmpeg"}
def execute(
self,
action: str = "get_info",
**kwargs
) -> Dict[str, Any]:
"""Dispatch to appropriate method based on action."""
if action == "get_info":
result = self.get_info(kwargs["file_path"])
return result.to_dict()
elif action == "convert":
output = self.convert(
kwargs["input_path"],
kwargs["output_format"],
**{k: v for k, v in kwargs.items() if k not in ["input_path", "output_format"]}
)
return {"output_path": output}
elif action == "extract_segment":
output = self.extract_segment(
kwargs["input_path"],
kwargs["start"],
kwargs["end"],
kwargs.get("output_path")
)
return {"output_path": output}
else:
raise ValueError(f"Unknown action: {action}")
def get_info(self, file_path: Union[str, Path]) -> MediaMetadata:
"""Get metadata for a media file (mock implementation)."""
return MediaMetadata(
path=str(file_path),
duration=120.5,
format="mp4",
size_bytes=15_000_000,
video_streams=[{"codec": "h264", "width": 1920, "height": 1080}],
audio_streams=[{"codec": "aac", "sample_rate": 48000}]
)
def convert(
self,
input_path: Union[str, Path],
output_format: str,
**kwargs
) -> str:
"""Convert media format (mock implementation)."""
# In real implementation, would call ffmpeg here
input_stem = Path(input_path).stem
return f"/tmp/{input_stem}.{output_format}"
def extract_segment(
self,
input_path: Union[str, Path],
start: float,
end: float,
output_path: Optional[str] = None
) -> str:
"""Extract segment from media (mock implementation)."""
if output_path:
return output_path
# Auto-generate output path
input_p = Path(input_path)
return f"/tmp/{input_p.stem}_segment_{start}_{end}{input_p.suffix}"
def get_config_schema(self) -> Dict[str, Any]:
"""Return JSON Schema for configuration."""
return {
"type": "object",
"properties": {
"ffmpeg_path": {
"type": "string",
"default": "ffmpeg",
"description": "Path to ffmpeg binary"
},
"default_video_codec": {
"type": "string",
"enum": ["h264", "h265", "vp9"],
"default": "h264"
}
}
}
def get_current_config(self) -> Dict[str, Any]:
"""Return current configuration."""
return self._config
def cleanup(self) -> None:
"""Clean up resources."""
passMedia Processing Plugin Interface
MediaProcessingPlugin
def MediaProcessingPlugin(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Abstract base class for plugins that modify, convert, or extract media.
Processing plugins perform write operations that produce new files (format conversion, segment extraction, source separation, speech enhancement, etc.). They are dispatcher-style (SG-44): callers invoke execute(action=..., **kwargs) and introspect supported_actions to discover which operations a given plugin implements. Only execute and the universal get_info read are mandated by this interface; each plugin declares its own write actions with the substrate’s @plugin_action decorator and sets supported_actions = collect_plugin_actions(cls).
How It Works
Host Process Worker Process (Isolated Env)
┌─────────────────────┐ ┌─────────────────────────────┐
│ │ │ │
│ plugin.execute( │ HTTP/JSON │ MediaProcessingPlugin │
│ action="convert", │ ─────────────────▶ .execute( │
│ input_path="...", │ │ action="convert", │
│ output_format= │ │ ... │
│ "mp4" │ │ ) │
│ ) │ │ │
│ │ │ # Calls internal method │
│ { │ ◀───────────────│ # .convert(...) │
│ "output_path": │ (JSON) │ # Returns new file path │
│ "/tmp/out.mp4" │ │ │
│ } │ │ │
└─────────────────────┘ └─────────────────────────────┘
Processing plugins: - Use action parameter to dispatch to specific methods - Return JSON-serializable results (typically containing output file paths) - May create new files (converted media, extracted segments)
Example Implementation
A minimal processing plugin that demonstrates the interface:
# Test the example plugin
plugin = ExampleFFmpegPlugin()
plugin.initialize({"ffmpeg_path": "/usr/bin/ffmpeg"})
print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"Config schema: {plugin.get_config_schema()}")
print(f"Current config: {plugin.get_current_config()}")Plugin: example-ffmpeg v1.0.0
Config schema: {'type': 'object', 'properties': {'ffmpeg_path': {'type': 'string', 'default': 'ffmpeg', 'description': 'Path to ffmpeg binary'}, 'default_video_codec': {'type': 'string', 'enum': ['h264', 'h265', 'vp9'], 'default': 'h264'}}}
Current config: {'ffmpeg_path': '/usr/bin/ffmpeg'}
# Test get_info action
info_result = plugin.execute(action="get_info", file_path="/path/to/video.mp4")
print(f"\nget_info result:")
print(f" Duration: {info_result['duration']}s")
print(f" Format: {info_result['format']}")
print(f" Video streams: {info_result['video_streams']}")
get_info result:
Duration: 120.5s
Format: mp4
Video streams: [{'codec': 'h264', 'width': 1920, 'height': 1080}]
# Test convert action
convert_result = plugin.execute(
action="convert",
input_path="/path/to/video.mp4",
output_format="webm"
)
print(f"\nconvert result: {convert_result}")
convert result: {'output_path': '/tmp/video.webm'}
# Test extract_segment action
segment_result = plugin.execute(
action="extract_segment",
input_path="/path/to/video.mp4",
start=10.0,
end=25.5
)
print(f"\nextract_segment result: {segment_result}")
# Cleanup
plugin.cleanup()
extract_segment result: {'output_path': '/tmp/video_segment_10.0_25.5.mp4'}