Capability Manager
CapabilityManager
The CapabilityManager orchestrates the complete lifecycle of capabilities in the process-isolated architecture:
- Discovery: Finds capability manifests in local (
.cjm/manifests/) or global (~/.cjm/manifests/) directories - Loading: Creates
RemoteCapabilityProxyinstances that spawn isolated Worker subprocesses - Execution: Forwards calls to Workers via HTTP, supports both sync and async
- Lifecycle: Handles initialization, configuration updates, and cleanup
CapabilityManager Worker Subprocesses
┌─────────────────┐ ┌─────────────────────┐
│ discover_ │ │ Conda Env: Whisper │
│ manifests() │ HTTP/JSON │ └─ whisper │
│ │◄──────────────────▶│ │
│ capabilities: │ └─────────────────────┘
│ whisper ──────┼──► RemoteProxy ┌─────────────────────┐
│ gemini ───────┼──► RemoteProxy ◄──▶│ Conda Env: Gemini │
│ │ │ └─ gemini │
└─────────────────┘ └─────────────────────┘
CapabilityManager
def CapabilityManager(
capability_interface:Type=ToolCapability, # Base interface for type checking
search_paths:Optional=None, # Custom manifest search paths
scheduler:Optional=None, # Resource allocation policy
config_store:Optional=None, # CR-2: persistence backend; lazy LocalCapabilityConfigStore default per OQ-4
empirical_store:Optional=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
secret_store:Optional=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
max_retries:int=1, # CR-7: how many reactive retries to attempt on CapabilityResourceError (default 1 — one retry after eviction)
sysmon_capability_name:Optional=None, # monitor capability (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
journal_store:Optional=None, # CR-14: durable account-of-action; lazy LocalJournalStore at <data_dir>/journal.db
diagnostics_store:Optional=None, # CR-14: disposable diagnostic narrative; lazy LocalDiagnosticsStore at <data_dir>/diagnostics.db
):
Manages capability discovery, loading, and lifecycle via process isolation.
CR-3 Telemetry (System Monitor)
Bind a MonitorToolProtocol proxy and read system telemetry through CR-3’s typed surface. _get_global_stats* duck-types on the proxy’s typed accessor, falling back to the legacy dispatcher path for monitors that haven’t migrated.
register_system_monitor
def register_system_monitor(
capability_name:str, # Name of the system monitor capability
)->None:
Bind a loaded capability to act as the hardware system monitor.
get_global_stats
async def get_global_stats(
)->Dict: # Current system telemetry
Public async system-telemetry accessor (stage 3 / CR-16).
The JobQueue’s multi-lane admission consumes this through the JobQueueDependencies protocol (defensively via getattr). Thin wrapper over _get_global_stats_async — same CR-3 duck-type semantics.
get_admission_profile
def get_admission_profile(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # {'gpu_memory_mb_peak_max','memory_mb_peak_max','sample_count'} or None
Empirical resource profile for a loaded instance’s CURRENT config (stage 3 / CR-16 multi-lane admission).
Reads the CR-7 empirical store at the instance’s live (instance_id, config_hash) key — the SAME keying that records samples, so the profile always describes the configuration actually being run (a config change = a new hash = no record = the queue runs the job EXCLUSIVE until its first measurement run graduates it).
None = no evidence (instance unknown / store disabled / no record for this config). The manifest’s requires_gpu is deliberately not part of this surface — GPU use is an empirical fact, not a declaration (stage-3 ledger G2).
get_instance_concurrency_cap
def get_instance_concurrency_cap(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # The instance's SG-33 max_concurrent_requests (None = unset)
Per-instance concurrency cap for queue admission (stage 3 / CR-16).
Surfaces the SG-33 max_concurrent_requests setting. The queue treats None as 1 (same-worker concurrency is OPT-IN per capability — e.g. ffmpeg raises its cap because its sync endpoints run in a threadpool and concurrent converts genuinely parallelize as subprocesses; model workers stay serial-per-instance).
Manifest Discovery
Read manifests from disk into typed CapabilityMeta (parsing Phase 5a resources), routing adapter manifests to the adapter registry. Stable cluster.
discover_manifests
def discover_manifests(
)->List: # List of discovered capability metadata
Discover capabilities via JSON manifests in search paths.
CR-8: reads each manifest via load_manifest, which parses the v2.0 nested layout into a typed ManifestV2. meta.manifest is set to a flat-shaped dict view so existing consumers (proxy, scheduling, execute path) continue working unchanged; the typed ManifestV2 is also attached as meta.manifest_v2 so drift detection + future typed callers can read drift_tracking.config_schema_hash without re-parsing.
get_capabilities_compatible_with
def get_capabilities_compatible_with(
adapter:Union, # Adapter unit name or manifest
)->List: # Discovered capability names whose surface satisfies the protocol
CR-17 pt 2: the pass-2 compatibility query, manifest-surface-based.
check_adapter_compatibility
def check_adapter_compatibility(
adapter:Union, # Adapter unit name or manifest
capability_name:str, # Discovered capability (capability) name
)->Dict: # Match verdict (see match_protocol_against_surface)
CR-17 pt 2: surface-based compatibility verdict (host-side; works against UNLOADED capabilities — manifest-vs-manifest, no protocol imports host-side).
Matches the adapter’s recorded protocol members against the capability manifest’s recorded structural_surface (pass-2 Thread 3: the capability records only itself; the adapter declares the protocol; the substrate matches). A capability without a recorded surface (pre-fracture manifest) is NOT compatible until its manifest regenerates — staleness stays visible instead of silently mis-answering.
get_adapters_for_task
def get_adapters_for_task(
task_name:str, # Task name, e.g. "graph-storage"
)->List: # Discovered adapter units serving the task
CR-17 pt 2: the adapter-registry view — discovered adapter manifests for a task.
get_capability_meta
def get_capability_meta(
capability_name:str, # Name of the capability
)->Optional: # Capability metadata or None
Get metadata for a loaded capability by name.
get_discovered_meta
def get_discovered_meta(
capability_name:str, # Name of the capability
)->Optional: # Capability metadata or None
Get metadata for a discovered (not necessarily loaded) capability by name.
Configuration Validation + Persistence Helpers
SG-5 strict-by-default config validation against the manifest’s config_schema; SG-9 schema drift detection; CR-2 per-capability persistence via CapabilityConfigStore; deferred-on-disable hook bookkeeping. Stable cluster — these gates are settled.
CR-10 Multi-Instance Helpers
Validate explicit instance_id inputs against the constrained pattern, auto-generate unique IDs for new_instance=True, and query the per-instance state in self.instances. Closed by CR-10 core.
get_instance
def get_instance(
name_or_id:str, # Capability name (default-loaded) or explicit instance_id
)->Optional:
Return the CapabilityInstance for name_or_id, or None if not loaded.
Lookup is keyed by instance_id (which equals capability_name for default- loaded capabilities). Multi-instance IDs only exist in self.instances.
list_instances
def list_instances(
capability_name:Optional=None, # If given, filter to this capability's instances
)->List:
List all loaded instances, optionally filtered by underlying capability name.
get_worker_env_status
def get_worker_env_status(
name_or_meta:Any, # Capability name (loaded/discovered) or a CapabilityMeta
scope:Optional=None, # SG-55 forward seam
)->List: # Per-entry status dicts (secret values never returned)
CR-12: per-entry satisfaction status of a capability’s worker-env contract.
Each entry: {name, secret, required, satisfied, label, description}. satisfied means a value is resolvable (secret present in the store, or a visible var has a default/override). Secret VALUES are never returned — only whether one is set. The capability-config UI uses this to gate config display on required secrets being satisfied.
missing_required_env
def missing_required_env(
name_or_meta:Any, # Capability name or CapabilityMeta
scope:Optional=None, # SG-55 forward seam
)->List: # Names of required worker-env entries with no resolvable value
CR-12: names of required worker-env entries that are unsatisfied.
set_capability_secret
def set_capability_secret(
name_or_id:str, # Capability name or instance_id whose secret to set
key:str, # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
value:str, # Secret value (stored via the SecretStore, never config/logs)
scope:Optional=None, # SG-55 forward seam: per-principal scope
reload:bool=True, # Respawn loaded worker(s) so the new env is injected
)->bool: # True if the secret was stored
CR-12: store a capability secret, then respawn its worker(s) to inject it.
Secrets are keyed by the underlying CAPABILITY name (not instance_id), so all instances of a capability share one credential — set the Gemini key once and every Gemini instance gets it at (re)spawn. Because worker env is fixed at spawn, the new value only reaches a running worker via a RESPAWN, so this reloads each loaded instance of the capability (unless reload=False, e.g. when provisioning a secret before the capability is loaded). This is the actuation seam both the CLI (cjm-ctl set-secret) and a future config UI call. Reload failures are logged, not raised.
Load + Unload Lifecycle
Spawn / terminate capability workers. load_capability is the canonical entry; unload_capability handles canonical-vs-multi-instance cleanup (drops CapabilityMeta only when no instances remain for the capability). get_capability / list_capabilities are the simple-accessor surface. Mid-churn — CR-7 reactive resource management will touch load_capability lightly to initialize empirical resource tracking.
load_capability
def load_capability(
capability_meta:CapabilityMeta, # Capability metadata (with manifest attached)
config:Optional=None, # Initial configuration
strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
instance_id:Optional=None, # CR-10: explicit instance_id; None defaults to capability_name
new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
max_concurrent_requests:Optional=None, # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
adapters:Optional=None, # CR-17 pt 2: explicit adapter unit names (loud refusal on mismatch); None = auto-bind discovered compatibles
)->bool: # True if successfully loaded
Load a capability by spawning a Worker subprocess.
CR-2: reads the persisted CapabilityConfigRecord from self.config_store before launching the worker. If a persisted record exists and the caller didn’t pass an explicit config, the persisted config is used as the effective input. The persisted enabled flag is applied to capability_meta.enabled so disabled capabilities stay disabled across process restarts.
CR-10: optional instance_id allows multi-instance loading. - instance_id=None, new_instance=False (default): instance_id = capability_meta.name. Populates self.capabilities[capability_name] + self.instances [capability_name] together (single-instance backward compat). - instance_id=“custom”: validated against [A-Za-z0-9_-]{1,64}. Populates self.instances[custom]. Persistence is keyed by capability_name and only applied to the default instance. - instance_id=None, new_instance=True: auto-generates {name}-{6-hex}. Idempotent: re-load against an existing instance_id returns True without re-spawning.
CR-7: computes config_hash from the effective config (post-defaults + post-validation) and stores it on the CapabilityInstance so execute_capability* can key empirical samples by (instance_id, config_hash). SG-33 stores max_concurrent_requests on the instance — the actual asyncio.Semaphore is lazy-created in execute_capability_async via _get_concurrent_limiter.
load_all
def load_all(
configs:Optional=None, # Capability name -> config mapping
)->Dict: # Capability name -> success mapping
Discover and load all available capabilities.
unload_capability
def unload_capability(
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
)->bool: # True if successfully unloaded
Unload a capability instance and terminate its Worker process (CR-10).
If name_or_id resolves to the default instance (instance_id == capability_name) and no other instances remain for the same capability, also removes the CapabilityMeta from self.capabilities. Otherwise removes only the instance and clears CapabilityMeta.instance if it pointed at the unloaded canonical.
unload_all
def unload_all(
)->None:
Unload all capability instances and terminate all Worker processes (CR-10).
Iterates self.instances (CR-10 keying) rather than self.capabilities so all multi-instance entries get torn down, not just the canonical instances.
get_capability
def get_capability(
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
)->Optional: # Capability proxy instance or None
Get a loaded capability’s proxy by name or instance_id (CR-10).
Lookup order: self.instances first (covers both default capability_name and multi-instance IDs), falling back to CapabilityMeta.instance for any legacy code path that populated self.capabilities without self.instances (defensive — shouldn’t happen post-CR-10 since load_capability always records the instance).
list_capabilities
def list_capabilities(
)->List: # List of loaded capability metadata
List all loaded capabilities.
Execution + Resource Eviction
The high-churn execute path. CR-7’s primary rewrite target — reactive retry + OOM recovery + bounded retries will land here without touching the stable cells above. Per-instance _running_executions tracking (keyed by instance_id) allows concurrent multi-instance executes without colliding on the deferred-disable coordination.
execute_capability
def execute_capability(
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
args:VAR_POSITIONAL,
_task_name:Optional=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
_method:Optional=None, # CR-17 pt 2: adapter method (set with _task_name)
kwargs:VAR_KEYWORD
)->Any: # Capability result
Execute a capability instance’s main functionality (sync).
CR-10: resolves name_or_id via self.instances; per-instance enabled flag gates execution. _running_executions tracks by instance_id so concurrent multi-instance executes don’t collide.
CR-2: raises CapabilityDisabledError (typed) when the instance is disabled.
CR-7: reactive retry on CapabilityResourceError — evicts other capabilities to free resources, then ALWAYS reloads the failing capability’s worker before the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL) needs the reload because there’s no live worker to retry on. Track B (capability-raised CapabilityResourceError — worker still alive) ALSO reloads because PyTorch’s CUDA caching allocator can fragment post-OOM in ways the capability can’t clean up from within its own process; a fresh worker is the only reliable reset. Bounded by self.max_retries (default 1). Empirical sample recorded in the finally block — best-effort, doesn’t break execute on failure.
execute_capability_async
async def execute_capability_async(
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
args:VAR_POSITIONAL,
_task_name:Optional=None, # CR-17 pt 2: route via the task channel (adapter task) instead of execute
_method:Optional=None, # CR-17 pt 2: adapter method (set with _task_name)
kwargs:VAR_KEYWORD
)->Any: # Capability result
Execute a capability instance’s main functionality (async).
CR-10 + CR-2: same semantics as execute_capability, async-flavored. Scheduler allocation goes through allocate_async for non-blocking polling.
CR-7 + SG-33: reactive retry on CapabilityResourceError — always reloads before retry (Track A + Track B converge on the same reload path; see sync variant docstring for the rationale). Per-instance asyncio.Semaphore enforces the max_concurrent_requests cap (None = unbounded). Empirical sample recorded in the finally block.
execute_capability_task_async
async def execute_capability_task_async(
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
task_name:str, # Adapter task, e.g. "graph-storage"
method:str, # Adapter method, e.g. "query_nodes"
kwargs:VAR_KEYWORD
)->Any: # Typed task result
CR-17 pt 2: execute a typed task-adapter method (explicit task channel; async).
Thin wrapper over execute_capability_async — CR-7 retry, SG-33 semaphore, admission and empirical sampling apply identically; this is the method the JobQueue’s task-addressed jobs invoke.
execute_capability_task
def execute_capability_task(
name_or_id:str, # Capability name (default-loaded) or instance_id (multi-instance)
task_name:str, # Adapter task, e.g. "graph-storage"
method:str, # Adapter method, e.g. "query_nodes"
kwargs:VAR_KEYWORD
)->Any: # Typed task result
CR-17 pt 2: execute a typed task-adapter method (explicit task channel; sync).
Thin wrapper over execute_capability — the whole CR-7 retry / scheduler / empirical-sampling machinery applies identically to task-channel calls.
Enable / Disable + Logs
CR-2 enable/disable surface: per-instance enabled flag with default-instance sync to CapabilityMeta.enabled + persistence via CapabilityConfigStore + deferred-on-disable hook fire-after-in-flight-job semantics. Plus the log-file tail reader. The capability-config UI library (Path C step 4) is the most likely consumer to extend the enable/disable behavior next.
enable_capability
def enable_capability(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was enabled
Enable a capability instance (CR-10 multi-instance aware).
CR-2: persists the new state via config_store (default-instance only; persistence is per-capability, not per-instance) and fires the capability’s on_enable hook on state-change. Idempotent for already-enabled instances.
disable_capability
def disable_capability(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was disabled
Disable a capability instance without unloading it (CR-10 multi-instance aware).
CR-2: persists the new state (default-instance only) and fires the capability’s on_disable hook — but defers the hook until any in-flight job for THIS instance finishes (the per-instance _running_executions key is the instance_id, so a concurrent execute on a different instance of the same capability doesn’t gate this instance’s hook).
get_capability_diagnostics
def get_capability_diagnostics(
name_or_id:str, # Capability name or instance_id
limit:int=50, # Max records to return (most recent)
include_stream:bool=True, # Include raw stream chunks for the capability's worker sessions
)->str: # Rendered diagnostic text (most recent last)
Render a capability’s recent diagnostics as text (CR-14; replaces the retired flat-log accessor — the flat .cjm/logs/*.log files no longer exist).
A convenience TEXT projection over the diagnostics store for operator / UI display: structured records (level + logger name + exact job id when stamped) merged with the raw stream chunks (prints / tqdm final frames / death rattles) from this capability’s worker sessions, ordered by time. Programmatic consumers query the stores directly (manager.diagnostics_store / JobQueue.get_job_diagnostics).
Key features:
- Local-first discovery: Manifests in
.cjm/manifests/shadow global ones in~/.cjm/manifests/ - Process isolation: Each capability runs in its own subprocess with a dedicated Python interpreter
- Dual execution modes:
execute_capability()for sync,execute_capability_async()for async - Automatic cleanup:
unload_all()terminates all Worker processes
Configuration Management
Methods for managing capability configuration. These forward to the RemoteCapabilityProxy which communicates with the Worker over HTTP.
get_capability_config
def get_capability_config(
capability_name:str, # Name of the capability
)->Optional: # Current configuration or None
Get the current configuration of a capability.
get_capability_config_schema
def get_capability_config_schema(
capability_name:str, # Name of the capability
)->Optional: # JSON Schema or None
Get the configuration JSON Schema for a capability.
get_config_options
def get_config_options(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Dict: # CR-11: live config option domains, or {} if unavailable
Get a capability instance’s runtime config option providers (CR-11).
Forwards to the worker’s get_config_options() - live enum domains + per-option metadata for dynamic config fields (e.g. an API model list). Kept separate from get_capability_config_schema (static, hashed for CR-8 drift); these options are the live companion the capability-config UI merges on top.
Degrades to {} if the instance is missing or the worker call fails - the UI then falls back to the static schema. Typed-error surfacing for the UI consumer is deferred to the capability-config UI library (Path C Step 4).
get_all_capability_configs
def get_all_capability_configs(
)->Dict: # Capability name -> config mapping
Get current configuration for all loaded capabilities.
update_capability_config
def update_capability_config(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
config:Dict, # New configuration values
strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
)->bool: # True if successful
Update a capability instance’s configuration (CR-10 multi-instance aware).
CR-2: on successful reconfigure, persists the new config (default instance only; multi-instance loads don’t persist). Per-instance inst.config is updated regardless. SG-5: validates against the underlying capability’s config_schema (per-capability, not per-instance, so all instances share the same schema).
reload_capability
def reload_capability(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
config:Optional=None, # Optional new configuration
)->bool: # True if successful
Reload a capability instance by terminating and restarting its Worker (CR-10).
get_capability_stats
def get_capability_stats(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
)->Optional: # Resource telemetry or None
Get resource usage stats for a capability instance’s Worker process (CR-10).
Streaming Execution
Async streaming support for real-time results (e.g., transcription word-by-word).
execute_capability_stream
def execute_capability_stream(
name_or_id:str, # Capability name (default instance) or instance_id (multi-instance)
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Async generator yielding results
Execute a capability instance with streaming response (CR-10 multi-instance aware).
Same per-instance resolution as execute_capability_async; scheduler allocation keys off the CapabilityMeta (capability-level), execution + bookkeeping key off the CapabilityInstance (per-instance).
Concurrent Load / Unload (CR-10b)
load_capabilities_concurrent / unload_capabilities_concurrent fan out capability spawns + teardowns across an asyncio gather, dropping host startup time from sum-of-spawns to max-of-spawn. The sync load_capability / unload_capability paths are unchanged; the async variants run them via asyncio.to_thread so the existing sync _wait_for_ready doesn’t block the event loop while the worker boots.
CapabilityLoadSpec (defined in core.metadata) collects the per-load parameters into one dataclass so a batch can be expressed as List[CapabilityLoadSpec].
Result shapes:
load_capabilities_concurrent(...)returnsDict[str, Union[str, Exception]]mapping the requested key (explicitinstance_idif set, elsespec.meta.name) → resolvedinstance_idon success or the raisedExceptionon failure.unload_capabilities_concurrent(...)returnsDict[str, Union[bool, Exception]]mapping the inputname_or_id→Trueon success or the raisedExceptionon failure.
max_concurrency=None (the default) means unbounded — every spec runs concurrently. Set an integer to cap simultaneous loads (useful when many capabilities share a constrained resource like GPU memory at boot). fail_fast=False (the default) collects every result via asyncio.gather(..., return_exceptions=True); fail_fast=True re-raises the first exception and lets remaining tasks complete in the background.
load_capability_async
async def load_capability_async(
capability_meta:CapabilityMeta, config:Optional=None, strict:bool=True, instance_id:Optional=None,
new_instance:bool=False
)->bool:
Async variant of load_capability (CR-10b).
Runs the existing sync load_capability via asyncio.to_thread so the blocking proxy spawn + _wait_for_ready doesn’t stall the event loop. Backward compat: identical behavior to the sync method, just non-blocking.
unload_capability_async
async def unload_capability_async(
name_or_id:str
)->bool:
Async variant of unload_capability (CR-10b).
load_capabilities_concurrent
async def load_capabilities_concurrent(
specs:List, # Per-capability load specifications
max_concurrency:Optional=None, # Cap simultaneous loads; None = unbounded
fail_fast:bool=False, # Re-raise first exception (default: collect all results)
)->Dict: # requested_key → instance_id or Exception
CR-10b: fan out capability loads concurrently via asyncio.gather.
Each spec is loaded via load_capability_async (asyncio.to_thread under the hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when max_concurrency=None. Capped concurrency uses an asyncio.Semaphore.
Result keys come from _spec_requested_key: explicit instance_id if set, {capability_name}#new[{index}] for ambiguous new_instance specs, else capability_name. Successful entries map to the resolved instance_id (string); failures map to the raised exception (caught regardless of fail_fast value for non-fail-fast mode; re-raised in fail_fast=True).
unload_capabilities_concurrent
async def unload_capabilities_concurrent(
name_or_ids:List, # Capability names or instance_ids to unload
max_concurrency:Optional=None, fail_fast:bool=False
)->Dict: # name_or_id → True or Exception
CR-10b: fan out capability unloads concurrently via asyncio.gather.
Same concurrency + fail_fast semantics as load_capabilities_concurrent. Result keys are the input name_or_ids (deduplication is the caller’s responsibility; duplicate inputs produce one dict entry per unique key).
CapabilityBinding (SG-17)
A CapabilityBinding is a pre-bound view of one capability through a shared CapabilityManager. Consumer services (8 audited in the substrate audit) have each been writing the same ~80-line wrapper class to hide the manager + capability_name pair behind a clean instance API — whisper.execute(...) rather than manager.execute_capability("whisper", ...).
CapabilityManager.bind(name, default_config=...) returns one. The binding forwards every method call to the bound manager, supplying capability_name automatically. default_config is applied when load() is called without an explicit config.
CapabilityBinding
def CapabilityBinding(
manager:CapabilityManager, capability_name:str, default_config:Dict=<factory>
)->None:
Pre-bound view of a single capability through a shared CapabilityManager.
Eliminates the wrapper-class duplication audited across 8 consumer services (SG-17). Methods forward to the manager with capability_name pre-supplied; default_config is the fallback used when load() is called without an explicit config (matches the manifest-default behavior in load_capability).
bind
def bind(
capability_name:str, # Name of the capability to pre-bind
default_config:Optional=None, # Default config used by binding.load()
)->CapabilityBinding: # Bound view ready for instance-style use
Create a CapabilityBinding pre-bound to this manager + capability_name.
Platform Queries (Phase 5a)
Substrate query API for filtering discovered capabilities by current-platform compatibility. The query is pure string-based — the substrate never imports the interface library to answer it, because _generate_manifest’s introspection script captured resources.platforms at install time inside the capability’s own conda env.
get_compatible_for_current_platform()— filtersdiscoveredbyresources.platforms. Empty platforms list ≡ universal compatibility.
get_compatible_for_current_platform
def get_compatible_for_current_platform(
)->List: # Capabilities compatible with current platform
Phase 5a: return discovered capabilities compatible with the host platform.
Filters by resources.platforms. Capabilities with an empty (or absent) platforms list are considered universally compatible — that’s the introspection-time convention when a capability author didn’t declare a platform constraint. Capabilities lacking the entire resources block (legacy / pre-Phase-5a manifests) also pass through as universal.
Does NOT filter on requires_gpu — substrate doesn’t know whether a GPU is present without invoking a system monitor capability. Callers gate on GPU availability separately if needed.
Usage Examples
Basic Usage
import logging
from cjm_substrate.core.manager import CapabilityManager
logging.basicConfig(level=logging.INFO)
# Create manager
manager = CapabilityManager()
# Discover and load all capabilities from manifest directories
results = manager.load_all()
print(f"Loaded: {results}")
# Execute a capability
result = manager.execute_capability("whisper-local", audio="/path/to/audio.wav")
# Update configuration (hot-reload)
manager.update_capability_config("whisper-local", {"model": "large-v3"})
# Clean up all workers
manager.unload_all()Async Usage (FastHTML)
async def transcribe(audio_path: str):
manager = CapabilityManager()
manager.load_all()
try:
result = await manager.execute_capability_async("whisper-local", audio=audio_path)
return result
finally:
manager.unload_all()
# Streaming
async def transcribe_stream(audio_path: str):
manager = CapabilityManager()
manager.load_all()
try:
async for chunk in manager.execute_capability_stream("whisper-local", audio=audio_path):
yield chunk
finally:
manager.unload_all()