Tool Capability

The tool-capability interface — the manage-the-tool half of the capability-unit fracture (pass-2 Thread 3)

ToolCapability

ToolCapability is the manage-the-tool half of the capability-unit fracture (pass-2 Thread 3): identity, lifecycle, config, cancellation, and observability for a tool running in a worker process. It works for both:

  • Concrete capabilities: running in Worker processes (implement the actual logic)
  • Remote proxies: running in the Host process (forward calls over HTTP)

What it deliberately does NOT carry: the task channel. The pre-fracture interface declared an untyped execute(*args, **kwargs) as its one task-shaped member; under the fracture, typed task contracts live on task adapters (see core.adapter and the per-task cjm-<task>-adapter-interface libraries), and results cross the worker boundary through the typed wire layer (core.wire).


FieldOptions


def FieldOptions(
    options:List, constraints:Dict=<factory>
)->None:

CR-11: the live option domain for one dynamic config field.

Kept SEPARATE from the static config_schema (which CR-8 hashes for drift detection). The capability-config UI merges these live options on top of the static schema; folding them into the schema would make every API capability perpetually ‘drift’.


ConfigOption


def ConfigOption(
    value:Any, label:str, metadata:Dict=<factory>
)->None:

CR-11: one live option for a dynamic config field, with optional metadata.


EnvVarSpec


def EnvVarSpec(
    name:str, secret:bool=False, required:bool=False, label:str='', description:str='', default:Optional=None,
    options:Optional=None
)->None:

CR-12: one entry of a capability’s spawn-time worker-environment contract.

A capability declares the environment variables its worker subprocess reads at startup via WORKER_ENV: ClassVar[List[EnvVarSpec]]. Worker env vars are FIXED AT SPAWN — changing one requires a worker RESPAWN, so the substrate routes such changes through reload_capability, never reconfigure (the env is baked into the subprocess at Popen and can’t be mutated in-process). This is the lifecycle distinction from a normal config field (reconfigurable in place via reconfigure/_release_<trigger>).

Two flavors share this one declaration:

  • secret=True : value resolved from the SecretStore (masked; never persisted in the config store, echoed in config_schema, or logged). A secret never carries a default — a baked-in secret is a leak.
  • secret=False : visible value resolved from the override chain (operator override > manifest install.env_vars > this default); safe to display.

Both share one injection seam: the substrate composes the resolved {name: value} overlay at load time and injects it into the worker env at spawn (extending the existing CJM_CAPABILITY_DATA_DIR / CJM_MODELS_DIR injection). This is “derive from behaviour, not metadata” applied to the spawn env: the capability declares WHICH vars it consumes + whether each is secret/required; the substrate owns resolution + injection.

options is a forward seam for visible vars with a finite domain (e.g. a device selector enumerating GPUs); unused today but reserved so the capability-config UI / a future set-env surface isn’t blocked.


template_check_placeholders


def template_check_placeholders(
    template:str, # The raw EnvVarSpec.default value
)->Set: # Placeholder names referenced (allowed-vocabulary-validated)

Return the set of placeholder names referenced by a worker-env template.

Validates the vocabulary (unknown names raise CapabilityConfigError) without requiring a placeholder-value mapping. Useful for cjm-ctl validate’s dry-run check at install/release time — surface the bug BEFORE the capability tries to spawn a worker with a malformed default.

Templates without ${...} return an empty set.


expand_worker_env_template


def expand_worker_env_template(
    template:str, # The raw EnvVarSpec.default value (may contain ${...} placeholders)
    placeholders:Mapping, # Resolved values keyed by placeholder name
    capability_name:str='', # For error context ("template X on capability Y references ...")
    var_name:str='', # For error context ("on EnvVarSpec(name=Z)")
)->str:

Substitute ${VAR} placeholders in template using placeholders.

Strict mode (no safe_substitute): unknown placeholders raise CapabilityConfigError with descriptive context. Single-pass, non-recursive — substituted values are taken verbatim, never re-scanned for further placeholders. Templates without any ${...} syntax pass through unchanged (so plain static defaults work as before).

The allowed placeholder vocabulary is fixed via WORKER_ENV_TEMPLATE_PLACEHOLDERS. A ${FOO} whose name is in the vocabulary but whose RESOLVED value is None (e.g. CJM_MODELS_DIR when the operator hasn’t configured one) raises CapabilityConfigError with the same shape — operators get a clear signal that the capability needs a value they haven’t provided, rather than a silent substitution of empty string into a load-bearing path.


ToolCapability


def ToolCapability(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Tool-capability interface: manage the tool/worker — identity, lifecycle, config, cancellation, observability (pass-2 Thread 3 fracture).

The task channel is NOT part of this surface: execute left the abstract set when the capability-unit fracture split tool capabilities from task adapters. Typed task contracts live on adapters (core.adapter + the per-task cjm-<task>-adapter-interface libraries). Fused-era capabilities (the pre-Option-C 12) still define execute themselves and their domain ABCs still declare it abstract — they kept working unchanged through the class-identical ToolCapability alias (later REMOVED at SG-48) until the Option C migration cascade split them (the alias was REMOVED at SG-48).

CR-4 extended this surface with: prefetch hook (SG-19), made cleanup optional (SG-43), reconfigure split + RELOAD_TRIGGER declarative-helper (lifecycle hook split), and cooperative-cancellation primitives (SG-16 — flag + callback + context manager).

Abstract methods: name, version, initialize, get_config_schema, get_current_config. Concrete defaults (overridable): execute_stream (transitional — see method), cleanup, prefetch, reconfigure, cancel, check_cancel, register_cancel_callback, cancel_signal_to, report_progress, report_usage, fields_that_changed, reconfigure_with_triggers, on_disable, on_enable.

Action Dispatcher Convention (SG-44 + SG-42)

Multi-verb capability interfaces (e.g., MediaProcessing, Graph, Text, Monitor) use a dispatcher-style execute(action, **kwargs) rather than declaring a typed abstract method per action. The substrate provides two helpers that let capability authors declare action handlers cleanly:

  • @capability_action("name") — marker decorator tagging a method as the handler for a named action.
  • collect_capability_actions(cls) — walks a class (and its MRO) to derive the set of action names from decorated methods. Useful for declaring supported_actions: ClassVar[Set[str]] without hand-maintaining the list.

SG-42 parameter convention: dispatcher-style interfaces standardize on action= as the dispatch parameter name. The old command= form (the pre-Option-C monitor dispatcher) has since been removed. The substrate’s @capability_action decorator deliberately doesn’t tie itself to either name — it tags methods, and the capability’s own execute() decides which keyword to dispatch on. The convention is documented; the migration is a consumer-side cascade per audit §6.

The actual migration of MediaProcessing/Graph/Text/Monitor interfaces to drop their typed abstract methods and adopt the dispatcher + supported_actions pattern lands in a later cascade step. Wave 3 ships the substrate primitives.


collect_capability_actions


def collect_capability_actions(
    cls:type, # Class (or subclass) to scan for @capability_action-tagged methods
)->Set: # Set of action names handled by `cls` (including inherited)

Collect action names from @capability_action-decorated methods on cls.

Walks the class’s MRO so subclasses inherit action handlers from base classes automatically. The returned set is suitable for supported_actions: ClassVar[Set[str]] = collect_capability_actions(MyCapability) once the capability class body has been defined.


capability_action


def capability_action(
    action_name:str, # Public action name the decorated method handles
)->Callable: # Decorator

Marker decorator tagging a capability method as the handler for action_name.

Sets func._capability_action = action_name. Capability authors with dispatcher-style execute(action, **kwargs) use collect_capability_actions(cls) to derive their supported_actions set from these markers rather than maintaining a separate list. The decorator does not change call semantics — the wrapped function is returned unchanged.

# SG-44 regression: @capability_action + collect_capability_actions form a usable
# decorator/introspection pair, and inheritance walks the MRO.
class _BaseDispatcher:
    @capability_action("hello")
    def _say_hello(self, **kwargs):
        return "hi"
    
    @capability_action("goodbye")
    def _say_goodbye(self, **kwargs):
        return "bye"


class _ExtendedDispatcher(_BaseDispatcher):
    @capability_action("wave")
    def _wave(self, **kwargs):
        return "👋"


# Decorator tags methods without changing them
assert _BaseDispatcher._say_hello._capability_action == "hello"

# Collection from base class
base_actions = collect_capability_actions(_BaseDispatcher)
assert base_actions == {"hello", "goodbye"}, f"unexpected: {base_actions}"

# Inheritance: subclass inherits parent's actions + adds its own
ext_actions = collect_capability_actions(_ExtendedDispatcher)
assert ext_actions == {"hello", "goodbye", "wave"}, f"unexpected: {ext_actions}"

# Undecorated methods don't contribute
class _Plain:
    def regular(self): return None
assert collect_capability_actions(_Plain) == set()

# Methods still callable after decoration
inst = _BaseDispatcher()
assert inst._say_hello() == "hi"

print("SG-44 @capability_action + collect_capability_actions: PASS")
# T28 regression: ToolCapability.dispatch_to_action routes to the
# @capability_action handler via the MRO walk + forwards kwargs; unknown actions
# raise the typed CapabilityInputError(fields_invalid=["action"]). This is the
# substrate helper the 5 dispatcher capabilities (Demucs/LavaSR/NLTK/FFmpeg/
# SQLite-graph) call instead of reimplementing the loop.
class _T28DispatchCapability(ToolCapability):
    @property
    def name(self) -> str: return "t28-dispatch"
    @property
    def version(self) -> str: return "0.0.0"
    def initialize(self, config=None): pass
    def get_config_schema(self) -> Dict[str, Any]: return {}
    def get_current_config(self) -> Dict[str, Any]: return {}
    def execute(self, action: str = "ping", **kwargs):
        return self.dispatch_to_action(action, **kwargs)

    @capability_action("ping")
    def _ping(self, **kwargs): return "pong"

    @capability_action("echo")
    def _echo(self, value=None, **kwargs): return value


# Inheritance: a subclass inherits the base's @capability_action handlers (MRO walk).
class _T28ExtendedCapability(_T28DispatchCapability):
    @capability_action("shout")
    def _shout(self, text="", **kwargs): return text.upper()


_t28 = _T28DispatchCapability()
# Routes to the tagged handler + forwards kwargs
assert _t28.execute("ping") == "pong"
assert _t28.dispatch_to_action("echo", value=42) == 42
# supported_actions (collect_capability_actions) and dispatch share the same markers
assert collect_capability_actions(_T28DispatchCapability) == {"ping", "echo"}
# Unknown action raises typed CapabilityInputError with fields_invalid=["action"]
try:
    _t28.dispatch_to_action("nope")
    assert False, "expected CapabilityInputError"
except CapabilityInputError as e:
    assert e.fields_invalid == ["action"], f"unexpected fields_invalid: {e.fields_invalid}"

# MRO walk: subclass dispatches BOTH its own and inherited handlers
_t28e = _T28ExtendedCapability()
assert _t28e.dispatch_to_action("shout", text="hi") == "HI"
assert _t28e.dispatch_to_action("ping") == "pong"
assert collect_capability_actions(_T28ExtendedCapability) == {"ping", "echo", "shout"}

print("T28 dispatch_to_action: PASS")

The interface provides:

  • Identity: name and version properties for capability identification
  • Lifecycle: initialize() / prefetch() / cleanup() + on_disable() / on_enable() signal hooks
  • Configuration: get_config_schema() / get_current_config() / get_config_options() / reconfigure() (+ RELOAD_TRIGGER declarative releases)
  • Cancellation: cancel() / check_cancel() / register_cancel_callback() / cancel_signal_to() cooperative primitives
  • Observability: report_progress() / report_usage() / heartbeat()
  • Multi-op dispatch: dispatch_to_action() for genuinely multi-op tools (ffmpeg, graph storage)

The task channel (execute / typed task methods) belongs to task adapters, not this interface; execute_stream’s transitional default remains for fused-era capabilities only.

Example: Implementing a Capability

Here’s a complete example showing how to implement a concrete capability:

from dataclasses import dataclass, field, asdict

@dataclass
class ExampleConfig:
    """Configuration for the example capability."""
    mode: str = "balanced"
    threshold: float = 0.5
    max_workers: int = 4

class ExampleCapability(ToolCapability):
    """A simple example capability implementation."""

    def __init__(self):
        self._config: ExampleConfig = ExampleConfig()
        self._resource: Optional[str] = None

    @property
    def name(self) -> str:
        return "example-capability"
    
    @property
    def version(self) -> str:
        return "1.0.0"
    
    def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
        """Initialize or re-configure the capability."""
        if config is None:
            config = {}
        
        # Merge with defaults
        current = asdict(self._config)
        current.update(config)
        self._config = ExampleConfig(**current)
        
        # Initialize resources based on config
        self._resource = f"Resource-{self._config.mode}"

    def execute(self, input_data: str, **kwargs) -> str:
        """Process input data."""
        return f"Processed '{input_data}' using {self._resource}"

    def get_config_schema(self) -> Dict[str, Any]:
        """Return JSON Schema for configuration."""
        return {
            "type": "object",
            "properties": {
                "mode": {
                    "type": "string",
                    "enum": ["fast", "balanced", "quality"],
                    "default": "balanced"
                },
                "threshold": {
                    "type": "number",
                    "minimum": 0.0,
                    "maximum": 1.0,
                    "default": 0.5
                },
                "max_workers": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 16,
                    "default": 4
                }
            }
        }

    def get_current_config(self) -> Dict[str, Any]:
        """Return current configuration."""
        return asdict(self._config)

    def cleanup(self) -> None:
        """Clean up resources."""
        self._resource = None
# Test the example capability
capability = ExampleCapability()
capability.initialize()

print(f"Capability: {capability.name} v{capability.version}")
print(f"\nConfig Schema:")
print(capability.get_config_schema())
print(f"\nCurrent Config:")
print(capability.get_current_config())
# Test execution
result = capability.execute("sample_data")
print(f"Result: {result}")

# Test re-initialization with new config
capability.initialize({"mode": "quality", "threshold": 0.8})
print(f"\nAfter re-init config: {capability.get_current_config()}")
result = capability.execute("more_data")
print(f"Result: {result}")

# Test cleanup
capability.cleanup()
print(f"\nAfter cleanup, resource is cleared")
# Test streaming (default implementation yields single result)
capability.initialize({"mode": "balanced"})

print("Streaming execution:")
for chunk in capability.execute_stream("stream_data"):
    print(f"  Chunk: {chunk}")

CR-4 regression tests

Exercise the lifecycle hooks + helpers + cancellation primitives independently of any worker subprocess. The tests use bare ToolCapability subclasses without going through RemoteCapabilityProxy.

What’s covered:

  • cleanup() made optional — a subclass with no cleanup override is instantiable.
  • prefetch() default no-op.
  • fields_that_changed symmetric difference + asymmetric-presence handling.
  • reconfigure_with_triggers walks RELOAD_TRIGGER metadata, dispatches _release_<trigger>, de-dupes shared triggers, silently skips capabilities without config_class, and survives a misbehaving _release method.
  • cancel() sets _cancel_requested + fires registered callbacks (logging and skipping misbehaving callbacks).
  • check_cancel() raises CapabilityCancelledError only when flag is set.
  • cancel_signal_to context manager registers + deregisters cleanly even when the with-block raises.

Structural surface derivation

Surface-based, adapter-driven compatibility (pass-2 Thread 3): the tool-capability manifest records the class’s structural surface (public methods + signatures, properties, class attributes) by pure self-introspection — the capability has ZERO knowledge of protocols or adapters. The adapter declares required_tool_protocol; the substrate matches the protocol against recorded surfaces host-side, against UNLOADED capabilities (the matching machinery is CR-17 pt 2, stage 4).

Two call sites use this one derivation: the _generate_manifest introspection script (runs in the capability’s env at install/regenerate time → code.structural_surface + a witness hash) and the worker’s /structural_surface endpoint (the live companion the manager compares against the hash → surface_drift flag; third instance of the CR-8 hashed-witness + live-companion idiom).


derive_structural_surface


def derive_structural_surface(
    cls:type, # The capability class to introspect
)->Dict: # {"methods": [...], "properties": [...], "attributes": [...]}

Record a capability class’s structural surface by pure self-introspection.

The FULL public surface is recorded, inherited members included — the surface is what adapter protocols match against, and protocol members may name inherited methods. Deterministic (name-sorted) so the canonical-JSON witness hash is stable across runs.

Classification: property → properties (names only); functions (static/class methods unwrapped) → methods with str(inspect.signature) + the parameter NAME list params (self excluded — the CR-17 pt 2 surface matcher’s input; stage 4); everything else public → attributes with the value’s type name (config_class, supported_actions, WORKER_ENV, …).

# derive_structural_surface: full public surface (inherited base methods
# included — they ARE surface), deterministic ordering, classification.
surf = derive_structural_surface(_CR4MinimalCapability)
names = {m["name"] for m in surf["methods"]}
assert "execute" in names              # the fused-era capability defines it
assert "reconfigure" in names          # inherited ToolCapability method
assert "dispatch_to_action" in names   # patched-on members are surface too
assert "name" in surf["properties"] and "version" in surf["properties"]
sig = next(m["signature"] for m in surf["methods"] if m["name"] == "initialize")
assert "config" in sig
assert surf == derive_structural_surface(_CR4MinimalCapability)  # deterministic
assert [m["name"] for m in surf["methods"]] == sorted(m["name"] for m in surf["methods"])
print(f"derive_structural_surface OK: {len(surf['methods'])} methods, "
      f"{len(surf['properties'])} properties, {len(surf['attributes'])} attributes")