Abstract base class defining the generic plugin interface
FileBackedDTO Protocol
The FileBackedDTO protocol defines objects that can serialize themselves to disk for zero-copy transfer between Host and Worker processes. When the Proxy detects an argument implementing this protocol, it calls to_temp_file() and sends the file path instead of the data.
CR-11: the live option domain for one dynamic config field.
Kept SEPARATE from the static config_schema (which CR-8 hashes for drift detection). The plugin-config UI merges these live options on top of the static schema; folding them into the schema would make every API plugin perpetually ‘drift’.
CR-12: one entry of a plugin’s spawn-time worker-environment contract.
A plugin declares the environment variables its worker subprocess reads at startup via WORKER_ENV: ClassVar[List[EnvVarSpec]]. Worker env vars are FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate routes such changes through reload_plugin, never reconfigure (the env is baked into the subprocess at Popen and can’t be mutated in-process). This is the lifecycle distinction from a normal config field (reconfigurable in place via reconfigure/_release_<trigger>).
Two flavors share this one declaration:
secret=True : value resolved from the SecretStore (masked; never persisted in the config store, echoed in config_schema, or logged). A secret never carries a default — a baked-in secret is a leak.
secret=False : visible value resolved from the override chain (operator override > manifest install.env_vars > this default); safe to display.
Both share one injection seam: the substrate composes the resolved {name: value} overlay at load time and injects it into the worker env at spawn (extending the existing CJM_PLUGIN_DATA_DIR / CJM_MODELS_DIR injection). This is “derive from behaviour, not metadata” applied to the spawn env: the plugin declares WHICH vars it consumes + whether each is secret/required; the substrate owns resolution + injection.
options is a forward seam for visible vars with a finite domain (e.g. a device selector enumerating GPUs); unused today but reserved so the plugin-config UI / a future set-env surface isn’t blocked.
template_check_placeholders
def template_check_placeholders( template:str, # The raw EnvVarSpec.default value)->Set: # Placeholder names referenced (allowed-vocabulary-validated)
Return the set of placeholder names referenced by a worker-env template.
Validates the vocabulary (unknown names raise PluginConfigError) without requiring a placeholder-value mapping. Useful for cjm-ctl validate’s dry-run check at install/release time — surface the bug BEFORE the plugin tries to spawn a worker with a malformed default.
Templates without ${...} return an empty set.
expand_worker_env_template
def expand_worker_env_template( template:str, # The raw EnvVarSpec.default value (may contain ${...} placeholders) placeholders:Mapping, # Resolved values keyed by placeholder name plugin_name:str='', # For error context ("template X on plugin Y references ...") var_name:str='', # For error context ("on EnvVarSpec(name=Z)"))->str:
Substitute ${VAR} placeholders in template using placeholders.
Strict mode (no safe_substitute): unknown placeholders raise PluginConfigError with descriptive context. Single-pass, non-recursive — substituted values are taken verbatim, never re-scanned for further placeholders. Templates without any ${...} syntax pass through unchanged (so plain static defaults work as before).
The allowed placeholder vocabulary is fixed via WORKER_ENV_TEMPLATE_PLACEHOLDERS. A ${FOO} whose name is in the vocabulary but whose RESOLVED value is None (e.g. CJM_MODELS_DIR when the operator hasn’t configured one) raises PluginConfigError with the same shape — operators get a clear signal that the plugin needs a value they haven’t provided, rather than a silent substitution of empty string into a load-bearing path.
Multi-verb plugin interfaces (e.g., MediaProcessing, Graph, Text, Monitor) use a dispatcher-styleexecute(action, **kwargs) rather than declaring a typed abstract method per action. The substrate provides two helpers that let plugin authors declare action handlers cleanly:
@plugin_action("name") — marker decorator tagging a method as the handler for a named action.
collect_plugin_actions(cls) — walks a class (and its MRO) to derive the set of action names from decorated methods. Useful for declaring supported_actions: ClassVar[Set[str]] without hand-maintaining the list.
SG-42 parameter convention: dispatcher-style interfaces standardize on action= as the dispatch parameter name. The old command= form used by cjm-infra-plugin-system should migrate. The substrate’s @plugin_action decorator deliberately doesn’t tie itself to either name — it tags methods, and the plugin’s own execute() decides which keyword to dispatch on. The convention is documented; the migration is a consumer-side cascade per audit §6.
The actual migration of MediaProcessing/Graph/Text/Monitor interfaces to drop their typed abstract methods and adopt the dispatcher + supported_actions pattern lands in a later cascade step. Wave 3 ships the substrate primitives.
collect_plugin_actions
def collect_plugin_actions( cls:type, # Class (or subclass) to scan for @plugin_action-tagged methods)->Set: # Set of action names handled by `cls` (including inherited)
Collect action names from @plugin_action-decorated methods on cls.
Walks the class’s MRO so subclasses inherit action handlers from base classes automatically. The returned set is suitable for supported_actions: ClassVar[Set[str]] = collect_plugin_actions(MyPlugin) once the plugin class body has been defined.
plugin_action
def plugin_action( action_name:str, # Public action name the decorated method handles)->Callable: # Decorator
Marker decorator tagging a plugin method as the handler for action_name.
Sets func._plugin_action = action_name. Plugin authors with dispatcher-style execute(action, **kwargs) use collect_plugin_actions(cls) to derive their supported_actions set from these markers rather than maintaining a separate list. The decorator does not change call semantics — the wrapped function is returned unchanged.
# SG-44 regression: @plugin_action + collect_plugin_actions form a usable# decorator/introspection pair, and inheritance walks the MRO.class _BaseDispatcher:@plugin_action("hello")def _say_hello(self, **kwargs):return"hi"@plugin_action("goodbye")def _say_goodbye(self, **kwargs):return"bye"class _ExtendedDispatcher(_BaseDispatcher):@plugin_action("wave")def _wave(self, **kwargs):return"👋"# Decorator tags methods without changing themassert _BaseDispatcher._say_hello._plugin_action =="hello"# Collection from base classbase_actions = collect_plugin_actions(_BaseDispatcher)assert base_actions == {"hello", "goodbye"}, f"unexpected: {base_actions}"# Inheritance: subclass inherits parent's actions + adds its ownext_actions = collect_plugin_actions(_ExtendedDispatcher)assert ext_actions == {"hello", "goodbye", "wave"}, f"unexpected: {ext_actions}"# Undecorated methods don't contributeclass _Plain:def regular(self): returnNoneassert collect_plugin_actions(_Plain) ==set()# Methods still callable after decorationinst = _BaseDispatcher()assert inst._say_hello() =="hi"print("SG-44 @plugin_action + collect_plugin_actions: PASS")
# T28 regression: PluginInterface.dispatch_to_action routes to the# @plugin_action handler via the MRO walk + forwards kwargs; unknown actions# raise the typed PluginInputError(fields_invalid=["action"]). This is the# substrate helper the 5 dispatcher plugins (Demucs/LavaSR/NLTK/FFmpeg/# SQLite-graph) call instead of reimplementing the loop.class _T28DispatchPlugin(PluginInterface):@propertydef name(self) ->str: return"t28-dispatch"@propertydef version(self) ->str: return"0.0.0"def initialize(self, config=None): passdef get_config_schema(self) -> Dict[str, Any]: return {}def get_current_config(self) -> Dict[str, Any]: return {}def execute(self, action: str="ping", **kwargs):returnself.dispatch_to_action(action, **kwargs)@plugin_action("ping")def _ping(self, **kwargs): return"pong"@plugin_action("echo")def _echo(self, value=None, **kwargs): return value# Inheritance: a subclass inherits the base's @plugin_action handlers (MRO walk).class _T28ExtendedPlugin(_T28DispatchPlugin):@plugin_action("shout")def _shout(self, text="", **kwargs): return text.upper()_t28 = _T28DispatchPlugin()# Routes to the tagged handler + forwards kwargsassert _t28.execute("ping") =="pong"assert _t28.dispatch_to_action("echo", value=42) ==42# supported_actions (collect_plugin_actions) and dispatch share the same markersassert collect_plugin_actions(_T28DispatchPlugin) == {"ping", "echo"}# Unknown action raises typed PluginInputError with fields_invalid=["action"]try: _t28.dispatch_to_action("nope")assertFalse, "expected PluginInputError"except PluginInputError as e:assert e.fields_invalid == ["action"], f"unexpected fields_invalid: {e.fields_invalid}"assertisinstance(e, ValueError), "PluginInputError must multi-inherit ValueError (CR-5)"# MRO walk: subclass dispatches BOTH its own and inherited handlers_t28e = _T28ExtendedPlugin()assert _t28e.dispatch_to_action("shout", text="hi") =="HI"assert _t28e.dispatch_to_action("ping") =="pong"assert collect_plugin_actions(_T28ExtendedPlugin) == {"ping", "echo", "shout"}print("T28 dispatch_to_action: PASS")
The interface provides:
Identity: name and version properties for plugin identification
Lifecycle: initialize() for configuration and cleanup() for resource release
Execution: execute() for main logic, execute_stream() for streaming results
Configuration: get_config_schema() returns JSON Schema, get_current_config() returns current values
Cancellation: cancel() for cooperative cancellation (plugins opt-in by overriding)
Progress: report_progress() to report execution progress and status messages
The default execute_stream() implementation yields a single result from execute(). Plugins can override this for true streaming where partial results are yielded as they become available.
The cancel() and report_progress() methods have default implementations. Plugins can override cancel() to implement cooperative cancellation. During execute(), plugins can call self.report_progress(0.5, "Processing...") to report progress.
Example: Implementing a Plugin
Here’s a complete example showing how to implement a concrete plugin:
from dataclasses import dataclass, field, asdict@dataclassclass ExampleConfig:"""Configuration for the example plugin.""" mode: str="balanced" threshold: float=0.5 max_workers: int=4class ExamplePlugin(PluginInterface):"""A simple example plugin implementation."""def__init__(self):self._config: ExampleConfig = ExampleConfig()self._resource: Optional[str] =None@propertydef name(self) ->str:return"example-plugin"@propertydef version(self) ->str:return"1.0.0"def initialize(self, config: Optional[Dict[str, Any]] =None) ->None:"""Initialize or re-configure the plugin."""if config isNone: config = {}# Merge with defaults current = asdict(self._config) current.update(config)self._config = ExampleConfig(**current)# Initialize resources based on configself._resource =f"Resource-{self._config.mode}"def execute(self, input_data: str, **kwargs) ->str:"""Process input data."""returnf"Processed '{input_data}' using {self._resource}"def get_config_schema(self) -> Dict[str, Any]:"""Return JSON Schema for configuration."""return {"type": "object","properties": {"mode": {"type": "string","enum": ["fast", "balanced", "quality"],"default": "balanced" },"threshold": {"type": "number","minimum": 0.0,"maximum": 1.0,"default": 0.5 },"max_workers": {"type": "integer","minimum": 1,"maximum": 16,"default": 4 } } }def get_current_config(self) -> Dict[str, Any]:"""Return current configuration."""return asdict(self._config)def cleanup(self) ->None:"""Clean up resources."""self._resource =None
# Test the example pluginplugin = ExamplePlugin()plugin.initialize()print(f"Plugin: {plugin.name} v{plugin.version}")print(f"\nConfig Schema:")print(plugin.get_config_schema())print(f"\nCurrent Config:")print(plugin.get_current_config())
# Test executionresult = plugin.execute("sample_data")print(f"Result: {result}")# Test re-initialization with new configplugin.initialize({"mode": "quality", "threshold": 0.8})print(f"\nAfter re-init config: {plugin.get_current_config()}")result = plugin.execute("more_data")print(f"Result: {result}")# Test cleanupplugin.cleanup()print(f"\nAfter cleanup, resource is cleared")
Result: Processed 'sample_data' using Resource-balanced
After re-init config: {'mode': 'quality', 'threshold': 0.8, 'max_workers': 4}
Result: Processed 'more_data' using Resource-quality
After cleanup, resource is cleared
# Test streaming (default implementation yields single result)plugin.initialize({"mode": "balanced"})print("Streaming execution:")for chunk in plugin.execute_stream("stream_data"):print(f" Chunk: {chunk}")
Streaming execution:
Chunk: Processed 'stream_data' using Resource-balanced
# Test FileBackedDTO protocol detectionimport tempfileclass MockAudioData:"""Example class implementing FileBackedDTO."""def__init__(self, data: bytes):self._data = datadef to_temp_file(self) ->str:"""Save to temp file and return path."""with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f: f.write(self._data)return f.name# Check if it implements the protocolaudio = MockAudioData(b"fake audio data")print(f"MockAudioData implements FileBackedDTO: {isinstance(audio, FileBackedDTO)}")print(f"Temp file path: {audio.to_temp_file()}")# A regular string does not implement the protocolprint(f"str implements FileBackedDTO: {isinstance('hello', FileBackedDTO)}")
Exercise the lifecycle hooks + helpers + cancellation primitives independently of any worker subprocess. The tests use bare PluginInterface subclasses without going through RemotePluginProxy.
What’s covered:
cleanup() made optional — a subclass with no cleanup override is instantiable.
prefetch() default no-op.
fields_that_changed symmetric difference + asymmetric-presence handling.
reconfigure_with_triggers walks RELOAD_TRIGGER metadata, dispatches _release_<trigger>, de-dupes shared triggers, silently skips plugins without config_class, and survives a misbehaving _release method.