# Universal Worker


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

The Universal Worker is a FastAPI server that:

1.  **Dynamically loads** a capability class specified via CLI arguments
2.  **Exposes HTTP endpoints** for capability lifecycle and execution
3.  **Monitors parent process** (“Suicide Pact”) to prevent zombie
    workers
4.  **Reports telemetry** for resource scheduling decisions

<!-- -->

    Host Process                    Worker Process (Isolated Env)
    ┌──────────────┐               ┌─────────────────────────────┐
    │ RemoteProxy  │──HTTP/JSON───▶│ Universal Worker (FastAPI)  │
    │              │               │   ├─ /health                │
    │  --ppid ─────│───────────────│──▶│ ├─ /initialize          │
    │              │               │   ├─ /execute               │
    └──────────────┘               │   ├─ /execute_stream        │
                                   │   └─ PID Watchdog (Thread)  │
                                   └─────────────────────────────┘

## EnhancedJSONEncoder

Custom JSON encoder that handles dataclasses and other common Python
types that need serialization for HTTP responses.

------------------------------------------------------------------------

### EnhancedJSONEncoder

``` python

def EnhancedJSONEncoder(
    skipkeys:bool=False, ensure_ascii:bool=True, check_circular:bool=True, allow_nan:bool=True, sort_keys:bool=False,
    indent:NoneType=None, separators:NoneType=None, default:NoneType=None
):

```

*JSON encoder that handles dataclasses and other common types.*

SG-52: datetime support added so JobError.occurred_at serializes cleanly
when emitted via the typed `_job_error` terminal chunk in
/execute_stream.

``` python
# Test EnhancedJSONEncoder
from dataclasses import dataclass

@dataclass
class SampleConfig:
    name: str
    value: int

cfg = SampleConfig(name="test", value=42)
result = json.dumps(cfg, cls=EnhancedJSONEncoder)
print(f"Encoded: {result}")
assert json.loads(result) == {"name": "test", "value": 42}
```

    Encoded: {"name": "test", "value": 42}

## PID Watchdog

The “Suicide Pact” pattern: if the Host process dies, the Worker must
terminate itself to prevent zombie processes consuming resources.

------------------------------------------------------------------------

### parent_monitor

``` python

def parent_monitor(
    ppid:int, # Parent process ID to monitor
)->None:

```

*Monitor parent process and terminate self if parent dies.*

This implements the “Suicide Pact” pattern: if the Host process dies,
the Worker must terminate itself to prevent zombie processes.

The watchdog runs in a daemon thread, checking every second if the
parent process is still alive. When the parent dies (or becomes a
zombie), the worker terminates itself using `terminate_self()` which
handles cross-platform differences (SIGTERM on Unix, `os._exit()` on
Windows).

## Application Factory

Creates the FastAPI application with all endpoints for capability
communication.

### Factory reshape (NB-2, stage 2)

`create_app` was a single 439-line closure (over the 8K-char cell-split
hard trigger). It is now a thin assembler over module-level helpers
grouped by concern (the NB-2 “internal-helpers” option): capability
loading, lifespan, and five endpoint-registration groups. Endpoint
bodies are verbatim from the closure (each `_register_*` re-creates the
same closure over `capability_instance`); the only behavioral change is
the typed wire envelope (`wire_encode`) at the two result-serialization
sites in the task group.

------------------------------------------------------------------------

### create_app

``` python

def create_app(
    module_name:str, # Python module path (e.g., "my_capability.capability")
    class_name:str, # Capability class name (e.g., "WhisperCapability")
    adapter_specs:NoneType=None, # CR-17 pt 2: "module:ClassName" adapter impl specs to bind in-worker
)->FastAPI: # Configured FastAPI application

```

*Create FastAPI app that hosts the specified capability.*

NB-2 reshape (stage 2): a thin assembler — load the capability, build
the lifespan, register the endpoint groups. Endpoint behavior lives in
the module-level `_register_*` helpers above.

### Endpoint Summary

<table>
<colgroup>
<col style="width: 15%" />
<col style="width: 12%" />
<col style="width: 71%" />
</colgroup>
<thead>
<tr>
<th>Endpoint</th>
<th>Method</th>
<th>Purpose</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>/health</code></td>
<td>GET</td>
<td>Health check with PID and capability identity</td>
</tr>
<tr>
<td><code>/stats</code></td>
<td>GET</td>
<td>Process telemetry (CPU, memory) for scheduler</td>
</tr>
<tr>
<td><code>/initialize</code></td>
<td>POST</td>
<td>Configure/reconfigure capability</td>
</tr>
<tr>
<td><code>/prefetch</code></td>
<td>POST</td>
<td>CR-4: eager resource acquisition (SG-19 hook)</td>
</tr>
<tr>
<td><code>/reconfigure</code></td>
<td>POST</td>
<td>CR-4: hot-reload via reconfigure(old, new); declarative
<code>RELOAD_TRIGGER</code> dispatch</td>
</tr>
<tr>
<td><code>/config_schema</code></td>
<td>GET</td>
<td>JSON Schema for UI generation</td>
</tr>
<tr>
<td><code>/config</code></td>
<td>GET</td>
<td>Current configuration values</td>
</tr>
<tr>
<td><code>/execute</code></td>
<td>POST</td>
<td>Execute capability, return JSON. CR-4:
<code>CapabilityCancelledError</code> → 409 (operator cancellation) vs
500 (real failure)</td>
</tr>
<tr>
<td><code>/execute_stream</code></td>
<td>POST</td>
<td>Execute with streaming NDJSON response. SG-51: resets
<code>_cancel_requested</code> at start of iteration. SG-52: errors emit
as terminal <code>{"_job_error": &lt;JobError dict&gt;}</code> chunk
with CR-5 typed classification (replaces undocumented
<code>{"error": str(e)}</code> shape).</td>
</tr>
<tr>
<td><code>/cancel</code></td>
<td>POST</td>
<td>Request cancellation of running execution. CR-4: default
<code>ToolCapability.cancel()</code> sets <code>_cancel_requested</code>
flag + fires registered callbacks</td>
</tr>
<tr>
<td><code>/progress</code></td>
<td>GET</td>
<td>Get current execution progress and status</td>
</tr>
<tr>
<td><code>/on_disable</code></td>
<td>POST</td>
<td>CR-2: signal capability to release heavy resources (worker stays
alive)</td>
</tr>
<tr>
<td><code>/on_enable</code></td>
<td>POST</td>
<td>CR-2: signal capability to re-acquire resources</td>
</tr>
<tr>
<td><code>/get_system_status</code></td>
<td>POST</td>
<td>CR-3: typed <code>MonitorToolProtocol.get_system_status()</code> —
returns <code>SystemStats.to_dict()</code>; 404 if not a monitor, 501 if
monitor raises <code>NotImplementedError</code></td>
</tr>
<tr>
<td><code>/list_processes</code></td>
<td>POST</td>
<td>CR-3: typed <code>MonitorToolProtocol.list_processes()</code> —
returns list of <code>ProcessStats.to_dict()</code>; same 404/501
taxonomy</td>
</tr>
<tr>
<td><code>/cleanup</code></td>
<td>POST</td>
<td>Release capability resources</td>
</tr>
</tbody>
</table>

#### `/execute_stream` wire-format contract (SG-52)

Each NDJSON line is one of:

- **Capability output chunk**: a JSON object yielded by the capability’s
  `execute_stream` (or wrapped single result from `execute`). Shape is
  capability-defined.
- **Terminal error chunk**: `{"_job_error": {<JobError dict>}}`. Emitted
  at most once and only as the last chunk. The `_job_error` sentinel key
  never appears in capability output chunks. The nested dict carries
  CR-5’s full `JobError` structure: `category`, `message`, `retriable`,
  `original_exc_repr`, `traceback`, `retry_after_seconds`,
  `fields_invalid`, `resource_shortfall`, `capability_name`,
  `capability_instance_id`, `occurred_at` (ISO 8601 string).

`CapabilityCancelledError` is identifiable in the error chunk via
`category == "transient"` combined with `original_exc_repr` prefix
`"CapabilityCancelledError"` — the proxy uses this to raise the typed
exception client-side rather than yielding the error chunk to downstream
consumers.

## CLI Entry Point

The worker is launched as a subprocess with the capability module/class
and port specified via command line arguments.

------------------------------------------------------------------------

### run_worker

``` python

def run_worker(
    
)->None:

```

*CLI entry point for running the worker.*

### Usage

The worker is typically launched by the `RemoteCapabilityProxy`:

``` bash
# Example: Launch a Whisper capability worker
python -m cjm_substrate.core.worker \
    --module cjm_transcription_capability_whisper.capability \
    --class WhisperLocalCapability \
    --port 12345 \
    --ppid 1234
```

The `--ppid` argument enables the suicide pact: if process 1234 dies,
this worker terminates.
