# SG-44 regression: @capability_action + collect_capability_actions form a usable
# decorator/introspection pair, and inheritance walks the MRO.
class _BaseDispatcher:
@capability_action("hello")
def _say_hello(self, **kwargs):
return "hi"
@capability_action("goodbye")
def _say_goodbye(self, **kwargs):
return "bye"
class _ExtendedDispatcher(_BaseDispatcher):
@capability_action("wave")
def _wave(self, **kwargs):
return "👋"
# Decorator tags methods without changing them
assert _BaseDispatcher._say_hello._capability_action == "hello"
# Collection from base class
base_actions = collect_capability_actions(_BaseDispatcher)
assert base_actions == {"hello", "goodbye"}, f"unexpected: {base_actions}"
# Inheritance: subclass inherits parent's actions + adds its own
ext_actions = collect_capability_actions(_ExtendedDispatcher)
assert ext_actions == {"hello", "goodbye", "wave"}, f"unexpected: {ext_actions}"
# Undecorated methods don't contribute
class _Plain:
def regular(self): return None
assert collect_capability_actions(_Plain) == set()
# Methods still callable after decoration
inst = _BaseDispatcher()
assert inst._say_hello() == "hi"
print("SG-44 @capability_action + collect_capability_actions: PASS")Tool Capability
ToolCapability
ToolCapability is the manage-the-tool half of the capability-unit fracture (pass-2 Thread 3): identity, lifecycle, config, cancellation, and observability for a tool running in a worker process. It works for both:
- Concrete capabilities: running in Worker processes (implement the actual logic)
- Remote proxies: running in the Host process (forward calls over HTTP)
What it deliberately does NOT carry: the task channel. The pre-fracture interface declared an untyped execute(*args, **kwargs) as its one task-shaped member; under the fracture, typed task contracts live on task adapters (see core.adapter and the per-task cjm-<task>-adapter-interface libraries), and results cross the worker boundary through the typed wire layer (core.wire).
FieldOptions
def FieldOptions(
options:List, constraints:Dict=<factory>
)->None:
CR-11: the live option domain for one dynamic config field.
Kept SEPARATE from the static config_schema (which CR-8 hashes for drift detection). The capability-config UI merges these live options on top of the static schema; folding them into the schema would make every API capability perpetually ‘drift’.
ConfigOption
def ConfigOption(
value:Any, label:str, metadata:Dict=<factory>
)->None:
CR-11: one live option for a dynamic config field, with optional metadata.
EnvVarSpec
def EnvVarSpec(
name:str, secret:bool=False, required:bool=False, label:str='', description:str='', default:Optional=None,
options:Optional=None
)->None:
CR-12: one entry of a capability’s spawn-time worker-environment contract.
A capability declares the environment variables its worker subprocess reads at startup via WORKER_ENV: ClassVar[List[EnvVarSpec]]. Worker env vars are FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate routes such changes through reload_capability, never reconfigure (the env is baked into the subprocess at Popen and can’t be mutated in-process). This is the lifecycle distinction from a normal config field (reconfigurable in place via reconfigure/_release_<trigger>).
Two flavors share this one declaration:
secret=True: value resolved from theSecretStore(masked; never persisted in the config store, echoed in config_schema, or logged). A secret never carries adefault— a baked-in secret is a leak.secret=False: visible value resolved from the override chain (operator override > manifestinstall.env_vars> thisdefault); safe to display.
Both share one injection seam: the substrate composes the resolved {name: value} overlay at load time and injects it into the worker env at spawn (extending the existing CJM_CAPABILITY_DATA_DIR / CJM_MODELS_DIR injection). This is “derive from behaviour, not metadata” applied to the spawn env: the capability declares WHICH vars it consumes + whether each is secret/required; the substrate owns resolution + injection.
options is a forward seam for visible vars with a finite domain (e.g. a device selector enumerating GPUs); unused today but reserved so the capability-config UI / a future set-env surface isn’t blocked.
template_check_placeholders
def template_check_placeholders(
template:str, # The raw EnvVarSpec.default value
)->Set: # Placeholder names referenced (allowed-vocabulary-validated)
Return the set of placeholder names referenced by a worker-env template.
Validates the vocabulary (unknown names raise CapabilityConfigError) without requiring a placeholder-value mapping. Useful for cjm-ctl validate’s dry-run check at install/release time — surface the bug BEFORE the capability tries to spawn a worker with a malformed default.
Templates without ${...} return an empty set.
expand_worker_env_template
def expand_worker_env_template(
template:str, # The raw EnvVarSpec.default value (may contain ${...} placeholders)
placeholders:Mapping, # Resolved values keyed by placeholder name
capability_name:str='', # For error context ("template X on capability Y references ...")
var_name:str='', # For error context ("on EnvVarSpec(name=Z)")
)->str:
Substitute ${VAR} placeholders in template using placeholders.
Strict mode (no safe_substitute): unknown placeholders raise CapabilityConfigError with descriptive context. Single-pass, non-recursive — substituted values are taken verbatim, never re-scanned for further placeholders. Templates without any ${...} syntax pass through unchanged (so plain static defaults work as before).
The allowed placeholder vocabulary is fixed via WORKER_ENV_TEMPLATE_PLACEHOLDERS. A ${FOO} whose name is in the vocabulary but whose RESOLVED value is None (e.g. CJM_MODELS_DIR when the operator hasn’t configured one) raises CapabilityConfigError with the same shape — operators get a clear signal that the capability needs a value they haven’t provided, rather than a silent substitution of empty string into a load-bearing path.
ToolCapability
def ToolCapability(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Tool-capability interface: manage the tool/worker — identity, lifecycle, config, cancellation, observability (pass-2 Thread 3 fracture).
The task channel is NOT part of this surface: execute left the abstract set when the capability-unit fracture split tool capabilities from task adapters. Typed task contracts live on adapters (core.adapter + the per-task cjm-<task>-adapter-interface libraries). Fused-era capabilities (the pre-Option-C 12) still define execute themselves and their domain ABCs still declare it abstract — they kept working unchanged through the class-identical ToolCapability alias (later REMOVED at SG-48) until the Option C migration cascade split them (the alias was REMOVED at SG-48).
CR-4 extended this surface with: prefetch hook (SG-19), made cleanup optional (SG-43), reconfigure split + RELOAD_TRIGGER declarative-helper (lifecycle hook split), and cooperative-cancellation primitives (SG-16 — flag + callback + context manager).
Abstract methods: name, version, initialize, get_config_schema, get_current_config. Concrete defaults (overridable): execute_stream (transitional — see method), cleanup, prefetch, reconfigure, cancel, check_cancel, register_cancel_callback, cancel_signal_to, report_progress, report_usage, fields_that_changed, reconfigure_with_triggers, on_disable, on_enable.
Action Dispatcher Convention (SG-44 + SG-42)
Multi-verb capability interfaces (e.g., MediaProcessing, Graph, Text, Monitor) use a dispatcher-style execute(action, **kwargs) rather than declaring a typed abstract method per action. The substrate provides two helpers that let capability authors declare action handlers cleanly:
@capability_action("name")— marker decorator tagging a method as the handler for a named action.collect_capability_actions(cls)— walks a class (and its MRO) to derive the set of action names from decorated methods. Useful for declaringsupported_actions: ClassVar[Set[str]]without hand-maintaining the list.
SG-42 parameter convention: dispatcher-style interfaces standardize on action= as the dispatch parameter name. The old command= form (the pre-Option-C monitor dispatcher) has since been removed. The substrate’s @capability_action decorator deliberately doesn’t tie itself to either name — it tags methods, and the capability’s own execute() decides which keyword to dispatch on. The convention is documented; the migration is a consumer-side cascade per audit §6.
The actual migration of MediaProcessing/Graph/Text/Monitor interfaces to drop their typed abstract methods and adopt the dispatcher + supported_actions pattern lands in a later cascade step. Wave 3 ships the substrate primitives.
collect_capability_actions
def collect_capability_actions(
cls:type, # Class (or subclass) to scan for @capability_action-tagged methods
)->Set: # Set of action names handled by `cls` (including inherited)
Collect action names from @capability_action-decorated methods on cls.
Walks the class’s MRO so subclasses inherit action handlers from base classes automatically. The returned set is suitable for supported_actions: ClassVar[Set[str]] = collect_capability_actions(MyCapability) once the capability class body has been defined.
capability_action
def capability_action(
action_name:str, # Public action name the decorated method handles
)->Callable: # Decorator
Marker decorator tagging a capability method as the handler for action_name.
Sets func._capability_action = action_name. Capability authors with dispatcher-style execute(action, **kwargs) use collect_capability_actions(cls) to derive their supported_actions set from these markers rather than maintaining a separate list. The decorator does not change call semantics — the wrapped function is returned unchanged.
# T28 regression: ToolCapability.dispatch_to_action routes to the
# @capability_action handler via the MRO walk + forwards kwargs; unknown actions
# raise the typed CapabilityInputError(fields_invalid=["action"]). This is the
# substrate helper the 5 dispatcher capabilities (Demucs/LavaSR/NLTK/FFmpeg/
# SQLite-graph) call instead of reimplementing the loop.
class _T28DispatchCapability(ToolCapability):
@property
def name(self) -> str: return "t28-dispatch"
@property
def version(self) -> str: return "0.0.0"
def initialize(self, config=None): pass
def get_config_schema(self) -> Dict[str, Any]: return {}
def get_current_config(self) -> Dict[str, Any]: return {}
def execute(self, action: str = "ping", **kwargs):
return self.dispatch_to_action(action, **kwargs)
@capability_action("ping")
def _ping(self, **kwargs): return "pong"
@capability_action("echo")
def _echo(self, value=None, **kwargs): return value
# Inheritance: a subclass inherits the base's @capability_action handlers (MRO walk).
class _T28ExtendedCapability(_T28DispatchCapability):
@capability_action("shout")
def _shout(self, text="", **kwargs): return text.upper()
_t28 = _T28DispatchCapability()
# Routes to the tagged handler + forwards kwargs
assert _t28.execute("ping") == "pong"
assert _t28.dispatch_to_action("echo", value=42) == 42
# supported_actions (collect_capability_actions) and dispatch share the same markers
assert collect_capability_actions(_T28DispatchCapability) == {"ping", "echo"}
# Unknown action raises typed CapabilityInputError with fields_invalid=["action"]
try:
_t28.dispatch_to_action("nope")
assert False, "expected CapabilityInputError"
except CapabilityInputError as e:
assert e.fields_invalid == ["action"], f"unexpected fields_invalid: {e.fields_invalid}"
# MRO walk: subclass dispatches BOTH its own and inherited handlers
_t28e = _T28ExtendedCapability()
assert _t28e.dispatch_to_action("shout", text="hi") == "HI"
assert _t28e.dispatch_to_action("ping") == "pong"
assert collect_capability_actions(_T28ExtendedCapability) == {"ping", "echo", "shout"}
print("T28 dispatch_to_action: PASS")The interface provides:
- Identity:
nameandversionproperties for capability identification - Lifecycle:
initialize()/prefetch()/cleanup()+on_disable()/on_enable()signal hooks - Configuration:
get_config_schema()/get_current_config()/get_config_options()/reconfigure()(+RELOAD_TRIGGERdeclarative releases) - Cancellation:
cancel()/check_cancel()/register_cancel_callback()/cancel_signal_to()cooperative primitives - Observability:
report_progress()/report_usage()/heartbeat() - Multi-op dispatch:
dispatch_to_action()for genuinely multi-op tools (ffmpeg, graph storage)
The task channel (execute / typed task methods) belongs to task adapters, not this interface; execute_stream’s transitional default remains for fused-era capabilities only.
Example: Implementing a Capability
Here’s a complete example showing how to implement a concrete capability:
from dataclasses import dataclass, field, asdict
@dataclass
class ExampleConfig:
"""Configuration for the example capability."""
mode: str = "balanced"
threshold: float = 0.5
max_workers: int = 4
class ExampleCapability(ToolCapability):
"""A simple example capability implementation."""
def __init__(self):
self._config: ExampleConfig = ExampleConfig()
self._resource: Optional[str] = None
@property
def name(self) -> str:
return "example-capability"
@property
def version(self) -> str:
return "1.0.0"
def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
"""Initialize or re-configure the capability."""
if config is None:
config = {}
# Merge with defaults
current = asdict(self._config)
current.update(config)
self._config = ExampleConfig(**current)
# Initialize resources based on config
self._resource = f"Resource-{self._config.mode}"
def execute(self, input_data: str, **kwargs) -> str:
"""Process input data."""
return f"Processed '{input_data}' using {self._resource}"
def get_config_schema(self) -> Dict[str, Any]:
"""Return JSON Schema for configuration."""
return {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["fast", "balanced", "quality"],
"default": "balanced"
},
"threshold": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.5
},
"max_workers": {
"type": "integer",
"minimum": 1,
"maximum": 16,
"default": 4
}
}
}
def get_current_config(self) -> Dict[str, Any]:
"""Return current configuration."""
return asdict(self._config)
def cleanup(self) -> None:
"""Clean up resources."""
self._resource = None# Test the example capability
capability = ExampleCapability()
capability.initialize()
print(f"Capability: {capability.name} v{capability.version}")
print(f"\nConfig Schema:")
print(capability.get_config_schema())
print(f"\nCurrent Config:")
print(capability.get_current_config())# Test execution
result = capability.execute("sample_data")
print(f"Result: {result}")
# Test re-initialization with new config
capability.initialize({"mode": "quality", "threshold": 0.8})
print(f"\nAfter re-init config: {capability.get_current_config()}")
result = capability.execute("more_data")
print(f"Result: {result}")
# Test cleanup
capability.cleanup()
print(f"\nAfter cleanup, resource is cleared")# Test streaming (default implementation yields single result)
capability.initialize({"mode": "balanced"})
print("Streaming execution:")
for chunk in capability.execute_stream("stream_data"):
print(f" Chunk: {chunk}")CR-4 regression tests
Exercise the lifecycle hooks + helpers + cancellation primitives independently of any worker subprocess. The tests use bare ToolCapability subclasses without going through RemoteCapabilityProxy.
What’s covered:
cleanup()made optional — a subclass with nocleanupoverride is instantiable.prefetch()default no-op.fields_that_changedsymmetric difference + asymmetric-presence handling.reconfigure_with_triggerswalksRELOAD_TRIGGERmetadata, dispatches_release_<trigger>, de-dupes shared triggers, silently skips capabilities withoutconfig_class, and survives a misbehaving_releasemethod.cancel()sets_cancel_requested+ fires registered callbacks (logging and skipping misbehaving callbacks).check_cancel()raisesCapabilityCancelledErroronly when flag is set.cancel_signal_tocontext manager registers + deregisters cleanly even when the with-block raises.
Structural surface derivation
Surface-based, adapter-driven compatibility (pass-2 Thread 3): the tool-capability manifest records the class’s structural surface (public methods + signatures, properties, class attributes) by pure self-introspection — the capability has ZERO knowledge of protocols or adapters. The adapter declares required_tool_protocol; the substrate matches the protocol against recorded surfaces host-side, against UNLOADED capabilities (the matching machinery is CR-17 pt 2, stage 4).
Two call sites use this one derivation: the _generate_manifest introspection script (runs in the capability’s env at install/regenerate time → code.structural_surface + a witness hash) and the worker’s /structural_surface endpoint (the live companion the manager compares against the hash → surface_drift flag; third instance of the CR-8 hashed-witness + live-companion idiom).
derive_structural_surface
def derive_structural_surface(
cls:type, # The capability class to introspect
)->Dict: # {"methods": [...], "properties": [...], "attributes": [...]}
Record a capability class’s structural surface by pure self-introspection.
The FULL public surface is recorded, inherited members included — the surface is what adapter protocols match against, and protocol members may name inherited methods. Deterministic (name-sorted) so the canonical-JSON witness hash is stable across runs.
Classification: property → properties (names only); functions (static/class methods unwrapped) → methods with str(inspect.signature) + the parameter NAME list params (self excluded — the CR-17 pt 2 surface matcher’s input; stage 4); everything else public → attributes with the value’s type name (config_class, supported_actions, WORKER_ENV, …).
# derive_structural_surface: full public surface (inherited base methods
# included — they ARE surface), deterministic ordering, classification.
surf = derive_structural_surface(_CR4MinimalCapability)
names = {m["name"] for m in surf["methods"]}
assert "execute" in names # the fused-era capability defines it
assert "reconfigure" in names # inherited ToolCapability method
assert "dispatch_to_action" in names # patched-on members are surface too
assert "name" in surf["properties"] and "version" in surf["properties"]
sig = next(m["signature"] for m in surf["methods"] if m["name"] == "initialize")
assert "config" in sig
assert surf == derive_structural_surface(_CR4MinimalCapability) # deterministic
assert [m["name"] for m in surf["methods"]] == sorted(m["name"] for m in surf["methods"])
print(f"derive_structural_surface OK: {len(surf['methods'])} methods, "
f"{len(surf['properties'])} properties, {len(surf['attributes'])} attributes")