# Job Queue


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

The `JobQueue` provides a resource-aware **multi-lane** job queue for
capability execution (stage 3 / CR-16 rework):

    ┌──────────────────────────────────────────────────────────────┐
    │                    User Application                          │
    │  ┌──────────────────────────────────────────────────────┐    │
    │  │                    JobQueue                          │    │
    │  │  ┌───────────┐  ┌─────────────────┐  ┌────────────┐  │    │
    │  │  │ Pending   │  │ Running Jobs    │  │ History    │  │    │
    │  │  │ Jobs      │→ │ (lanes; gated   │→ │ (completed/│  │    │
    │  │  │ (heap)    │  │  by admission)  │  │ cancelled) │  │    │
    │  │  └───────────┘  └─────────────────┘  └────────────┘  │    │
    │  │        │                 ▲                           │    │
    │  │        ▼                 │ lazy member Jobs          │    │
    │  │  ┌─────────────────────────────────┐                 │    │
    │  │  │ Compositions (DAGs w/ bindings) │                 │    │
    │  │  └─────────────────────────────────┘                 │    │
    │  └──────────────────────────────────────────────────────┘    │
    │         │                │                                   │
    │         ▼                ▼                                   │
    │  ┌──────────────┐  ┌─────────────┐                           │
    │  │ Empirical    │  │ Capability      │                           │
    │  │ store+sysmon │  │ Manager     │                           │
    │  └──────────────┘  └─────────────┘                           │
    └──────────────────────────────────────────────────────────────┘

Key features:

- **Priority queue**: Higher priority jobs execute first (FIFO within
  same priority)
- **Multi-lane dispatch (stage 3)**: independent jobs run concurrently
  when the admission ladder allows — derived from empirical resource
  records + live sysmon telemetry, never from declared metadata; no
  evidence → exclusive (the first run is the measurement run)
- **Compositions (stage 3 / CR-16)**: DAGs of capability invocations
  with execution-time-bound inputs (`OutputRef` markers); member Jobs
  created lazily as dependencies complete — replaces `submit_sequence`
- **Cancellation**: Cancel pending or running jobs with graceful
  fallback to force termination
- **Progress tracking**: Poll progress and status messages during
  execution
- **Observable**: push-based event bus + queue state for UI integration

## Enums

`JobStatus` captures the lifecycle a job moves through. `JobEventType`
enumerates the push-based events emitted on the CR-6 multi-subscriber
event bus. `CancelPhase` surfaces the substrate’s cancel state machine
(cooperative → force → reloading → completed) so UIs can render cancel
progress.

------------------------------------------------------------------------

### CancelPhase

``` python

def CancelPhase(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

```

*Phase of a cancellation in progress (CR-6 + CR-4 pairing).*

Surfaces the substrate’s cancel state machine. Stage 4 wires the
transitions; Stage 1 reserves the enum so `Job.cancel_phase` can be
typed correctly without dangling forward references.

------------------------------------------------------------------------

### JobEventType

``` python

def JobEventType(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

```

*Push-based job event types (CR-6; stage-3 composition rework; CR-14*
journal-primary emission).

Emitted by JobQueue through the single emission path (`_publish_event`):
journal-class events become durable journal rows AND fan out to live
subscribers; liveness-class events (`LIVENESS_EVENT_TYPES` in
`core.journal_store`) fan out only — their final values ride the
terminal STATE_TRANSITION row. Consumers subscribe via
`queue.events(job_id)` / `queue.events_for_composition(comp_id)` /
`queue.all_events()` and receive `JobEvent` instances asynchronously.

COMPOSITION_ADVANCED replaced the retired SEQUENCE_ADVANCED at execution
stage 3 (CR-16: compositions replace sequences outright): it fires when
a member job’s completion unlocks downstream composition nodes — payload
carries the completed node id + the newly enqueued node ids.

The reserved-never-emitted LOG_APPENDED was RETIRED at stage 7 (CR-14):
log-follow is a diagnostics-store cursor read (`get_job_diagnostics`),
not a push event — there are no log files or byte offsets anymore.
Non-job substrate events (worker lifecycle, config, runs) live in
`core.journal_store.SubstrateEventType`.

------------------------------------------------------------------------

### JobStatus

``` python

def JobStatus(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

```

*Status of a job in the queue.*

## JobQueueDependencies

The CR-6 substrate dependency Protocol that JobQueue requires.
`CapabilityManager` satisfies this structurally, so existing hosts
continue to work — passing a `CapabilityManager` to `JobQueue(deps=...)`
still type-checks. The Protocol enables clean future extraction of
JobQueue into a separate library per OQ-5 resolution. The audit’s locked
design listed 3 methods; investigation surfaced more that the execute
path actually consumes (`get_capability_meta`, `get_capability`,
`execute_capability_async`, `reload_capability`) — CR-14 retired the
original flat-log accessor, so it dropped out of the surface. The
stage-3 admission methods and the stage-4 task-channel method are
included too, so the Protocol genuinely captures the surface the queue
uses.

------------------------------------------------------------------------

### JobQueueDependencies

``` python

def JobQueueDependencies(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

```

*Substrate dependencies the JobQueue requires (CR-6 + stage 3).*

CapabilityManager satisfies this structurally; the Protocol exists so
JobQueue can be tested in isolation (with a lightweight test double) and
so a future extraction into a separate library has no API constraint
locked in.

The first 4 methods are the CR-6 execute-path surface (CR-14 retired the
original flat-log accessor — log retrieval is a diagnostics-store query
now). The stage-3 additions (CR-16 multi-lane admission) are consumed
DEFENSIVELY via getattr — a deps implementation without them yields no
admission evidence, so every job runs exclusive = exact pre-stage-3
single-lane behavior. Older test doubles keep working unchanged. CR-14
also reads `journal_store` / `diagnostics_store` ATTRIBUTES via getattr
when the queue isn’t constructed with explicit stores — a deps without
them (test doubles) simply yields no journaling.

## Job / JobEvent / QueueStats / \_Subscription

`Job` is the queued execution unit (CR-6 reshape:
`capability_instance_id` per CR-10, `JobError` typed error per CR-5,
datetime timestamps; stage 3 replaced the sequence tags with composition
tags — `composition_id`/`node_id` — alongside the cancel-phase +
block-reason + retry fields). `JobEvent` is the push-bus payload — every
event carries full tag context (`job_id`, optional
`composition_id`/`node_id`, `capability_instance_id`) so consumers
subscribed to any filter view get the same shape. `QueueStats` is the
aggregate-counts shape returned by `get_stats()`. `_Subscription` is the
internal per-subscriber bounded-queue + drop-counter holder.

`Job` keeps backward-compat `@property` aliases for `capability_name`
(returns `capability_instance_id`) and `created_at` (returns
`submitted_at.timestamp()`), tagged for SG-48 removal after the consumer
cascade migrates.

------------------------------------------------------------------------

### QueueStats

``` python

def QueueStats(
    total_pending:int, total_completed:int, total_failed:int, total_cancelled:int
)->None:

```

*Aggregate counts returned by `JobQueue.get_stats()` (CR-6).*

------------------------------------------------------------------------

### JobEvent

``` python

def JobEvent(
    type:JobEventType, job_id:str, capability_instance_id:str, composition_id:Optional=None, node_id:Optional=None,
    run_id:Optional=None, actor:Optional=None, timestamp:datetime=<factory>, payload:Dict=<factory>
)->None:

```

*A push-based job event (CR-6; stage-3 composition tags).*

Carries full tag context so a subscriber to `all_events()`,
`events(job_id)`, or `events_for_composition(comp_id)` receives
identically-shaped instances. `payload` is a per-event-type structured
dict (e.g., STATE_TRANSITION carries
`{"from": "pending", "to": "running"}`; COMPOSITION_ADVANCED carries
`{"completed_node": ..., "enqueued_nodes": [...]}`).

`run_id` / `actor` (CR-14 follow-up) ride from the Job so journal rows
written by the single emission path carry the host-tier correlation.

------------------------------------------------------------------------

### Job

``` python

def Job(
    id:str, capability_instance_id:str, args:Tuple, kwargs:Dict, status:JobStatus=<JobStatus.pending: 'pending'>,
    priority:int=0, submitted_at:datetime=<factory>, started_at:Optional=None, completed_at:Optional=None,
    progress:float=0.0, status_message:str='', result:Any=None, error:Optional=None, composition_id:Optional=None,
    node_id:Optional=None, task_name:Optional=None, method:Optional=None, run_id:Optional=None, actor:Optional=None,
    control:Dict=<factory>, cancel_requested_at:Optional=None, cancel_phase:Optional=None,
    block_reason:Optional=None, retry_count:int=0, last_resource_snapshot:Optional=None
)->None:

```

*A queued capability execution request (CR-6 reshape; stage-3
composition* rework renamed the sequence tags to composition tags).

`composition_id` / `node_id` are set when the job is a lazily-created
member of a composition (CR-16) — they ride every JobEvent so
`events_for_composition` subscribers see member lifecycle events.

`run_id` / `actor` (CR-14 follow-up) are host-tier correlation tags:
cores pass their run-manifest id + initiating actor at submit so every
journal row for this job links back to the run record (run manifest ↔
journal linkage) and carries who/what initiated it.

## ResourceSnapshot

Point-in-time worker + GPU stats for one job, populated by sampling the
worker proxy’s `get_stats()` and (optionally) the configured
system-monitor capability’s CR-3 typed `get_system_status()` +
`list_processes()` methods. Distinct from CR-7’s
`EmpiricalResourceRecord` which aggregates samples across runs —
`ResourceSnapshot` is the substrate’s “what’s happening right now”
surface. Field shape mirrors `cjm-fasthtml-job-monitor`’s existing
`ResourceSnapshot` so the cascade migration to the substrate-owned type
is structural rather than semantic.

------------------------------------------------------------------------

### ResourceSnapshot

``` python

def ResourceSnapshot(
    timestamp:datetime, worker_pid:int=0, cpu_percent:float=0.0, memory_rss_mb:float=0.0, gpu_index:Optional=None,
    gpu_memory_mb:Optional=None, gpu_type:Optional=None, gpu_total_mb:Optional=None, gpu_load_percent:Optional=None
)->None:

```

*Point-in-time resource usage for one job (CR-6 Stage 3).*

Worker stats (cpu_percent, memory_rss_mb) come from the capability
proxy’s `get_stats()`. GPU fields come from the configured
system-monitor capability (when set on JobQueue) via CR-3’s typed
`list_processes()` (per-PID matching) and `get_system_status()` (global
GPU stats). All GPU fields are Optional — None if no sysmon configured
or the worker isn’t running on a GPU.

Distinct from CR-7’s `EmpiricalResourceRecord` (aggregated profile
across runs); this is “what’s happening right now” for one job.

## JobQueue

Main queue class — submission, execution lifecycle, multi-subscriber
event bus, observability. CR-6 reshape: takes
`deps: JobQueueDependencies` (Protocol) instead of
`manager: CapabilityManager` (concrete); adds bounded-queue
multi-subscriber event-bus state.

------------------------------------------------------------------------

### JobQueue

``` python

def JobQueue(
    deps:JobQueueDependencies, # Substrate dependencies (CapabilityManager satisfies structurally)
    max_history:int=100, # Max completed jobs to retain
    cancel_timeout:float=3.0, # Seconds to wait for cooperative cancel
    progress_poll_interval:float=1.0, # Seconds between progress polls
    sysmon_capability_name:Optional=None, # CR-3 monitor capability instance for GPU stats (None = no GPU info)
    resource_snapshot_cadence_polls:int=4, # Sample resources every Nth progress poll
    max_concurrent_lanes:int=4, # Stage 3: max in-flight jobs (admission still gates each)
    gpu_headroom_fraction:float=0.9, # Stage 3: blunt GPU admission margin (budget = total * fraction)
    journal:Optional=None, # CR-14: journal sink; defaults to deps.journal_store (the manager's)
    diagnostics:Optional=None, # CR-14: diagnostics sink for get_job_diagnostics; defaults to deps.diagnostics_store
):

```

*Resource-aware multi-lane job queue with journal-primary observability*
(CR-6 + CR-14; stage-3 CR-16 rework: ready-set dispatch +
resource-derived admission + composition execution).

### Job Submission

------------------------------------------------------------------------

### submit

``` python

async def submit(
    capability_instance_id:str, # Target capability instance (per CR-10)
    args:VAR_POSITIONAL, priority:int=0, # Higher = more urgent
    task:Optional=None, # Task-channel address: adapter task name (stage 4)
    method:Optional=None, # Task-channel address: adapter method (set with task)
    run_id:Optional=None, # Host-tier run correlation (CR-14 follow-up; reserved name, never a capability kwarg)
    actor:Optional=None, # Who/what initiated (CR-14 follow-up; reserved name)
    control:Optional=None, # Per-call control flags (force/cache-bypass); reserved name, never a capability kwarg
    kwargs:VAR_KEYWORD
)->str: # Returns job_id

```

*Submit a job to the queue.*

CR-2: rejects jobs for disabled capabilities at submit time (typed
CapabilityDisabledError) so the failure surface matches
CapabilityManager. execute_capability’s disabled gate. Submitting to a
disabled capability would otherwise sit in the queue until execution,
then raise — moving the check earlier gives operators an actionable
signal immediately.

CR-6: no STATE_TRANSITION event fires at submit because the job is
already in `pending` state at construction — there’s no transition to
publish. The first STATE_TRANSITION fires when the processor loop moves
the job pending → running.

CR-14: refuses loudly when the journal is wedged (see
`_check_journal_wedge`). `run_id`/`actor` join `priority`/`task`/
`method` as reserved keyword names (they never reach capability kwargs):
cores pass their run-manifest id + initiating actor so every journal row
for this job carries the host-tier correlation.

### Job Control

------------------------------------------------------------------------

### reorder

``` python

def reorder(
    job_id:str, # Job to move
    new_priority:int, # New priority value
)->bool: # True if reordered

```

*Change the priority of a pending job.*

------------------------------------------------------------------------

### cancel

``` python

async def cancel(
    job_id:str, # Job to cancel
)->bool: # True if cancelled

```

*Cancel a pending or running job.*

CR-6: publishes STATE_TRANSITION when a pending job moves directly to
cancelled (no transition through running). Running-job cancellation
publishes CANCEL_PHASE_CHANGED events from `_execute_with_cancellation`.

Stage 3: when the cancelled job is a composition member,
`_advance_composition` runs after the lock is released so the cancelled
status propagates to the composition run (dependents skip; the run
finalizes when all nodes are terminal). Lock release is required because
`_advance_composition` may need to enqueue downstream members in some
flows; asyncio.Lock is not re-entrant.

### Observation

CR-6 split the legacy `get_state()` UI-shaped dict into typed accessors:
`get_pending()`, `get_running_jobs()`, `get_history(limit)`,
`get_stats()`. Both the `get_state()` shim and the single-lane
`get_running()` affordance were removed at SG-48; stage 3’s multi-lane
queue exposes `get_running_jobs()`.

**CR-14 (stage 7)** replaced the log-retrieval surface outright:
`get_job_logs` + the `_slice_log_by_job_window` timestamp-window
heuristic (structurally unsound under stage-3 same-worker concurrency —
interleaved windows) are DELETED; `get_job_diagnostics(job_id)` reads
EXACT job-stamped records from the diagnostics store, and
`get_history_from_journal(limit)` rehydrates the durable job history
from terminal STATE_TRANSITION journal rows (the `_history` migration
rider — restart-surviving, unlimited depth).

------------------------------------------------------------------------

### get_job

``` python

def get_job(
    job_id:str, # Job to retrieve
)->Optional: # Job or None

```

*Get a job by ID.*

------------------------------------------------------------------------

### wait_for_job

``` python

async def wait_for_job(
    job_id:str, # Job to wait for
    timeout:Optional=None, # Max seconds to wait
)->Job: # Completed/failed/cancelled job

```

*Wait for a job to complete.*

Independent of the CR-6 event bus — uses a per-job `asyncio.Event` for
the simple block-until-done affordance. Streaming consumers should use
`events(job_id)` instead.

------------------------------------------------------------------------

### get_pending

``` python

def get_pending(
    
)->List: # Pending jobs, priority-sorted

```

*Get pending jobs, priority-sorted (higher priority first, then FIFO).*

------------------------------------------------------------------------

### get_running_jobs

``` python

def get_running_jobs(
    
)->List: # All currently-executing jobs

```

*All in-flight jobs (stage 3: the queue is multi-lane).*

------------------------------------------------------------------------

### get_history

``` python

def get_history(
    limit:Optional=None, # Max jobs to return (most recent N); None = all
)->List: # Completed/failed/cancelled jobs, most recent first

```

*Get completed jobs, most recent first.*

If `limit` is provided, returns the most recent N. The internal history
list grows append-only up to `max_history`, so older jobs are evicted in
submission order (oldest first).

------------------------------------------------------------------------

### get_stats

``` python

def get_stats(
    
)->QueueStats: # Aggregate counts

```

*Get aggregate queue stats — total counts by terminal status.*

------------------------------------------------------------------------

### get_history_from_journal

``` python

def get_history_from_journal(
    limit:Optional=None, # Most recent N terminal jobs (None = all)
)->List: # Rehydrated job records, most recent first

```

*Durable job history (the CR-14 `_history` migration rider).*

Rehydrates Job records from terminal STATE_TRANSITION journal rows —
restart-surviving and unbounded, unlike the in-memory `get_history`
working set (`max_history` eviction). Rehydrated Jobs are RECORDS:
args/kwargs/result are not journaled (results live in capability DBs;
parameters in run manifests) — identity, timing, status, error,
composition/task fields are present.

Falls back to the in-memory history when no journal is configured.

------------------------------------------------------------------------

### get_job_diagnostics

``` python

def get_job_diagnostics(
    job_id:str, # Job whose diagnostic records to read
    limit:Optional=200, # Max records (None = all)
    after_seq:Optional=None, # Tail cursor for follow-style reads
)->List: # Job-stamped records, oldest first

```

*EXACT per-job diagnostics (CR-14; replaces `get_job_logs`).*

Records were stamped with the job id IN THE WORKER via the call-envelope
contextvar — no timestamp windows, no over-fetch, correct under stage-3
same-worker concurrency and across multi-instance capabilities (both of
which the deleted `_slice_log_by_job_window` heuristic got wrong).
Follow-style consumers poll with `after_seq` (the LOG_APPENDED
replacement).

Returns \[\] when no diagnostics store is configured.

### Event Bus

The CR-6 push-based event surface. Three subscriber views
(`events(job_id)`, `events_for_composition(comp_id)`, `all_events()`)
all share one fan-out path: each subscriber owns a bounded
`asyncio.Queue`, and the publisher routes every event to all matching
subscriber keys (`"all"`, `"job:<id>"`, `"comp:<id>"`). Slow subscribers
backpressure themselves — the publisher never blocks. Composition events
are *tagged* job events (each carries optional `composition_id` +
`node_id`) rather than a separate topic, so a single subscription
captures the full per-composition narrative including the
`COMPOSITION_ADVANCED` aggregate signal.

------------------------------------------------------------------------

### all_events

``` python

def all_events(
    
)->AsyncIterator:

```

*Subscribe to all events (firehose; async generator).*

Useful for global dashboards, audit logs, and telemetry sinks that need
the complete event stream rather than a filtered view.

------------------------------------------------------------------------

### events_for_composition

``` python

def events_for_composition(
    composition_id:str, # Filter to events tagged with this composition
)->AsyncIterator:

```

*Subscribe to events for all member jobs of a composition (async*
generator).

Yields the unified per-composition narrative: member-job lifecycle
events interleaved with `COMPOSITION_ADVANCED` aggregate signals (stage
3).

------------------------------------------------------------------------

### events

``` python

def events(
    job_id:str, # Filter to events for this job
)->AsyncIterator:

```

*Subscribe to events for a single job (async generator).*

Yields events as they fire. Multiple concurrent subscribers to the same
job_id each get their own independent stream — useful for multi-tab UIs.
Late subscribers catch up exactly via the journal: query
`journal.query(job_id=..., after_seq=cursor)` then follow live — the bus
is a tail of the journal, not the record itself (CR-14).

### Composition Submission & Control

Stage 3 (CR-16): compositions — DAGs of capability-invocation nodes with
execution-time-bound inputs — REPLACE the pre-bound `submit_sequence`
batch model outright (the pass-2 Thread-4 decision; the 2026-05-31
opt-in framing was superseded). The data model, validation, and binding
resolution live in `core/ports.ipynb`; the queue owns submission, lazy
member-Job creation, advancement, cancellation, and finalization.

------------------------------------------------------------------------

### submit_composition

``` python

async def submit_composition(
    comp:Composition, # Composition to run (validated at submit)
)->str: # composition run id

```

*Submit a composition — a DAG of capability-invocation nodes with*
execution-time-bound inputs (stage 3; replaces `submit_sequence`).

Validates upfront: structural validation via `new_composition_run`
(duplicate ids / unknown refs / cycles → `CompositionValidationError`)
and the disabled-capability gate across all nodes
(`CapabilityDisabledError`), matching the sequence-era precedent. Member
Jobs are created LAZILY — only dependency-free nodes have Jobs at
submit; downstream nodes get their kwargs materialized from upstream
results at advancement time.

CR-14: refuses loudly when the journal is wedged (the same gate as
`submit`).

Consumers wait via `wait_for_composition`, observe via
`events_for_composition`, inspect via `get_composition`.

------------------------------------------------------------------------

### wait_for_composition

``` python

async def wait_for_composition(
    composition_id:str, # Composition to wait for
    timeout:Optional=None, # Max seconds to wait
)->CompositionRun: # Terminal run record

```

*Block until a composition reaches terminal status (the* `wait_for_job`
analog for compositions).

------------------------------------------------------------------------

### cancel_composition

``` python

async def cancel_composition(
    composition_id:str, # Composition to cancel
)->bool: # True if cancellation was recorded

```

*Cancel an in-flight composition (USER intent — the run lands*
`cancelled`).

Records intent FIRST (`cancel_requested`) so member-cancel callbacks
racing through `_advance_composition` derive the right terminal status;
then marks never-started nodes cancelled and cancels every member whose
Job is still pending or running (in-flight members resolve through the
per-job cooperative-cancel machinery and finalize the run on the way
out). Returns False if the composition is unknown or already terminal.

------------------------------------------------------------------------

### get_composition

``` python

def get_composition(
    composition_id:str, # Composition to retrieve
)->Optional: # CompositionRun or None

```

*Get a composition run by id (read-only inspection).*

### Resource Snapshots

`get_resource_snapshot(job_id)` returns a point-in-time
`ResourceSnapshot` for a job, composing worker proxy `get_stats()` with
the configured sysmon capability’s CR-3 typed `list_processes()`
(per-PID matching) and `get_system_status()` (global GPU stats).
`_sample_resource_snapshot` is the internal helper invoked both from
this method and from `_poll_progress` (which emits `RESOURCE_SNAPSHOT`
events at `resource_snapshot_cadence_polls` cadence). All sysmon calls
are best-effort — failures are swallowed and the snapshot returns with
worker stats only.

------------------------------------------------------------------------

### get_resource_snapshot

``` python

def get_resource_snapshot(
    job_id:str, # Job to sample resources for
)->Optional:

```

*Get a point-in-time resource snapshot for a job (CR-6 Stage 3).*

Returns None if the job is unknown or the worker proxy doesn’t expose
`get_stats`. Composes worker stats with sysmon GPU stats when the queue
is configured with a `sysmon_capability_name`.

### Lifecycle

------------------------------------------------------------------------

### start

``` python

async def start(
    
)->None:

```

*Start the queue processor.*

CR-6 Stage 4: installs the substrate-side retry observer on `_deps`
(typically a CapabilityManager). When CR-7’s reactive-retry path fires
for a running job, the observer updates `Job.retry_count` and publishes
a RETRY_STARTED event tagged with the in-flight job. Previous observer
value (if any) is saved + restored in stop() for cooperative
coexistence.

------------------------------------------------------------------------

### stop

``` python

async def stop(
    
)->None:

```

*Stop the queue processor gracefully.*

Stage 3: in-flight job tasks are detached lanes now — drain them with
the same 5s budget as the processor task; leftovers are cancelled.

CR-6 Stage 4: restores the previous `_on_retry` observer on deps to
leave the manager in the state we found it (cooperative with other queue
instances, tests, etc.).

### Internal Methods

### Multi-lane dispatch + resource-derived admission

Stage 3 replaced the pre-overhaul single-lane loop (one inline
`_execute_job` at a time, a single `self._running` slot) with
**ready-set dispatch**: the loop launches every job the admission ladder
allows, as independent tasks. Admission derives from the CR-7 empirical
store (per `(instance_id, config_hash)` measured peaks) + live sysmon
telemetry — never from declared metadata. A job with no empirical
evidence runs EXCLUSIVE: its first run is its measurement run, after
which the store’s keying graduates it automatically. Worst case (empty
store, no sysmon) degrades to exactly the old single-lane behavior; CR-7
reactive retry backstops admission misses.

## Usage Example

``` python
from cjm_substrate.core.manager import CapabilityManager
from cjm_substrate.core.queue import JobQueue, JobStatus
from cjm_substrate.core.scheduling import QueueScheduler

# Setup
manager = CapabilityManager(scheduler=QueueScheduler())
manager.discover_manifests()
manager.load_capability(manager.get_discovered_meta("sys-mon"))
manager.register_system_monitor("sys-mon")

# Create queue
queue = JobQueue(manager)
await queue.start()

# Submit jobs
job1_id = await queue.submit("whisper", audio="/path/to/audio1.mp3")
job2_id = await queue.submit("gemini-vision", image="/path/to/image.png")
job3_id = await queue.submit("whisper", audio="/path/to/audio2.mp3", priority=10)

# Monitor queue state (typed accessors)
print(f"Running: {queue.get_running_jobs()}")
print(f"Pending: {len(queue.get_pending())} jobs")

# Cancel a job
await queue.cancel(job2_id)

# Wait for a job to complete
job1 = await queue.wait_for_job(job1_id)
if job1.status == JobStatus.completed:
    print(job1.result)

# Cleanup
await queue.stop()
manager.unload_all()
```

``` python
# Task-channel routing test (stage 4, CR-17 pt 2): submit validation + the
# executor routing task-addressed jobs via execute_capability_task_async while
# execute-channel jobs stay on execute_capability_async.
import asyncio as _aio


class _TaskChannelDeps:
    def __init__(self):
        self.task_calls = []
        self.exec_calls = []

    def get_capability_meta(self, name_or_id):
        return None

    def get_capability(self, name_or_id):
        return object()  # non-None: the executor's loaded-check passes; no cancel paths used

    async def execute_capability_async(self, name_or_id, *args, **kwargs):
        self.exec_calls.append((name_or_id, args, kwargs))
        return {"channel": "execute"}

    async def execute_capability_task_async(self, name_or_id, task_name, method, **kwargs):
        self.task_calls.append((name_or_id, task_name, method, kwargs))
        return {"channel": "task"}

    def reload_capability(self, name_or_id):
        pass


async def _run_task_channel_test():
    deps = _TaskChannelDeps()
    q = JobQueue(deps)
    await q.start()
    try:
        jid = await q.submit("graph", task="graph-storage", method="query_nodes",
                             query={"type": "node_query"})
        job = await q.wait_for_job(jid, timeout=10)
        assert job.status == JobStatus.completed, job.error
        assert job.result == {"channel": "task"}
        assert deps.task_calls == [
            ("graph", "graph-storage", "query_nodes", {"query": {"type": "node_query"}})]
        # execute channel unchanged
        jid2 = await q.submit("graph", "posarg", key="val")
        job2 = await q.wait_for_job(jid2, timeout=10)
        assert job2.result == {"channel": "execute"}
        assert deps.exec_calls == [("graph", ("posarg",), {"key": "val"})]
        # validation: task without method refuses at submit
        from cjm_substrate.core.errors import CapabilityInputError as _PIE
        try:
            await q.submit("graph", task="graph-storage")
            raise AssertionError("should have raised")
        except _PIE:
            pass
    finally:
        await q.stop()

_aio.run(_run_task_channel_test())
```
