Typed Wire Layer

Typed data transfer at the worker boundary — the zero-copy FileBackedDTO

FileBackedDTO Protocol

The FileBackedDTO protocol defines objects that can serialize themselves to disk for zero-copy transfer between Host and Worker processes. When the Proxy detects an argument implementing this protocol, it calls to_temp_file() and sends the file path instead of the data.


FileBackedDTO


def FileBackedDTO(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer.

# Test FileBackedDTO protocol detection
import tempfile

class MockAudioData:
    """Example class implementing FileBackedDTO."""
    
    def __init__(self, data: bytes):
        self._data = data
    
    def to_temp_file(self) -> str:
        """Save to temp file and return path."""
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f:
            f.write(self._data)
            return f.name

# Check if it implements the protocol
audio = MockAudioData(b"fake audio data")
print(f"MockAudioData implements FileBackedDTO: {isinstance(audio, FileBackedDTO)}")
print(f"Temp file path: {audio.to_temp_file()}")

# A regular string does not implement the protocol
print(f"str implements FileBackedDTO: {isinstance('hello', FileBackedDTO)}")
MockAudioData implements FileBackedDTO: True
Temp file path: /tmp/tmpchnx7qag.wav
str implements FileBackedDTO: False

Typed result envelope

The typed result envelope is how task results keep their type across the worker HTTP/JSON boundary. Today a worker serializes execute() results via EnhancedJSONEncoder (dataclasses.asdict) — the type information is destroyed at the boundary, every consumer re-extracts fields from anonymous dicts, and each workflow core re-pays the dict-or-object tolerance tax (field_of; evidence E5/D10/C7 + 10 inline e2e-script copies).

Design (serialization precedent: the cjm-context-graph-primitives tagged-dict law, stage-1 ledger P5/P6):

  • Dict-primary, self-describing: {"__wire__": "<kind>", "data": {...}}.
  • Encode is exact-type: only DTO classes registered via @wire_type get the envelope; everything else (plain dicts from dispatcher capabilities, primitives, unregistered dataclasses) passes through to the EnhancedJSONEncoder path exactly as before — adoption is per-DTO, not flag-day.
  • Decode is strict on known kinds, tolerant on unknown kinds: a registered kind reconstructs through the DTO’s from_dict (a missing required field raises loudly); an unknown kind passes through unchanged with the envelope intact, so a host without the result’s interface library degrades to today’s dict behavior instead of failing.
  • One deliberate divergence from the P6 law: results are a transport TERMINUS (consumed, not re-persisted), so extra keys on a known kind are dropped with a debug log rather than raising — version skew between a worker env (snapshot installs) and a host env must not hard-fail a run. Surface-drift visibility is the manifest live-companion’s job, not the result path’s.

Direction note: this is the RESULT direction (worker → host). Typed task ARGUMENTS (host → worker) ride the CR-17 adapter routing work (stage 4); until then _prepare_payload / FileBackedDTO cover the input direction.


flat_from_dict


def flat_from_dict(
    cls, # The wire DTO dataclass being reconstructed
    d:dict, # The envelope's "data" payload
):

Default reconstruction for FLAT wire DTOs (no nested-DTO fields).

Filters the payload to the dataclass’s declared fields (unknown extras are dropped with a debug log — transport-terminus tolerance, see the envelope design note) and lets the constructor enforce required fields (a missing required field raises TypeError loudly). DTOs with nested DTO fields (e.g. a result carrying a list of typed items) must define their own from_dict classmethod; @wire_type only attaches this default when the class has none.


wire_type


def wire_type(
    kind:str, # Stable wire discriminator, e.g. "transcription.result"
)->Callable: # Class decorator

Register a dataclass as a typed wire DTO under kind.

  • The class must be a dataclass (encode falls back to dataclasses.asdict when it defines no to_dict).
  • If the class defines no from_dict, the flat default (flat_from_dict) is attached; nested DTOs define their own.
  • Re-registering the same LOGICAL class (qualname match; the module is ignored because nbdev’s literate workflow defines each class twice — in-notebook __main__ + the exported module) replaces the decode entry; a DIFFERENT class claiming an already-registered kind raises ValueError.

wire_encode


def wire_encode(
    obj:Any, # A task result (any shape)
)->Any: # Tagged envelope dict for registered DTOs; `obj` unchanged otherwise

Wrap a registered wire DTO in its tagged envelope (worker side).

Exact-type lookup: subclasses are NOT encoded under the parent’s kind (they pass through unregistered, preserving today’s behavior). Payload preference: the DTO’s own to_dict() when defined, else dataclasses.asdict (recursive — nested dataclasses flatten).


wire_decode


def wire_decode(
    obj:Any, # A JSON-decoded response body (any shape)
)->Any: # The typed DTO for known kinds; `obj` unchanged otherwise

Reconstruct a typed result from its tagged envelope (host side).

Known kind -> the registered class’s from_dict (strict: a missing required field raises). Unknown kind -> the dict passes through UNCHANGED with the envelope intact (tolerant degradation for hosts without the result’s interface library). Untagged values pass through.

# Wire-format executable tests (the cross-boundary payload discipline):
# round-trips simulate the real boundary — encode -> json.dumps -> json.loads
# -> decode — never in-memory shortcuts.
import json as _json
from dataclasses import dataclass, field
from typing import List, Optional


@wire_type("test.flat")
@dataclass
class _FlatResult:
    text: str
    confidence: Optional[float] = None
    metadata: dict = field(default_factory=dict)


@dataclass
class _Item:
    text: str
    start_time: float
    end_time: float


@wire_type("test.nested")
@dataclass
class _NestedResult:
    items: List[_Item]
    metadata: dict = field(default_factory=dict)

    @classmethod
    def from_dict(cls, d: dict) -> "_NestedResult":
        return cls(items=[_Item(**i) for i in d.get("items", [])],
                   metadata=d.get("metadata", {}) or {})


def _roundtrip(obj):
    return wire_decode(_json.loads(_json.dumps(wire_encode(obj))))


# 1. Flat DTO round-trips typed through the simulated boundary.
flat = _FlatResult(text="hello", confidence=0.9, metadata={"lang": "en"})
back = _roundtrip(flat)
assert isinstance(back, _FlatResult) and back == flat, back

# 2. Nested DTO round-trips through its custom from_dict (items re-typed).
nested = _NestedResult(items=[_Item("a", 0.0, 1.0), _Item("b", 1.0, 2.0)])
nback = _roundtrip(nested)
assert isinstance(nback, _NestedResult) and isinstance(nback.items[0], _Item)
assert nback == nested

# 3. Unregistered objects pass through encode unchanged (plain dicts /
#    untyped capability results keep today's behavior).
plain = {"rows": [[1, 2]], "count": 2}
assert wire_encode(plain) is plain
assert wire_decode(plain) is plain

# 4. Unknown kind passes through decode UNCHANGED, envelope intact
#    (tolerant degradation; lossless if ever re-serialized).
foreign = {WIRE_KIND_KEY: "some.future/kind", WIRE_DATA_KEY: {"x": 1}}
assert wire_decode(foreign) is foreign

# 5. Exact-type encode: a subclass is NOT encoded under the parent's kind.
@dataclass
class _FlatSubclass(_FlatResult):
    pass

sub = _FlatSubclass(text="sub")
assert wire_encode(sub) is sub

# 6. Duplicate-kind guard: a DIFFERENT class claiming a taken kind raises;
#    re-registering the same logical class (same qualname; module ignored — nbdev defines classes twice) replaces quietly.
try:
    @wire_type("test.flat")
    @dataclass
    class _Imposter:
        y: int = 0
    raise AssertionError("duplicate kind must raise")
except ValueError:
    pass

# 7. Transport-terminus tolerance: extras dropped (debug-logged), missing
#    required raises loudly.
tolerant = wire_decode({WIRE_KIND_KEY: "test.flat",
                        WIRE_DATA_KEY: {"text": "t", "new_field_from_future": 1}})
assert tolerant == _FlatResult(text="t")
try:
    wire_decode({WIRE_KIND_KEY: "test.flat", WIRE_DATA_KEY: {"confidence": 0.5}})
    raise AssertionError("missing required field must raise")
except TypeError:
    pass

print("typed wire envelope tests OK")
typed wire envelope tests OK

Per-call envelope (CR-14 / the surviving CR-15 control channel)

The substrate-owned per-call identity + control block. Today job_id reaches the worker only as a domain kwarg that polite capability authors echo into log messages (the observational pass found failures — the case diagnosis needs — carry NO identity at all). The envelope makes call identity a wire-level concern:

  • Host side: the queue sets the contextvar around each job execution; the proxy injects "envelope": {...} as a TOP-LEVEL body key on /execute, /execute_stream, and /task (never inside kwargs — capability code never sees it; old workers ignore unknown top-level keys).
  • Worker side: the endpoint decodes it back into the SAME contextvar, so the DiagnosticsLogHandler stamps every record with exact job identity (contextvars.copy_context().run(...) carries it into the executor thread — run_in_executor does not copy context by itself).
  • control carries per-call flags (force cache-bypass et al — the 4th CR-15 kwarg category) so they stop riding the typed task args.

Tolerant both directions: unknown envelope keys are ignored on decode (forward compat); an absent envelope leaves the contextvar None (records honestly unattributed, never heuristically attributed).


get_call_envelope


def get_call_envelope(
    
)->Optional:

The current call envelope, or None outside any call span.


reset_call_envelope


def reset_call_envelope(
    token:Token
)->None:

Restore the prior envelope (always pair with set_call_envelope in finally).


set_call_envelope


def set_call_envelope(
    env:Optional
)->Token:

Set the current call envelope; returns the token for reset_call_envelope.


CallEnvelope


def CallEnvelope(
    job_id:Optional=None, run_id:Optional=None, composition_id:Optional=None, node_id:Optional=None,
    actor:Optional=None, control:dict=<factory>
)->None:

Substrate-owned per-call identity + control block (CR-14 / CR-15).

Travels as a top-level "envelope" key on every proxy→worker call body. All fields optional — an envelope-less call (direct proxy use, old hosts) simply yields unattributed records, never a failure.

# Envelope wire-format executable tests (same boundary-simulation discipline).
import contextvars as _cv

# 1. Round-trip through the simulated JSON boundary; None fields dropped.
env = CallEnvelope(job_id="j-1", run_id="r-1", control={"force": True})
wire_form = _json.loads(_json.dumps(env.to_wire()))
assert wire_form == {"job_id": "j-1", "run_id": "r-1", "control": {"force": True}}
back = CallEnvelope.from_wire(wire_form)
assert back == env

# 2. Tolerant decode: unknown keys from a future host are ignored.
future = CallEnvelope.from_wire({"job_id": "j-2", "tenant_id": "future-key"})
assert future.job_id == "j-2" and future.control == {}

# 3. Empty/absent envelope decodes to all-None (honestly unattributed).
assert CallEnvelope.from_wire({}) == CallEnvelope()
assert CallEnvelope.from_wire(None) == CallEnvelope()

# 4. Contextvar set/reset pairing + executor-thread propagation via
#    copy_context (the run_in_executor gotcha the worker endpoints handle).
assert get_call_envelope() is None
token = set_call_envelope(env)
try:
    assert get_call_envelope() is env
    seen = {}
    ctx = _cv.copy_context()
    import threading
    t = threading.Thread(target=lambda: seen.update(
        inside=ctx.run(get_call_envelope)))
    t.start(); t.join()
    assert seen["inside"] is env  # copy_context carries it into the thread
    bare = {}
    t2 = threading.Thread(target=lambda: bare.update(inside=get_call_envelope()))
    t2.start(); t2.join()
    assert bare["inside"] is None  # without copy_context the thread sees None
finally:
    reset_call_envelope(token)
assert get_call_envelope() is None

print("call envelope tests OK")
call envelope tests OK

In-worker accounts (CR-14 follow-up)

The worker-side half of the substrate-family account events (TASK_ACCOUNT / RESULT_SAVED / CACHE_HIT — the SubstrateEventType reserved vocabulary). The derived-not-authored rule holds: workers never write the journal; they RECORD accounts during a call span and the records ride the wire back as a response header, where the HOST journals them with worker_reported=True (single-writer-class rule — a compromised worker still cannot write the audit record directly, and the host stamps the receiving-side identity).

Why a header and not a body wrapper: the response body is the RESULT’s contract (typed wire envelope / capability dict) — wrapping it would break any old host talking to a new worker. A header is invisible to old proxies in both skew directions (the K6 version-skew posture). The error path attaches the same header on the _job_error 500 body — a FAILED call still reports the accounts recorded before the failure (e.g. a save that succeeded before a later crash).

Honest limit: accounts ride the UNARY paths (/execute, /task) only. /execute_stream responses have sent their headers before execution ends; a terminal sentinel chunk (the _job_error dialect) is the seam-admitted carrier if a streaming adopter ever needs accounts.

Writers: the worker itself (TASK_ACCOUNT at /task completion — zero capability cooperation) and the interface-lib storage helpers (T29 save_with_logging → RESULT_SAVED, get_cached → CACHE_HIT — recorded at the helper boundary, so every capability using the helpers reports for free). record_account outside a capture span is a silent no-op (standalone/dev runs, host-process imports).


drain_accounts


def drain_accounts(
    
)->list:

Return + clear the current span’s recorded accounts ([] outside a span or when nothing was recorded). The worker response path calls this once when building the response headers.


record_account


def record_account(
    event_type:str, # SubstrateEventType value (task_account / result_saved / cache_hit / ...)
    payload:Optional=None, # Structured detail (references + hashes, never content)
)->None:

Record one substrate-family account for the current call span.

Called by the worker itself and by interface-lib storage helpers. Silent no-op outside a capture span (standalone runs, host imports) — the envelope-less-call posture applied to accounts.


begin_account_capture


def begin_account_capture(
    
)->None:

Start a fresh account list for the current call span (worker endpoint entry; same no-reset semantics as the envelope — the ASGI request task’s context dies with the request).

# Account-capture executable tests (capture span + executor-thread sharing +
# the no-span no-op + drain-once semantics).
import threading as _thr

# 1. No capture span: record_account is a silent no-op; drain yields [].
record_account("result_saved", {"row_job_id": "x"})
assert drain_accounts() == []

# 2. Within a span: records accumulate and drain ONCE.
begin_account_capture()
record_account("cache_hit", {"row_job_id": "j-1"})
record_account("result_saved")  # payload defaults to {}
got = drain_accounts()
assert got == [
    {"event_type": "cache_hit", "payload": {"row_job_id": "j-1"}},
    {"event_type": "result_saved", "payload": {}},
]
assert drain_accounts() == []  # drained — second call yields nothing

# 3. Executor-thread sharing: copy_context AFTER begin gives the thread the
#    SAME list object (the worker's ctx.run pattern); appends are visible to
#    the draining request task.
begin_account_capture()
ctx = _cv.copy_context()
t = _thr.Thread(target=lambda: ctx.run(
    record_account, "task_account", {"task": "t", "ok": True}))
t.start(); t.join()
assert drain_accounts() == [
    {"event_type": "task_account", "payload": {"task": "t", "ok": True}}]

# 4. Header round-trip shape: ASCII JSON survives the simulated boundary.
begin_account_capture()
record_account("result_saved", {"text_hash": "sha256:abc", "n": 1})
hdr = _json.dumps(drain_accounts())
assert hdr.isascii()
assert _json.loads(hdr)[0]["payload"]["text_hash"] == "sha256:abc"

print("account capture tests OK")
account capture tests OK