# Capability Manager


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## CapabilityManager

The `CapabilityManager` orchestrates the complete lifecycle of
capabilities in the process-isolated architecture:

- **Discovery**: Finds capability manifests in local (`.cjm/manifests/`)
  or global (`~/.cjm/manifests/`) directories
- **Loading**: Creates `RemoteCapabilityProxy` instances that spawn
  isolated Worker subprocesses
- **Execution**: Forwards calls to Workers via HTTP, supports both sync
  and async
- **Lifecycle**: Handles initialization, configuration updates, and
  cleanup

<!-- -->

    CapabilityManager                           Worker Subprocesses
    ┌─────────────────┐                    ┌─────────────────────┐
    │ discover_       │                    │ Conda Env: Whisper  │
    │   manifests()   │     HTTP/JSON      │   └─ whisper        │
    │                 │◄──────────────────▶│                     │
    │ capabilities:        │                    └─────────────────────┘
    │   whisper ──────┼──► RemoteProxy     ┌─────────────────────┐
    │   gemini ───────┼──► RemoteProxy ◄──▶│ Conda Env: Gemini   │
    │                 │                    │   └─ gemini         │
    └─────────────────┘                    └─────────────────────┘

------------------------------------------------------------------------

### CapabilityManager

``` python

def CapabilityManager(
    capability_interface:Type=ToolCapability, # Base interface for type checking
    search_paths:Optional=None, # Custom manifest search paths
    scheduler:Optional=None, # Resource allocation policy
    config_store:Optional=None, # CR-2: persistence backend; lazy LocalCapabilityConfigStore default per OQ-4
    empirical_store:Optional=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
    secret_store:Optional=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
    max_retries:int=1, # CR-7: how many reactive retries to attempt on CapabilityResourceError (default 1 — one retry after eviction)
    sysmon_capability_name:Optional=None, # monitor capability (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
    journal_store:Optional=None, # CR-14: durable account-of-action; lazy LocalJournalStore at <data_dir>/journal.db
    diagnostics_store:Optional=None, # CR-14: disposable diagnostic narrative; lazy LocalDiagnosticsStore at <data_dir>/diagnostics.db
):

```

*Manages capability discovery, loading, and lifecycle via process
isolation.*

## CR-3 Telemetry (System Monitor)

Bind a `MonitorToolProtocol` proxy and read system telemetry through
CR-3’s typed surface. `_get_global_stats*` duck-types on the proxy’s
typed accessor, falling back to the legacy dispatcher path for monitors
that haven’t migrated.

------------------------------------------------------------------------

### register_system_monitor

``` python

def register_system_monitor(
    capability_name:str, # Name of the system monitor capability
)->None:

```

*Bind a loaded capability to act as the hardware system monitor.*

------------------------------------------------------------------------

### get_global_stats

``` python

async def get_global_stats(
    
)->Dict: # Current system telemetry

```

*Public async system-telemetry accessor (stage 3 / CR-16).*

The JobQueue’s multi-lane admission consumes this through the
`JobQueueDependencies` protocol (defensively via getattr). Thin wrapper
over `_get_global_stats_async` — same CR-3 duck-type semantics.

------------------------------------------------------------------------

### get_admission_profile

``` python

def get_admission_profile(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # {'gpu_memory_mb_peak_max','memory_mb_peak_max','sample_count'} or None

```

*Empirical resource profile for a loaded instance’s CURRENT config*
(stage 3 / CR-16 multi-lane admission).

Reads the CR-7 empirical store at the instance’s live
`(instance_id, config_hash)` key — the SAME keying that records samples,
so the profile always describes the configuration actually being run (a
config change = a new hash = no record = the queue runs the job
EXCLUSIVE until its first measurement run graduates it).

None = no evidence (instance unknown / store disabled / no record for
this config). The manifest’s `requires_gpu` is deliberately not part of
this surface — GPU use is an empirical fact, not a declaration (stage-3
ledger G2).

------------------------------------------------------------------------

### get_instance_concurrency_cap

``` python

def get_instance_concurrency_cap(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # The instance's SG-33 max_concurrent_requests (None = unset)

```

*Per-instance concurrency cap for queue admission (stage 3 / CR-16).*

Surfaces the SG-33 `max_concurrent_requests` setting. The queue treats
None as 1 (same-worker concurrency is OPT-IN per capability — e.g.
ffmpeg raises its cap because its sync endpoints run in a threadpool and
concurrent converts genuinely parallelize as subprocesses; model workers
stay serial-per-instance).

## Manifest Discovery

Read manifests from disk into typed `CapabilityMeta` (parsing Phase 5a
resources), routing adapter manifests to the adapter registry. Stable
cluster.

------------------------------------------------------------------------

### discover_manifests

``` python

def discover_manifests(
    
)->List: # List of discovered capability metadata

```

*Discover capabilities via JSON manifests in search paths.*

CR-8: reads each manifest via `load_manifest`, which parses the v2.0
nested layout into a typed `ManifestV2`. `meta.manifest` is set to a
flat-shaped dict view so existing consumers (proxy, scheduling, execute
path) continue working unchanged; the typed `ManifestV2` is also
attached as `meta.manifest_v2` so drift detection + future typed callers
can read `drift_tracking.config_schema_hash` without re-parsing.

------------------------------------------------------------------------

### get_capabilities_compatible_with

``` python

def get_capabilities_compatible_with(
    adapter:Union, # Adapter unit name or manifest
)->List: # Discovered capability names whose surface satisfies the protocol

```

*CR-17 pt 2: the pass-2 compatibility query, manifest-surface-based.*

------------------------------------------------------------------------

### check_adapter_compatibility

``` python

def check_adapter_compatibility(
    adapter:Union, # Adapter unit name or manifest
    capability_name:str, # Discovered capability (capability) name
)->Dict: # Match verdict (see match_protocol_against_surface)

```

*CR-17 pt 2: surface-based compatibility verdict (host-side; works
against* UNLOADED capabilities — manifest-vs-manifest, no protocol
imports host-side).

Matches the adapter’s recorded protocol members against the capability
manifest’s recorded `structural_surface` (pass-2 Thread 3: the
capability records only itself; the adapter declares the protocol; the
substrate matches). A capability without a recorded surface
(pre-fracture manifest) is NOT compatible until its manifest regenerates
— staleness stays visible instead of silently mis-answering.

------------------------------------------------------------------------

### get_adapters_for_task

``` python

def get_adapters_for_task(
    task_name:str, # Task name, e.g. "graph-storage"
)->List: # Discovered adapter units serving the task

```

*CR-17 pt 2: the adapter-registry view — discovered adapter manifests
for a task.*

------------------------------------------------------------------------

### get_capability_meta

``` python

def get_capability_meta(
    capability_name:str, # Name of the capability
)->Optional: # Capability metadata or None

```

*Get metadata for a loaded capability by name.*

------------------------------------------------------------------------

### get_discovered_meta

``` python

def get_discovered_meta(
    capability_name:str, # Name of the capability
)->Optional: # Capability metadata or None

```

*Get metadata for a discovered (not necessarily loaded) capability by
name.*

## Configuration Validation + Persistence Helpers

SG-5 strict-by-default config validation against the manifest’s
`config_schema`; SG-9 schema drift detection; CR-2 per-capability
persistence via `CapabilityConfigStore`; deferred-on-disable hook
bookkeeping. Stable cluster — these gates are settled.

## CR-10 Multi-Instance Helpers

Validate explicit `instance_id` inputs against the constrained pattern,
auto-generate unique IDs for `new_instance=True`, and query the
per-instance state in `self.instances`. Closed by CR-10 core.

------------------------------------------------------------------------

### get_instance

``` python

def get_instance(
    name_or_id:str, # Capability name (default-loaded) or explicit instance_id
)->Optional:

```

*Return the CapabilityInstance for `name_or_id`, or None if not loaded.*

Lookup is keyed by instance_id (which equals capability_name for
default- loaded capabilities). Multi-instance IDs only exist in
self.instances.

------------------------------------------------------------------------

### list_instances

``` python

def list_instances(
    capability_name:Optional=None, # If given, filter to this capability's instances
)->List:

```

*List all loaded instances, optionally filtered by underlying capability
name.*

------------------------------------------------------------------------

### get_worker_env_status

``` python

def get_worker_env_status(
    name_or_meta:Any, # Capability name (loaded/discovered) or a CapabilityMeta
    scope:Optional=None, # SG-55 forward seam
)->List: # Per-entry status dicts (secret values never returned)

```

*CR-12: per-entry satisfaction status of a capability’s worker-env
contract.*

Each entry: {name, secret, required, satisfied, label, description}.
`satisfied` means a value is resolvable (secret present in the store, or
a visible var has a default/override). Secret VALUES are never returned
— only whether one is set. The capability-config UI uses this to gate
config display on required secrets being satisfied.

------------------------------------------------------------------------

### missing_required_env

``` python

def missing_required_env(
    name_or_meta:Any, # Capability name or CapabilityMeta
    scope:Optional=None, # SG-55 forward seam
)->List: # Names of required worker-env entries with no resolvable value

```

*CR-12: names of required worker-env entries that are unsatisfied.*

------------------------------------------------------------------------

### set_capability_secret

``` python

def set_capability_secret(
    name_or_id:str, # Capability name or instance_id whose secret to set
    key:str, # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
    value:str, # Secret value (stored via the SecretStore, never config/logs)
    scope:Optional=None, # SG-55 forward seam: per-principal scope
    reload:bool=True, # Respawn loaded worker(s) so the new env is injected
)->bool: # True if the secret was stored

```

*CR-12: store a capability secret, then respawn its worker(s) to inject
it.*

Secrets are keyed by the underlying CAPABILITY name (not instance_id),
so all instances of a capability share one credential — set the Gemini
key once and every Gemini instance gets it at (re)spawn. Because worker
env is fixed at spawn, the new value only reaches a *running* worker via
a RESPAWN, so this reloads each loaded instance of the capability
(unless `reload=False`, e.g. when provisioning a secret before the
capability is loaded). This is the actuation seam both the CLI
(`cjm-ctl set-secret`) and a future config UI call. Reload failures are
logged, not raised.

## Load + Unload Lifecycle

Spawn / terminate capability workers. `load_capability` is the canonical
entry; `unload_capability` handles canonical-vs-multi-instance cleanup
(drops `CapabilityMeta` only when no instances remain for the
capability). `get_capability` / `list_capabilities` are the
simple-accessor surface. **Mid-churn** — CR-7 reactive resource
management will touch `load_capability` lightly to initialize empirical
resource tracking.

------------------------------------------------------------------------

### load_capability

``` python

def load_capability(
    capability_meta:CapabilityMeta, # Capability metadata (with manifest attached)
    config:Optional=None, # Initial configuration
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
    instance_id:Optional=None, # CR-10: explicit instance_id; None defaults to capability_name
    new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
    max_concurrent_requests:Optional=None, # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
    adapters:Optional=None, # CR-17 pt 2: explicit adapter unit names (loud refusal on mismatch); None = auto-bind discovered compatibles
)->bool: # True if successfully loaded

```

*Load a capability by spawning a Worker subprocess.*

CR-2: reads the persisted CapabilityConfigRecord from
`self.config_store` before launching the worker. If a persisted record
exists and the caller didn’t pass an explicit config, the persisted
config is used as the effective input. The persisted `enabled` flag is
applied to `capability_meta.enabled` so disabled capabilities stay
disabled across process restarts.

CR-10: optional `instance_id` allows multi-instance loading. -
instance_id=None, new_instance=False (default): instance_id =
capability_meta.name. Populates self.capabilities\[capability_name\] +
self.instances \[capability_name\] together (single-instance backward
compat). - instance_id=“custom”: validated against
`[A-Za-z0-9_-]{1,64}`. Populates self.instances\[custom\]. Persistence
is keyed by capability_name and only applied to the default instance. -
instance_id=None, new_instance=True: auto-generates `{name}-{6-hex}`.
Idempotent: re-load against an existing instance_id returns True without
re-spawning.

CR-7: computes `config_hash` from the effective config (post-defaults +
post-validation) and stores it on the CapabilityInstance so
execute_capability\* can key empirical samples by (instance_id,
config_hash). SG-33 stores `max_concurrent_requests` on the instance —
the actual asyncio.Semaphore is lazy-created in execute_capability_async
via `_get_concurrent_limiter`.

------------------------------------------------------------------------

### load_all

``` python

def load_all(
    configs:Optional=None, # Capability name -> config mapping
)->Dict: # Capability name -> success mapping

```

*Discover and load all available capabilities.*

------------------------------------------------------------------------

### unload_capability

``` python

def unload_capability(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
)->bool: # True if successfully unloaded

```

*Unload a capability instance and terminate its Worker process (CR-10).*

If name_or_id resolves to the default instance (instance_id ==
capability_name) and no other instances remain for the same capability,
also removes the CapabilityMeta from self.capabilities. Otherwise
removes only the instance and clears CapabilityMeta.instance if it
pointed at the unloaded canonical.

------------------------------------------------------------------------

### unload_all

``` python

def unload_all(
    
)->None:

```

*Unload all capability instances and terminate all Worker processes
(CR-10).*

Iterates self.instances (CR-10 keying) rather than self.capabilities so
all multi-instance entries get torn down, not just the canonical
instances.

------------------------------------------------------------------------

### get_capability

``` python

def get_capability(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
)->Optional: # Capability proxy instance or None

```

*Get a loaded capability’s proxy by name or instance_id (CR-10).*

Lookup order: self.instances first (covers both default capability_name
and multi-instance IDs), falling back to CapabilityMeta.instance for any
legacy code path that populated self.capabilities without self.instances
(defensive — shouldn’t happen post-CR-10 since load_capability always
records the instance).

------------------------------------------------------------------------

### list_capabilities

``` python

def list_capabilities(
    
)->List: # List of loaded capability metadata

```

*List all loaded capabilities.*

## Execution + Resource Eviction

The high-churn execute path. **CR-7’s primary rewrite target** —
reactive retry + OOM recovery + bounded retries will land here without
touching the stable cells above. Per-instance `_running_executions`
tracking (keyed by `instance_id`) allows concurrent multi-instance
executes without colliding on the deferred-disable coordination.

------------------------------------------------------------------------

### execute_capability

``` python

def execute_capability(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    args:VAR_POSITIONAL,
    _task_name:Optional=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
    _method:Optional=None, # CR-17 pt 2: adapter method (set with _task_name)
    kwargs:VAR_KEYWORD
)->Any: # Capability result

```

*Execute a capability instance’s main functionality (sync).*

CR-10: resolves `name_or_id` via self.instances; per-instance enabled
flag gates execution. `_running_executions` tracks by instance_id so
concurrent multi-instance executes don’t collide.

CR-2: raises CapabilityDisabledError (typed) when the instance is
disabled.

CR-7: reactive retry on CapabilityResourceError — evicts other
capabilities to free resources, then ALWAYS reloads the failing
capability’s worker before the retry attempt. Track A (WorkerOOMError —
worker died from SIGKILL) needs the reload because there’s no live
worker to retry on. Track B (capability-raised CapabilityResourceError —
worker still alive) ALSO reloads because PyTorch’s CUDA caching
allocator can fragment post-OOM in ways the capability can’t clean up
from within its own process; a fresh worker is the only reliable reset.
Bounded by `self.max_retries` (default 1). Empirical sample recorded in
the finally block — best-effort, doesn’t break execute on failure.

------------------------------------------------------------------------

### execute_capability_async

``` python

async def execute_capability_async(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    args:VAR_POSITIONAL,
    _task_name:Optional=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
    _method:Optional=None, # CR-17 pt 2: adapter method (set with _task_name)
    kwargs:VAR_KEYWORD
)->Any: # Capability result

```

*Execute a capability instance’s main functionality (async).*

CR-10 + CR-2: same semantics as execute_capability, async-flavored.
Scheduler allocation goes through allocate_async for non-blocking
polling.

CR-7 + SG-33: reactive retry on CapabilityResourceError — always reloads
before retry (Track A + Track B converge on the same reload path; see
sync variant docstring for the rationale). Per-instance
asyncio.Semaphore enforces the `max_concurrent_requests` cap (None =
unbounded). Empirical sample recorded in the finally block.

------------------------------------------------------------------------

### execute_capability_task_async

``` python

async def execute_capability_task_async(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    task_name:str, # Adapter task, e.g. "graph-storage"
    method:str, # Adapter method, e.g. "query_nodes"
    kwargs:VAR_KEYWORD
)->Any: # Typed task result

```

*CR-17 pt 2: execute a typed task-adapter method (explicit task channel;
async).*

Thin wrapper over `execute_capability_async` — CR-7 retry, SG-33
semaphore, admission and empirical sampling apply identically; this is
the method the JobQueue’s task-addressed jobs invoke.

------------------------------------------------------------------------

### execute_capability_task

``` python

def execute_capability_task(
    name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
    task_name:str, # Adapter task, e.g. "graph-storage"
    method:str, # Adapter method, e.g. "query_nodes"
    kwargs:VAR_KEYWORD
)->Any: # Typed task result

```

*CR-17 pt 2: execute a typed task-adapter method (explicit task channel;
sync).*

Thin wrapper over `execute_capability` — the whole CR-7 retry /
scheduler / empirical-sampling machinery applies identically to
task-channel calls.

## Enable / Disable + Logs

CR-2 enable/disable surface: per-instance `enabled` flag with
default-instance sync to `CapabilityMeta.enabled` + persistence via
`CapabilityConfigStore` + deferred-on-disable hook
fire-after-in-flight-job semantics. Plus the log-file tail reader. The
capability-config UI library (Path C step 4) is the most likely consumer
to extend the enable/disable behavior next.

------------------------------------------------------------------------

### enable_capability

``` python

def enable_capability(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was enabled

```

*Enable a capability instance (CR-10 multi-instance aware).*

CR-2: persists the new state via `config_store` (default-instance only;
persistence is per-capability, not per-instance) and fires the
capability’s on_enable hook on state-change. Idempotent for
already-enabled instances.

------------------------------------------------------------------------

### disable_capability

``` python

def disable_capability(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was disabled

```

*Disable a capability instance without unloading it (CR-10
multi-instance aware).*

CR-2: persists the new state (default-instance only) and fires the
capability’s on_disable hook — but defers the hook until any in-flight
job for THIS instance finishes (the per-instance `_running_executions`
key is the instance_id, so a concurrent execute on a different instance
of the same capability doesn’t gate this instance’s hook).

------------------------------------------------------------------------

### get_capability_diagnostics

``` python

def get_capability_diagnostics(
    name_or_id:str, # Capability name or instance_id
    limit:int=50, # Max records to return (most recent)
    include_stream:bool=True, # Include raw stream chunks for the capability's worker sessions
)->str: # Rendered diagnostic text (most recent last)

```

*Render a capability’s recent diagnostics as text (CR-14; replaces* the
retired flat-log accessor — the flat `.cjm/logs/*.log` files no longer
exist).

A convenience TEXT projection over the diagnostics store for operator /
UI display: structured records (level + logger name + exact job id when
stamped) merged with the raw stream chunks (prints / tqdm final frames /
death rattles) from this capability’s worker sessions, ordered by time.
Programmatic consumers query the stores directly
(`manager.diagnostics_store` / `JobQueue.get_job_diagnostics`).

Key features:

- **Local-first discovery**: Manifests in `.cjm/manifests/` shadow
  global ones in `~/.cjm/manifests/`
- **Process isolation**: Each capability runs in its own subprocess with
  a dedicated Python interpreter
- **Dual execution modes**: `execute_capability()` for sync,
  `execute_capability_async()` for async
- **Automatic cleanup**: `unload_all()` terminates all Worker processes

## Configuration Management

Methods for managing capability configuration. These forward to the
`RemoteCapabilityProxy` which communicates with the Worker over HTTP.

------------------------------------------------------------------------

### get_capability_config

``` python

def get_capability_config(
    capability_name:str, # Name of the capability
)->Optional: # Current configuration or None

```

*Get the current configuration of a capability.*

------------------------------------------------------------------------

### get_capability_config_schema

``` python

def get_capability_config_schema(
    capability_name:str, # Name of the capability
)->Optional: # JSON Schema or None

```

*Get the configuration JSON Schema for a capability.*

------------------------------------------------------------------------

### get_config_options

``` python

def get_config_options(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Dict: # CR-11: live config option domains, or {} if unavailable

```

*Get a capability instance’s runtime config option providers (CR-11).*

Forwards to the worker’s get_config_options() - live enum domains +
per-option metadata for dynamic config fields (e.g. an API model list).
Kept separate from get_capability_config_schema (static, hashed for CR-8
drift); these options are the live companion the capability-config UI
merges on top.

Degrades to {} if the instance is missing or the worker call fails - the
UI then falls back to the static schema. Typed-error surfacing for the
UI consumer is deferred to the capability-config UI library (Path C Step
4).

------------------------------------------------------------------------

### get_all_capability_configs

``` python

def get_all_capability_configs(
    
)->Dict: # Capability name -> config mapping

```

*Get current configuration for all loaded capabilities.*

------------------------------------------------------------------------

### update_capability_config

``` python

def update_capability_config(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
    config:Dict, # New configuration values
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
)->bool: # True if successful

```

*Update a capability instance’s configuration (CR-10 multi-instance
aware).*

CR-2: on successful reconfigure, persists the new config (default
instance only; multi-instance loads don’t persist). Per-instance
`inst.config` is updated regardless. SG-5: validates against the
underlying capability’s config_schema (per-capability, not per-instance,
so all instances share the same schema).

------------------------------------------------------------------------

### reload_capability

``` python

def reload_capability(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
    config:Optional=None, # Optional new configuration
)->bool: # True if successful

```

*Reload a capability instance by terminating and restarting its Worker
(CR-10).*

------------------------------------------------------------------------

### get_capability_stats

``` python

def get_capability_stats(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # Resource telemetry or None

```

*Get resource usage stats for a capability instance’s Worker process
(CR-10).*

## Streaming Execution

Async streaming support for real-time results (e.g., transcription
word-by-word).

------------------------------------------------------------------------

### execute_capability_stream

``` python

def execute_capability_stream(
    name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Async generator yielding results

```

*Execute a capability instance with streaming response (CR-10
multi-instance aware).*

Same per-instance resolution as execute_capability_async; scheduler
allocation keys off the CapabilityMeta (capability-level), execution +
bookkeeping key off the CapabilityInstance (per-instance).

## Concurrent Load / Unload (CR-10b)

`load_capabilities_concurrent` / `unload_capabilities_concurrent` fan
out capability spawns + teardowns across an asyncio gather, dropping
host startup time from sum-of-spawns to max-of-spawn. The sync
`load_capability` / `unload_capability` paths are unchanged; the async
variants run them via `asyncio.to_thread` so the existing sync
`_wait_for_ready` doesn’t block the event loop while the worker boots.

`CapabilityLoadSpec` (defined in `core.metadata`) collects the per-load
parameters into one dataclass so a batch can be expressed as
`List[CapabilityLoadSpec]`.

Result shapes:

- `load_capabilities_concurrent(...)` returns
  `Dict[str, Union[str, Exception]]` mapping the requested key (explicit
  `instance_id` if set, else `spec.meta.name`) → resolved `instance_id`
  on success or the raised `Exception` on failure.
- `unload_capabilities_concurrent(...)` returns
  `Dict[str, Union[bool, Exception]]` mapping the input `name_or_id` →
  `True` on success or the raised `Exception` on failure.

`max_concurrency=None` (the default) means unbounded — every spec runs
concurrently. Set an integer to cap simultaneous loads (useful when many
capabilities share a constrained resource like GPU memory at boot).
`fail_fast=False` (the default) collects every result via
`asyncio.gather(..., return_exceptions=True)`; `fail_fast=True`
re-raises the first exception and lets remaining tasks complete in the
background.

------------------------------------------------------------------------

### load_capability_async

``` python

async def load_capability_async(
    capability_meta:CapabilityMeta, config:Optional=None, strict:bool=True, instance_id:Optional=None,
    new_instance:bool=False
)->bool:

```

*Async variant of `load_capability` (CR-10b).*

Runs the existing sync `load_capability` via `asyncio.to_thread` so the
blocking proxy spawn + `_wait_for_ready` doesn’t stall the event loop.
Backward compat: identical behavior to the sync method, just
non-blocking.

------------------------------------------------------------------------

### unload_capability_async

``` python

async def unload_capability_async(
    name_or_id:str
)->bool:

```

*Async variant of `unload_capability` (CR-10b).*

------------------------------------------------------------------------

### load_capabilities_concurrent

``` python

async def load_capabilities_concurrent(
    specs:List, # Per-capability load specifications
    max_concurrency:Optional=None, # Cap simultaneous loads; None = unbounded
    fail_fast:bool=False, # Re-raise first exception (default: collect all results)
)->Dict: # requested_key → instance_id or Exception

```

*CR-10b: fan out capability loads concurrently via asyncio.gather.*

Each spec is loaded via `load_capability_async` (`asyncio.to_thread`
under the hood). The total wall-clock drops from sum-of-spawns to
max-of-spawns when `max_concurrency=None`. Capped concurrency uses an
asyncio.Semaphore.

Result keys come from `_spec_requested_key`: explicit `instance_id` if
set, `{capability_name}#new[{index}]` for ambiguous new_instance specs,
else `capability_name`. Successful entries map to the resolved
instance_id (string); failures map to the raised exception (caught
regardless of fail_fast value for non-fail-fast mode; re-raised in
fail_fast=True).

------------------------------------------------------------------------

### unload_capabilities_concurrent

``` python

async def unload_capabilities_concurrent(
    name_or_ids:List, # Capability names or instance_ids to unload
    max_concurrency:Optional=None, fail_fast:bool=False
)->Dict: # name_or_id → True or Exception

```

*CR-10b: fan out capability unloads concurrently via asyncio.gather.*

Same concurrency + fail_fast semantics as load_capabilities_concurrent.
Result keys are the input `name_or_ids` (deduplication is the caller’s
responsibility; duplicate inputs produce one dict entry per unique key).

## CapabilityBinding (SG-17)

A `CapabilityBinding` is a pre-bound view of one capability through a
shared `CapabilityManager`. Consumer services (8 audited in the
substrate audit) have each been writing the same ~80-line wrapper class
to hide the `manager + capability_name` pair behind a clean instance API
— `whisper.execute(...)` rather than
`manager.execute_capability("whisper", ...)`.

`CapabilityManager.bind(name, default_config=...)` returns one. The
binding forwards every method call to the bound manager, supplying
`capability_name` automatically. `default_config` is applied when
`load()` is called without an explicit config.

------------------------------------------------------------------------

### CapabilityBinding

``` python

def CapabilityBinding(
    manager:CapabilityManager, capability_name:str, default_config:Dict=<factory>
)->None:

```

*Pre-bound view of a single capability through a shared
CapabilityManager.*

Eliminates the wrapper-class duplication audited across 8 consumer
services (SG-17). Methods forward to the manager with `capability_name`
pre-supplied; `default_config` is the fallback used when `load()` is
called without an explicit config (matches the manifest-default behavior
in `load_capability`).

------------------------------------------------------------------------

### bind

``` python

def bind(
    capability_name:str, # Name of the capability to pre-bind
    default_config:Optional=None, # Default config used by binding.load()
)->CapabilityBinding: # Bound view ready for instance-style use

```

*Create a CapabilityBinding pre-bound to this manager +
capability_name.*

## Platform Queries (Phase 5a)

Substrate query API for filtering discovered capabilities by
current-platform compatibility. The query is pure string-based — the
substrate never imports the interface library to answer it, because
`_generate_manifest`’s introspection script captured
`resources.platforms` at install time inside the capability’s own conda
env.

- `get_compatible_for_current_platform()` — filters `discovered` by
  `resources.platforms`. Empty platforms list ≡ universal compatibility.

------------------------------------------------------------------------

### get_compatible_for_current_platform

``` python

def get_compatible_for_current_platform(
    
)->List: # Capabilities compatible with current platform

```

*Phase 5a: return discovered capabilities compatible with the host
platform.*

Filters by `resources.platforms`. Capabilities with an empty (or absent)
platforms list are considered universally compatible — that’s the
introspection-time convention when a capability author didn’t declare a
platform constraint. Capabilities lacking the entire `resources` block
(legacy / pre-Phase-5a manifests) also pass through as universal.

Does NOT filter on `requires_gpu` — substrate doesn’t know whether a GPU
is present without invoking a system monitor capability. Callers gate on
GPU availability separately if needed.

## Usage Examples

### Basic Usage

``` python
import logging
from cjm_substrate.core.manager import CapabilityManager

logging.basicConfig(level=logging.INFO)

# Create manager
manager = CapabilityManager()

# Discover and load all capabilities from manifest directories
results = manager.load_all()
print(f"Loaded: {results}")

# Execute a capability
result = manager.execute_capability("whisper-local", audio="/path/to/audio.wav")

# Update configuration (hot-reload)
manager.update_capability_config("whisper-local", {"model": "large-v3"})

# Clean up all workers
manager.unload_all()
```

### Async Usage (FastHTML)

``` python
async def transcribe(audio_path: str):
    manager = CapabilityManager()
    manager.load_all()
    
    try:
        result = await manager.execute_capability_async("whisper-local", audio=audio_path)
        return result
    finally:
        manager.unload_all()

# Streaming
async def transcribe_stream(audio_path: str):
    manager = CapabilityManager()
    manager.load_all()
    
    try:
        async for chunk in manager.execute_capability_stream("whisper-local", audio=audio_path):
            yield chunk
    finally:
        manager.unload_all()
```
