Bootstrap

One-call factory that assembles a PluginManager + JobQueue + plugin bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.

Pipeline

Container returned by create_pipeline. Holds the assembled PluginManager, JobQueue, and a bindings dict mapping plugin names to PluginBinding views for the plugins that loaded successfully. Implements async context-manager semantics for ergonomic teardown:

async with create_pipeline(plugins=["whisper", "sys-mon"]) as pipe:
    result = await pipe.bindings["whisper"].execute_async(audio=path)

The context-manager exit calls queue.stop() + manager.unload_all().


Pipeline


def Pipeline(
    manager:PluginManager, queue:JobQueue, bindings:Dict=<factory>
)->None:

Assembled substrate stack: manager + queue + plugin bindings.

Spec normalization

Callers can describe each plugin entry in three forms — a bare name (use defaults), a (name, config) tuple, or a mapping with name + config keys. The internal helper normalizes all three into a (name, config) pair.

create_pipeline

The canonical assembly factory. Substitutes for the ~70-line boilerplate that 5 audited demos have been repeating. Returns a Pipeline whose queue hasn’t been started yet — callers either use it as an async context manager or call await pipeline.start() explicitly.


create_pipeline


def create_pipeline(
    plugins:Optional=None, # Plugins to discover + load
    scheduler:Optional=None, # Resource policy (default: permissive)
    system_monitor:Optional=None, # Plugin name to register as system monitor
    search_paths:Optional=None, # Custom manifest search paths
    queue_kwargs:Optional=None, # Extra kwargs forwarded to JobQueue
    strict:bool=True, # SG-5 strict config validation on each load
)->Pipeline: # Assembled stack ready to start

Assemble a PluginManager + JobQueue + plugin bindings in one call.

Steps performed: 1. Construct PluginManager with the given scheduler + search paths 2. discover_manifests() 3. For each spec in plugins: load the plugin and create a PluginBinding 4. If system_monitor is set, register that plugin as the sys-mon 5. Construct JobQueue (NOT started — caller starts via context manager)

Plugins that fail to load are logged but do not raise; their entries are omitted from Pipeline.bindings. Use the returned Pipeline.manager to inspect which loads succeeded.

# SG-18 regression: spec normalization handles all three accepted forms.
from typing import cast

# Bare string
assert _normalize_spec("whisper") == ("whisper", None)

# Tuple — one-element
assert _normalize_spec(("whisper",)) == ("whisper", None)

# Tuple — name + config
assert _normalize_spec(("whisper", {"model": "large"})) == ("whisper", {"model": "large"})

# Mapping with config
assert _normalize_spec({"name": "whisper", "config": {"model": "tiny"}}) == (
    "whisper", {"model": "tiny"},
)

# Mapping without config
assert _normalize_spec({"name": "whisper"}) == ("whisper", None)

# Bad forms raise
try:
    _normalize_spec(cast(Any, 42))
except TypeError as e:
    print(f"✓ rejected int spec: {e}")

try:
    _normalize_spec(("a", "b", "c"))  # type: ignore
except ValueError as e:
    print(f"✓ rejected 3-tuple: {e}")

try:
    _normalize_spec({"config": {}})  # type: ignore
except ValueError as e:
    print(f"✓ rejected mapping without name: {e}")

# Pipeline dataclass round-trips
from unittest.mock import MagicMock
mgr = MagicMock(spec=PluginManager)
q = MagicMock(spec=JobQueue)
pipe = Pipeline(manager=mgr, queue=q)
assert pipe.bindings == {}
assert pipe.manager is mgr
assert pipe.queue is q

print("SG-18 spec normalization + Pipeline shape: PASS")
✓ rejected int spec: unsupported plugin spec type: int
✓ rejected 3-tuple: plugin spec tuple must be (name,) or (name, config); got ('a', 'b', 'c')
✓ rejected mapping without name: plugin spec mapping missing 'name' key: {'config': {}}
SG-18 spec normalization + Pipeline shape: PASS