Typed data transfer at the worker boundary — the zero-copy FileBackedDTO
FileBackedDTO Protocol
The FileBackedDTO protocol defines objects that can serialize themselves to disk for zero-copy transfer between Host and Worker processes. When the Proxy detects an argument implementing this protocol, it calls to_temp_file() and sends the file path instead of the data.
Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer.
# Test FileBackedDTO protocol detectionimport tempfileclass MockAudioData:"""Example class implementing FileBackedDTO."""def__init__(self, data: bytes):self._data = datadef to_temp_file(self) ->str:"""Save to temp file and return path."""with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f: f.write(self._data)return f.name# Check if it implements the protocolaudio = MockAudioData(b"fake audio data")print(f"MockAudioData implements FileBackedDTO: {isinstance(audio, FileBackedDTO)}")print(f"Temp file path: {audio.to_temp_file()}")# A regular string does not implement the protocolprint(f"str implements FileBackedDTO: {isinstance('hello', FileBackedDTO)}")
The typed result envelope is how task results keep their type across the worker HTTP/JSON boundary. Today a worker serializes execute() results via EnhancedJSONEncoder (dataclasses.asdict) — the type information is destroyed at the boundary, every consumer re-extracts fields from anonymous dicts, and each workflow core re-pays the dict-or-object tolerance tax (field_of; evidence E5/D10/C7 + 10 inline e2e-script copies).
Design (serialization precedent: the cjm-context-graph-primitives tagged-dict law, stage-1 ledger P5/P6):
Encode is exact-type: only DTO classes registered via @wire_type get the envelope; everything else (plain dicts from dispatcher capabilities, primitives, unregistered dataclasses) passes through to the EnhancedJSONEncoder path exactly as before — adoption is per-DTO, not flag-day.
Decode is strict on known kinds, tolerant on unknown kinds: a registered kind reconstructs through the DTO’s from_dict (a missing required field raises loudly); an unknown kind passes through unchanged with the envelope intact, so a host without the result’s interface library degrades to today’s dict behavior instead of failing.
One deliberate divergence from the P6 law: results are a transport TERMINUS (consumed, not re-persisted), so extra keys on a known kind are dropped with a debug log rather than raising — version skew between a worker env (snapshot installs) and a host env must not hard-fail a run. Surface-drift visibility is the manifest live-companion’s job, not the result path’s.
Direction note: this is the RESULT direction (worker → host). Typed task ARGUMENTS (host → worker) ride the CR-17 adapter routing work (stage 4); until then _prepare_payload / FileBackedDTO cover the input direction.
flat_from_dict
def flat_from_dict( cls, # The wire DTO dataclass being reconstructed d:dict, # The envelope's "data" payload):
Default reconstruction for FLAT wire DTOs (no nested-DTO fields).
Filters the payload to the dataclass’s declared fields (unknown extras are dropped with a debug log — transport-terminus tolerance, see the envelope design note) and lets the constructor enforce required fields (a missing required field raises TypeError loudly). DTOs with nested DTO fields (e.g. a result carrying a list of typed items) must define their own from_dict classmethod; @wire_type only attaches this default when the class has none.
wire_type
def wire_type( kind:str, # Stable wire discriminator, e.g. "transcription.result")->Callable: # Class decorator
Register a dataclass as a typed wire DTO under kind.
The class must be a dataclass (encode falls back to dataclasses.asdict when it defines no to_dict).
If the class defines no from_dict, the flat default (flat_from_dict) is attached; nested DTOs define their own.
Re-registering the same LOGICAL class (qualname match; the module is ignored because nbdev’s literate workflow defines each class twice — in-notebook __main__ + the exported module) replaces the decode entry; a DIFFERENT class claiming an already-registered kind raises ValueError.
wire_encode
def wire_encode( obj:Any, # A task result (any shape))->Any: # Tagged envelope dict for registered DTOs; `obj` unchanged otherwise
Wrap a registered wire DTO in its tagged envelope (worker side).
Exact-type lookup: subclasses are NOT encoded under the parent’s kind (they pass through unregistered, preserving today’s behavior). Payload preference: the DTO’s own to_dict() when defined, else dataclasses.asdict (recursive — nested dataclasses flatten).
wire_decode
def wire_decode( obj:Any, # A JSON-decoded response body (any shape))->Any: # The typed DTO for known kinds; `obj` unchanged otherwise
Reconstruct a typed result from its tagged envelope (host side).
Known kind -> the registered class’s from_dict (strict: a missing required field raises). Unknown kind -> the dict passes through UNCHANGED with the envelope intact (tolerant degradation for hosts without the result’s interface library). Untagged values pass through.
# Wire-format executable tests (the cross-boundary payload discipline):# round-trips simulate the real boundary — encode -> json.dumps -> json.loads# -> decode — never in-memory shortcuts.import json as _jsonfrom dataclasses import dataclass, fieldfrom typing import List, Optional@wire_type("test.flat")@dataclassclass _FlatResult: text: str confidence: Optional[float] =None metadata: dict= field(default_factory=dict)@dataclassclass _Item: text: str start_time: float end_time: float@wire_type("test.nested")@dataclassclass _NestedResult: items: List[_Item] metadata: dict= field(default_factory=dict)@classmethoddef from_dict(cls, d: dict) ->"_NestedResult":return cls(items=[_Item(**i) for i in d.get("items", [])], metadata=d.get("metadata", {}) or {})def _roundtrip(obj):return wire_decode(_json.loads(_json.dumps(wire_encode(obj))))# 1. Flat DTO round-trips typed through the simulated boundary.flat = _FlatResult(text="hello", confidence=0.9, metadata={"lang": "en"})back = _roundtrip(flat)assertisinstance(back, _FlatResult) and back == flat, back# 2. Nested DTO round-trips through its custom from_dict (items re-typed).nested = _NestedResult(items=[_Item("a", 0.0, 1.0), _Item("b", 1.0, 2.0)])nback = _roundtrip(nested)assertisinstance(nback, _NestedResult) andisinstance(nback.items[0], _Item)assert nback == nested# 3. Unregistered objects pass through encode unchanged (plain dicts /# untyped capability results keep today's behavior).plain = {"rows": [[1, 2]], "count": 2}assert wire_encode(plain) is plainassert wire_decode(plain) is plain# 4. Unknown kind passes through decode UNCHANGED, envelope intact# (tolerant degradation; lossless if ever re-serialized).foreign = {WIRE_KIND_KEY: "some.future/kind", WIRE_DATA_KEY: {"x": 1}}assert wire_decode(foreign) is foreign# 5. Exact-type encode: a subclass is NOT encoded under the parent's kind.@dataclassclass _FlatSubclass(_FlatResult):passsub = _FlatSubclass(text="sub")assert wire_encode(sub) is sub# 6. Duplicate-kind guard: a DIFFERENT class claiming a taken kind raises;# re-registering the same logical class (same qualname; module ignored — nbdev defines classes twice) replaces quietly.try:@wire_type("test.flat")@dataclassclass _Imposter: y: int=0raiseAssertionError("duplicate kind must raise")exceptValueError:pass# 7. Transport-terminus tolerance: extras dropped (debug-logged), missing# required raises loudly.tolerant = wire_decode({WIRE_KIND_KEY: "test.flat", WIRE_DATA_KEY: {"text": "t", "new_field_from_future": 1}})assert tolerant == _FlatResult(text="t")try: wire_decode({WIRE_KIND_KEY: "test.flat", WIRE_DATA_KEY: {"confidence": 0.5}})raiseAssertionError("missing required field must raise")exceptTypeError:passprint("typed wire envelope tests OK")
typed wire envelope tests OK
Per-call envelope (CR-14 / the surviving CR-15 control channel)
The substrate-owned per-call identity + control block. Today job_id reaches the worker only as a domain kwarg that polite capability authors echo into log messages (the observational pass found failures — the case diagnosis needs — carry NO identity at all). The envelope makes call identity a wire-level concern:
Host side: the queue sets the contextvar around each job execution; the proxy injects "envelope": {...} as a TOP-LEVEL body key on /execute, /execute_stream, and /task (never inside kwargs — capability code never sees it; old workers ignore unknown top-level keys).
Worker side: the endpoint decodes it back into the SAME contextvar, so the DiagnosticsLogHandler stamps every record with exact job identity (contextvars.copy_context().run(...) carries it into the executor thread — run_in_executor does not copy context by itself).
control carries per-call flags (force cache-bypass et al — the 4th CR-15 kwarg category) so they stop riding the typed task args.
Tolerant both directions: unknown envelope keys are ignored on decode (forward compat); an absent envelope leaves the contextvar None (records honestly unattributed, never heuristically attributed).
get_call_envelope
def get_call_envelope()->Optional:
The current call envelope, or None outside any call span.
reset_call_envelope
def reset_call_envelope( token:Token)->None:
Restore the prior envelope (always pair with set_call_envelope in finally).
set_call_envelope
def set_call_envelope( env:Optional)->Token:
Set the current call envelope; returns the token for reset_call_envelope.
Substrate-owned per-call identity + control block (CR-14 / CR-15).
Travels as a top-level "envelope" key on every proxy→worker call body. All fields optional — an envelope-less call (direct proxy use, old hosts) simply yields unattributed records, never a failure.
# Envelope wire-format executable tests (same boundary-simulation discipline).import contextvars as _cv# 1. Round-trip through the simulated JSON boundary; None fields dropped.env = CallEnvelope(job_id="j-1", run_id="r-1", control={"force": True})wire_form = _json.loads(_json.dumps(env.to_wire()))assert wire_form == {"job_id": "j-1", "run_id": "r-1", "control": {"force": True}}back = CallEnvelope.from_wire(wire_form)assert back == env# 2. Tolerant decode: unknown keys from a future host are ignored.future = CallEnvelope.from_wire({"job_id": "j-2", "tenant_id": "future-key"})assert future.job_id =="j-2"and future.control == {}# 3. Empty/absent envelope decodes to all-None (honestly unattributed).assert CallEnvelope.from_wire({}) == CallEnvelope()assert CallEnvelope.from_wire(None) == CallEnvelope()# 4. Contextvar set/reset pairing + executor-thread propagation via# copy_context (the run_in_executor gotcha the worker endpoints handle).assert get_call_envelope() isNonetoken = set_call_envelope(env)try:assert get_call_envelope() is env seen = {} ctx = _cv.copy_context()import threading t = threading.Thread(target=lambda: seen.update( inside=ctx.run(get_call_envelope))) t.start(); t.join()assert seen["inside"] is env # copy_context carries it into the thread bare = {} t2 = threading.Thread(target=lambda: bare.update(inside=get_call_envelope())) t2.start(); t2.join()assert bare["inside"] isNone# without copy_context the thread sees Nonefinally: reset_call_envelope(token)assert get_call_envelope() isNoneprint("call envelope tests OK")
call envelope tests OK
In-worker accounts (CR-14 follow-up)
The worker-side half of the substrate-family account events (TASK_ACCOUNT / RESULT_SAVED / CACHE_HIT — the SubstrateEventType reserved vocabulary). The derived-not-authored rule holds: workers never write the journal; they RECORD accounts during a call span and the records ride the wire back as a response header, where the HOST journals them with worker_reported=True (single-writer-class rule — a compromised worker still cannot write the audit record directly, and the host stamps the receiving-side identity).
Why a header and not a body wrapper: the response body is the RESULT’s contract (typed wire envelope / capability dict) — wrapping it would break any old host talking to a new worker. A header is invisible to old proxies in both skew directions (the K6 version-skew posture). The error path attaches the same header on the _job_error 500 body — a FAILED call still reports the accounts recorded before the failure (e.g. a save that succeeded before a later crash).
Honest limit: accounts ride the UNARY paths (/execute, /task) only. /execute_stream responses have sent their headers before execution ends; a terminal sentinel chunk (the _job_error dialect) is the seam-admitted carrier if a streaming adopter ever needs accounts.
Writers: the worker itself (TASK_ACCOUNT at /task completion — zero capability cooperation) and the interface-lib storage helpers (T29 save_with_logging → RESULT_SAVED, get_cached → CACHE_HIT — recorded at the helper boundary, so every capability using the helpers reports for free). record_account outside a capture span is a silent no-op (standalone/dev runs, host-process imports).
drain_accounts
def drain_accounts()->list:
Return + clear the current span’s recorded accounts ([] outside a span or when nothing was recorded). The worker response path calls this once when building the response headers.
Record one substrate-family account for the current call span.
Called by the worker itself and by interface-lib storage helpers. Silent no-op outside a capture span (standalone runs, host imports) — the envelope-less-call posture applied to accounts.
begin_account_capture
def begin_account_capture()->None:
Start a fresh account list for the current call span (worker endpoint entry; same no-reset semantics as the envelope — the ASGI request task’s context dies with the request).
# Account-capture executable tests (capture span + executor-thread sharing +# the no-span no-op + drain-once semantics).import threading as _thr# 1. No capture span: record_account is a silent no-op; drain yields [].record_account("result_saved", {"row_job_id": "x"})assert drain_accounts() == []# 2. Within a span: records accumulate and drain ONCE.begin_account_capture()record_account("cache_hit", {"row_job_id": "j-1"})record_account("result_saved") # payload defaults to {}got = drain_accounts()assert got == [ {"event_type": "cache_hit", "payload": {"row_job_id": "j-1"}}, {"event_type": "result_saved", "payload": {}},]assert drain_accounts() == [] # drained — second call yields nothing# 3. Executor-thread sharing: copy_context AFTER begin gives the thread the# SAME list object (the worker's ctx.run pattern); appends are visible to# the draining request task.begin_account_capture()ctx = _cv.copy_context()t = _thr.Thread(target=lambda: ctx.run( record_account, "task_account", {"task": "t", "ok": True}))t.start(); t.join()assert drain_accounts() == [ {"event_type": "task_account", "payload": {"task": "t", "ok": True}}]# 4. Header round-trip shape: ASCII JSON survives the simulated boundary.begin_account_capture()record_account("result_saved", {"text_hash": "sha256:abc", "n": 1})hdr = _json.dumps(drain_accounts())assert hdr.isascii()assert _json.loads(hdr)[0]["payload"]["text_hash"] =="sha256:abc"print("account capture tests OK")