# Composition Ports


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## The composition model

A **composition** is a static DAG of capability-invocation nodes
submitted to the `JobQueue` as one unit. It REPLACES the pre-bound
`submit_sequence` batch model (pass-2 Thread 4: the old model could not
pipe step outputs to step inputs — `_advance_sequence` built every next
job from kwargs frozen at submit time).

Three load-bearing properties:

- **Bindings live inside kwargs.** A node’s kwargs mix static values
  with `OutputRef(node, field)` markers. There is no separate edge list
  — the DAG topology is *derived* by scanning kwargs for markers (the
  derive-from-code dialect applied to the composition itself), so
  declaration and behavior cannot drift apart.
- **Member jobs are created lazily.** A node’s `Job` is constructed only
  when all of its dependencies have completed, because its kwargs do not
  *exist* until the upstream results do. This is the heart of
  execution-time binding.
- **Fan-out is host-constructed.** The evidenced map shape (one list
  output → N downstream invocations) is built by the *host* reading the
  producer’s list and constructing one composition of N independent
  subgraphs with static per-item kwargs (stage-3 ledger, ratified
  amendment 2). A dynamically expanding map node is seam-admitted but
  not built — no adopter needs the host to not see the list.

The pure data + validation + binding-resolution layer lives here; the
executor (multi-lane ready-set dispatch + resource-derived admission)
lives in `core/queue.ipynb`.

------------------------------------------------------------------------

### NodeState

``` python

def NodeState(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

```

*State of one composition node (and, for the terminal subset, of a*
whole composition run).

`skipped` is composition-specific: a node whose transitive dependencies
failed/cancelled can never run (its inputs will never exist) and is
recorded as skipped rather than getting a Job at all. Composition-level
status uses the running/completed/failed/cancelled subset.

------------------------------------------------------------------------

### OutputRef

``` python

def OutputRef(
    node:str, field:Optional=None
)->None:

```

*Binding marker: this kwarg’s value comes from an upstream node’s
result.*

Placed directly in a `CompositionNode.kwargs` value position.
`field=None` binds the WHOLE result (fan-in folds); a field name
extracts one field via `extract_output_field` (dict key or typed-result
attribute). Frozen so markers are hashable + safely shareable across
nodes.

------------------------------------------------------------------------

### CompositionNode

``` python

def CompositionNode(
    id:str, capability_instance_id:str, kwargs:Dict=<factory>, priority:int=0, task_name:Optional=None,
    method:Optional=None, control:Dict=<factory>
)->None:

```

*One capability invocation in a composition.*

`kwargs` mixes static values with `OutputRef` markers; the markers are
scanned (top-level values only — nested containers are not searched, by
design: evidence needs single-position bindings, and a nested-marker
grammar is seam-admitted later) to derive the node’s dependencies.

------------------------------------------------------------------------

### Composition

``` python

def Composition(
    nodes:List, fail_fast:bool=True, priority:int=0, run_id:Optional=None, actor:Optional=None
)->None:

```

*A static DAG of capability-invocation nodes, submitted as one unit.*

`fail_fast=True` (default, matching the audit-locked sequence default):
on a member failure, pending independent members are cancelled,
in-flight members run to completion, transitive dependents are skipped,
and the composition lands `failed`. `fail_fast=False` is best-effort:
independent members continue; only transitive dependents of the failure
are skipped.

`run_id` / `actor` (CR-14 follow-up) are host-tier correlation tags
stamped onto every lazily-created member Job (the
`submit(run_id=, actor=)` analog for compositions) — NOT the composition
run’s own id, which the queue assigns at submit.

``` python
# A convert->align shape: convert -> align, mixed static + bound kwargs.
comp = Composition(nodes=[
    CompositionNode("convert", "media-converter", {
        "action": "convert", "input_path": "/tmp/a.mp3",
        "output_format": "wav", "sample_rate": 16000, "channels": 1,
    }),
    CompositionNode("align", "forced-aligner", {
        "audio": OutputRef("convert", "output_path"),
        "text": "hello world",
    }),
])
assert comp.fail_fast is True
assert isinstance(comp.nodes[1].kwargs["audio"], OutputRef)
assert comp.nodes[1].kwargs["audio"].node == "convert"
print("composition model OK")
```

## Validation — derived edges + cycle check

`validate_composition` runs at submit time (mirroring
`submit_sequence`’s upfront-validation precedent) and returns the
derived dependency map that the run record carries for the rest of the
composition’s life. It rejects duplicate node ids, references to unknown
nodes, and cycles (Kahn’s algorithm; a self-reference is a cycle of
length one).

------------------------------------------------------------------------

### CompositionValidationError

``` python

def CompositionValidationError(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

```

*A composition failed submit-time validation (duplicate ids, unresolved*
`OutputRef` targets, or a dependency cycle).

------------------------------------------------------------------------

### validate_composition

``` python

def validate_composition(
    comp:Composition, # Composition to validate
)->Dict: # node_id -> set of upstream node ids (the derived DAG)

```

*Validate a composition and return its derived dependency map.*

Raises `CompositionValidationError` on duplicate node ids, `OutputRef`
targets that name no node in the composition, or dependency cycles. An
empty composition is valid (returns `{}`) — the queue completes it at
submit, mirroring the empty-sequence totality precedent.

``` python
# Self-contained (nbdev-prepare may parallelize test execution): the
# qwen3-e2e pipe shape, redefined locally.
comp = Composition(nodes=[
    CompositionNode("convert", "media-converter", {
        "action": "convert", "input_path": "/tmp/a.mp3",
        "output_format": "wav", "sample_rate": 16000, "channels": 1,
    }),
    CompositionNode("align", "forced-aligner", {
        "audio": OutputRef("convert", "output_path"),
        "text": "hello world",
    }),
])

# Happy path: derived edges match the markers.
deps = validate_composition(comp)
assert deps == {"convert": set(), "align": {"convert"}}

# Duplicate ids rejected.
try:
    validate_composition(Composition(nodes=[
        CompositionNode("a", "p", {}), CompositionNode("a", "p", {})]))
    raise AssertionError("expected duplicate-id failure")
except CompositionValidationError as e:
    assert "duplicate" in str(e)

# Unknown ref rejected.
try:
    validate_composition(Composition(nodes=[
        CompositionNode("a", "p", {"x": OutputRef("ghost")})]))
    raise AssertionError("expected unknown-ref failure")
except CompositionValidationError as e:
    assert "ghost" in str(e)

# Cycle rejected (a <-> b), and self-reference is a cycle of one.
try:
    validate_composition(Composition(nodes=[
        CompositionNode("a", "p", {"x": OutputRef("b")}),
        CompositionNode("b", "p", {"y": OutputRef("a")})]))
    raise AssertionError("expected cycle failure")
except CompositionValidationError as e:
    assert "cycle" in str(e)
try:
    validate_composition(Composition(nodes=[
        CompositionNode("a", "p", {"x": OutputRef("a")})]))
    raise AssertionError("expected self-cycle failure")
except CompositionValidationError as e:
    assert "cycle" in str(e)

# Empty composition is valid.
assert validate_composition(Composition(nodes=[])) == {}
print("validation OK")
```

## Binding resolution — the `field_of` successor

`extract_output_field` is the SINGLE substrate-owned field resolver:
dict results resolve by key (the multi-op tools — ffmpeg, graph — return
dicts until their adapter splits at stages 4/8), typed wire DTOs resolve
by attribute. Stage 2 retired the per-consumer `field_of` dict-or-object
tolerance copies (E5/D10/C7 + 10 inline e2e sites); this is the one
legitimate reincarnation, living at the binding seam where the substrate
— not each consumer — owns the extraction.

------------------------------------------------------------------------

### CompositionBindingError

``` python

def CompositionBindingError(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

```

*An `OutputRef` could not be resolved against the producer’s recorded*
result at execution time (missing producer result, missing
key/attribute).

------------------------------------------------------------------------

### extract_output_field

``` python

def extract_output_field(
    result:Any, # The producer node's recorded result (typed DTO or capability-side dict)
    field:Optional, # Field to extract; None returns the whole result
    producer:str='?', # Producer node id (error messages only)
)->Any: # The bound value

```

*Extract a field from an upstream result for binding into a kwarg.*

The single substrate-owned successor of the retired `field_of` helpers:
dicts resolve by KEY (intent for capability-side dict results),
everything else by ATTRIBUTE (typed wire DTOs). Missing fields raise
`CompositionBindingError` loudly — silent shape-shifting is what stage 2
retired (F12 fail-loudly posture).

------------------------------------------------------------------------

### resolve_node_kwargs

``` python

def resolve_node_kwargs(
    node:CompositionNode, # Node whose kwargs are being materialized
    results:Dict, # Completed producers' results, keyed by node id
)->Dict: # kwargs with every OutputRef replaced by its bound value

```

*Materialize a node’s kwargs by resolving its `OutputRef` markers.*

Called by the executor at the moment a node becomes ready (all
dependencies completed) — this is where execution-time binding actually
happens. Static kwargs pass through untouched.

``` python
from dataclasses import dataclass as _dc

# Dict results resolve by key (the ffmpeg shape).
assert extract_output_field({"output_path": "/x.wav"}, "output_path") == "/x.wav"
# None binds the whole result.
whole = {"a": 1}
assert extract_output_field(whole, None) is whole

# Typed results resolve by attribute (the wire-DTO shape).
@_dc
class _FakeResult:
    text: str = "hi"
assert extract_output_field(_FakeResult(), "text") == "hi"

# Missing key / attribute fail loudly.
for bad_result in ({"other": 1}, _FakeResult()):
    try:
        extract_output_field(bad_result, "nope", producer="conv")
        raise AssertionError("expected binding failure")
    except CompositionBindingError as e:
        assert "conv" in str(e) and "nope" in str(e)

# resolve_node_kwargs: markers replaced, statics untouched.
n = CompositionNode("align", "qwen3", {
    "audio": OutputRef("convert", "output_path"), "text": "hello"})
out = resolve_node_kwargs(n, {"convert": {"output_path": "/x.wav"}})
assert out == {"audio": "/x.wav", "text": "hello"}

# Unrecorded producer is an executor-ordering bug and raises.
try:
    resolve_node_kwargs(n, {})
    raise AssertionError("expected missing-producer failure")
except CompositionBindingError as e:
    assert "convert" in str(e)
print("binding resolution OK")
```

    binding resolution OK

## Run state — `CompositionNodeRun` / `CompositionRun`

The run record tracks per-node state for the composition’s whole life.
The advancement logic (which nodes are ready, what gets skipped on
failure, what terminal status the run derives) lives HERE as pure
methods so it is unit-testable without a queue — the executor in
`queue.ipynb` stays a thin integration: create lazy Jobs for ready
nodes, record member outcomes, and re-scan.

------------------------------------------------------------------------

### CompositionNodeRun

``` python

def CompositionNodeRun(
    node_id:str, state:NodeState=<NodeState.pending: 'pending'>, job_id:Optional=None, result:Any=None,
    error:Optional=None
)->None:

```

*Live state of one node within a composition run.*

------------------------------------------------------------------------

### CompositionRun

``` python

def CompositionRun(
    id:str, composition:Composition, deps:Dict, dependents:Dict, nodes_by_id:Dict, node_runs:Dict,
    status:NodeState=<NodeState.running: 'running'>, cancel_requested:bool=False, submitted_at:datetime=<factory>,
    completed_at:Optional=None
)->None:

```

*Tracks a submitted composition through execution (lives in*
`JobQueue._compositions`).

Carries the validated dependency map (and its reverse) so advancement
decisions are O(edges) lookups for the rest of the run.
Composition-level `status` reuses the NodeState terminal subset: starts
`running`, transitions to completed / failed / cancelled via
`derive_terminal_status` once `all_terminal()`.

`cancel_requested` records USER cancel intent (`cancel_composition`),
distinguishing it from the executor’s fail-fast HOUSEKEEPING cancels of
independent pending members after a failure — without the flag, a
failure-driven run would derive `cancelled` instead of `failed` because
its housekeeping cancels would dominate.

------------------------------------------------------------------------

### new_composition_run

``` python

def new_composition_run(
    comp:Composition, # Composition to run (validated here)
    run_id:str, # Run UUID (assigned by the queue)
)->CompositionRun: # Fresh run record with derived topology

```

*Validate a composition and build its run record.*

------------------------------------------------------------------------

### ready_nodes

``` python

def ready_nodes(
    
)->List: # Node ids that are pending with all dependencies completed

```

*Nodes whose member Jobs can be created right now.*

Scan order follows the composition’s node order (readability +
deterministic dispatch among equally-ready nodes).

------------------------------------------------------------------------

### record_started

``` python

def record_started(
    node_id:str, # Node whose member Job was just created/enqueued
    job_id:str, # The member Job's id
)->None:

```

*Mark a node running and bind it to its member Job.*

------------------------------------------------------------------------

### record_result

``` python

def record_result(
    node_id:str, # Node whose member Job reached terminal status
    state:NodeState, # completed / failed / cancelled
    result:Any=None, # Member job result (if completed)
    error:Optional=None, # Structured failure (if failed/cancelled)
)->None:

```

*Record a member job’s terminal outcome on its node.*

------------------------------------------------------------------------

### skip_dependents

``` python

def skip_dependents(
    node_id:str, # Node whose failure/cancellation poisons its downstream
)->List: # Node ids newly marked skipped (transitive)

```

*Mark every still-pending transitive dependent of `node_id` skipped.*

Skipped nodes never get a Job — their inputs can never exist. Runs
regardless of fail_fast (dependents are unrunnable either way; fail_fast
only governs INDEPENDENT pending members, which the executor cancels).

------------------------------------------------------------------------

### all_terminal

``` python

def all_terminal(
    
)->bool: # True when every node is in a terminal state

```

*Whether the composition has nothing left to run or wait for.*

------------------------------------------------------------------------

### derive_terminal_status

``` python

def derive_terminal_status(
    
)->NodeState: # cancelled / failed / completed

```

*Derive the composition-level terminal status from member outcomes.*

Precedence: 1. USER cancel intent (`cancel_requested`) dominates
everything. 2. A member failure under fail_fast lands the run `failed` —
the executor’s housekeeping cancels of independent pending members do
NOT flip it to cancelled (that’s what `cancel_requested` distinguishes).
3. A directly-cancelled member (job-level cancel, no failure, no
composition intent) lands the run `cancelled`. 4. Otherwise `completed`
— including best-effort (fail_fast=False) runs with failed members: “we
attempted everything”, matching the sequence semantics this replaces.
Per-node outcomes stay inspectable on `node_runs` either way. (`skipped`
never appears without a failed or cancelled member upstream of it, so it
needs no clause of its own.)

------------------------------------------------------------------------

### results_by_node

``` python

def results_by_node(
    
)->Dict: # node_id -> result, for completed nodes only

```

*Completed members’ results keyed by node id (what host folds consume,*
and what `resolve_node_kwargs` reads at advancement time).

``` python
import dataclasses

# Self-contained (nbdev-prepare may parallelize test execution): the
# qwen3-e2e pipe shape, redefined locally.
comp = Composition(nodes=[
    CompositionNode("convert", "media-converter", {
        "action": "convert", "input_path": "/tmp/a.mp3",
        "output_format": "wav", "sample_rate": 16000, "channels": 1,
    }),
    CompositionNode("align", "forced-aligner", {
        "audio": OutputRef("convert", "output_path"),
        "text": "hello world",
    }),
])


# --- Pipe progression: align becomes ready only after convert completes.
run = new_composition_run(comp, "run-1")
assert run.ready_nodes() == ["convert"]
run.record_started("convert", "job-1")
assert run.ready_nodes() == []  # nothing ready while convert runs
run.record_result("convert", NodeState.completed, result={"output_path": "/x.wav"})
assert run.ready_nodes() == ["align"]
resolved = resolve_node_kwargs(run.nodes_by_id["align"], run.results_by_node())
assert resolved["audio"] == "/x.wav"
run.record_started("align", "job-2")
run.record_result("align", NodeState.completed, result={"items": []})
assert run.all_terminal() and run.derive_terminal_status() == NodeState.completed

# --- Parallel fan-in: VAD ∥ FA both ready immediately (D8 shape).
par = Composition(nodes=[
    CompositionNode("vad", "silero", {"media_path": "/seg.wav"}),
    CompositionNode("fa", "qwen3", {"audio": "/seg.wav", "text": "t"}),
])
prun = new_composition_run(par, "run-2")
assert prun.ready_nodes() == ["vad", "fa"]

# --- Failure: dependents skipped transitively; independents untouched.
diamond = Composition(nodes=[
    CompositionNode("a", "p", {}),
    CompositionNode("b", "p", {"x": OutputRef("a")}),
    CompositionNode("c", "p", {"x": OutputRef("b")}),
    CompositionNode("d", "p", {}),  # independent of a/b/c
])
drun = new_composition_run(diamond, "run-3")
drun.record_started("a", "j-a")
drun.record_result("a", NodeState.failed, error=None)
assert sorted(drun.skip_dependents("a")) == ["b", "c"]
assert drun.node_runs["d"].state == NodeState.pending  # executor decides d's fate (fail_fast)
drun.record_result("d", NodeState.cancelled)  # fail_fast executor HOUSEKEEPING cancel
assert drun.all_terminal()
# Housekeeping cancels do NOT flip a failure-driven run to cancelled.
assert drun.derive_terminal_status() == NodeState.failed

# --- USER cancel intent dominates, including over failures.
crun = new_composition_run(diamond, "run-3b")
crun.record_result("a", NodeState.failed)
crun.skip_dependents("a")
crun.cancel_requested = True
crun.record_result("d", NodeState.cancelled)
assert crun.derive_terminal_status() == NodeState.cancelled

# --- Direct member cancel (no failure, no intent) lands cancelled.
mrun = new_composition_run(diamond, "run-3c")
mrun.record_result("a", NodeState.cancelled)
mrun.skip_dependents("a")
mrun.record_result("d", NodeState.completed)
assert mrun.derive_terminal_status() == NodeState.cancelled

# --- fail_fast derivation without cancellation: failed + skipped -> failed.
drun2 = new_composition_run(diamond, "run-4")
drun2.record_result("a", NodeState.failed)
drun2.skip_dependents("a")
drun2.record_result("d", NodeState.completed)
assert drun2.derive_terminal_status() == NodeState.failed

# --- Best-effort: failures recorded, run still lands completed.
be = dataclasses.replace(diamond, fail_fast=False)
brun = new_composition_run(be, "run-5")
brun.record_result("a", NodeState.failed)
brun.skip_dependents("a")
brun.record_result("d", NodeState.completed)
assert brun.derive_terminal_status() == NodeState.completed

# --- Empty composition: immediately terminal, completed.
erun = new_composition_run(Composition(nodes=[]), "run-6")
assert erun.all_terminal() and erun.derive_terminal_status() == NodeState.completed
print("run-state machinery OK")
```

    run-state machinery OK
