# Typed Wire Layer


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## FileBackedDTO Protocol

The `FileBackedDTO` protocol defines objects that can serialize
themselves to disk for zero-copy transfer between Host and Worker
processes. When the Proxy detects an argument implementing this
protocol, it calls `to_temp_file()` and sends the file path instead of
the data.

------------------------------------------------------------------------

### FileBackedDTO

``` python

def FileBackedDTO(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

```

*Protocol for Data Transfer Objects that serialize to disk for zero-copy
transfer.*

``` python
# Test FileBackedDTO protocol detection
import tempfile

class MockAudioData:
    """Example class implementing FileBackedDTO."""
    
    def __init__(self, data: bytes):
        self._data = data
    
    def to_temp_file(self) -> str:
        """Save to temp file and return path."""
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f:
            f.write(self._data)
            return f.name

# Check if it implements the protocol
audio = MockAudioData(b"fake audio data")
print(f"MockAudioData implements FileBackedDTO: {isinstance(audio, FileBackedDTO)}")
print(f"Temp file path: {audio.to_temp_file()}")

# A regular string does not implement the protocol
print(f"str implements FileBackedDTO: {isinstance('hello', FileBackedDTO)}")
```

    MockAudioData implements FileBackedDTO: True
    Temp file path: /tmp/tmpchnx7qag.wav
    str implements FileBackedDTO: False

## Typed result envelope

The typed result envelope is how task results keep their type across the
worker HTTP/JSON boundary. Today a worker serializes `execute()` results
via `EnhancedJSONEncoder` (`dataclasses.asdict`) — the type information
is destroyed at the boundary, every consumer re-extracts fields from
anonymous dicts, and each workflow core re-pays the dict-or-object
tolerance tax (`field_of`; evidence E5/D10/C7 + 10 inline e2e-script
copies).

Design (serialization precedent: the `cjm-context-graph-primitives`
tagged-dict law, stage-1 ledger P5/P6):

- **Dict-primary, self-describing**:
  `{"__wire__": "<kind>", "data": {...}}`.
- **Encode is exact-type**: only DTO classes registered via `@wire_type`
  get the envelope; everything else (plain dicts from dispatcher
  capabilities, primitives, unregistered dataclasses) passes through to
  the `EnhancedJSONEncoder` path exactly as before — adoption is
  per-DTO, not flag-day.
- **Decode is strict on known kinds, tolerant on unknown kinds**: a
  registered kind reconstructs through the DTO’s `from_dict` (a missing
  required field raises loudly); an unknown kind passes through
  unchanged with the envelope intact, so a host without the result’s
  interface library degrades to today’s dict behavior instead of
  failing.
- **One deliberate divergence from the P6 law**: results are a transport
  TERMINUS (consumed, not re-persisted), so extra keys on a known kind
  are dropped with a debug log rather than raising — version skew
  between a worker env (snapshot installs) and a host env must not
  hard-fail a run. Surface-drift visibility is the manifest
  live-companion’s job, not the result path’s.

Direction note: this is the RESULT direction (worker → host). Typed task
ARGUMENTS (host → worker) ride the CR-17 adapter routing work (stage 4);
until then `_prepare_payload` / `FileBackedDTO` cover the input
direction.

------------------------------------------------------------------------

### flat_from_dict

``` python

def flat_from_dict(
    cls, # The wire DTO dataclass being reconstructed
    d:dict, # The envelope's "data" payload
):

```

*Default reconstruction for FLAT wire DTOs (no nested-DTO fields).*

Filters the payload to the dataclass’s declared fields (unknown extras
are dropped with a debug log — transport-terminus tolerance, see the
envelope design note) and lets the constructor enforce required fields
(a missing required field raises TypeError loudly). DTOs with nested DTO
fields (e.g. a result carrying a list of typed items) must define their
own `from_dict` classmethod; `@wire_type` only attaches this default
when the class has none.

------------------------------------------------------------------------

### wire_type

``` python

def wire_type(
    kind:str, # Stable wire discriminator, e.g. "transcription.result"
)->Callable: # Class decorator

```

*Register a dataclass as a typed wire DTO under `kind`.*

- The class must be a dataclass (encode falls back to
  `dataclasses.asdict` when it defines no `to_dict`).
- If the class defines no `from_dict`, the flat default
  (`flat_from_dict`) is attached; nested DTOs define their own.
- Re-registering the same LOGICAL class (qualname match; the module is
  ignored because nbdev’s literate workflow defines each class twice —
  in-notebook `__main__` + the exported module) replaces the decode
  entry; a DIFFERENT class claiming an already-registered kind raises
  ValueError.

------------------------------------------------------------------------

### wire_encode

``` python

def wire_encode(
    obj:Any, # A task result (any shape)
)->Any: # Tagged envelope dict for registered DTOs; `obj` unchanged otherwise

```

*Wrap a registered wire DTO in its tagged envelope (worker side).*

Exact-type lookup: subclasses are NOT encoded under the parent’s kind
(they pass through unregistered, preserving today’s behavior). Payload
preference: the DTO’s own `to_dict()` when defined, else
`dataclasses.asdict` (recursive — nested dataclasses flatten).

------------------------------------------------------------------------

### wire_decode

``` python

def wire_decode(
    obj:Any, # A JSON-decoded response body (any shape)
)->Any: # The typed DTO for known kinds; `obj` unchanged otherwise

```

*Reconstruct a typed result from its tagged envelope (host side).*

Known kind -\> the registered class’s `from_dict` (strict: a missing
required field raises). Unknown kind -\> the dict passes through
UNCHANGED with the envelope intact (tolerant degradation for hosts
without the result’s interface library). Untagged values pass through.

``` python
# Wire-format executable tests (the cross-boundary payload discipline):
# round-trips simulate the real boundary — encode -> json.dumps -> json.loads
# -> decode — never in-memory shortcuts.
import json as _json
from dataclasses import dataclass, field
from typing import List, Optional


@wire_type("test.flat")
@dataclass
class _FlatResult:
    text: str
    confidence: Optional[float] = None
    metadata: dict = field(default_factory=dict)


@dataclass
class _Item:
    text: str
    start_time: float
    end_time: float


@wire_type("test.nested")
@dataclass
class _NestedResult:
    items: List[_Item]
    metadata: dict = field(default_factory=dict)

    @classmethod
    def from_dict(cls, d: dict) -> "_NestedResult":
        return cls(items=[_Item(**i) for i in d.get("items", [])],
                   metadata=d.get("metadata", {}) or {})


def _roundtrip(obj):
    return wire_decode(_json.loads(_json.dumps(wire_encode(obj))))


# 1. Flat DTO round-trips typed through the simulated boundary.
flat = _FlatResult(text="hello", confidence=0.9, metadata={"lang": "en"})
back = _roundtrip(flat)
assert isinstance(back, _FlatResult) and back == flat, back

# 2. Nested DTO round-trips through its custom from_dict (items re-typed).
nested = _NestedResult(items=[_Item("a", 0.0, 1.0), _Item("b", 1.0, 2.0)])
nback = _roundtrip(nested)
assert isinstance(nback, _NestedResult) and isinstance(nback.items[0], _Item)
assert nback == nested

# 3. Unregistered objects pass through encode unchanged (plain dicts /
#    untyped capability results keep today's behavior).
plain = {"rows": [[1, 2]], "count": 2}
assert wire_encode(plain) is plain
assert wire_decode(plain) is plain

# 4. Unknown kind passes through decode UNCHANGED, envelope intact
#    (tolerant degradation; lossless if ever re-serialized).
foreign = {WIRE_KIND_KEY: "some.future/kind", WIRE_DATA_KEY: {"x": 1}}
assert wire_decode(foreign) is foreign

# 5. Exact-type encode: a subclass is NOT encoded under the parent's kind.
@dataclass
class _FlatSubclass(_FlatResult):
    pass

sub = _FlatSubclass(text="sub")
assert wire_encode(sub) is sub

# 6. Duplicate-kind guard: a DIFFERENT class claiming a taken kind raises;
#    re-registering the same logical class (same qualname; module ignored — nbdev defines classes twice) replaces quietly.
try:
    @wire_type("test.flat")
    @dataclass
    class _Imposter:
        y: int = 0
    raise AssertionError("duplicate kind must raise")
except ValueError:
    pass

# 7. Transport-terminus tolerance: extras dropped (debug-logged), missing
#    required raises loudly.
tolerant = wire_decode({WIRE_KIND_KEY: "test.flat",
                        WIRE_DATA_KEY: {"text": "t", "new_field_from_future": 1}})
assert tolerant == _FlatResult(text="t")
try:
    wire_decode({WIRE_KIND_KEY: "test.flat", WIRE_DATA_KEY: {"confidence": 0.5}})
    raise AssertionError("missing required field must raise")
except TypeError:
    pass

print("typed wire envelope tests OK")
```

    typed wire envelope tests OK

## Per-call envelope (CR-14 / the surviving CR-15 control channel)

The substrate-owned per-call identity + control block. Today `job_id`
reaches the worker only as a domain kwarg that polite capability authors
echo into log messages (the observational pass found failures — the case
diagnosis needs — carry NO identity at all). The envelope makes call
identity a wire-level concern:

- **Host side**: the queue sets the contextvar around each job
  execution; the proxy injects `"envelope": {...}` as a TOP-LEVEL body
  key on `/execute`, `/execute_stream`, and `/task` (never inside
  `kwargs` — capability code never sees it; old workers ignore unknown
  top-level keys).
- **Worker side**: the endpoint decodes it back into the SAME
  contextvar, so the `DiagnosticsLogHandler` stamps every record with
  exact job identity (`contextvars.copy_context().run(...)` carries it
  into the executor thread — `run_in_executor` does not copy context by
  itself).
- **`control`** carries per-call flags (`force` cache-bypass et al — the
  4th CR-15 kwarg category) so they stop riding the typed task args.

Tolerant both directions: unknown envelope keys are ignored on decode
(forward compat); an absent envelope leaves the contextvar `None`
(records honestly unattributed, never heuristically attributed).

------------------------------------------------------------------------

### get_call_envelope

``` python

def get_call_envelope(
    
)->Optional:

```

*The current call envelope, or None outside any call span.*

------------------------------------------------------------------------

### reset_call_envelope

``` python

def reset_call_envelope(
    token:Token
)->None:

```

*Restore the prior envelope (always pair with `set_call_envelope` in
finally).*

------------------------------------------------------------------------

### set_call_envelope

``` python

def set_call_envelope(
    env:Optional
)->Token:

```

*Set the current call envelope; returns the token for
`reset_call_envelope`.*

------------------------------------------------------------------------

### CallEnvelope

``` python

def CallEnvelope(
    job_id:Optional=None, run_id:Optional=None, composition_id:Optional=None, node_id:Optional=None,
    actor:Optional=None, control:dict=<factory>
)->None:

```

*Substrate-owned per-call identity + control block (CR-14 / CR-15).*

Travels as a top-level `"envelope"` key on every proxy→worker call body.
All fields optional — an envelope-less call (direct proxy use, old
hosts) simply yields unattributed records, never a failure.

``` python
# Envelope wire-format executable tests (same boundary-simulation discipline).
import contextvars as _cv

# 1. Round-trip through the simulated JSON boundary; None fields dropped.
env = CallEnvelope(job_id="j-1", run_id="r-1", control={"force": True})
wire_form = _json.loads(_json.dumps(env.to_wire()))
assert wire_form == {"job_id": "j-1", "run_id": "r-1", "control": {"force": True}}
back = CallEnvelope.from_wire(wire_form)
assert back == env

# 2. Tolerant decode: unknown keys from a future host are ignored.
future = CallEnvelope.from_wire({"job_id": "j-2", "tenant_id": "future-key"})
assert future.job_id == "j-2" and future.control == {}

# 3. Empty/absent envelope decodes to all-None (honestly unattributed).
assert CallEnvelope.from_wire({}) == CallEnvelope()
assert CallEnvelope.from_wire(None) == CallEnvelope()

# 4. Contextvar set/reset pairing + executor-thread propagation via
#    copy_context (the run_in_executor gotcha the worker endpoints handle).
assert get_call_envelope() is None
token = set_call_envelope(env)
try:
    assert get_call_envelope() is env
    seen = {}
    ctx = _cv.copy_context()
    import threading
    t = threading.Thread(target=lambda: seen.update(
        inside=ctx.run(get_call_envelope)))
    t.start(); t.join()
    assert seen["inside"] is env  # copy_context carries it into the thread
    bare = {}
    t2 = threading.Thread(target=lambda: bare.update(inside=get_call_envelope()))
    t2.start(); t2.join()
    assert bare["inside"] is None  # without copy_context the thread sees None
finally:
    reset_call_envelope(token)
assert get_call_envelope() is None

print("call envelope tests OK")
```

    call envelope tests OK

## In-worker accounts (CR-14 follow-up)

The worker-side half of the substrate-family account events
(TASK_ACCOUNT / RESULT_SAVED / CACHE_HIT — the `SubstrateEventType`
reserved vocabulary). The derived-not-authored rule holds: workers never
write the journal; they RECORD accounts during a call span and the
records ride the wire back as a response **header**, where the HOST
journals them with `worker_reported=True` (single-writer-class rule — a
compromised worker still cannot write the audit record directly, and the
host stamps the receiving-side identity).

Why a header and not a body wrapper: the response body is the RESULT’s
contract (typed wire envelope / capability dict) — wrapping it would
break any old host talking to a new worker. A header is invisible to old
proxies in both skew directions (the K6 version-skew posture). The error
path attaches the same header on the `_job_error` 500 body — a FAILED
call still reports the accounts recorded before the failure (e.g. a save
that succeeded before a later crash).

Honest limit: accounts ride the UNARY paths (`/execute`, `/task`) only.
`/execute_stream` responses have sent their headers before execution
ends; a terminal sentinel chunk (the `_job_error` dialect) is the
seam-admitted carrier if a streaming adopter ever needs accounts.

Writers: the worker itself (TASK_ACCOUNT at `/task` completion — zero
capability cooperation) and the interface-lib storage helpers (T29
`save_with_logging` → RESULT_SAVED, `get_cached` → CACHE_HIT — recorded
at the helper boundary, so every capability using the helpers reports
for free). `record_account` outside a capture span is a silent no-op
(standalone/dev runs, host-process imports).

------------------------------------------------------------------------

### drain_accounts

``` python

def drain_accounts(
    
)->list:

```

*Return + clear the current span’s recorded accounts (\[\] outside a
span* or when nothing was recorded). The worker response path calls this
once when building the response headers.

------------------------------------------------------------------------

### record_account

``` python

def record_account(
    event_type:str, # SubstrateEventType value (task_account / result_saved / cache_hit / ...)
    payload:Optional=None, # Structured detail (references + hashes, never content)
)->None:

```

*Record one substrate-family account for the current call span.*

Called by the worker itself and by interface-lib storage helpers. Silent
no-op outside a capture span (standalone runs, host imports) — the
envelope-less-call posture applied to accounts.

------------------------------------------------------------------------

### begin_account_capture

``` python

def begin_account_capture(
    
)->None:

```

*Start a fresh account list for the current call span (worker endpoint*
entry; same no-reset semantics as the envelope — the ASGI request task’s
context dies with the request).

``` python
# Account-capture executable tests (capture span + executor-thread sharing +
# the no-span no-op + drain-once semantics).
import threading as _thr

# 1. No capture span: record_account is a silent no-op; drain yields [].
record_account("result_saved", {"row_job_id": "x"})
assert drain_accounts() == []

# 2. Within a span: records accumulate and drain ONCE.
begin_account_capture()
record_account("cache_hit", {"row_job_id": "j-1"})
record_account("result_saved")  # payload defaults to {}
got = drain_accounts()
assert got == [
    {"event_type": "cache_hit", "payload": {"row_job_id": "j-1"}},
    {"event_type": "result_saved", "payload": {}},
]
assert drain_accounts() == []  # drained — second call yields nothing

# 3. Executor-thread sharing: copy_context AFTER begin gives the thread the
#    SAME list object (the worker's ctx.run pattern); appends are visible to
#    the draining request task.
begin_account_capture()
ctx = _cv.copy_context()
t = _thr.Thread(target=lambda: ctx.run(
    record_account, "task_account", {"task": "t", "ok": True}))
t.start(); t.join()
assert drain_accounts() == [
    {"event_type": "task_account", "payload": {"task": "t", "ok": True}}]

# 4. Header round-trip shape: ASCII JSON survives the simulated boundary.
begin_account_capture()
record_account("result_saved", {"text_hash": "sha256:abc", "n": 1})
hdr = _json.dumps(drain_accounts())
assert hdr.isascii()
assert _json.loads(hdr)[0]["payload"]["text_hash"] == "sha256:abc"

print("account capture tests OK")
```

    account capture tests OK
