Plugin Interface

Abstract base class defining the generic plugin interface

FileBackedDTO Protocol

The FileBackedDTO protocol defines objects that can serialize themselves to disk for zero-copy transfer between Host and Worker processes. When the Proxy detects an argument implementing this protocol, it calls to_temp_file() and sends the file path instead of the data.


FileBackedDTO


def FileBackedDTO(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer.

PluginInterface

The PluginInterface is an abstract base class that defines the contract all plugins must implement. This interface works for both:

  • Concrete Plugins: Running in Worker processes (implement the actual logic)
  • Remote Proxies: Running in Host process (forward calls over HTTP)

The interface is domain-agnostic. Domain-specific plugin systems (e.g., transcription, vision) subclass this to add their specific methods and DTOs.


FieldOptions


def FieldOptions(
    options:List, constraints:Dict=<factory>
)->None:

CR-11: the live option domain for one dynamic config field.

Kept SEPARATE from the static config_schema (which CR-8 hashes for drift detection). The plugin-config UI merges these live options on top of the static schema; folding them into the schema would make every API plugin perpetually ‘drift’.


ConfigOption


def ConfigOption(
    value:Any, label:str, metadata:Dict=<factory>
)->None:

CR-11: one live option for a dynamic config field, with optional metadata.


EnvVarSpec


def EnvVarSpec(
    name:str, secret:bool=False, required:bool=False, label:str='', description:str='', default:Optional=None,
    options:Optional=None
)->None:

CR-12: one entry of a plugin’s spawn-time worker-environment contract.

A plugin declares the environment variables its worker subprocess reads at startup via WORKER_ENV: ClassVar[List[EnvVarSpec]]. Worker env vars are FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate routes such changes through reload_plugin, never reconfigure (the env is baked into the subprocess at Popen and can’t be mutated in-process). This is the lifecycle distinction from a normal config field (reconfigurable in place via reconfigure/_release_<trigger>).

Two flavors share this one declaration:

  • secret=True : value resolved from the SecretStore (masked; never persisted in the config store, echoed in config_schema, or logged). A secret never carries a default — a baked-in secret is a leak.
  • secret=False : visible value resolved from the override chain (operator override > manifest install.env_vars > this default); safe to display.

Both share one injection seam: the substrate composes the resolved {name: value} overlay at load time and injects it into the worker env at spawn (extending the existing CJM_PLUGIN_DATA_DIR / CJM_MODELS_DIR injection). This is “derive from behaviour, not metadata” applied to the spawn env: the plugin declares WHICH vars it consumes + whether each is secret/required; the substrate owns resolution + injection.

options is a forward seam for visible vars with a finite domain (e.g. a device selector enumerating GPUs); unused today but reserved so the plugin-config UI / a future set-env surface isn’t blocked.


template_check_placeholders


def template_check_placeholders(
    template:str, # The raw EnvVarSpec.default value
)->Set: # Placeholder names referenced (allowed-vocabulary-validated)

Return the set of placeholder names referenced by a worker-env template.

Validates the vocabulary (unknown names raise PluginConfigError) without requiring a placeholder-value mapping. Useful for cjm-ctl validate’s dry-run check at install/release time — surface the bug BEFORE the plugin tries to spawn a worker with a malformed default.

Templates without ${...} return an empty set.


expand_worker_env_template


def expand_worker_env_template(
    template:str, # The raw EnvVarSpec.default value (may contain ${...} placeholders)
    placeholders:Mapping, # Resolved values keyed by placeholder name
    plugin_name:str='', # For error context ("template X on plugin Y references ...")
    var_name:str='', # For error context ("on EnvVarSpec(name=Z)")
)->str:

Substitute ${VAR} placeholders in template using placeholders.

Strict mode (no safe_substitute): unknown placeholders raise PluginConfigError with descriptive context. Single-pass, non-recursive — substituted values are taken verbatim, never re-scanned for further placeholders. Templates without any ${...} syntax pass through unchanged (so plain static defaults work as before).

The allowed placeholder vocabulary is fixed via WORKER_ENV_TEMPLATE_PLACEHOLDERS. A ${FOO} whose name is in the vocabulary but whose RESOLVED value is None (e.g. CJM_MODELS_DIR when the operator hasn’t configured one) raises PluginConfigError with the same shape — operators get a clear signal that the plugin needs a value they haven’t provided, rather than a silent substitution of empty string into a load-bearing path.


PluginInterface


def PluginInterface(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Abstract base class for all plugins (both local workers and remote proxies).

CR-4 extended this surface with: prefetch hook (SG-19), made cleanup optional (SG-43), reconfigure split + RELOAD_TRIGGER declarative-helper (lifecycle hook split), and cooperative-cancellation primitives (SG-16 — flag + callback + context manager).

Abstract methods: name, version, initialize, execute, get_config_schema, get_current_config. Concrete defaults (overridable): execute_stream, cleanup, prefetch, reconfigure, cancel, check_cancel, register_cancel_callback, cancel_signal_to, report_progress, fields_that_changed, reconfigure_with_triggers, on_disable, on_enable.

Action Dispatcher Convention (SG-44 + SG-42)

Multi-verb plugin interfaces (e.g., MediaProcessing, Graph, Text, Monitor) use a dispatcher-style execute(action, **kwargs) rather than declaring a typed abstract method per action. The substrate provides two helpers that let plugin authors declare action handlers cleanly:

  • @plugin_action("name") — marker decorator tagging a method as the handler for a named action.
  • collect_plugin_actions(cls) — walks a class (and its MRO) to derive the set of action names from decorated methods. Useful for declaring supported_actions: ClassVar[Set[str]] without hand-maintaining the list.

SG-42 parameter convention: dispatcher-style interfaces standardize on action= as the dispatch parameter name. The old command= form used by cjm-infra-plugin-system should migrate. The substrate’s @plugin_action decorator deliberately doesn’t tie itself to either name — it tags methods, and the plugin’s own execute() decides which keyword to dispatch on. The convention is documented; the migration is a consumer-side cascade per audit §6.

The actual migration of MediaProcessing/Graph/Text/Monitor interfaces to drop their typed abstract methods and adopt the dispatcher + supported_actions pattern lands in a later cascade step. Wave 3 ships the substrate primitives.


collect_plugin_actions


def collect_plugin_actions(
    cls:type, # Class (or subclass) to scan for @plugin_action-tagged methods
)->Set: # Set of action names handled by `cls` (including inherited)

Collect action names from @plugin_action-decorated methods on cls.

Walks the class’s MRO so subclasses inherit action handlers from base classes automatically. The returned set is suitable for supported_actions: ClassVar[Set[str]] = collect_plugin_actions(MyPlugin) once the plugin class body has been defined.


plugin_action


def plugin_action(
    action_name:str, # Public action name the decorated method handles
)->Callable: # Decorator

Marker decorator tagging a plugin method as the handler for action_name.

Sets func._plugin_action = action_name. Plugin authors with dispatcher-style execute(action, **kwargs) use collect_plugin_actions(cls) to derive their supported_actions set from these markers rather than maintaining a separate list. The decorator does not change call semantics — the wrapped function is returned unchanged.

# SG-44 regression: @plugin_action + collect_plugin_actions form a usable
# decorator/introspection pair, and inheritance walks the MRO.
class _BaseDispatcher:
    @plugin_action("hello")
    def _say_hello(self, **kwargs):
        return "hi"
    
    @plugin_action("goodbye")
    def _say_goodbye(self, **kwargs):
        return "bye"


class _ExtendedDispatcher(_BaseDispatcher):
    @plugin_action("wave")
    def _wave(self, **kwargs):
        return "👋"


# Decorator tags methods without changing them
assert _BaseDispatcher._say_hello._plugin_action == "hello"

# Collection from base class
base_actions = collect_plugin_actions(_BaseDispatcher)
assert base_actions == {"hello", "goodbye"}, f"unexpected: {base_actions}"

# Inheritance: subclass inherits parent's actions + adds its own
ext_actions = collect_plugin_actions(_ExtendedDispatcher)
assert ext_actions == {"hello", "goodbye", "wave"}, f"unexpected: {ext_actions}"

# Undecorated methods don't contribute
class _Plain:
    def regular(self): return None
assert collect_plugin_actions(_Plain) == set()

# Methods still callable after decoration
inst = _BaseDispatcher()
assert inst._say_hello() == "hi"

print("SG-44 @plugin_action + collect_plugin_actions: PASS")
SG-44 @plugin_action + collect_plugin_actions: PASS
# T28 regression: PluginInterface.dispatch_to_action routes to the
# @plugin_action handler via the MRO walk + forwards kwargs; unknown actions
# raise the typed PluginInputError(fields_invalid=["action"]). This is the
# substrate helper the 5 dispatcher plugins (Demucs/LavaSR/NLTK/FFmpeg/
# SQLite-graph) call instead of reimplementing the loop.
class _T28DispatchPlugin(PluginInterface):
    @property
    def name(self) -> str: return "t28-dispatch"
    @property
    def version(self) -> str: return "0.0.0"
    def initialize(self, config=None): pass
    def get_config_schema(self) -> Dict[str, Any]: return {}
    def get_current_config(self) -> Dict[str, Any]: return {}
    def execute(self, action: str = "ping", **kwargs):
        return self.dispatch_to_action(action, **kwargs)

    @plugin_action("ping")
    def _ping(self, **kwargs): return "pong"

    @plugin_action("echo")
    def _echo(self, value=None, **kwargs): return value


# Inheritance: a subclass inherits the base's @plugin_action handlers (MRO walk).
class _T28ExtendedPlugin(_T28DispatchPlugin):
    @plugin_action("shout")
    def _shout(self, text="", **kwargs): return text.upper()


_t28 = _T28DispatchPlugin()
# Routes to the tagged handler + forwards kwargs
assert _t28.execute("ping") == "pong"
assert _t28.dispatch_to_action("echo", value=42) == 42
# supported_actions (collect_plugin_actions) and dispatch share the same markers
assert collect_plugin_actions(_T28DispatchPlugin) == {"ping", "echo"}
# Unknown action raises typed PluginInputError with fields_invalid=["action"]
try:
    _t28.dispatch_to_action("nope")
    assert False, "expected PluginInputError"
except PluginInputError as e:
    assert e.fields_invalid == ["action"], f"unexpected fields_invalid: {e.fields_invalid}"
    assert isinstance(e, ValueError), "PluginInputError must multi-inherit ValueError (CR-5)"

# MRO walk: subclass dispatches BOTH its own and inherited handlers
_t28e = _T28ExtendedPlugin()
assert _t28e.dispatch_to_action("shout", text="hi") == "HI"
assert _t28e.dispatch_to_action("ping") == "pong"
assert collect_plugin_actions(_T28ExtendedPlugin) == {"ping", "echo", "shout"}

print("T28 dispatch_to_action: PASS")

The interface provides:

  • Identity: name and version properties for plugin identification
  • Lifecycle: initialize() for configuration and cleanup() for resource release
  • Execution: execute() for main logic, execute_stream() for streaming results
  • Configuration: get_config_schema() returns JSON Schema, get_current_config() returns current values
  • Cancellation: cancel() for cooperative cancellation (plugins opt-in by overriding)
  • Progress: report_progress() to report execution progress and status messages

The default execute_stream() implementation yields a single result from execute(). Plugins can override this for true streaming where partial results are yielded as they become available.

The cancel() and report_progress() methods have default implementations. Plugins can override cancel() to implement cooperative cancellation. During execute(), plugins can call self.report_progress(0.5, "Processing...") to report progress.

Example: Implementing a Plugin

Here’s a complete example showing how to implement a concrete plugin:

from dataclasses import dataclass, field, asdict

@dataclass
class ExampleConfig:
    """Configuration for the example plugin."""
    mode: str = "balanced"
    threshold: float = 0.5
    max_workers: int = 4

class ExamplePlugin(PluginInterface):
    """A simple example plugin implementation."""

    def __init__(self):
        self._config: ExampleConfig = ExampleConfig()
        self._resource: Optional[str] = None

    @property
    def name(self) -> str:
        return "example-plugin"
    
    @property
    def version(self) -> str:
        return "1.0.0"
    
    def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
        """Initialize or re-configure the plugin."""
        if config is None:
            config = {}
        
        # Merge with defaults
        current = asdict(self._config)
        current.update(config)
        self._config = ExampleConfig(**current)
        
        # Initialize resources based on config
        self._resource = f"Resource-{self._config.mode}"

    def execute(self, input_data: str, **kwargs) -> str:
        """Process input data."""
        return f"Processed '{input_data}' using {self._resource}"

    def get_config_schema(self) -> Dict[str, Any]:
        """Return JSON Schema for configuration."""
        return {
            "type": "object",
            "properties": {
                "mode": {
                    "type": "string",
                    "enum": ["fast", "balanced", "quality"],
                    "default": "balanced"
                },
                "threshold": {
                    "type": "number",
                    "minimum": 0.0,
                    "maximum": 1.0,
                    "default": 0.5
                },
                "max_workers": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 16,
                    "default": 4
                }
            }
        }

    def get_current_config(self) -> Dict[str, Any]:
        """Return current configuration."""
        return asdict(self._config)

    def cleanup(self) -> None:
        """Clean up resources."""
        self._resource = None
# Test the example plugin
plugin = ExamplePlugin()
plugin.initialize()

print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"\nConfig Schema:")
print(plugin.get_config_schema())
print(f"\nCurrent Config:")
print(plugin.get_current_config())
Plugin: example-plugin v1.0.0

Config Schema:
{'type': 'object', 'properties': {'mode': {'type': 'string', 'enum': ['fast', 'balanced', 'quality'], 'default': 'balanced'}, 'threshold': {'type': 'number', 'minimum': 0.0, 'maximum': 1.0, 'default': 0.5}, 'max_workers': {'type': 'integer', 'minimum': 1, 'maximum': 16, 'default': 4}}}

Current Config:
{'mode': 'balanced', 'threshold': 0.5, 'max_workers': 4}
# Test execution
result = plugin.execute("sample_data")
print(f"Result: {result}")

# Test re-initialization with new config
plugin.initialize({"mode": "quality", "threshold": 0.8})
print(f"\nAfter re-init config: {plugin.get_current_config()}")
result = plugin.execute("more_data")
print(f"Result: {result}")

# Test cleanup
plugin.cleanup()
print(f"\nAfter cleanup, resource is cleared")
Result: Processed 'sample_data' using Resource-balanced

After re-init config: {'mode': 'quality', 'threshold': 0.8, 'max_workers': 4}
Result: Processed 'more_data' using Resource-quality

After cleanup, resource is cleared
# Test streaming (default implementation yields single result)
plugin.initialize({"mode": "balanced"})

print("Streaming execution:")
for chunk in plugin.execute_stream("stream_data"):
    print(f"  Chunk: {chunk}")
Streaming execution:
  Chunk: Processed 'stream_data' using Resource-balanced
# Test FileBackedDTO protocol detection
import tempfile

class MockAudioData:
    """Example class implementing FileBackedDTO."""
    
    def __init__(self, data: bytes):
        self._data = data
    
    def to_temp_file(self) -> str:
        """Save to temp file and return path."""
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f:
            f.write(self._data)
            return f.name

# Check if it implements the protocol
audio = MockAudioData(b"fake audio data")
print(f"MockAudioData implements FileBackedDTO: {isinstance(audio, FileBackedDTO)}")
print(f"Temp file path: {audio.to_temp_file()}")

# A regular string does not implement the protocol
print(f"str implements FileBackedDTO: {isinstance('hello', FileBackedDTO)}")
MockAudioData implements FileBackedDTO: True
Temp file path: /tmp/tmpui1v2c52.wav
str implements FileBackedDTO: False

CR-4 regression tests

Exercise the lifecycle hooks + helpers + cancellation primitives independently of any worker subprocess. The tests use bare PluginInterface subclasses without going through RemotePluginProxy.

What’s covered:

  • cleanup() made optional — a subclass with no cleanup override is instantiable.
  • prefetch() default no-op.
  • fields_that_changed symmetric difference + asymmetric-presence handling.
  • reconfigure_with_triggers walks RELOAD_TRIGGER metadata, dispatches _release_<trigger>, de-dupes shared triggers, silently skips plugins without config_class, and survives a misbehaving _release method.
  • cancel() sets _cancel_requested + fires registered callbacks (logging and skipping misbehaving callbacks).
  • check_cancel() raises PluginCancelledError only when flag is set.
  • cancel_signal_to context manager registers + deregisters cleanly even when the with-block raises.