Universal Worker

FastAPI server that runs inside isolated capability environments

The Universal Worker is a FastAPI server that:

  1. Dynamically loads a capability class specified via CLI arguments
  2. Exposes HTTP endpoints for capability lifecycle and execution
  3. Monitors parent process (“Suicide Pact”) to prevent zombie workers
  4. Reports telemetry for resource scheduling decisions
Host Process                    Worker Process (Isolated Env)
┌──────────────┐               ┌─────────────────────────────┐
│ RemoteProxy  │──HTTP/JSON───▶│ Universal Worker (FastAPI)  │
│              │               │   ├─ /health                │
│  --ppid ─────│───────────────│──▶│ ├─ /initialize          │
│              │               │   ├─ /execute               │
└──────────────┘               │   ├─ /execute_stream        │
                               │   └─ PID Watchdog (Thread)  │
                               └─────────────────────────────┘

EnhancedJSONEncoder

Custom JSON encoder that handles dataclasses and other common Python types that need serialization for HTTP responses.


EnhancedJSONEncoder


def EnhancedJSONEncoder(
    skipkeys:bool=False, ensure_ascii:bool=True, check_circular:bool=True, allow_nan:bool=True, sort_keys:bool=False,
    indent:NoneType=None, separators:NoneType=None, default:NoneType=None
):

JSON encoder that handles dataclasses and other common types.

SG-52: datetime support added so JobError.occurred_at serializes cleanly when emitted via the typed _job_error terminal chunk in /execute_stream.

# Test EnhancedJSONEncoder
from dataclasses import dataclass

@dataclass
class SampleConfig:
    name: str
    value: int

cfg = SampleConfig(name="test", value=42)
result = json.dumps(cfg, cls=EnhancedJSONEncoder)
print(f"Encoded: {result}")
assert json.loads(result) == {"name": "test", "value": 42}
Encoded: {"name": "test", "value": 42}

PID Watchdog

The “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes consuming resources.


parent_monitor


def parent_monitor(
    ppid:int, # Parent process ID to monitor
)->None:

Monitor parent process and terminate self if parent dies.

This implements the “Suicide Pact” pattern: if the Host process dies, the Worker must terminate itself to prevent zombie processes.

The watchdog runs in a daemon thread, checking every second if the parent process is still alive. When the parent dies (or becomes a zombie), the worker terminates itself using terminate_self() which handles cross-platform differences (SIGTERM on Unix, os._exit() on Windows).

Application Factory

Creates the FastAPI application with all endpoints for capability communication.

Factory reshape (NB-2, stage 2)

create_app was a single 439-line closure (over the 8K-char cell-split hard trigger). It is now a thin assembler over module-level helpers grouped by concern (the NB-2 “internal-helpers” option): capability loading, lifespan, and five endpoint-registration groups. Endpoint bodies are verbatim from the closure (each _register_* re-creates the same closure over capability_instance); the only behavioral change is the typed wire envelope (wire_encode) at the two result-serialization sites in the task group.


create_app


def create_app(
    module_name:str, # Python module path (e.g., "my_capability.capability")
    class_name:str, # Capability class name (e.g., "WhisperCapability")
    adapter_specs:NoneType=None, # CR-17 pt 2: "module:ClassName" adapter impl specs to bind in-worker
)->FastAPI: # Configured FastAPI application

Create FastAPI app that hosts the specified capability.

NB-2 reshape (stage 2): a thin assembler — load the capability, build the lifespan, register the endpoint groups. Endpoint behavior lives in the module-level _register_* helpers above.

Endpoint Summary

Endpoint Method Purpose
/health GET Health check with PID and capability identity
/stats GET Process telemetry (CPU, memory) for scheduler
/initialize POST Configure/reconfigure capability
/prefetch POST CR-4: eager resource acquisition (SG-19 hook)
/reconfigure POST CR-4: hot-reload via reconfigure(old, new); declarative RELOAD_TRIGGER dispatch
/config_schema GET JSON Schema for UI generation
/config GET Current configuration values
/execute POST Execute capability, return JSON. CR-4: CapabilityCancelledError → 409 (operator cancellation) vs 500 (real failure)
/execute_stream POST Execute with streaming NDJSON response. SG-51: resets _cancel_requested at start of iteration. SG-52: errors emit as terminal {"_job_error": <JobError dict>} chunk with CR-5 typed classification (replaces undocumented {"error": str(e)} shape).
/cancel POST Request cancellation of running execution. CR-4: default ToolCapability.cancel() sets _cancel_requested flag + fires registered callbacks
/progress GET Get current execution progress and status
/on_disable POST CR-2: signal capability to release heavy resources (worker stays alive)
/on_enable POST CR-2: signal capability to re-acquire resources
/get_system_status POST CR-3: typed MonitorToolProtocol.get_system_status() — returns SystemStats.to_dict(); 404 if not a monitor, 501 if monitor raises NotImplementedError
/list_processes POST CR-3: typed MonitorToolProtocol.list_processes() — returns list of ProcessStats.to_dict(); same 404/501 taxonomy
/cleanup POST Release capability resources

/execute_stream wire-format contract (SG-52)

Each NDJSON line is one of:

  • Capability output chunk: a JSON object yielded by the capability’s execute_stream (or wrapped single result from execute). Shape is capability-defined.
  • Terminal error chunk: {"_job_error": {<JobError dict>}}. Emitted at most once and only as the last chunk. The _job_error sentinel key never appears in capability output chunks. The nested dict carries CR-5’s full JobError structure: category, message, retriable, original_exc_repr, traceback, retry_after_seconds, fields_invalid, resource_shortfall, capability_name, capability_instance_id, occurred_at (ISO 8601 string).

CapabilityCancelledError is identifiable in the error chunk via category == "transient" combined with original_exc_repr prefix "CapabilityCancelledError" — the proxy uses this to raise the typed exception client-side rather than yielding the error chunk to downstream consumers.

CLI Entry Point

The worker is launched as a subprocess with the capability module/class and port specified via command line arguments.


run_worker


def run_worker(
    
)->None:

CLI entry point for running the worker.

Usage

The worker is typically launched by the RemoteCapabilityProxy:

# Example: Launch a Whisper capability worker
python -m cjm_substrate.core.worker \
    --module cjm_transcription_capability_whisper.capability \
    --class WhisperLocalCapability \
    --port 12345 \
    --ppid 1234

The --ppid argument enables the suicide pact: if process 1234 dies, this worker terminates.