Remote Plugin Proxy

Bridge between Host application and isolated Worker processes

The RemotePluginProxy implements PluginInterface but forwards all calls over HTTP to a Worker process running in an isolated environment.

Host Application                        Isolated Environment
┌────────────────────┐                 ┌─────────────────────┐
│  PluginManager     │                 │  Conda Env (GPU)    │
│    │               │                 │    │                │
│    ▼               │     HTTP        │    ▼                │
│  RemotePluginProxy │◄───────────────▶│  Universal Worker   │
│  (implements       │   localhost     │    │                │
│   PluginInterface) │                 │    ▼                │
│                    │                 │  Actual Plugin      │
└────────────────────┘                 └─────────────────────┘

Key responsibilities:

  1. Process Management: Launch worker subprocess with correct Python interpreter
  2. Port Allocation: Find free port, pass to worker via CLI
  3. Zero-Copy Transfer: Detect FileBackedDTO objects and serialize to temp files
  4. Dual Interface: Sync methods for scripts, async methods for FastHTML

RemotePluginProxy


RemotePluginProxy


def RemotePluginProxy(
    manifest:Dict, # Plugin manifest with python_path, module, class, etc.
    extra_env:Optional=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
):

Proxy that forwards plugin calls to an isolated Worker subprocess.


RemotePluginProxy.config_options


def config_options(
    
)->Dict: # CR-11: live config option domains

Get the plugin’s runtime config option providers (CR-11).

Returns the worker’s get_config_options() output (FieldOptions per dynamic field, JSON-serialized to dicts). Empty dict when the plugin exposes no dynamic options.


RemotePluginProxy.cleanup


def cleanup(
    
)->None:

Clean up plugin resources and terminate worker process.

Input Serialization

The proxy detects FileBackedDTO objects and serializes them to temp files before transmission. This enables zero-copy transfer of large data (audio, images) between processes.

Asynchronous Interface

These methods are async for use with FastHTML and other async frameworks. Use execute_stream for real-time streaming results.


execute_async


async def execute_async(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Plugin result

Execute the plugin asynchronously.

CR-4: HTTP 409 from the worker is mapped to a typed PluginCancelledError. Same 409/200/other semantics as the sync execute() variant.


execute_stream_sync


def execute_stream_sync(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Generator:

Synchronous wrapper for streaming (blocking).


execute_stream


def execute_stream(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Yields parsed JSON chunks

Execute with streaming response (async generator).

SG-52: detects the terminal {"_job_error": <JobError dict>} chunk and raises the corresponding typed exception client-side instead of yielding it to downstream consumers. Mirrors /execute’s HTTP 409 → typed-exception behavior at the streaming wire boundary. Normal plugin output chunks pass through unchanged (they never carry the _job_error key).

CR-7 Track A: Worker-death classification

When the worker subprocess dies mid-call, the httpx client fault propagates up. The substrate needs to classify the death to dispatch CR-7’s reactive retry path correctly:

Worker state after httpx fault Classification Raised exception
Still alive Network blip / timeout — not CR-7’s concern Re-raise original httpx error
Dead with returncode == -SIGKILL (-9 on POSIX) Kernel OOM-killer is the dominant cause WorkerOOMError
Dead with any other non-zero returncode Generic crash (segfault, plugin init panic, etc.) PluginTransientError

This is Track A — the substrate-side view. Track B (plugin-side, per SG-47’s sub-task) catches torch.cuda.OutOfMemoryError inside execute() and re-raises as PluginResourceError with a populated ResourceShortfall before the worker ever dies; that path doesn’t hit Track A at all. Both tracks converge at except PluginResourceError in CR-7’s reactive retry loop.

The classification only applies to the /execute and /execute_stream paths — the long-running ones. Hook calls (/on_disable, /prefetch, etc.) already short-circuit on httpx.ConnectError and return False without classification, because their callers (CR-2 / CR-4 lifecycle bookkeeping) don’t need typed retry.


execute_async_with_oom_check


async def execute_async_with_oom_check(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:

CR-7 Track A wrapper around the async execute path. Same semantics.


execute_with_oom_check


def execute_with_oom_check(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:

CR-7 Track A wrapper around the sync execute path.

Catches httpx connection / protocol faults, calls _check_worker_death, and either raises a typed WorkerOOMError / PluginTransientError or re-raises the original httpx error (worker still alive — caller treats as a generic transient network issue).

Lifecycle Management


is_alive


def is_alive(
    
)->bool: # True if worker is responsive

Check if the worker process is still running and responsive.


get_stats


def get_stats(
    
)->Dict: # Process telemetry

Get worker process resource usage.

Cancellation and Progress

Methods for cancelling running executions and polling progress status.


get_progress_async


async def get_progress_async(
    
)->Dict: # {progress: float, message: str}

Get current execution progress asynchronously.


get_progress


def get_progress(
    
)->Dict: # {progress: float, message: str}

Get current execution progress from worker.


cancel_async


async def cancel_async(
    
)->bool: # True if cancel request was sent

Request cancellation asynchronously.


cancel


def cancel(
    
)->bool: # True if cancel request was sent

Request cancellation of running execution.


on_enable


def on_enable(
    
)->bool: # True if hook signal accepted by worker

CR-2: forward the substrate’s on_enable signal to the worker process.

Same delivery semantics as on_disable: best-effort, errors logged-and- swallowed. The plugin’s hook (default no-op) decides whether to eagerly re-acquire resources or rely on lazy re-load at next execute().


on_disable


def on_disable(
    
)->bool: # True if hook signal accepted by worker

CR-2: forward the substrate’s on_disable signal to the worker process.

Plugin can opt in via PluginInterface.on_disable(); default implementation is a no-op so silent-pass-through is the norm. Failures to reach the worker (already terminated, network blip) are logged-and-swallowed — the substrate-side enable/disable bookkeeping doesn’t depend on the hook actually firing.

CR-3: Typed MonitorPlugin accessors

get_system_status / list_processes POST to typed worker endpoints (/get_system_status, /list_processes) instead of the magic-string-dispatched /execute("get_system_status"). Both expose sync + async variants matching the existing CR-2 hook style.

Status code taxonomy (intentional, per CR-3 review):

Code Meaning Proxy behavior
200 Typed call succeeded Return JSON body
404 Plugin is not a MonitorPlugin Propagate HTTPStatusError (configuration error — substrate logs at WARN)
501 Legacy monitor predating CR-3 REMOVE-AFTER-OVERHAUL fallback to /execute(command=...)
500 Real plugin failure Propagate HTTPStatusError (don’t silently mask)
ConnectError Worker may have died Return None (substrate degrades to empty stats)

The 404/501 split prevents masking the “wrong plugin wired as system_monitor” misconfiguration as if it were just an unmigrated legacy plugin. After the SG-47 cascade migrates all monitor plugins to typed methods, the 501 branch becomes unreachable and SG-48 sweep removes it.


get_system_status


def get_system_status(
    
)->Optional: # SystemStats dict, or None on transport / config failure

CR-3: typed MonitorPlugin accessor. POSTs to worker’s /get_system_status.

Status code semantics (worker side raises HTTPException with these codes): - 200: SystemStats dict returned - 404: plugin is not a MonitorPlugin — logged at ERROR (configuration error; no amount of retry fixes it) and returns None. Loudly distinguished from the substrate’s WARN-level transient-failure degradation. - 501: legacy monitor predating CR-3; REMOVE-AFTER-OVERHAUL fallback to /execute("get_system_status") returns a dict in the pre-CR-3 wire format - 500: real plugin failure; propagates as HTTPStatusError - ConnectError: worker may have died; returns None silently (substrate degrades to empty stats)


get_system_status_async


async def get_system_status_async(
    
)->Optional: # SystemStats dict, or None on transport / config failure

Async variant of get_system_status. Same 200/404/501/500/ConnectError semantics.


list_processes


def list_processes(
    
)->Optional: # ProcessStats dict list, or None on transport / config failure

CR-3: typed MonitorPlugin accessor. POSTs to worker’s /list_processes.

Same 200/404/501/500/ConnectError semantics as get_system_status. Note that MonitorPlugin.list_processes() defaults to returning [], so monitors without per-process visibility yield a 200 with an empty list rather than 501.


list_processes_async


async def list_processes_async(
    
)->Optional: # ProcessStats dict list, or None on transport / config failure

Async variant of list_processes. Same semantics.

CR-4: Lifecycle hook proxies (prefetch + reconfigure)

prefetch() / reconfigure() POST to the worker’s /prefetch and /reconfigure endpoints. Both expose sync + async variants matching the existing CR-2/CR-3 hook style.

Endpoint Proxy method Behavior
/prefetch prefetch() / prefetch_async() Returns True on 200, False on transport failure. Errors raised by the plugin’s prefetch() (worker 500) propagate as RuntimeError so callers can distinguish “plugin can’t acquire resources” from “worker unreachable.”
/reconfigure reconfigure(old, new) / reconfigure_async(old, new) Returns True on 200, False on transport failure. Body shape: {"old_config": ..., "new_config": ...}. Plugin’s default reconfigure body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata to fire _release_<trigger> methods.

The substrate’s PluginManager.update_plugin_config path (CR-2 onward) calls reconfigure(old, new) when the worker exposes the typed endpoint. Older workers fall back to initialize(new_config) — both paths converge on the same worker-side state after the call.


prefetch


def prefetch(
    stall_threshold_seconds:Optional=None, # Override SubstrateConfig.prefetch_stall_threshold_seconds; None = use config
    poll_interval_seconds:float=1.0, # How often to poll /progress for stall detection
)->bool: # True if worker accepted the prefetch hook

CR-4 / Session A 2026-05-27: forward the substrate’s prefetch signal with progress-based stall detection.

Replaces wall-clock-timeout-based startup waiting (operators racing arbitrary timeouts vs. network speeds for model downloads). Approach:

  1. POST /prefetch fires in a background thread with httpx timeout=None.
  2. Main thread polls /progress every poll_interval_seconds.
  3. Each (progress, message) change resets the stall counter.
  4. If no change in stall_threshold_seconds AND POST still pending → SIGTERM the worker subprocess + raise PluginTimeoutError.

Plugins opt in to fine-grained stall defeat by calling self.report_progress(…) periodically during long lifecycle operations (model download, server startup, etc.). Plugins that don’t report progress are fine as long as the threshold accommodates their slowest plausible silent stretch.

Errors raised by the plugin (worker 500) propagate as RuntimeError; worker unreachable propagates as False; stall fires PluginTimeoutError.


prefetch_async


async def prefetch_async(
    stall_threshold_seconds:Optional=None, poll_interval_seconds:float=1.0
)->bool: # True if worker accepted the prefetch hook

Async variant of prefetch. Same stall-detection semantics.


reconfigure


def reconfigure(
    old_config:Optional, # Previous config snapshot
    new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call

CR-4: forward a reconfigure(old, new) call to the worker process.

The plugin’s default reconfigure() body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the plugin’s config_class to fire _release_<trigger> methods for fields whose values changed. Plugins not opting into the declarative pattern land in a silent no-op; the substrate’s PluginManager.update_plugin_config then falls back to initialize(new_config) for the actual state change.


reconfigure_async


async def reconfigure_async(
    old_config:Optional, # Previous config snapshot
    new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call

Async variant of reconfigure. Same semantics.

Context Manager Support

The proxy can be used as a context manager for automatic cleanup.


aexit


async def __aexit__(
    exc_type, exc_val, exc_tb
):

Exit async context manager and cleanup.


aenter


async def __aenter__(
    
):

Enter async context manager.


exit


def __exit__(
    exc_type, exc_val, exc_tb
):

Exit context manager and cleanup.


enter


def __enter__(
    
):

Enter context manager.

Usage Examples

Basic Usage (Sync)

# Load manifest from JSON file
with open("~/.cjm/manifests/whisper.json") as f:
    manifest = json.load(f)

# Create proxy (starts worker subprocess)
plugin = RemotePluginProxy(manifest)

# Use like a local plugin
plugin.initialize({"model": "large-v3"})
result = plugin.execute(audio="/path/to/audio.wav")

# Clean up
plugin.cleanup()

With Context Manager

with RemotePluginProxy(manifest) as plugin:
    plugin.initialize({"model": "large-v3"})
    result = plugin.execute(audio="/path/to/audio.wav")
# Worker automatically terminated

Async with Streaming (FastHTML)

async def transcribe_stream(audio_path: str):
    async with RemotePluginProxy(manifest) as plugin:
        await plugin.initialize({"model": "large-v3"})
        async for chunk in plugin.execute_stream(audio=audio_path):
            yield chunk  # Stream to client

Manifest Format

The proxy expects a manifest dictionary with at minimum:

{
    "name": "whisper-local",
    "version": "1.0.0",
    "python_path": "/home/user/anaconda3/envs/cjm-whisper/bin/python",
    "module": "cjm_transcription_plugin_whisper.plugin",
    "class": "WhisperLocalPlugin",
    "env_vars": {
        "CUDA_VISIBLE_DEVICES": "0"
    }
}