Remote Capability Proxy
The RemoteCapabilityProxy implements ToolCapability but forwards all calls over HTTP to a Worker process running in an isolated environment.
Host Application Isolated Environment
┌────────────────────┐ ┌─────────────────────┐
│ CapabilityManager │ │ Conda Env (GPU) │
│ │ │ │ │ │
│ ▼ │ HTTP │ ▼ │
│ RemoteCapabilityProxy │◄───────────────▶│ Universal Worker │
│ (implements │ localhost │ │ │
│ ToolCapability) │ │ ▼ │
│ │ │ Actual Capability │
└────────────────────┘ └─────────────────────┘
Key responsibilities:
- Process Management: Launch worker subprocess with correct Python interpreter
- Port Allocation: Find free port, pass to worker via CLI
- Zero-Copy Transfer: Detect
FileBackedDTOobjects and serialize to temp files - Dual Interface: Sync methods for scripts, async methods for FastHTML
RemoteCapabilityProxy
RemoteCapabilityProxy
def RemoteCapabilityProxy(
manifest:Dict, # Capability manifest with python_path, module, class, etc.
extra_env:Optional=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
adapter_specs:Optional=None, # CR-17 pt 2: host-matched adapter impl specs ("module:ClassName") bound in-worker at spawn
journal:Optional=None, # CR-14: journal sink for worker-lifecycle events; lazy LocalJournalStore at cfg.journal_db_path when None
diagnostics:Optional=None, # CR-14: diagnostics sink (raw-stream pump + worker env contract); lazy LocalDiagnosticsStore when None
):
Proxy that forwards capability calls to an isolated Worker subprocess.
RemoteCapabilityProxy.config_options
def config_options(
)->Dict: # CR-11: live config option domains
Get the capability’s runtime config option providers (CR-11).
Returns the worker’s get_config_options() output (FieldOptions per dynamic field, JSON-serialized to dicts). Empty dict when the capability exposes no dynamic options.
RemoteCapabilityProxy.cleanup
def cleanup(
)->None:
Clean up capability resources and terminate worker process.
Input Serialization
The proxy detects FileBackedDTO objects and serializes them to temp files before transmission. This enables zero-copy transfer of large data (audio, images) between processes.
Asynchronous Interface
These methods are async for use with FastHTML and other async frameworks. Use execute_stream for real-time streaming results.
execute_async
async def execute_async(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Capability result
Execute the capability asynchronously.
CR-4: HTTP 409 from the worker is mapped to a typed CapabilityCancelledError. Same 409/200/other semantics as the sync execute() variant.
execute_stream_sync
def execute_stream_sync(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Generator:
Synchronous wrapper for streaming (blocking).
execute_stream
def execute_stream(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Yields parsed JSON chunks
Execute with streaming response (async generator).
SG-52: detects the terminal {"_job_error": <JobError dict>} chunk and raises the corresponding typed exception client-side instead of yielding it to downstream consumers. Mirrors /execute’s HTTP 409 → typed-exception behavior at the streaming wire boundary. Normal capability output chunks pass through unchanged (they never carry the _job_error key).
CR-7 Track A: Worker-death classification
When the worker subprocess dies mid-call, the httpx client fault propagates up. The substrate needs to classify the death to dispatch CR-7’s reactive retry path correctly:
| Worker state after httpx fault | Classification | Raised exception |
|---|---|---|
| Still alive | Network blip / timeout — not CR-7’s concern | Re-raise original httpx error |
Dead with returncode == -SIGKILL (-9 on POSIX) |
Kernel OOM-killer is the dominant cause | WorkerOOMError |
| Dead with any other non-zero returncode | Generic crash (segfault, capability init panic, etc.) | CapabilityTransientError |
This is Track A — the substrate-side view. Track B (capability-side, per SG-47’s sub-task) catches torch.cuda.OutOfMemoryError inside execute() and re-raises as CapabilityResourceError with a populated ResourceShortfall before the worker ever dies; that path doesn’t hit Track A at all. Both tracks converge at except CapabilityResourceError in CR-7’s reactive retry loop.
The classification only applies to the /execute and /execute_stream paths — the long-running ones. Hook calls (/on_disable, /prefetch, etc.) already short-circuit on httpx.ConnectError and return False without classification, because their callers (CR-2 / CR-4 lifecycle bookkeeping) don’t need typed retry.
execute_async_with_oom_check
async def execute_async_with_oom_check(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:
CR-7 Track A wrapper around the async execute path. Same semantics.
execute_with_oom_check
def execute_with_oom_check(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:
CR-7 Track A wrapper around the sync execute path.
Catches httpx connection / protocol faults, calls _check_worker_death, and either raises a typed WorkerOOMError / CapabilityTransientError or re-raises the original httpx error (worker still alive — caller treats as a generic transient network issue).
execute_task_async
async def execute_task_async(
task_name:str, method:str, kwargs:VAR_KEYWORD
)->Any:
Invoke a typed task-adapter method in-worker (CR-17 pt 2; async). Same semantics.
execute_task
def execute_task(
task_name:str, method:str, kwargs:VAR_KEYWORD
)->Any:
Invoke a typed task-adapter method in-worker (CR-17 pt 2; sync).
The explicit task channel: action= stays the tool’s native in-worker dispatch; this addresses the TASK contract (adapter + method). Kwargs-only by design. Built ONCE with the CR-7 Track-A worker-death check inside — no later wrapper supersedes it (the G7a last-assignment-wins lesson).
Lifecycle Management
is_alive
def is_alive(
)->bool: # True if worker is responsive
Check if the worker process is still running and responsive.
get_stats
def get_stats(
)->Dict: # Process telemetry
Get worker process resource usage.
get_structural_surface
def get_structural_surface(
)->Optional: # Live-derived surface, or None
Pass-2 Thread 3 live companion: fetch the worker’s runtime-derived structural surface (GET /structural_surface).
Returns None when the worker predates the endpoint (a pre-fracture substrate in a snapshot env → 404) or on transport failure — callers treat None as “skip the drift check”, never as an empty surface.
Cancellation and Progress
Methods for cancelling running executions and polling progress status.
get_progress_async
async def get_progress_async(
)->Dict: # {progress: float, message: str}
Get current execution progress asynchronously.
get_progress
def get_progress(
)->Dict: # {progress: float, message: str}
Get current execution progress from worker.
cancel_async
async def cancel_async(
)->bool: # True if cancel request was sent
Request cancellation asynchronously.
cancel
def cancel(
)->bool: # True if cancel request was sent
Request cancellation of running execution.
on_enable
def on_enable(
)->bool: # True if hook signal accepted by worker
CR-2: forward the substrate’s on_enable signal to the worker process.
Same delivery semantics as on_disable: best-effort, errors logged-and- swallowed. The capability’s hook (default no-op) decides whether to eagerly re-acquire resources or rely on lazy re-load at next execute().
on_disable
def on_disable(
)->bool: # True if hook signal accepted by worker
CR-2: forward the substrate’s on_disable signal to the worker process.
Capability can opt in via ToolCapability.on_disable(); default implementation is a no-op so silent-pass-through is the norm. Failures to reach the worker (already terminated, network blip) are logged-and-swallowed — the substrate-side enable/disable bookkeeping doesn’t depend on the hook actually firing.
CR-3: Typed MonitorToolProtocol accessors
get_system_status / list_processes POST to typed worker endpoints (/get_system_status, /list_processes) instead of the magic-string-dispatched /execute("get_system_status"). Both expose sync + async variants matching the existing CR-2 hook style.
Status code taxonomy (intentional, per CR-3 review):
| Code | Meaning | Proxy behavior |
|---|---|---|
| 200 | Typed call succeeded | Return JSON body |
| 404 | Capability is not a monitor | Propagate HTTPStatusError (configuration error — substrate logs at WARN) |
| 500 | Real capability failure | Propagate HTTPStatusError (don’t silently mask) |
| ConnectError | Worker may have died | Return None (substrate degrades to empty stats) |
The 404 path prevents masking the “wrong capability wired as system_monitor” misconfiguration (e.g. a non-monitor capability registered as the system monitor).
get_system_status
def get_system_status(
)->Optional: # SystemStats dict, or None on transport / config failure
CR-3: typed MonitorToolProtocol accessor. POSTs to worker’s /get_system_status.
Status code semantics (worker side raises HTTPException with these codes): - 200: SystemStats dict returned - 404: capability is not a monitor — logged at ERROR (configuration error; no amount of retry fixes it) and returns None. Loudly distinguished from the substrate’s WARN-level transient-failure degradation. - 500: real capability failure; propagates as HTTPStatusError - ConnectError: worker may have died; returns None silently (substrate degrades to empty stats)
get_system_status_async
async def get_system_status_async(
)->Optional: # SystemStats dict, or None on transport / config failure
Async variant of get_system_status. Same 200/404/500/ConnectError semantics.
list_processes
def list_processes(
)->Optional: # ProcessStats dict list, or None on transport / config failure
CR-3: typed MonitorToolProtocol accessor. POSTs to worker’s /list_processes.
Same 200/404/500/ConnectError semantics as get_system_status. Note that MonitorToolProtocol.list_processes() defaults to returning [], so monitors without per-process visibility yield a 200 with an empty list.
list_processes_async
async def list_processes_async(
)->Optional: # ProcessStats dict list, or None on transport / config failure
Async variant of list_processes. Same semantics.
CR-4: Lifecycle hook proxies (prefetch + reconfigure)
prefetch() / reconfigure() POST to the worker’s /prefetch and /reconfigure endpoints. Both expose sync + async variants matching the existing CR-2/CR-3 hook style.
| Endpoint | Proxy method | Behavior |
|---|---|---|
/prefetch |
prefetch() / prefetch_async() |
Returns True on 200, False on transport failure. Errors raised by the capability’s prefetch() (worker 500) propagate as RuntimeError so callers can distinguish “capability can’t acquire resources” from “worker unreachable.” |
/reconfigure |
reconfigure(old, new) / reconfigure_async(old, new) |
Returns True on 200, False on transport failure. Body shape: {"old_config": ..., "new_config": ...}. Capability’s default reconfigure body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata to fire _release_<trigger> methods. |
The substrate’s CapabilityManager.update_capability_config path (CR-2 onward) calls reconfigure(old, new) when the worker exposes the typed endpoint. Older workers fall back to initialize(new_config) — both paths converge on the same worker-side state after the call.
prefetch
def prefetch(
stall_threshold_seconds:Optional=None, # Override SubstrateConfig.prefetch_stall_threshold_seconds; None = use config
poll_interval_seconds:float=1.0, # How often to poll /progress for stall detection
)->bool: # True if worker accepted the prefetch hook
CR-4 / Session A 2026-05-27: forward the substrate’s prefetch signal with progress-based stall detection.
Replaces wall-clock-timeout-based startup waiting (operators racing arbitrary timeouts vs. network speeds for model downloads). Approach:
- POST /prefetch fires in a background thread with httpx timeout=None.
- Main thread polls /progress every poll_interval_seconds.
- Each (progress, message) change resets the stall counter.
- If no change in stall_threshold_seconds AND POST still pending → SIGTERM the worker subprocess + raise CapabilityTimeoutError.
Capabilities opt in to fine-grained stall defeat by calling self.report_progress(…) periodically during long lifecycle operations (model download, server startup, etc.). Capabilities that don’t report progress are fine as long as the threshold accommodates their slowest plausible silent stretch.
Errors raised by the capability (worker 500) propagate as RuntimeError; worker unreachable propagates as False; stall fires CapabilityTimeoutError.
prefetch_async
async def prefetch_async(
stall_threshold_seconds:Optional=None, poll_interval_seconds:float=1.0
)->bool: # True if worker accepted the prefetch hook
Async variant of prefetch. Same stall-detection semantics.
reconfigure
def reconfigure(
old_config:Optional, # Previous config snapshot
new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call
CR-4: forward a reconfigure(old, new) call to the worker process.
The capability’s default reconfigure() body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the capability’s config_class to fire _release_<trigger> methods for fields whose values changed. Capabilities not opting into the declarative pattern land in a silent no-op; the substrate’s CapabilityManager.update_capability_config then falls back to initialize(new_config) for the actual state change.
reconfigure_async
async def reconfigure_async(
old_config:Optional, # Previous config snapshot
new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call
Async variant of reconfigure. Same semantics.
Context Manager Support
The proxy can be used as a context manager for automatic cleanup.
aexit
async def __aexit__(
exc_type, exc_val, exc_tb
):
Exit async context manager and cleanup.
aenter
async def __aenter__(
):
Enter async context manager.
exit
def __exit__(
exc_type, exc_val, exc_tb
):
Exit context manager and cleanup.
enter
def __enter__(
):
Enter context manager.
Usage Examples
Basic Usage (Sync)
# Load manifest from JSON file
with open("~/.cjm/manifests/whisper.json") as f:
manifest = json.load(f)
# Create proxy (starts worker subprocess)
capability = RemoteCapabilityProxy(manifest)
# Use like a local capability
capability.initialize({"model": "large-v3"})
result = capability.execute(audio="/path/to/audio.wav")
# Clean up
capability.cleanup()With Context Manager
with RemoteCapabilityProxy(manifest) as capability:
capability.initialize({"model": "large-v3"})
result = capability.execute(audio="/path/to/audio.wav")
# Worker automatically terminatedAsync with Streaming (FastHTML)
async def transcribe_stream(audio_path: str):
async with RemoteCapabilityProxy(manifest) as capability:
await capability.initialize({"model": "large-v3"})
async for chunk in capability.execute_stream(audio=audio_path):
yield chunk # Stream to clientManifest Format
The proxy expects a manifest dictionary with at minimum:
{
"name": "whisper-local",
"version": "1.0.0",
"python_path": "/home/user/anaconda3/envs/cjm-whisper/bin/python",
"module": "cjm_transcription_capability_whisper.capability",
"class": "WhisperLocalCapability",
"env_vars": {
"CUDA_VISIBLE_DEVICES": "0"
}
}