Remote Plugin Proxy
The RemotePluginProxy implements PluginInterface but forwards all calls over HTTP to a Worker process running in an isolated environment.
Host Application Isolated Environment
┌────────────────────┐ ┌─────────────────────┐
│ PluginManager │ │ Conda Env (GPU) │
│ │ │ │ │ │
│ ▼ │ HTTP │ ▼ │
│ RemotePluginProxy │◄───────────────▶│ Universal Worker │
│ (implements │ localhost │ │ │
│ PluginInterface) │ │ ▼ │
│ │ │ Actual Plugin │
└────────────────────┘ └─────────────────────┘
Key responsibilities:
- Process Management: Launch worker subprocess with correct Python interpreter
- Port Allocation: Find free port, pass to worker via CLI
- Zero-Copy Transfer: Detect
FileBackedDTOobjects and serialize to temp files - Dual Interface: Sync methods for scripts, async methods for FastHTML
RemotePluginProxy
RemotePluginProxy
def RemotePluginProxy(
manifest:Dict, # Plugin manifest with python_path, module, class, etc.
extra_env:Optional=None, # CR-12: resolved worker-env overlay (secrets + visible overrides) injected at spawn
):
Proxy that forwards plugin calls to an isolated Worker subprocess.
RemotePluginProxy.config_options
def config_options(
)->Dict: # CR-11: live config option domains
Get the plugin’s runtime config option providers (CR-11).
Returns the worker’s get_config_options() output (FieldOptions per dynamic field, JSON-serialized to dicts). Empty dict when the plugin exposes no dynamic options.
RemotePluginProxy.cleanup
def cleanup(
)->None:
Clean up plugin resources and terminate worker process.
Input Serialization
The proxy detects FileBackedDTO objects and serializes them to temp files before transmission. This enables zero-copy transfer of large data (audio, images) between processes.
Asynchronous Interface
These methods are async for use with FastHTML and other async frameworks. Use execute_stream for real-time streaming results.
execute_async
async def execute_async(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Plugin result
Execute the plugin asynchronously.
CR-4: HTTP 409 from the worker is mapped to a typed PluginCancelledError. Same 409/200/other semantics as the sync execute() variant.
execute_stream_sync
def execute_stream_sync(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Generator:
Synchronous wrapper for streaming (blocking).
execute_stream
def execute_stream(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Yields parsed JSON chunks
Execute with streaming response (async generator).
SG-52: detects the terminal {"_job_error": <JobError dict>} chunk and raises the corresponding typed exception client-side instead of yielding it to downstream consumers. Mirrors /execute’s HTTP 409 → typed-exception behavior at the streaming wire boundary. Normal plugin output chunks pass through unchanged (they never carry the _job_error key).
CR-7 Track A: Worker-death classification
When the worker subprocess dies mid-call, the httpx client fault propagates up. The substrate needs to classify the death to dispatch CR-7’s reactive retry path correctly:
| Worker state after httpx fault | Classification | Raised exception |
|---|---|---|
| Still alive | Network blip / timeout — not CR-7’s concern | Re-raise original httpx error |
Dead with returncode == -SIGKILL (-9 on POSIX) |
Kernel OOM-killer is the dominant cause | WorkerOOMError |
| Dead with any other non-zero returncode | Generic crash (segfault, plugin init panic, etc.) | PluginTransientError |
This is Track A — the substrate-side view. Track B (plugin-side, per SG-47’s sub-task) catches torch.cuda.OutOfMemoryError inside execute() and re-raises as PluginResourceError with a populated ResourceShortfall before the worker ever dies; that path doesn’t hit Track A at all. Both tracks converge at except PluginResourceError in CR-7’s reactive retry loop.
The classification only applies to the /execute and /execute_stream paths — the long-running ones. Hook calls (/on_disable, /prefetch, etc.) already short-circuit on httpx.ConnectError and return False without classification, because their callers (CR-2 / CR-4 lifecycle bookkeeping) don’t need typed retry.
execute_async_with_oom_check
async def execute_async_with_oom_check(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:
CR-7 Track A wrapper around the async execute path. Same semantics.
execute_with_oom_check
def execute_with_oom_check(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any:
CR-7 Track A wrapper around the sync execute path.
Catches httpx connection / protocol faults, calls _check_worker_death, and either raises a typed WorkerOOMError / PluginTransientError or re-raises the original httpx error (worker still alive — caller treats as a generic transient network issue).
Lifecycle Management
is_alive
def is_alive(
)->bool: # True if worker is responsive
Check if the worker process is still running and responsive.
get_stats
def get_stats(
)->Dict: # Process telemetry
Get worker process resource usage.
Cancellation and Progress
Methods for cancelling running executions and polling progress status.
get_progress_async
async def get_progress_async(
)->Dict: # {progress: float, message: str}
Get current execution progress asynchronously.
get_progress
def get_progress(
)->Dict: # {progress: float, message: str}
Get current execution progress from worker.
cancel_async
async def cancel_async(
)->bool: # True if cancel request was sent
Request cancellation asynchronously.
cancel
def cancel(
)->bool: # True if cancel request was sent
Request cancellation of running execution.
on_enable
def on_enable(
)->bool: # True if hook signal accepted by worker
CR-2: forward the substrate’s on_enable signal to the worker process.
Same delivery semantics as on_disable: best-effort, errors logged-and- swallowed. The plugin’s hook (default no-op) decides whether to eagerly re-acquire resources or rely on lazy re-load at next execute().
on_disable
def on_disable(
)->bool: # True if hook signal accepted by worker
CR-2: forward the substrate’s on_disable signal to the worker process.
Plugin can opt in via PluginInterface.on_disable(); default implementation is a no-op so silent-pass-through is the norm. Failures to reach the worker (already terminated, network blip) are logged-and-swallowed — the substrate-side enable/disable bookkeeping doesn’t depend on the hook actually firing.
CR-3: Typed MonitorPlugin accessors
get_system_status / list_processes POST to typed worker endpoints (/get_system_status, /list_processes) instead of the magic-string-dispatched /execute("get_system_status"). Both expose sync + async variants matching the existing CR-2 hook style.
Status code taxonomy (intentional, per CR-3 review):
| Code | Meaning | Proxy behavior |
|---|---|---|
| 200 | Typed call succeeded | Return JSON body |
| 404 | Plugin is not a MonitorPlugin | Propagate HTTPStatusError (configuration error — substrate logs at WARN) |
| 501 | Legacy monitor predating CR-3 | REMOVE-AFTER-OVERHAUL fallback to /execute(command=...) |
| 500 | Real plugin failure | Propagate HTTPStatusError (don’t silently mask) |
| ConnectError | Worker may have died | Return None (substrate degrades to empty stats) |
The 404/501 split prevents masking the “wrong plugin wired as system_monitor” misconfiguration as if it were just an unmigrated legacy plugin. After the SG-47 cascade migrates all monitor plugins to typed methods, the 501 branch becomes unreachable and SG-48 sweep removes it.
get_system_status
def get_system_status(
)->Optional: # SystemStats dict, or None on transport / config failure
CR-3: typed MonitorPlugin accessor. POSTs to worker’s /get_system_status.
Status code semantics (worker side raises HTTPException with these codes): - 200: SystemStats dict returned - 404: plugin is not a MonitorPlugin — logged at ERROR (configuration error; no amount of retry fixes it) and returns None. Loudly distinguished from the substrate’s WARN-level transient-failure degradation. - 501: legacy monitor predating CR-3; REMOVE-AFTER-OVERHAUL fallback to /execute("get_system_status") returns a dict in the pre-CR-3 wire format - 500: real plugin failure; propagates as HTTPStatusError - ConnectError: worker may have died; returns None silently (substrate degrades to empty stats)
get_system_status_async
async def get_system_status_async(
)->Optional: # SystemStats dict, or None on transport / config failure
Async variant of get_system_status. Same 200/404/501/500/ConnectError semantics.
list_processes
def list_processes(
)->Optional: # ProcessStats dict list, or None on transport / config failure
CR-3: typed MonitorPlugin accessor. POSTs to worker’s /list_processes.
Same 200/404/501/500/ConnectError semantics as get_system_status. Note that MonitorPlugin.list_processes() defaults to returning [], so monitors without per-process visibility yield a 200 with an empty list rather than 501.
list_processes_async
async def list_processes_async(
)->Optional: # ProcessStats dict list, or None on transport / config failure
Async variant of list_processes. Same semantics.
CR-4: Lifecycle hook proxies (prefetch + reconfigure)
prefetch() / reconfigure() POST to the worker’s /prefetch and /reconfigure endpoints. Both expose sync + async variants matching the existing CR-2/CR-3 hook style.
| Endpoint | Proxy method | Behavior |
|---|---|---|
/prefetch |
prefetch() / prefetch_async() |
Returns True on 200, False on transport failure. Errors raised by the plugin’s prefetch() (worker 500) propagate as RuntimeError so callers can distinguish “plugin can’t acquire resources” from “worker unreachable.” |
/reconfigure |
reconfigure(old, new) / reconfigure_async(old, new) |
Returns True on 200, False on transport failure. Body shape: {"old_config": ..., "new_config": ...}. Plugin’s default reconfigure body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata to fire _release_<trigger> methods. |
The substrate’s PluginManager.update_plugin_config path (CR-2 onward) calls reconfigure(old, new) when the worker exposes the typed endpoint. Older workers fall back to initialize(new_config) — both paths converge on the same worker-side state after the call.
prefetch
def prefetch(
stall_threshold_seconds:Optional=None, # Override SubstrateConfig.prefetch_stall_threshold_seconds; None = use config
poll_interval_seconds:float=1.0, # How often to poll /progress for stall detection
)->bool: # True if worker accepted the prefetch hook
CR-4 / Session A 2026-05-27: forward the substrate’s prefetch signal with progress-based stall detection.
Replaces wall-clock-timeout-based startup waiting (operators racing arbitrary timeouts vs. network speeds for model downloads). Approach:
- POST /prefetch fires in a background thread with httpx timeout=None.
- Main thread polls /progress every poll_interval_seconds.
- Each (progress, message) change resets the stall counter.
- If no change in stall_threshold_seconds AND POST still pending → SIGTERM the worker subprocess + raise PluginTimeoutError.
Plugins opt in to fine-grained stall defeat by calling self.report_progress(…) periodically during long lifecycle operations (model download, server startup, etc.). Plugins that don’t report progress are fine as long as the threshold accommodates their slowest plausible silent stretch.
Errors raised by the plugin (worker 500) propagate as RuntimeError; worker unreachable propagates as False; stall fires PluginTimeoutError.
prefetch_async
async def prefetch_async(
stall_threshold_seconds:Optional=None, poll_interval_seconds:float=1.0
)->bool: # True if worker accepted the prefetch hook
Async variant of prefetch. Same stall-detection semantics.
reconfigure
def reconfigure(
old_config:Optional, # Previous config snapshot
new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call
CR-4: forward a reconfigure(old, new) call to the worker process.
The plugin’s default reconfigure() body delegates to reconfigure_with_triggers, which walks RELOAD_TRIGGER metadata on the plugin’s config_class to fire _release_<trigger> methods for fields whose values changed. Plugins not opting into the declarative pattern land in a silent no-op; the substrate’s PluginManager.update_plugin_config then falls back to initialize(new_config) for the actual state change.
reconfigure_async
async def reconfigure_async(
old_config:Optional, # Previous config snapshot
new_config:Optional, # Config being applied
)->bool: # True if worker accepted the reconfigure call
Async variant of reconfigure. Same semantics.
Context Manager Support
The proxy can be used as a context manager for automatic cleanup.
aexit
async def __aexit__(
exc_type, exc_val, exc_tb
):
Exit async context manager and cleanup.
aenter
async def __aenter__(
):
Enter async context manager.
exit
def __exit__(
exc_type, exc_val, exc_tb
):
Exit context manager and cleanup.
enter
def __enter__(
):
Enter context manager.
Usage Examples
Basic Usage (Sync)
# Load manifest from JSON file
with open("~/.cjm/manifests/whisper.json") as f:
manifest = json.load(f)
# Create proxy (starts worker subprocess)
plugin = RemotePluginProxy(manifest)
# Use like a local plugin
plugin.initialize({"model": "large-v3"})
result = plugin.execute(audio="/path/to/audio.wav")
# Clean up
plugin.cleanup()With Context Manager
with RemotePluginProxy(manifest) as plugin:
plugin.initialize({"model": "large-v3"})
result = plugin.execute(audio="/path/to/audio.wav")
# Worker automatically terminatedAsync with Streaming (FastHTML)
async def transcribe_stream(audio_path: str):
async with RemotePluginProxy(manifest) as plugin:
await plugin.initialize({"model": "large-v3"})
async for chunk in plugin.execute_stream(audio=audio_path):
yield chunk # Stream to clientManifest Format
The proxy expects a manifest dictionary with at minimum:
{
"name": "whisper-local",
"version": "1.0.0",
"python_path": "/home/user/anaconda3/envs/cjm-whisper/bin/python",
"module": "cjm_transcription_plugin_whisper.plugin",
"class": "WhisperLocalPlugin",
"env_vars": {
"CUDA_VISIBLE_DEVICES": "0"
}
}