Universal Worker

FastAPI server that runs inside isolated plugin environments

The Universal Worker is a FastAPI server that:

  1. Dynamically loads a plugin class specified via CLI arguments
  2. Exposes HTTP endpoints for plugin lifecycle and execution
  3. Monitors parent process (“Suicide Pact”) to prevent zombie workers
  4. Reports telemetry for resource scheduling decisions
Host Process                    Worker Process (Isolated Env)
┌──────────────┐               ┌─────────────────────────────┐
│ RemoteProxy  │──HTTP/JSON───▶│ Universal Worker (FastAPI)  │
│              │               │   ├─ /health                │
│  --ppid ─────│───────────────│──▶│ ├─ /initialize          │
│              │               │   ├─ /execute               │
└──────────────┘               │   ├─ /execute_stream        │
                               │   └─ PID Watchdog (Thread)  │
                               └─────────────────────────────┘

EnhancedJSONEncoder

Custom JSON encoder that handles dataclasses and other common Python types that need serialization for HTTP responses.


EnhancedJSONEncoder


def EnhancedJSONEncoder(
    skipkeys:bool=False, ensure_ascii:bool=True, check_circular:bool=True, allow_nan:bool=True, sort_keys:bool=False,
    indent:NoneType=None, separators:NoneType=None, default:NoneType=None
):

JSON encoder that handles dataclasses and other common types.

SG-52: datetime support added so JobError.occurred_at serializes cleanly when emitted via the typed _job_error terminal chunk in /execute_stream.

# Test EnhancedJSONEncoder
from dataclasses import dataclass

@dataclass
class SampleConfig:
    name: str
    value: int

cfg = SampleConfig(name="test", value=42)
result = json.dumps(cfg, cls=EnhancedJSONEncoder)
print(f"Encoded: {result}")
assert json.loads(result) == {"name": "test", "value": 42}
Encoded: {"name": "test", "value": 42}

PID Watchdog

The “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes consuming resources.


parent_monitor


def parent_monitor(
    ppid:int, # Parent process ID to monitor
)->None:

Monitor parent process and terminate self if parent dies.

This implements the “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes.

The watchdog runs in a daemon thread, checking every second if the parent process is still alive. When the parent dies (or becomes a zombie), the worker terminates itself using terminate_self() which handles cross-platform differences (SIGTERM on Unix, os._exit() on Windows).

Application Factory

Creates the FastAPI application with all endpoints for plugin communication.


create_app


def create_app(
    module_name:str, # Python module path (e.g., "my_plugin.plugin")
    class_name:str, # Plugin class name (e.g., "WhisperPlugin")
)->FastAPI: # Configured FastAPI application

Create FastAPI app that hosts the specified plugin.

Endpoint Summary

Endpoint Method Purpose
/health GET Health check with PID and plugin identity
/stats GET Process telemetry (CPU, memory) for scheduler
/initialize POST Configure/reconfigure plugin
/prefetch POST CR-4: eager resource acquisition (SG-19 hook)
/reconfigure POST CR-4: hot-reload via reconfigure(old, new); declarative RELOAD_TRIGGER dispatch
/config_schema GET JSON Schema for UI generation
/config GET Current configuration values
/execute POST Execute plugin, return JSON. CR-4: PluginCancelledError → 409 (operator cancellation) vs 500 (real failure)
/execute_stream POST Execute with streaming NDJSON response. SG-51: resets _cancel_requested at start of iteration. SG-52: errors emit as terminal {"_job_error": <JobError dict>} chunk with CR-5 typed classification (replaces undocumented {"error": str(e)} shape).
/cancel POST Request cancellation of running execution. CR-4: default PluginInterface.cancel() sets _cancel_requested flag + fires registered callbacks
/progress GET Get current execution progress and status
/on_disable POST CR-2: signal plugin to release heavy resources (worker stays alive)
/on_enable POST CR-2: signal plugin to re-acquire resources
/get_system_status POST CR-3: typed MonitorPlugin.get_system_status() — returns SystemStats.to_dict(); 404 if not a monitor, 501 if monitor raises NotImplementedError
/list_processes POST CR-3: typed MonitorPlugin.list_processes() — returns list of ProcessStats.to_dict(); same 404/501 taxonomy
/cleanup POST Release plugin resources

/execute_stream wire-format contract (SG-52)

Each NDJSON line is one of:

  • Plugin output chunk: a JSON object yielded by the plugin’s execute_stream (or wrapped single result from execute). Shape is plugin-defined.
  • Terminal error chunk: {"_job_error": {<JobError dict>}}. Emitted at most once and only as the last chunk. The _job_error sentinel key never appears in plugin output chunks. The nested dict carries CR-5’s full JobError structure: category, message, retriable, original_exc_repr, traceback, retry_after_seconds, fields_invalid, resource_shortfall, plugin_name, plugin_instance_id, occurred_at (ISO 8601 string).

PluginCancelledError is identifiable in the error chunk via category == "transient" combined with original_exc_repr prefix "PluginCancelledError" — the proxy uses this to raise the typed exception client-side rather than yielding the error chunk to downstream consumers.

CLI Entry Point

The worker is launched as a subprocess with the plugin module/class and port specified via command line arguments.


run_worker


def run_worker(
    
)->None:

CLI entry point for running the worker.

Usage

The worker is typically launched by the RemotePluginProxy:

# Example: Launch a Whisper plugin worker
python -m cjm_plugin_system.core.worker \
    --module cjm_transcription_plugin_whisper.plugin \
    --class WhisperLocalPlugin \
    --port 12345 \
    --ppid 1234

The --ppid argument enables the suicide pact: if process 1234 dies, this worker terminates.