One-call factory that assembles a PluginManager + JobQueue + plugin bindings — closes the demo-app boilerplate duplication audited across 5 substrate consumers.
Pipeline
Container returned by create_pipeline. Holds the assembled PluginManager, JobQueue, and a bindings dict mapping plugin names to PluginBinding views for the plugins that loaded successfully. Implements async context-manager semantics for ergonomic teardown:
asyncwith create_pipeline(plugins=["whisper", "sys-mon"]) as pipe: result =await pipe.bindings["whisper"].execute_async(audio=path)
The context-manager exit calls queue.stop() + manager.unload_all().
Callers can describe each plugin entry in three forms — a bare name (use defaults), a (name, config) tuple, or a mapping with name + config keys. The internal helper normalizes all three into a (name, config) pair.
create_pipeline
The canonical assembly factory. Substitutes for the ~70-line boilerplate that 5 audited demos have been repeating. Returns a Pipeline whose queue hasn’t been started yet — callers either use it as an async context manager or call await pipeline.start() explicitly.
create_pipeline
def create_pipeline( plugins:Optional=None, # Plugins to discover + load scheduler:Optional=None, # Resource policy (default: permissive) system_monitor:Optional=None, # Plugin name to register as system monitor search_paths:Optional=None, # Custom manifest search paths queue_kwargs:Optional=None, # Extra kwargs forwarded to JobQueue strict:bool=True, # SG-5 strict config validation on each load)->Pipeline: # Assembled stack ready to start
Assemble a PluginManager + JobQueue + plugin bindings in one call.
Steps performed: 1. Construct PluginManager with the given scheduler + search paths 2. discover_manifests() 3. For each spec in plugins: load the plugin and create a PluginBinding 4. If system_monitor is set, register that plugin as the sys-mon 5. Construct JobQueue (NOT started — caller starts via context manager)
Plugins that fail to load are logged but do not raise; their entries are omitted from Pipeline.bindings. Use the returned Pipeline.manager to inspect which loads succeeded.
# SG-18 regression: spec normalization handles all three accepted forms.from typing import cast# Bare stringassert _normalize_spec("whisper") == ("whisper", None)# Tuple — one-elementassert _normalize_spec(("whisper",)) == ("whisper", None)# Tuple — name + configassert _normalize_spec(("whisper", {"model": "large"})) == ("whisper", {"model": "large"})# Mapping with configassert _normalize_spec({"name": "whisper", "config": {"model": "tiny"}}) == ("whisper", {"model": "tiny"},)# Mapping without configassert _normalize_spec({"name": "whisper"}) == ("whisper", None)# Bad forms raisetry: _normalize_spec(cast(Any, 42))exceptTypeErroras e:print(f"✓ rejected int spec: {e}")try: _normalize_spec(("a", "b", "c")) # type: ignoreexceptValueErroras e:print(f"✓ rejected 3-tuple: {e}")try: _normalize_spec({"config": {}}) # type: ignoreexceptValueErroras e:print(f"✓ rejected mapping without name: {e}")# Pipeline dataclass round-tripsfrom unittest.mock import MagicMockmgr = MagicMock(spec=PluginManager)q = MagicMock(spec=JobQueue)pipe = Pipeline(manager=mgr, queue=q)assert pipe.bindings == {}assert pipe.manager is mgrassert pipe.queue is qprint("SG-18 spec normalization + Pipeline shape: PASS")
✓ rejected int spec: unsupported plugin spec type: int
✓ rejected 3-tuple: plugin spec tuple must be (name,) or (name, config); got ('a', 'b', 'c')
✓ rejected mapping without name: plugin spec mapping missing 'name' key: {'config': {}}
SG-18 spec normalization + Pipeline shape: PASS