Universal Worker

FastAPI server that runs inside isolated plugin environments

The Universal Worker is a FastAPI server that:

  1. Dynamically loads a plugin class specified via CLI arguments
  2. Exposes HTTP endpoints for plugin lifecycle and execution
  3. Monitors parent process (“Suicide Pact”) to prevent zombie workers
  4. Reports telemetry for resource scheduling decisions
Host Process                    Worker Process (Isolated Env)
┌──────────────┐               ┌─────────────────────────────┐
│ RemoteProxy  │──HTTP/JSON───▶│ Universal Worker (FastAPI)  │
│              │               │   ├─ /health                │
│  --ppid ─────│───────────────│──▶│ ├─ /initialize          │
│              │               │   ├─ /execute               │
└──────────────┘               │   ├─ /execute_stream        │
                               │   └─ PID Watchdog (Thread)  │
                               └─────────────────────────────┘

EnhancedJSONEncoder

Custom JSON encoder that handles dataclasses and other common Python types that need serialization for HTTP responses.


source

EnhancedJSONEncoder


def EnhancedJSONEncoder(
    skipkeys:bool=False, ensure_ascii:bool=True, check_circular:bool=True, allow_nan:bool=True, sort_keys:bool=False,
    indent:NoneType=None, separators:NoneType=None, default:NoneType=None
):

JSON encoder that handles dataclasses and other common types.

# Test EnhancedJSONEncoder
from dataclasses import dataclass

@dataclass
class SampleConfig:
    name: str
    value: int

cfg = SampleConfig(name="test", value=42)
result = json.dumps(cfg, cls=EnhancedJSONEncoder)
print(f"Encoded: {result}")
assert json.loads(result) == {"name": "test", "value": 42}
Encoded: {"name": "test", "value": 42}

PID Watchdog

The “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes consuming resources.


source

parent_monitor


def parent_monitor(
    ppid:int, # Parent process ID to monitor
)->None:

Monitor parent process and terminate self if parent dies.

This implements the “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes.

The watchdog runs in a daemon thread, checking every second if the parent process is still alive. When the parent dies (or becomes a zombie), the worker terminates itself using terminate_self() which handles cross-platform differences (SIGTERM on Unix, os._exit() on Windows).

Application Factory

Creates the FastAPI application with all endpoints for plugin communication.


source

create_app


def create_app(
    module_name:str, # Python module path (e.g., "my_plugin.plugin")
    class_name:str, # Plugin class name (e.g., "WhisperPlugin")
)->FastAPI: # Configured FastAPI application

Create FastAPI app that hosts the specified plugin.

Endpoint Summary

Endpoint Method Purpose
/health GET Health check with PID and plugin identity
/stats GET Process telemetry (CPU, memory) for scheduler
/initialize POST Configure/reconfigure plugin
/config_schema GET JSON Schema for UI generation
/config GET Current configuration values
/execute POST Execute plugin, return JSON
/execute_stream POST Execute with streaming NDJSON response
/cancel POST Request cancellation of running execution
/progress GET Get current execution progress and status
/cleanup POST Release plugin resources

CLI Entry Point

The worker is launched as a subprocess with the plugin module/class and port specified via command line arguments.


source

run_worker


def run_worker(
    
)->None:

CLI entry point for running the worker.

Usage

The worker is typically launched by the RemotePluginProxy:

# Example: Launch a Whisper plugin worker
python -m cjm_plugin_system.core.worker \
    --module cjm_transcription_plugin_whisper.plugin \
    --class WhisperLocalPlugin \
    --port 12345 \
    --ppid 1234

The --ppid argument enables the suicide pact: if process 1234 dies, this worker terminates.