Remote Plugin Proxy

Bridge between Host application and isolated Worker processes

The RemotePluginProxy implements PluginInterface but forwards all calls over HTTP to a Worker process running in an isolated environment.

Host Application                        Isolated Environment
┌────────────────────┐                 ┌─────────────────────┐
│  PluginManager     │                 │  Conda Env (GPU)    │
│    │               │                 │    │                │
│    ▼               │     HTTP        │    ▼                │
│  RemotePluginProxy │◄───────────────▶│  Universal Worker   │
│  (implements       │   localhost     │    │                │
│   PluginInterface) │                 │    ▼                │
│                    │                 │  Actual Plugin      │
└────────────────────┘                 └─────────────────────┘

Key responsibilities:

  1. Process Management: Launch worker subprocess with correct Python interpreter
  2. Port Allocation: Find free port, pass to worker via CLI
  3. Zero-Copy Transfer: Detect FileBackedDTO objects and serialize to temp files
  4. Dual Interface: Sync methods for scripts, async methods for FastHTML

RemotePluginProxy


source

RemotePluginProxy


def RemotePluginProxy(
    manifest:Dict, # Plugin manifest with python_path, module, class, etc.
):

Proxy that forwards plugin calls to an isolated Worker subprocess.

Input Serialization

The proxy detects FileBackedDTO objects and serializes them to temp files before transmission. This enables zero-copy transfer of large data (audio, images) between processes.

Asynchronous Interface

These methods are async for use with FastHTML and other async frameworks. Use execute_stream for real-time streaming results.


source

execute_stream


def execute_stream(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Yields parsed JSON chunks

Execute with streaming response (async generator).


source

execute_stream_sync


def execute_stream_sync(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Generator:

Synchronous wrapper for streaming (blocking).


source

execute_async


def execute_async(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Plugin result

Execute the plugin asynchronously.

Lifecycle Management


source

is_alive


def is_alive(
    
)->bool: # True if worker is responsive

Check if the worker process is still running and responsive.


source

get_stats


def get_stats(
    
)->Dict: # Process telemetry

Get worker process resource usage.

Cancellation and Progress

Methods for cancelling running executions and polling progress status.


source

get_progress_async


def get_progress_async(
    
)->Dict: # {progress: float, message: str}

Get current execution progress asynchronously.


source

get_progress


def get_progress(
    
)->Dict: # {progress: float, message: str}

Get current execution progress from worker.


source

cancel_async


def cancel_async(
    
)->bool: # True if cancel request was sent

Request cancellation asynchronously.


source

cancel


def cancel(
    
)->bool: # True if cancel request was sent

Request cancellation of running execution.

Context Manager Support

The proxy can be used as a context manager for automatic cleanup.


source

aexit


def __aexit__(
    exc_type, exc_val, exc_tb
):

Exit async context manager and cleanup.


source

aenter


def __aenter__(
    
):

Enter async context manager.


source

exit


def __exit__(
    exc_type, exc_val, exc_tb
):

Exit context manager and cleanup.


source

enter


def __enter__(
    
):

Enter context manager.

Usage Examples

Basic Usage (Sync)

# Load manifest from JSON file
with open("~/.cjm/manifests/whisper.json") as f:
    manifest = json.load(f)

# Create proxy (starts worker subprocess)
plugin = RemotePluginProxy(manifest)

# Use like a local plugin
plugin.initialize({"model": "large-v3"})
result = plugin.execute(audio="/path/to/audio.wav")

# Clean up
plugin.cleanup()

With Context Manager

with RemotePluginProxy(manifest) as plugin:
    plugin.initialize({"model": "large-v3"})
    result = plugin.execute(audio="/path/to/audio.wav")
# Worker automatically terminated

Async with Streaming (FastHTML)

async def transcribe_stream(audio_path: str):
    async with RemotePluginProxy(manifest) as plugin:
        await plugin.initialize({"model": "large-v3"})
        async for chunk in plugin.execute_stream(audio=audio_path):
            yield chunk  # Stream to client

Manifest Format

The proxy expects a manifest dictionary with at minimum:

{
    "name": "whisper-local",
    "version": "1.0.0",
    "python_path": "/home/user/anaconda3/envs/cjm-whisper/bin/python",
    "module": "cjm_transcription_plugin_whisper.plugin",
    "class": "WhisperLocalPlugin",
    "env_vars": {
        "CUDA_VISIBLE_DEVICES": "0"
    }
}