Capability Manager

Capability discovery, loading, and lifecycle management system

CapabilityManager

The CapabilityManager orchestrates the complete lifecycle of capabilities in the process-isolated architecture:

  • Discovery: Finds capability manifests in local (.cjm/manifests/) or global (~/.cjm/manifests/) directories
  • Loading: Creates RemoteCapabilityProxy instances that spawn isolated Worker subprocesses
  • Execution: Forwards calls to Workers via HTTP, supports both sync and async
  • Lifecycle: Handles initialization, configuration updates, and cleanup
CapabilityManager                           Worker Subprocesses
┌─────────────────┐                    ┌─────────────────────┐
│ discover_       │                    │ Conda Env: Whisper  │
│   manifests()   │     HTTP/JSON      │   └─ whisper        │
│                 │◄──────────────────▶│                     │
│ capabilities:        │                    └─────────────────────┘
│   whisper ──────┼──► RemoteProxy     ┌─────────────────────┐
│   gemini ───────┼──► RemoteProxy ◄──▶│ Conda Env: Gemini   │
│                 │                    │   └─ gemini         │
└─────────────────┘                    └─────────────────────┘

CapabilityManager


def CapabilityManager(
    capability_interface:Type=ToolCapability, # Base interface for type checking
    search_paths:Optional=None, # Custom manifest search paths
    scheduler:Optional=None, # Resource allocation policy
    config_store:Optional=None, # CR-2: persistence backend; lazy LocalCapabilityConfigStore default per OQ-4
    empirical_store:Optional=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
    secret_store:Optional=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
    max_retries:int=1, # CR-7: how many reactive retries to attempt on CapabilityResourceError (default 1 — one retry after eviction)
    sysmon_capability_name:Optional=None, # monitor capability (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
    journal_store:Optional=None, # CR-14: durable account-of-action; lazy LocalJournalStore at <data_dir>/journal.db
    diagnostics_store:Optional=None, # CR-14: disposable diagnostic narrative; lazy LocalDiagnosticsStore at <data_dir>/diagnostics.db
):

Manages capability discovery, loading, and lifecycle via process isolation.

CR-3 Telemetry (System Monitor)

Bind a MonitorToolProtocol proxy and read system telemetry through CR-3’s typed surface. _get_global_stats* duck-types on the proxy’s typed accessor, falling back to the legacy dispatcher path for monitors that haven’t migrated.


register_system_monitor


def register_system_monitor(
    capability_name:str, # Name of the system monitor capability
)->None:

Bind a loaded capability to act as the hardware system monitor.


get_global_stats


async def get_global_stats(
    
)->Dict: # Current system telemetry

Public async system-telemetry accessor (stage 3 / CR-16).

The JobQueue’s multi-lane admission consumes this through the JobQueueDependencies protocol (defensively via getattr). Thin wrapper over _get_global_stats_async — same CR-3 duck-type semantics.


get_admission_profile


def get_admission_profile(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # {'gpu_memory_mb_peak_max','memory_mb_peak_max','sample_count'} or None

Empirical resource profile for a loaded instance’s CURRENT config (stage 3 / CR-16 multi-lane admission).

Reads the CR-7 empirical store at the instance’s live (instance_id, config_hash) key — the SAME keying that records samples, so the profile always describes the configuration actually being run (a config change = a new hash = no record = the queue runs the job EXCLUSIVE until its first measurement run graduates it).

None = no evidence (instance unknown / store disabled / no record for this config). The manifest’s requires_gpu is deliberately not part of this surface — GPU use is an empirical fact, not a declaration (stage-3 ledger G2).


get_instance_concurrency_cap


def get_instance_concurrency_cap(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # The instance's SG-33 max_concurrent_requests (None = unset)

Per-instance concurrency cap for queue admission (stage 3 / CR-16).

Surfaces the SG-33 max_concurrent_requests setting. The queue treats None as 1 (same-worker concurrency is OPT-IN per capability — e.g. ffmpeg raises its cap because its sync endpoints run in a threadpool and concurrent converts genuinely parallelize as subprocesses; model workers stay serial-per-instance).

Manifest Discovery

Read manifests from disk into typed CapabilityMeta (parsing Phase 5a resources), routing adapter manifests to the adapter registry. Stable cluster.


discover_manifests


def discover_manifests(
    
)->List: # List of discovered capability metadata

Discover capabilities via JSON manifests in search paths.

CR-8: reads each manifest via load_manifest, which parses the v2.0 nested layout into a typed ManifestV2. meta.manifest is set to a flat-shaped dict view so existing consumers (proxy, scheduling, execute path) continue working unchanged; the typed ManifestV2 is also attached as meta.manifest_v2 so drift detection + future typed callers can read drift_tracking.config_schema_hash without re-parsing.


get_capabilities_compatible_with


def get_capabilities_compatible_with(
    adapter:Union, # Adapter unit name or manifest
)->List: # Discovered capability names whose surface satisfies the protocol

CR-17 pt 2: the pass-2 compatibility query, manifest-surface-based.


check_adapter_compatibility


def check_adapter_compatibility(
    adapter:Union, # Adapter unit name or manifest
    capability_name:str, # Discovered capability (capability) name
)->Dict: # Match verdict (see match_protocol_against_surface)

CR-17 pt 2: surface-based compatibility verdict (host-side; works against UNLOADED capabilities — manifest-vs-manifest, no protocol imports host-side).

Matches the adapter’s recorded protocol members against the capability manifest’s recorded structural_surface (pass-2 Thread 3: the capability records only itself; the adapter declares the protocol; the substrate matches). A capability without a recorded surface (pre-fracture manifest) is NOT compatible until its manifest regenerates — staleness stays visible instead of silently mis-answering.


get_adapters_for_task


def get_adapters_for_task(
    task_name:str, # Task name, e.g. "graph-storage"
)->List: # Discovered adapter units serving the task

CR-17 pt 2: the adapter-registry view — discovered adapter manifests for a task.


get_capability_meta


def get_capability_meta(
    capability_name:str, # Name of the capability
)->Optional: # Capability metadata or None

Get metadata for a loaded capability by name.


get_discovered_meta


def get_discovered_meta(
    capability_name:str, # Name of the capability
)->Optional: # Capability metadata or None

Get metadata for a discovered (not necessarily loaded) capability by name.

Configuration Validation + Persistence Helpers

SG-5 strict-by-default config validation against the manifest’s config_schema; SG-9 schema drift detection; CR-2 per-capability persistence via CapabilityConfigStore; deferred-on-disable hook bookkeeping. Stable cluster — these gates are settled.

CR-10 Multi-Instance Helpers

Validate explicit instance_id inputs against the constrained pattern, auto-generate unique IDs for new_instance=True, and query the per-instance state in self.instances. Closed by CR-10 core.


get_instance


def get_instance(
    name_or_id:str, # Capability name (default-loaded) or explicit instance_id
)->Optional:

Return the CapabilityInstance for name_or_id, or None if not loaded.

Lookup is keyed by instance_id (which equals capability_name for default- loaded capabilities). Multi-instance IDs only exist in self.instances.


list_instances


def list_instances(
    capability_name:Optional=None, # If given, filter to this capability's instances
)->List:

List all loaded instances, optionally filtered by underlying capability name.


get_worker_env_status


def get_worker_env_status(
    name_or_meta:Any, # Capability name (loaded/discovered) or a CapabilityMeta
    scope:Optional=None, # SG-55 forward seam
)->List: # Per-entry status dicts (secret values never returned)

CR-12: per-entry satisfaction status of a capability’s worker-env contract.

Each entry: {name, secret, required, satisfied, label, description}. satisfied means a value is resolvable (secret present in the store, or a visible var has a default/override). Secret VALUES are never returned — only whether one is set. The capability-config UI uses this to gate config display on required secrets being satisfied.


missing_required_env


def missing_required_env(
    name_or_meta:Any, # Capability name or CapabilityMeta
    scope:Optional=None, # SG-55 forward seam
)->List: # Names of required worker-env entries with no resolvable value

CR-12: names of required worker-env entries that are unsatisfied.


set_capability_secret


def set_capability_secret(
    name_or_id:str, # Capability name or instance_id whose secret to set
    key:str, # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
    value:str, # Secret value (stored via the SecretStore, never config/logs)
    scope:Optional=None, # SG-55 forward seam: per-principal scope
    reload:bool=True, # Respawn loaded worker(s) so the new env is injected
)->bool: # True if the secret was stored

CR-12: store a capability secret, then respawn its worker(s) to inject it.

Secrets are keyed by the underlying CAPABILITY name (not instance_id), so all instances of a capability share one credential — set the Gemini key once and every Gemini instance gets it at (re)spawn. Because worker env is fixed at spawn, the new value only reaches a running worker via a RESPAWN, so this reloads each loaded instance of the capability (unless reload=False, e.g. when provisioning a secret before the capability is loaded). This is the actuation seam both the CLI (cjm-ctl set-secret) and a future config UI call. Reload failures are logged, not raised.

Load + Unload Lifecycle

Spawn / terminate capability workers. load_capability is the canonical entry; unload_capability handles canonical-vs-multi-instance cleanup (drops CapabilityMeta only when no instances remain for the capability). get_capability / list_capabilities are the simple-accessor surface. Mid-churn — CR-7 reactive resource management will touch load_capability lightly to initialize empirical resource tracking.


load_capability


def load_capability(
    capability_meta:CapabilityMeta, # Capability metadata (with manifest attached)
    config:Optional=None, # Initial configuration
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
    instance_id:Optional=None, # CR-10: explicit instance_id; None defaults to capability_name
    new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
    max_concurrent_requests:Optional=None, # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
    adapters:Optional=None, # CR-17 pt 2: explicit adapter unit names (loud refusal on mismatch); None = auto-bind discovered compatibles
)->bool: # True if successfully loaded

Load a capability by spawning a Worker subprocess.

CR-2: reads the persisted CapabilityConfigRecord from self.config_store before launching the worker. If a persisted record exists and the caller didn’t pass an explicit config, the persisted config is used as the effective input. The persisted enabled flag is applied to capability_meta.enabled so disabled capabilities stay disabled across process restarts.

CR-10: optional instance_id allows multi-instance loading. - instance_id=None, new_instance=False (default): instance_id = capability_meta.name. Populates self.capabilities[capability_name] + self.instances [capability_name] together (single-instance backward compat). - instance_id=“custom”: validated against [A-Za-z0-9_-]{1,64}. Populates self.instances[custom]. Persistence is keyed by capability_name and only applied to the default instance. - instance_id=None, new_instance=True: auto-generates {name}-{6-hex}. Idempotent: re-load against an existing instance_id returns True without re-spawning.

CR-7: computes config_hash from the effective config (post-defaults + post-validation) and stores it on the CapabilityInstance so execute_capability* can key empirical samples by (instance_id, config_hash). SG-33 stores max_concurrent_requests on the instance — the actual asyncio.Semaphore is lazy-created in execute_capability_async via _get_concurrent_limiter.


load_all


def load_all(
    configs:Optional=None, # Capability name -> config mapping
)->Dict: # Capability name -> success mapping

Discover and load all available capabilities.


unload_capability


def unload_capability(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
)->bool: # True if successfully unloaded

Unload a capability instance and terminate its Worker process (CR-10).

If name_or_id resolves to the default instance (instance_id == capability_name) and no other instances remain for the same capability, also removes the CapabilityMeta from self.capabilities. Otherwise removes only the instance and clears CapabilityMeta.instance if it pointed at the unloaded canonical.


unload_all


def unload_all(
    
)->None:

Unload all capability instances and terminate all Worker processes (CR-10).

Iterates self.instances (CR-10 keying) rather than self.capabilities so all multi-instance entries get torn down, not just the canonical instances.


get_capability


def get_capability(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
)->Optional: # Capability proxy instance or None

Get a loaded capability’s proxy by name or instance_id (CR-10).

Lookup order: self.instances first (covers both default capability_name and multi-instance IDs), falling back to CapabilityMeta.instance for any legacy code path that populated self.capabilities without self.instances (defensive — shouldn’t happen post-CR-10 since load_capability always records the instance).


list_capabilities


def list_capabilities(
    
)->List: # List of loaded capability metadata

List all loaded capabilities.

Execution + Resource Eviction

The high-churn execute path. CR-7’s primary rewrite target — reactive retry + OOM recovery + bounded retries will land here without touching the stable cells above. Per-instance _running_executions tracking (keyed by instance_id) allows concurrent multi-instance executes without colliding on the deferred-disable coordination.


execute_capability


def execute_capability(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    args:VAR_POSITIONAL,
    _task_name:Optional=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
    _method:Optional=None, # CR-17 pt 2: adapter method (set with _task_name)
    kwargs:VAR_KEYWORD
)->Any: # Capability result

Execute a capability instance’s main functionality (sync).

CR-10: resolves name_or_id via self.instances; per-instance enabled flag gates execution. _running_executions tracks by instance_id so concurrent multi-instance executes don’t collide.

CR-2: raises CapabilityDisabledError (typed) when the instance is disabled.

CR-7: reactive retry on CapabilityResourceError — evicts other capabilities to free resources, then ALWAYS reloads the failing capability’s worker before the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL) needs the reload because there’s no live worker to retry on. Track B (capability-raised CapabilityResourceError — worker still alive) ALSO reloads because PyTorch’s CUDA caching allocator can fragment post-OOM in ways the capability can’t clean up from within its own process; a fresh worker is the only reliable reset. Bounded by self.max_retries (default 1). Empirical sample recorded in the finally block — best-effort, doesn’t break execute on failure.


execute_capability_async


async def execute_capability_async(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    args:VAR_POSITIONAL,
    _task_name:Optional=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
    _method:Optional=None, # CR-17 pt 2: adapter method (set with _task_name)
    kwargs:VAR_KEYWORD
)->Any: # Capability result

Execute a capability instance’s main functionality (async).

CR-10 + CR-2: same semantics as execute_capability, async-flavored. Scheduler allocation goes through allocate_async for non-blocking polling.

CR-7 + SG-33: reactive retry on CapabilityResourceError — always reloads before retry (Track A + Track B converge on the same reload path; see sync variant docstring for the rationale). Per-instance asyncio.Semaphore enforces the max_concurrent_requests cap (None = unbounded). Empirical sample recorded in the finally block.


execute_capability_task_async


async def execute_capability_task_async(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    task_name:str, # Adapter task, e.g. "graph-storage"
    method:str, # Adapter method, e.g. "query_nodes"
    kwargs:VAR_KEYWORD
)->Any: # Typed task result

CR-17 pt 2: execute a typed task-adapter method (explicit task channel; async).

Thin wrapper over execute_capability_async — CR-7 retry, SG-33 semaphore, admission and empirical sampling apply identically; this is the method the JobQueue’s task-addressed jobs invoke.


execute_capability_task


def execute_capability_task(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    task_name:str, # Adapter task, e.g. "graph-storage"
    method:str, # Adapter method, e.g. "query_nodes"
    kwargs:VAR_KEYWORD
)->Any: # Typed task result

CR-17 pt 2: execute a typed task-adapter method (explicit task channel; sync).

Thin wrapper over execute_capability — the whole CR-7 retry / scheduler / empirical-sampling machinery applies identically to task-channel calls.

Enable / Disable + Logs

CR-2 enable/disable surface: per-instance enabled flag with default-instance sync to CapabilityMeta.enabled + persistence via CapabilityConfigStore + deferred-on-disable hook fire-after-in-flight-job semantics. Plus the log-file tail reader. The capability-config UI library (Path C step 4) is the most likely consumer to extend the enable/disable behavior next.


enable_capability


def enable_capability(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was enabled

Enable a capability instance (CR-10 multi-instance aware).

CR-2: persists the new state via config_store (default-instance only; persistence is per-capability, not per-instance) and fires the capability’s on_enable hook on state-change. Idempotent for already-enabled instances.


disable_capability


def disable_capability(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was disabled

Disable a capability instance without unloading it (CR-10 multi-instance aware).

CR-2: persists the new state (default-instance only) and fires the capability’s on_disable hook — but defers the hook until any in-flight job for THIS instance finishes (the per-instance _running_executions key is the instance_id, so a concurrent execute on a different instance of the same capability doesn’t gate this instance’s hook).


get_capability_diagnostics


def get_capability_diagnostics(
    name_or_id:str, # Capability name or instance_id
    limit:int=50, # Max records to return (most recent)
    include_stream:bool=True, # Include raw stream chunks for the capability's worker sessions
)->str: # Rendered diagnostic text (most recent last)

Render a capability’s recent diagnostics as text (CR-14; replaces the retired flat-log accessor — the flat .cjm/logs/*.log files no longer exist).

A convenience TEXT projection over the diagnostics store for operator / UI display: structured records (level + logger name + exact job id when stamped) merged with the raw stream chunks (prints / tqdm final frames / death rattles) from this capability’s worker sessions, ordered by time. Programmatic consumers query the stores directly (manager.diagnostics_store / JobQueue.get_job_diagnostics).

Key features:

  • Local-first discovery: Manifests in .cjm/manifests/ shadow global ones in ~/.cjm/manifests/
  • Process isolation: Each capability runs in its own subprocess with a dedicated Python interpreter
  • Dual execution modes: execute_capability() for sync, execute_capability_async() for async
  • Automatic cleanup: unload_all() terminates all Worker processes

Configuration Management

Methods for managing capability configuration. These forward to the RemoteCapabilityProxy which communicates with the Worker over HTTP.


get_capability_config


def get_capability_config(
    capability_name:str, # Name of the capability
)->Optional: # Current configuration or None

Get the current configuration of a capability.


get_capability_config_schema


def get_capability_config_schema(
    capability_name:str, # Name of the capability
)->Optional: # JSON Schema or None

Get the configuration JSON Schema for a capability.


get_config_options


def get_config_options(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Dict: # CR-11: live config option domains, or {} if unavailable

Get a capability instance’s runtime config option providers (CR-11).

Forwards to the worker’s get_config_options() - live enum domains + per-option metadata for dynamic config fields (e.g. an API model list). Kept separate from get_capability_config_schema (static, hashed for CR-8 drift); these options are the live companion the capability-config UI merges on top.

Degrades to {} if the instance is missing or the worker call fails - the UI then falls back to the static schema. Typed-error surfacing for the UI consumer is deferred to the capability-config UI library (Path C Step 4).


get_all_capability_configs


def get_all_capability_configs(
    
)->Dict: # Capability name -> config mapping

Get current configuration for all loaded capabilities.


update_capability_config


def update_capability_config(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
    config:Dict, # New configuration values
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
)->bool: # True if successful

Update a capability instance’s configuration (CR-10 multi-instance aware).

CR-2: on successful reconfigure, persists the new config (default instance only; multi-instance loads don’t persist). Per-instance inst.config is updated regardless. SG-5: validates against the underlying capability’s config_schema (per-capability, not per-instance, so all instances share the same schema).


reload_capability


def reload_capability(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
    config:Optional=None, # Optional new configuration
)->bool: # True if successful

Reload a capability instance by terminating and restarting its Worker (CR-10).


get_capability_stats


def get_capability_stats(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # Resource telemetry or None

Get resource usage stats for a capability instance’s Worker process (CR-10).

Streaming Execution

Async streaming support for real-time results (e.g., transcription word-by-word).


execute_capability_stream


def execute_capability_stream(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Async generator yielding results

Execute a capability instance with streaming response (CR-10 multi-instance aware).

Same per-instance resolution as execute_capability_async; scheduler allocation keys off the CapabilityMeta (capability-level), execution + bookkeeping key off the CapabilityInstance (per-instance).

Concurrent Load / Unload (CR-10b)

load_capabilities_concurrent / unload_capabilities_concurrent fan out capability spawns + teardowns across an asyncio gather, dropping host startup time from sum-of-spawns to max-of-spawn. The sync load_capability / unload_capability paths are unchanged; the async variants run them via asyncio.to_thread so the existing sync _wait_for_ready doesn’t block the event loop while the worker boots.

CapabilityLoadSpec (defined in core.metadata) collects the per-load parameters into one dataclass so a batch can be expressed as List[CapabilityLoadSpec].

Result shapes:

  • load_capabilities_concurrent(...) returns Dict[str, Union[str, Exception]] mapping the requested key (explicit instance_id if set, else spec.meta.name) → resolved instance_id on success or the raised Exception on failure.
  • unload_capabilities_concurrent(...) returns Dict[str, Union[bool, Exception]] mapping the input name_or_idTrue on success or the raised Exception on failure.

max_concurrency=None (the default) means unbounded — every spec runs concurrently. Set an integer to cap simultaneous loads (useful when many capabilities share a constrained resource like GPU memory at boot). fail_fast=False (the default) collects every result via asyncio.gather(..., return_exceptions=True); fail_fast=True re-raises the first exception and lets remaining tasks complete in the background.


load_capability_async


async def load_capability_async(
    capability_meta:CapabilityMeta, config:Optional=None, strict:bool=True, instance_id:Optional=None,
    new_instance:bool=False
)->bool:

Async variant of load_capability (CR-10b).

Runs the existing sync load_capability via asyncio.to_thread so the blocking proxy spawn + _wait_for_ready doesn’t stall the event loop. Backward compat: identical behavior to the sync method, just non-blocking.


unload_capability_async


async def unload_capability_async(
    name_or_id:str
)->bool:

Async variant of unload_capability (CR-10b).


load_capabilities_concurrent


async def load_capabilities_concurrent(
    specs:List, # Per-capability load specifications
    max_concurrency:Optional=None, # Cap simultaneous loads; None = unbounded
    fail_fast:bool=False, # Re-raise first exception (default: collect all results)
)->Dict: # requested_key → instance_id or Exception

CR-10b: fan out capability loads concurrently via asyncio.gather.

Each spec is loaded via load_capability_async (asyncio.to_thread under the hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when max_concurrency=None. Capped concurrency uses an asyncio.Semaphore.

Result keys come from _spec_requested_key: explicit instance_id if set, {capability_name}#new[{index}] for ambiguous new_instance specs, else capability_name. Successful entries map to the resolved instance_id (string); failures map to the raised exception (caught regardless of fail_fast value for non-fail-fast mode; re-raised in fail_fast=True).


unload_capabilities_concurrent


async def unload_capabilities_concurrent(
    name_or_ids:List, # Capability names or instance_ids to unload
    max_concurrency:Optional=None, fail_fast:bool=False
)->Dict: # name_or_id → True or Exception

CR-10b: fan out capability unloads concurrently via asyncio.gather.

Same concurrency + fail_fast semantics as load_capabilities_concurrent. Result keys are the input name_or_ids (deduplication is the caller’s responsibility; duplicate inputs produce one dict entry per unique key).

CapabilityBinding (SG-17)

A CapabilityBinding is a pre-bound view of one capability through a shared CapabilityManager. Consumer services (8 audited in the substrate audit) have each been writing the same ~80-line wrapper class to hide the manager + capability_name pair behind a clean instance API — whisper.execute(...) rather than manager.execute_capability("whisper", ...).

CapabilityManager.bind(name, default_config=...) returns one. The binding forwards every method call to the bound manager, supplying capability_name automatically. default_config is applied when load() is called without an explicit config.


CapabilityBinding


def CapabilityBinding(
    manager:CapabilityManager, capability_name:str, default_config:Dict=<factory>
)->None:

Pre-bound view of a single capability through a shared CapabilityManager.

Eliminates the wrapper-class duplication audited across 8 consumer services (SG-17). Methods forward to the manager with capability_name pre-supplied; default_config is the fallback used when load() is called without an explicit config (matches the manifest-default behavior in load_capability).


bind


def bind(
    capability_name:str, # Name of the capability to pre-bind
    default_config:Optional=None, # Default config used by binding.load()
)->CapabilityBinding: # Bound view ready for instance-style use

Create a CapabilityBinding pre-bound to this manager + capability_name.

Platform Queries (Phase 5a)

Substrate query API for filtering discovered capabilities by current-platform compatibility. The query is pure string-based — the substrate never imports the interface library to answer it, because _generate_manifest’s introspection script captured resources.platforms at install time inside the capability’s own conda env.

  • get_compatible_for_current_platform() — filters discovered by resources.platforms. Empty platforms list ≡ universal compatibility.

get_compatible_for_current_platform


def get_compatible_for_current_platform(
    
)->List: # Capabilities compatible with current platform

Phase 5a: return discovered capabilities compatible with the host platform.

Filters by resources.platforms. Capabilities with an empty (or absent) platforms list are considered universally compatible — that’s the introspection-time convention when a capability author didn’t declare a platform constraint. Capabilities lacking the entire resources block (legacy / pre-Phase-5a manifests) also pass through as universal.

Does NOT filter on requires_gpu — substrate doesn’t know whether a GPU is present without invoking a system monitor capability. Callers gate on GPU availability separately if needed.

Usage Examples

Basic Usage

import logging
from cjm_substrate.core.manager import CapabilityManager

logging.basicConfig(level=logging.INFO)

# Create manager
manager = CapabilityManager()

# Discover and load all capabilities from manifest directories
results = manager.load_all()
print(f"Loaded: {results}")

# Execute a capability
result = manager.execute_capability("whisper-local", audio="/path/to/audio.wav")

# Update configuration (hot-reload)
manager.update_capability_config("whisper-local", {"model": "large-v3"})

# Clean up all workers
manager.unload_all()

Async Usage (FastHTML)

async def transcribe(audio_path: str):
    manager = CapabilityManager()
    manager.load_all()
    
    try:
        result = await manager.execute_capability_async("whisper-local", audio=audio_path)
        return result
    finally:
        manager.unload_all()

# Streaming
async def transcribe_stream(audio_path: str):
    manager = CapabilityManager()
    manager.load_all()
    
    try:
        async for chunk in manager.execute_capability_stream("whisper-local", audio=audio_path):
            yield chunk
    finally:
        manager.unload_all()