# A convert->align shape: convert -> align, mixed static + bound kwargs.
comp = Composition(nodes=[
CompositionNode("convert", "media-converter", {
"action": "convert", "input_path": "/tmp/a.mp3",
"output_format": "wav", "sample_rate": 16000, "channels": 1,
}),
CompositionNode("align", "forced-aligner", {
"audio": OutputRef("convert", "output_path"),
"text": "hello world",
}),
])
assert comp.fail_fast is True
assert isinstance(comp.nodes[1].kwargs["audio"], OutputRef)
assert comp.nodes[1].kwargs["audio"].node == "convert"
print("composition model OK")Composition Ports
The composition model
A composition is a static DAG of capability-invocation nodes submitted to the JobQueue as one unit. It REPLACES the pre-bound submit_sequence batch model (pass-2 Thread 4: the old model could not pipe step outputs to step inputs — _advance_sequence built every next job from kwargs frozen at submit time).
Three load-bearing properties:
- Bindings live inside kwargs. A node’s kwargs mix static values with
OutputRef(node, field)markers. There is no separate edge list — the DAG topology is derived by scanning kwargs for markers (the derive-from-code dialect applied to the composition itself), so declaration and behavior cannot drift apart. - Member jobs are created lazily. A node’s
Jobis constructed only when all of its dependencies have completed, because its kwargs do not exist until the upstream results do. This is the heart of execution-time binding. - Fan-out is host-constructed. The evidenced map shape (one list output → N downstream invocations) is built by the host reading the producer’s list and constructing one composition of N independent subgraphs with static per-item kwargs (stage-3 ledger, ratified amendment 2). A dynamically expanding map node is seam-admitted but not built — no adopter needs the host to not see the list.
The pure data + validation + binding-resolution layer lives here; the executor (multi-lane ready-set dispatch + resource-derived admission) lives in core/queue.ipynb.
NodeState
def NodeState(
args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):
State of one composition node (and, for the terminal subset, of a whole composition run).
skipped is composition-specific: a node whose transitive dependencies failed/cancelled can never run (its inputs will never exist) and is recorded as skipped rather than getting a Job at all. Composition-level status uses the running/completed/failed/cancelled subset.
OutputRef
def OutputRef(
node:str, field:Optional=None
)->None:
Binding marker: this kwarg’s value comes from an upstream node’s result.
Placed directly in a CompositionNode.kwargs value position. field=None binds the WHOLE result (fan-in folds); a field name extracts one field via extract_output_field (dict key or typed-result attribute). Frozen so markers are hashable + safely shareable across nodes.
CompositionNode
def CompositionNode(
id:str, capability_instance_id:str, kwargs:Dict=<factory>, priority:int=0, task_name:Optional=None,
method:Optional=None, control:Dict=<factory>
)->None:
One capability invocation in a composition.
kwargs mixes static values with OutputRef markers; the markers are scanned (top-level values only — nested containers are not searched, by design: evidence needs single-position bindings, and a nested-marker grammar is seam-admitted later) to derive the node’s dependencies.
Composition
def Composition(
nodes:List, fail_fast:bool=True, priority:int=0, run_id:Optional=None, actor:Optional=None
)->None:
A static DAG of capability-invocation nodes, submitted as one unit.
fail_fast=True (default, matching the audit-locked sequence default): on a member failure, pending independent members are cancelled, in-flight members run to completion, transitive dependents are skipped, and the composition lands failed. fail_fast=False is best-effort: independent members continue; only transitive dependents of the failure are skipped.
run_id / actor (CR-14 follow-up) are host-tier correlation tags stamped onto every lazily-created member Job (the submit(run_id=, actor=) analog for compositions) — NOT the composition run’s own id, which the queue assigns at submit.
Validation — derived edges + cycle check
validate_composition runs at submit time (mirroring submit_sequence’s upfront-validation precedent) and returns the derived dependency map that the run record carries for the rest of the composition’s life. It rejects duplicate node ids, references to unknown nodes, and cycles (Kahn’s algorithm; a self-reference is a cycle of length one).
CompositionValidationError
def CompositionValidationError(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
A composition failed submit-time validation (duplicate ids, unresolved OutputRef targets, or a dependency cycle).
validate_composition
def validate_composition(
comp:Composition, # Composition to validate
)->Dict: # node_id -> set of upstream node ids (the derived DAG)
Validate a composition and return its derived dependency map.
Raises CompositionValidationError on duplicate node ids, OutputRef targets that name no node in the composition, or dependency cycles. An empty composition is valid (returns {}) — the queue completes it at submit, mirroring the empty-sequence totality precedent.
# Self-contained (nbdev-prepare may parallelize test execution): the
# qwen3-e2e pipe shape, redefined locally.
comp = Composition(nodes=[
CompositionNode("convert", "media-converter", {
"action": "convert", "input_path": "/tmp/a.mp3",
"output_format": "wav", "sample_rate": 16000, "channels": 1,
}),
CompositionNode("align", "forced-aligner", {
"audio": OutputRef("convert", "output_path"),
"text": "hello world",
}),
])
# Happy path: derived edges match the markers.
deps = validate_composition(comp)
assert deps == {"convert": set(), "align": {"convert"}}
# Duplicate ids rejected.
try:
validate_composition(Composition(nodes=[
CompositionNode("a", "p", {}), CompositionNode("a", "p", {})]))
raise AssertionError("expected duplicate-id failure")
except CompositionValidationError as e:
assert "duplicate" in str(e)
# Unknown ref rejected.
try:
validate_composition(Composition(nodes=[
CompositionNode("a", "p", {"x": OutputRef("ghost")})]))
raise AssertionError("expected unknown-ref failure")
except CompositionValidationError as e:
assert "ghost" in str(e)
# Cycle rejected (a <-> b), and self-reference is a cycle of one.
try:
validate_composition(Composition(nodes=[
CompositionNode("a", "p", {"x": OutputRef("b")}),
CompositionNode("b", "p", {"y": OutputRef("a")})]))
raise AssertionError("expected cycle failure")
except CompositionValidationError as e:
assert "cycle" in str(e)
try:
validate_composition(Composition(nodes=[
CompositionNode("a", "p", {"x": OutputRef("a")})]))
raise AssertionError("expected self-cycle failure")
except CompositionValidationError as e:
assert "cycle" in str(e)
# Empty composition is valid.
assert validate_composition(Composition(nodes=[])) == {}
print("validation OK")Binding resolution — the field_of successor
extract_output_field is the SINGLE substrate-owned field resolver: dict results resolve by key (the multi-op tools — ffmpeg, graph — return dicts until their adapter splits at stages 4/8), typed wire DTOs resolve by attribute. Stage 2 retired the per-consumer field_of dict-or-object tolerance copies (E5/D10/C7 + 10 inline e2e sites); this is the one legitimate reincarnation, living at the binding seam where the substrate — not each consumer — owns the extraction.
CompositionBindingError
def CompositionBindingError(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
An OutputRef could not be resolved against the producer’s recorded result at execution time (missing producer result, missing key/attribute).
extract_output_field
def extract_output_field(
result:Any, # The producer node's recorded result (typed DTO or capability-side dict)
field:Optional, # Field to extract; None returns the whole result
producer:str='?', # Producer node id (error messages only)
)->Any: # The bound value
Extract a field from an upstream result for binding into a kwarg.
The single substrate-owned successor of the retired field_of helpers: dicts resolve by KEY (intent for capability-side dict results), everything else by ATTRIBUTE (typed wire DTOs). Missing fields raise CompositionBindingError loudly — silent shape-shifting is what stage 2 retired (F12 fail-loudly posture).
resolve_node_kwargs
def resolve_node_kwargs(
node:CompositionNode, # Node whose kwargs are being materialized
results:Dict, # Completed producers' results, keyed by node id
)->Dict: # kwargs with every OutputRef replaced by its bound value
Materialize a node’s kwargs by resolving its OutputRef markers.
Called by the executor at the moment a node becomes ready (all dependencies completed) — this is where execution-time binding actually happens. Static kwargs pass through untouched.
from dataclasses import dataclass as _dc
# Dict results resolve by key (the ffmpeg shape).
assert extract_output_field({"output_path": "/x.wav"}, "output_path") == "/x.wav"
# None binds the whole result.
whole = {"a": 1}
assert extract_output_field(whole, None) is whole
# Typed results resolve by attribute (the wire-DTO shape).
@_dc
class _FakeResult:
text: str = "hi"
assert extract_output_field(_FakeResult(), "text") == "hi"
# Missing key / attribute fail loudly.
for bad_result in ({"other": 1}, _FakeResult()):
try:
extract_output_field(bad_result, "nope", producer="conv")
raise AssertionError("expected binding failure")
except CompositionBindingError as e:
assert "conv" in str(e) and "nope" in str(e)
# resolve_node_kwargs: markers replaced, statics untouched.
n = CompositionNode("align", "qwen3", {
"audio": OutputRef("convert", "output_path"), "text": "hello"})
out = resolve_node_kwargs(n, {"convert": {"output_path": "/x.wav"}})
assert out == {"audio": "/x.wav", "text": "hello"}
# Unrecorded producer is an executor-ordering bug and raises.
try:
resolve_node_kwargs(n, {})
raise AssertionError("expected missing-producer failure")
except CompositionBindingError as e:
assert "convert" in str(e)
print("binding resolution OK")binding resolution OK
Run state — CompositionNodeRun / CompositionRun
The run record tracks per-node state for the composition’s whole life. The advancement logic (which nodes are ready, what gets skipped on failure, what terminal status the run derives) lives HERE as pure methods so it is unit-testable without a queue — the executor in queue.ipynb stays a thin integration: create lazy Jobs for ready nodes, record member outcomes, and re-scan.
CompositionNodeRun
def CompositionNodeRun(
node_id:str, state:NodeState=<NodeState.pending: 'pending'>, job_id:Optional=None, result:Any=None,
error:Optional=None
)->None:
Live state of one node within a composition run.
CompositionRun
def CompositionRun(
id:str, composition:Composition, deps:Dict, dependents:Dict, nodes_by_id:Dict, node_runs:Dict,
status:NodeState=<NodeState.running: 'running'>, cancel_requested:bool=False, submitted_at:datetime=<factory>,
completed_at:Optional=None
)->None:
Tracks a submitted composition through execution (lives in JobQueue._compositions).
Carries the validated dependency map (and its reverse) so advancement decisions are O(edges) lookups for the rest of the run. Composition-level status reuses the NodeState terminal subset: starts running, transitions to completed / failed / cancelled via derive_terminal_status once all_terminal().
cancel_requested records USER cancel intent (cancel_composition), distinguishing it from the executor’s fail-fast HOUSEKEEPING cancels of independent pending members after a failure — without the flag, a failure-driven run would derive cancelled instead of failed because its housekeeping cancels would dominate.
new_composition_run
def new_composition_run(
comp:Composition, # Composition to run (validated here)
run_id:str, # Run UUID (assigned by the queue)
)->CompositionRun: # Fresh run record with derived topology
Validate a composition and build its run record.
ready_nodes
def ready_nodes(
)->List: # Node ids that are pending with all dependencies completed
Nodes whose member Jobs can be created right now.
Scan order follows the composition’s node order (readability + deterministic dispatch among equally-ready nodes).
record_started
def record_started(
node_id:str, # Node whose member Job was just created/enqueued
job_id:str, # The member Job's id
)->None:
Mark a node running and bind it to its member Job.
record_result
def record_result(
node_id:str, # Node whose member Job reached terminal status
state:NodeState, # completed / failed / cancelled
result:Any=None, # Member job result (if completed)
error:Optional=None, # Structured failure (if failed/cancelled)
)->None:
Record a member job’s terminal outcome on its node.
skip_dependents
def skip_dependents(
node_id:str, # Node whose failure/cancellation poisons its downstream
)->List: # Node ids newly marked skipped (transitive)
Mark every still-pending transitive dependent of node_id skipped.
Skipped nodes never get a Job — their inputs can never exist. Runs regardless of fail_fast (dependents are unrunnable either way; fail_fast only governs INDEPENDENT pending members, which the executor cancels).
all_terminal
def all_terminal(
)->bool: # True when every node is in a terminal state
Whether the composition has nothing left to run or wait for.
derive_terminal_status
def derive_terminal_status(
)->NodeState: # cancelled / failed / completed
Derive the composition-level terminal status from member outcomes.
Precedence: 1. USER cancel intent (cancel_requested) dominates everything. 2. A member failure under fail_fast lands the run failed — the executor’s housekeeping cancels of independent pending members do NOT flip it to cancelled (that’s what cancel_requested distinguishes). 3. A directly-cancelled member (job-level cancel, no failure, no composition intent) lands the run cancelled. 4. Otherwise completed — including best-effort (fail_fast=False) runs with failed members: “we attempted everything”, matching the sequence semantics this replaces. Per-node outcomes stay inspectable on node_runs either way. (skipped never appears without a failed or cancelled member upstream of it, so it needs no clause of its own.)
results_by_node
def results_by_node(
)->Dict: # node_id -> result, for completed nodes only
Completed members’ results keyed by node id (what host folds consume, and what resolve_node_kwargs reads at advancement time).
import dataclasses
# Self-contained (nbdev-prepare may parallelize test execution): the
# qwen3-e2e pipe shape, redefined locally.
comp = Composition(nodes=[
CompositionNode("convert", "media-converter", {
"action": "convert", "input_path": "/tmp/a.mp3",
"output_format": "wav", "sample_rate": 16000, "channels": 1,
}),
CompositionNode("align", "forced-aligner", {
"audio": OutputRef("convert", "output_path"),
"text": "hello world",
}),
])
# --- Pipe progression: align becomes ready only after convert completes.
run = new_composition_run(comp, "run-1")
assert run.ready_nodes() == ["convert"]
run.record_started("convert", "job-1")
assert run.ready_nodes() == [] # nothing ready while convert runs
run.record_result("convert", NodeState.completed, result={"output_path": "/x.wav"})
assert run.ready_nodes() == ["align"]
resolved = resolve_node_kwargs(run.nodes_by_id["align"], run.results_by_node())
assert resolved["audio"] == "/x.wav"
run.record_started("align", "job-2")
run.record_result("align", NodeState.completed, result={"items": []})
assert run.all_terminal() and run.derive_terminal_status() == NodeState.completed
# --- Parallel fan-in: VAD ∥ FA both ready immediately (D8 shape).
par = Composition(nodes=[
CompositionNode("vad", "silero", {"media_path": "/seg.wav"}),
CompositionNode("fa", "qwen3", {"audio": "/seg.wav", "text": "t"}),
])
prun = new_composition_run(par, "run-2")
assert prun.ready_nodes() == ["vad", "fa"]
# --- Failure: dependents skipped transitively; independents untouched.
diamond = Composition(nodes=[
CompositionNode("a", "p", {}),
CompositionNode("b", "p", {"x": OutputRef("a")}),
CompositionNode("c", "p", {"x": OutputRef("b")}),
CompositionNode("d", "p", {}), # independent of a/b/c
])
drun = new_composition_run(diamond, "run-3")
drun.record_started("a", "j-a")
drun.record_result("a", NodeState.failed, error=None)
assert sorted(drun.skip_dependents("a")) == ["b", "c"]
assert drun.node_runs["d"].state == NodeState.pending # executor decides d's fate (fail_fast)
drun.record_result("d", NodeState.cancelled) # fail_fast executor HOUSEKEEPING cancel
assert drun.all_terminal()
# Housekeeping cancels do NOT flip a failure-driven run to cancelled.
assert drun.derive_terminal_status() == NodeState.failed
# --- USER cancel intent dominates, including over failures.
crun = new_composition_run(diamond, "run-3b")
crun.record_result("a", NodeState.failed)
crun.skip_dependents("a")
crun.cancel_requested = True
crun.record_result("d", NodeState.cancelled)
assert crun.derive_terminal_status() == NodeState.cancelled
# --- Direct member cancel (no failure, no intent) lands cancelled.
mrun = new_composition_run(diamond, "run-3c")
mrun.record_result("a", NodeState.cancelled)
mrun.skip_dependents("a")
mrun.record_result("d", NodeState.completed)
assert mrun.derive_terminal_status() == NodeState.cancelled
# --- fail_fast derivation without cancellation: failed + skipped -> failed.
drun2 = new_composition_run(diamond, "run-4")
drun2.record_result("a", NodeState.failed)
drun2.skip_dependents("a")
drun2.record_result("d", NodeState.completed)
assert drun2.derive_terminal_status() == NodeState.failed
# --- Best-effort: failures recorded, run still lands completed.
be = dataclasses.replace(diamond, fail_fast=False)
brun = new_composition_run(be, "run-5")
brun.record_result("a", NodeState.failed)
brun.skip_dependents("a")
brun.record_result("d", NodeState.completed)
assert brun.derive_terminal_status() == NodeState.completed
# --- Empty composition: immediately terminal, completed.
erun = new_composition_run(Composition(nodes=[]), "run-6")
assert erun.all_terminal() and erun.derive_terminal_status() == NodeState.completed
print("run-state machinery OK")run-state machinery OK