The Universal Worker is a FastAPI server that:
Dynamically loads a plugin class specified via CLI arguments
Exposes HTTP endpoints for plugin lifecycle and execution
Monitors parent process (“Suicide Pact”) to prevent zombie workers
Reports telemetry for resource scheduling decisions
Host Process Worker Process (Isolated Env)
┌──────────────┐ ┌─────────────────────────────┐
│ RemoteProxy │──HTTP/JSON───▶│ Universal Worker (FastAPI) │
│ │ │ ├─ /health │
│ --ppid ─────│───────────────│──▶│ ├─ /initialize │
│ │ │ ├─ /execute │
└──────────────┘ │ ├─ /execute_stream │
│ └─ PID Watchdog (Thread) │
└─────────────────────────────┘
EnhancedJSONEncoder
Custom JSON encoder that handles dataclasses and other common Python types that need serialization for HTTP responses.
source
EnhancedJSONEncoder
def EnhancedJSONEncoder(
skipkeys:bool = False , ensure_ascii:bool = True , check_circular:bool = True , allow_nan:bool = True , sort_keys:bool = False ,
indent:NoneType= None , separators:NoneType= None , default:NoneType= None
):
JSON encoder that handles dataclasses and other common types.
# Test EnhancedJSONEncoder
from dataclasses import dataclass
@dataclass
class SampleConfig:
name: str
value: int
cfg = SampleConfig(name= "test" , value= 42 )
result = json.dumps(cfg, cls= EnhancedJSONEncoder)
print (f"Encoded: { result} " )
assert json.loads(result) == {"name" : "test" , "value" : 42 }
Encoded: {"name": "test", "value": 42}
PID Watchdog
The “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes consuming resources.
source
parent_monitor
def parent_monitor(
ppid:int , # Parent process ID to monitor
)-> None :
Monitor parent process and terminate self if parent dies.
This implements the “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes.
The watchdog runs in a daemon thread, checking every second if the parent process is still alive. When the parent dies (or becomes a zombie), the worker terminates itself using terminate_self() which handles cross-platform differences (SIGTERM on Unix, os._exit() on Windows).
Application Factory
Creates the FastAPI application with all endpoints for plugin communication.
source
create_app
def create_app(
module_name:str , # Python module path (e.g., "my_plugin.plugin")
class_name:str , # Plugin class name (e.g., "WhisperPlugin")
)-> FastAPI: # Configured FastAPI application
Create FastAPI app that hosts the specified plugin.
Endpoint Summary
/health
GET
Health check with PID and plugin identity
/stats
GET
Process telemetry (CPU, memory) for scheduler
/initialize
POST
Configure/reconfigure plugin
/config_schema
GET
JSON Schema for UI generation
/config
GET
Current configuration values
/execute
POST
Execute plugin, return JSON
/execute_stream
POST
Execute with streaming NDJSON response
/cancel
POST
Request cancellation of running execution
/progress
GET
Get current execution progress and status
/cleanup
POST
Release plugin resources
CLI Entry Point
The worker is launched as a subprocess with the plugin module/class and port specified via command line arguments.
source
run_worker
CLI entry point for running the worker.
Usage
The worker is typically launched by the RemotePluginProxy :
# Example: Launch a Whisper plugin worker
python -m cjm_plugin_system.core.worker \
--module cjm_transcription_plugin_whisper.plugin \
--class WhisperLocalPlugin \
--port 12345 \
--ppid 1234
The --ppid argument enables the suicide pact: if process 1234 dies, this worker terminates.