Remote Capability Proxy

Bridge between Host application and isolated Worker processes

The RemoteCapabilityProxy implements ToolCapability but forwards all calls over HTTP to a Worker process running in an isolated environment.

Host Application                        Isolated Environment
┌────────────────────┐                 ┌─────────────────────┐
│  CapabilityManager     │                 │  Conda Env (GPU)    │
│    │               │                 │    │                │
│    ▼               │     HTTP        │    ▼                │
│  RemoteCapabilityProxy │◄───────────────▶│  Universal Worker   │
│  (implements       │   localhost     │    │                │
│   ToolCapability) │                 │    ▼                │
│                    │                 │  Actual Capability      │
└────────────────────┘                 └─────────────────────┘

Key responsibilities:

  1. Process Management: Launch worker subprocess with correct Python interpreter
  2. Port Allocation: Find free port, pass to worker via CLI
  3. Zero-Copy Transfer: Detect FileBackedDTO objects and serialize to temp files
  4. Dual Interface: Sync methods for scripts, async methods for FastHTML

RemoteCapabilityProxy


RemoteCapabilityProxy


def RemoteCapabilityProxy(
    manifest:Dict, # Capability manifest with python_path, module, class, etc.
    extra_env:Optional=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
    adapter_specs:Optional=None, # CR-17 pt 2: host-matched adapter impl specs ("module:ClassName") bound in-worker at spawn
    journal:Optional=None, # CR-14: journal sink for worker-lifecycle events; lazy LocalJournalStore at cfg.journal_db_path when None
    diagnostics:Optional=None, # CR-14: diagnostics sink (raw-stream pump + worker env contract); lazy LocalDiagnosticsStore when None
):

Proxy that forwards capability calls to an isolated Worker subprocess.


RemoteCapabilityProxy.config_options


def config_options(
    
)->Dict: # CR-11: live config option domains

Get the capability’s runtime config option providers (CR-11).

Returns the worker’s get_config_options() output (FieldOptions per dynamic field, JSON-serialized to dicts). Empty dict when the capability exposes no dynamic options.


RemoteCapabilityProxy.cleanup


def cleanup(
    
)->None:

Clean up capability resources and terminate worker process.

Input Serialization

The proxy detects FileBackedDTO objects and serializes them to temp files before transmission. This enables zero-copy transfer of large data (audio, images) between processes.

Asynchronous Interface

These methods are async for use with FastHTML and other async frameworks. Use execute_stream for real-time streaming results.


execute_async


async def execute_async(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Capability result

Execute the capability asynchronously.

CR-4: HTTP 409 from the worker is mapped to a typed CapabilityCancelledError. Same 409/200/other semantics as the sync execute() variant.


execute_stream_sync


def execute_stream_sync(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Generator:

Synchronous wrapper for streaming (blocking).


execute_stream


def execute_stream(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Yields parsed JSON chunks

Execute with streaming response (async generator).

SG-52: detects the terminal {"_job_error": <JobError dict>} chunk and raises the corresponding typed exception client-side instead of yielding it to downstream consumers. Mirrors /execute’s HTTP 409 → typed-exception behavior at the streaming wire boundary. Normal capability output chunks pass through unchanged (they never carry the _job_error key).

CR-7 Track A: Worker-death classification

When the worker subprocess dies mid-call, the httpx client fault propagates up. The substrate needs to classify the death to dispatch CR-7’s reactive retry path correctly:

Worker state after httpx fault Classification Raised exception
Still alive Network blip / timeout — not CR-7’s concern Re-raise original httpx error
Dead with returncode == -SIGKILL (-9 on POSIX) Kernel OOM-killer is the dominant cause WorkerOOMError
Dead with any other non-zero returncode Generic crash (segfault, capability init panic, etc.) CapabilityTransientError

This is Track A — the substrate-side view. Track B (capability-side, per SG-47’s sub-task) catches torch.cuda.OutOfMemoryError inside execute() and re-raises as CapabilityResourceError with a populated ResourceShortfall before the worker ever dies; that path doesn’t hit Track A at all. Both tracks converge at except CapabilityResourceError in CR-7’s reactive retry loop.

The classification only applies to the /execute and /execute_stream paths — the long-running ones. Hook calls (/on_disable, /prefetch, etc.) already short-circuit on httpx.ConnectError and return False without classification, because their callers (CR-2 / CR-4 lifecycle bookkeeping) don’t need typed retry.


execute_async_with_oom_check


async def execute_async_with_oom_check(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:

CR-7 Track A wrapper around the async execute path. Same semantics.


execute_with_oom_check


def execute_with_oom_check(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:

CR-7 Track A wrapper around the sync execute path.

Catches httpx connection / protocol faults, calls _check_worker_death, and either raises a typed WorkerOOMError / CapabilityTransientError or re-raises the original httpx error (worker still alive — caller treats as a generic transient network issue).


execute_task_async


async def execute_task_async(
    task_name:str, method:str, kwargs:VAR_KEYWORD
)->Any:

Invoke a typed task-adapter method in-worker (CR-17 pt 2; async). Same semantics.


execute_task


def execute_task(
    task_name:str, method:str, kwargs:VAR_KEYWORD
)->Any:

Invoke a typed task-adapter method in-worker (CR-17 pt 2; sync).

The explicit task channel: action= stays the tool’s native in-worker dispatch; this addresses the TASK contract (adapter + method). Kwargs-only by design. Built ONCE with the CR-7 Track-A worker-death check inside — no later wrapper supersedes it (the G7a last-assignment-wins lesson).

Lifecycle Management


is_alive


def is_alive(
    
)->bool: # True if worker is responsive

Check if the worker process is still running and responsive.


get_stats


def get_stats(
    
)->Dict: # Process telemetry

Get worker process resource usage.


get_structural_surface


def get_structural_surface(
    
)->Optional: # Live-derived surface, or None

Pass-2 Thread 3 live companion: fetch the worker’s runtime-derived structural surface (GET /structural_surface).

Returns None when the worker predates the endpoint (a pre-fracture substrate in a snapshot env → 404) or on transport failure — callers treat None as “skip the drift check”, never as an empty surface.

Cancellation and Progress

Methods for cancelling running executions and polling progress status.


get_progress_async


async def get_progress_async(
    
)->Dict: # {progress: float, message: str}

Get current execution progress asynchronously.


get_progress


def get_progress(
    
)->Dict: # {progress: float, message: str}

Get current execution progress from worker.


cancel_async


async def cancel_async(
    
)->bool: # True if cancel request was sent

Request cancellation asynchronously.


cancel


def cancel(
    
)->bool: # True if cancel request was sent

Request cancellation of running execution.


on_enable


def on_enable(
    
)->bool: # True if hook signal accepted by worker

CR-2: forward the substrate’s on_enable signal to the worker process.

Same delivery semantics as on_disable: best-effort, errors logged-and- swallowed. The capability’s hook (default no-op) decides whether to eagerly re-acquire resources or rely on lazy re-load at next execute().


on_disable


def on_disable(
    
)->bool: # True if hook signal accepted by worker

CR-2: forward the substrate’s on_disable signal to the worker process.

Capability can opt in via ToolCapability.on_disable(); default implementation is a no-op so silent-pass-through is the norm. Failures to reach the worker (already terminated, network blip) are logged-and-swallowed — the substrate-side enable/disable bookkeeping doesn’t depend on the hook actually firing.

CR-3: Typed MonitorToolProtocol accessors

get_system_status / list_processes POST to typed worker endpoints (/get_system_status, /list_processes) instead of the magic-string-dispatched /execute("get_system_status"). Both expose sync + async variants matching the existing CR-2 hook style.

Status code taxonomy (intentional, per CR-3 review):

Code Meaning Proxy behavior
200 Typed call succeeded Return JSON body
404 Capability is not a monitor Propagate HTTPStatusError (configuration error — substrate logs at WARN)
500 Real capability failure Propagate HTTPStatusError (don’t silently mask)
ConnectError Worker may have died Return None (substrate degrades to empty stats)

The 404 path prevents masking the “wrong capability wired as system_monitor” misconfiguration (e.g. a non-monitor capability registered as the system monitor).


get_system_status


def get_system_status(
    
)->Optional: # SystemStats dict, or None on transport / config failure

CR-3: typed MonitorToolProtocol accessor. POSTs to worker’s /get_system_status.

Status code semantics (worker side raises HTTPException with these codes): - 200: SystemStats dict returned - 404: capability is not a monitor — logged at ERROR (configuration error; no amount of retry fixes it) and returns None. Loudly distinguished from the substrate’s WARN-level transient-failure degradation. - 500: real capability failure; propagates as HTTPStatusError - ConnectError: worker may have died; returns None silently (substrate degrades to empty stats)


get_system_status_async


async def get_system_status_async(
    
)->Optional: # SystemStats dict, or None on transport / config failure

Async variant of get_system_status. Same 200/404/500/ConnectError semantics.


list_processes


def list_processes(
    
)->Optional: # ProcessStats dict list, or None on transport / config failure

CR-3: typed MonitorToolProtocol accessor. POSTs to worker’s /list_processes.

Same 200/404/500/ConnectError semantics as get_system_status. Note that MonitorToolProtocol.list_processes() defaults to returning [], so monitors without per-process visibility yield a 200 with an empty list.


list_processes_async


async def list_processes_async(
    
)->Optional: # ProcessStats dict list, or None on transport / config failure

Async variant of list_processes. Same semantics.

CR-4: Lifecycle hook proxies (prefetch + reconfigure)

prefetch() / reconfigure() POST to the worker’s /prefetch and /reconfigure endpoints. Both expose sync + async variants matching the existing CR-2/CR-3 hook style.

Endpoint Proxy method Behavior
/prefetch prefetch() / prefetch_async() Returns True on 200, False on transport failure. Errors raised by the capability’s prefetch() (worker 500) propagate as RuntimeError so callers can distinguish “capability can’t acquire resources” from “worker unreachable.”
/reconfigure reconfigure(old, new) / reconfigure_async(old, new) Returns True on 200, False on transport failure. Body shape: {"old_config": ..., "new_config": ...}. Capability’s default reconfigure body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata to fire _release_<trigger> methods.

The substrate’s CapabilityManager.update_capability_config path (CR-2 onward) calls reconfigure(old, new) when the worker exposes the typed endpoint. Older workers fall back to initialize(new_config) — both paths converge on the same worker-side state after the call.


prefetch


def prefetch(
    stall_threshold_seconds:Optional=None, # Override SubstrateConfig.prefetch_stall_threshold_seconds; None = use config
    poll_interval_seconds:float=1.0, # How often to poll /progress for stall detection
)->bool: # True if worker accepted the prefetch hook

CR-4 / Session A 2026-05-27: forward the substrate’s prefetch signal with progress-based stall detection.

Replaces wall-clock-timeout-based startup waiting (operators racing arbitrary timeouts vs. network speeds for model downloads). Approach:

  1. POST /prefetch fires in a background thread with httpx timeout=None.
  2. Main thread polls /progress every poll_interval_seconds.
  3. Each (progress, message) change resets the stall counter.
  4. If no change in stall_threshold_seconds AND POST still pending → SIGTERM the worker subprocess + raise CapabilityTimeoutError.

Capabilities opt in to fine-grained stall defeat by calling self.report_progress(…) periodically during long lifecycle operations (model download, server startup, etc.). Capabilities that don’t report progress are fine as long as the threshold accommodates their slowest plausible silent stretch.

Errors raised by the capability (worker 500) propagate as RuntimeError; worker unreachable propagates as False; stall fires CapabilityTimeoutError.


prefetch_async


async def prefetch_async(
    stall_threshold_seconds:Optional=None, poll_interval_seconds:float=1.0
)->bool: # True if worker accepted the prefetch hook

Async variant of prefetch. Same stall-detection semantics.


reconfigure


def reconfigure(
    old_config:Optional, # Previous config snapshot
    new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call

CR-4: forward a reconfigure(old, new) call to the worker process.

The capability’s default reconfigure() body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the capability’s config_class to fire _release_<trigger> methods for fields whose values changed. Capabilities not opting into the declarative pattern land in a silent no-op; the substrate’s CapabilityManager.update_capability_config then falls back to initialize(new_config) for the actual state change.


reconfigure_async


async def reconfigure_async(
    old_config:Optional, # Previous config snapshot
    new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call

Async variant of reconfigure. Same semantics.

Context Manager Support

The proxy can be used as a context manager for automatic cleanup.


aexit


async def __aexit__(
    exc_type, exc_val, exc_tb
):

Exit async context manager and cleanup.


aenter


async def __aenter__(
    
):

Enter async context manager.


exit


def __exit__(
    exc_type, exc_val, exc_tb
):

Exit context manager and cleanup.


enter


def __enter__(
    
):

Enter context manager.

Usage Examples

Basic Usage (Sync)

# Load manifest from JSON file
with open("~/.cjm/manifests/whisper.json") as f:
    manifest = json.load(f)

# Create proxy (starts worker subprocess)
capability = RemoteCapabilityProxy(manifest)

# Use like a local capability
capability.initialize({"model": "large-v3"})
result = capability.execute(audio="/path/to/audio.wav")

# Clean up
capability.cleanup()

With Context Manager

with RemoteCapabilityProxy(manifest) as capability:
    capability.initialize({"model": "large-v3"})
    result = capability.execute(audio="/path/to/audio.wav")
# Worker automatically terminated

Async with Streaming (FastHTML)

async def transcribe_stream(audio_path: str):
    async with RemoteCapabilityProxy(manifest) as capability:
        await capability.initialize({"model": "large-v3"})
        async for chunk in capability.execute_stream(audio=audio_path):
            yield chunk  # Stream to client

Manifest Format

The proxy expects a manifest dictionary with at minimum:

{
    "name": "whisper-local",
    "version": "1.0.0",
    "python_path": "/home/user/anaconda3/envs/cjm-whisper/bin/python",
    "module": "cjm_transcription_capability_whisper.capability",
    "class": "WhisperLocalCapability",
    "env_vars": {
        "CUDA_VISIBLE_DEVICES": "0"
    }
}