Job Queue

Resource-aware job queue for sequential capability execution with cancellation support

The JobQueue provides a resource-aware multi-lane job queue for capability execution (stage 3 / CR-16 rework):

┌──────────────────────────────────────────────────────────────┐
│                    User Application                          │
│  ┌──────────────────────────────────────────────────────┐    │
│  │                    JobQueue                          │    │
│  │  ┌───────────┐  ┌─────────────────┐  ┌────────────┐  │    │
│  │  │ Pending   │  │ Running Jobs    │  │ History    │  │    │
│  │  │ Jobs      │→ │ (lanes; gated   │→ │ (completed/│  │    │
│  │  │ (heap)    │  │  by admission)  │  │ cancelled) │  │    │
│  │  └───────────┘  └─────────────────┘  └────────────┘  │    │
│  │        │                 ▲                           │    │
│  │        ▼                 │ lazy member Jobs          │    │
│  │  ┌─────────────────────────────────┐                 │    │
│  │  │ Compositions (DAGs w/ bindings) │                 │    │
│  │  └─────────────────────────────────┘                 │    │
│  └──────────────────────────────────────────────────────┘    │
│         │                │                                   │
│         ▼                ▼                                   │
│  ┌──────────────┐  ┌─────────────┐                           │
│  │ Empirical    │  │ Capability      │                           │
│  │ store+sysmon │  │ Manager     │                           │
│  └──────────────┘  └─────────────┘                           │
└──────────────────────────────────────────────────────────────┘

Key features:

Enums

JobStatus captures the lifecycle a job moves through. JobEventType enumerates the push-based events emitted on the CR-6 multi-subscriber event bus. CancelPhase surfaces the substrate’s cancel state machine (cooperative → force → reloading → completed) so UIs can render cancel progress.


CancelPhase


def CancelPhase(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Phase of a cancellation in progress (CR-6 + CR-4 pairing).

Surfaces the substrate’s cancel state machine. Stage 4 wires the transitions; Stage 1 reserves the enum so Job.cancel_phase can be typed correctly without dangling forward references.


JobEventType


def JobEventType(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Push-based job event types (CR-6; stage-3 composition rework; CR-14 journal-primary emission).

Emitted by JobQueue through the single emission path (_publish_event): journal-class events become durable journal rows AND fan out to live subscribers; liveness-class events (LIVENESS_EVENT_TYPES in core.journal_store) fan out only — their final values ride the terminal STATE_TRANSITION row. Consumers subscribe via queue.events(job_id) / queue.events_for_composition(comp_id) / queue.all_events() and receive JobEvent instances asynchronously.

COMPOSITION_ADVANCED replaced the retired SEQUENCE_ADVANCED at execution stage 3 (CR-16: compositions replace sequences outright): it fires when a member job’s completion unlocks downstream composition nodes — payload carries the completed node id + the newly enqueued node ids.

The reserved-never-emitted LOG_APPENDED was RETIRED at stage 7 (CR-14): log-follow is a diagnostics-store cursor read (get_job_diagnostics), not a push event — there are no log files or byte offsets anymore. Non-job substrate events (worker lifecycle, config, runs) live in core.journal_store.SubstrateEventType.


JobStatus


def JobStatus(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Status of a job in the queue.

JobQueueDependencies

The CR-6 substrate dependency Protocol that JobQueue requires. CapabilityManager satisfies this structurally, so existing hosts continue to work — passing a CapabilityManager to JobQueue(deps=...) still type-checks. The Protocol enables clean future extraction of JobQueue into a separate library per OQ-5 resolution. The audit’s locked design listed 3 methods; investigation surfaced more that the execute path actually consumes (get_capability_meta, get_capability, execute_capability_async, reload_capability) — CR-14 retired the original flat-log accessor, so it dropped out of the surface. The stage-3 admission methods and the stage-4 task-channel method are included too, so the Protocol genuinely captures the surface the queue uses.


JobQueueDependencies


def JobQueueDependencies(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Substrate dependencies the JobQueue requires (CR-6 + stage 3).

CapabilityManager satisfies this structurally; the Protocol exists so JobQueue can be tested in isolation (with a lightweight test double) and so a future extraction into a separate library has no API constraint locked in.

The first 4 methods are the CR-6 execute-path surface (CR-14 retired the original flat-log accessor — log retrieval is a diagnostics-store query now). The stage-3 additions (CR-16 multi-lane admission) are consumed DEFENSIVELY via getattr — a deps implementation without them yields no admission evidence, so every job runs exclusive = exact pre-stage-3 single-lane behavior. Older test doubles keep working unchanged. CR-14 also reads journal_store / diagnostics_store ATTRIBUTES via getattr when the queue isn’t constructed with explicit stores — a deps without them (test doubles) simply yields no journaling.

Job / JobEvent / QueueStats / _Subscription

Job is the queued execution unit (CR-6 reshape: capability_instance_id per CR-10, JobError typed error per CR-5, datetime timestamps; stage 3 replaced the sequence tags with composition tags — composition_id/node_id — alongside the cancel-phase + block-reason + retry fields). JobEvent is the push-bus payload — every event carries full tag context (job_id, optional composition_id/node_id, capability_instance_id) so consumers subscribed to any filter view get the same shape. QueueStats is the aggregate-counts shape returned by get_stats(). _Subscription is the internal per-subscriber bounded-queue + drop-counter holder.

Job keeps backward-compat @property aliases for capability_name (returns capability_instance_id) and created_at (returns submitted_at.timestamp()), tagged for SG-48 removal after the consumer cascade migrates.


QueueStats


def QueueStats(
    total_pending:int, total_completed:int, total_failed:int, total_cancelled:int
)->None:

Aggregate counts returned by JobQueue.get_stats() (CR-6).


JobEvent


def JobEvent(
    type:JobEventType, job_id:str, capability_instance_id:str, composition_id:Optional=None, node_id:Optional=None,
    run_id:Optional=None, actor:Optional=None, timestamp:datetime=<factory>, payload:Dict=<factory>
)->None:

A push-based job event (CR-6; stage-3 composition tags).

Carries full tag context so a subscriber to all_events(), events(job_id), or events_for_composition(comp_id) receives identically-shaped instances. payload is a per-event-type structured dict (e.g., STATE_TRANSITION carries {"from": "pending", "to": "running"}; COMPOSITION_ADVANCED carries {"completed_node": ..., "enqueued_nodes": [...]}).

run_id / actor (CR-14 follow-up) ride from the Job so journal rows written by the single emission path carry the host-tier correlation.


Job


def Job(
    id:str, capability_instance_id:str, args:Tuple, kwargs:Dict, status:JobStatus=<JobStatus.pending: 'pending'>,
    priority:int=0, submitted_at:datetime=<factory>, started_at:Optional=None, completed_at:Optional=None,
    progress:float=0.0, status_message:str='', result:Any=None, error:Optional=None, composition_id:Optional=None,
    node_id:Optional=None, task_name:Optional=None, method:Optional=None, run_id:Optional=None, actor:Optional=None,
    control:Dict=<factory>, cancel_requested_at:Optional=None, cancel_phase:Optional=None,
    block_reason:Optional=None, retry_count:int=0, last_resource_snapshot:Optional=None
)->None:

A queued capability execution request (CR-6 reshape; stage-3 composition rework renamed the sequence tags to composition tags).

composition_id / node_id are set when the job is a lazily-created member of a composition (CR-16) — they ride every JobEvent so events_for_composition subscribers see member lifecycle events.

run_id / actor (CR-14 follow-up) are host-tier correlation tags: cores pass their run-manifest id + initiating actor at submit so every journal row for this job links back to the run record (run manifest ↔︎ journal linkage) and carries who/what initiated it.

ResourceSnapshot

Point-in-time worker + GPU stats for one job, populated by sampling the worker proxy’s get_stats() and (optionally) the configured system-monitor capability’s CR-3 typed get_system_status() + list_processes() methods. Distinct from CR-7’s EmpiricalResourceRecord which aggregates samples across runs — ResourceSnapshot is the substrate’s “what’s happening right now” surface. Field shape mirrors cjm-fasthtml-job-monitor’s existing ResourceSnapshot so the cascade migration to the substrate-owned type is structural rather than semantic.


ResourceSnapshot


def ResourceSnapshot(
    timestamp:datetime, worker_pid:int=0, cpu_percent:float=0.0, memory_rss_mb:float=0.0, gpu_index:Optional=None,
    gpu_memory_mb:Optional=None, gpu_type:Optional=None, gpu_total_mb:Optional=None, gpu_load_percent:Optional=None
)->None:

Point-in-time resource usage for one job (CR-6 Stage 3).

Worker stats (cpu_percent, memory_rss_mb) come from the capability proxy’s get_stats(). GPU fields come from the configured system-monitor capability (when set on JobQueue) via CR-3’s typed list_processes() (per-PID matching) and get_system_status() (global GPU stats). All GPU fields are Optional — None if no sysmon configured or the worker isn’t running on a GPU.

Distinct from CR-7’s EmpiricalResourceRecord (aggregated profile across runs); this is “what’s happening right now” for one job.

JobQueue

Main queue class — submission, execution lifecycle, multi-subscriber event bus, observability. CR-6 reshape: takes deps: JobQueueDependencies (Protocol) instead of manager: CapabilityManager (concrete); adds bounded-queue multi-subscriber event-bus state.


JobQueue


def JobQueue(
    deps:JobQueueDependencies, # Substrate dependencies (CapabilityManager satisfies structurally)
    max_history:int=100, # Max completed jobs to retain
    cancel_timeout:float=3.0, # Seconds to wait for cooperative cancel
    progress_poll_interval:float=1.0, # Seconds between progress polls
    sysmon_capability_name:Optional=None, # CR-3 monitor capability instance for GPU stats (None = no GPU info)
    resource_snapshot_cadence_polls:int=4, # Sample resources every Nth progress poll
    max_concurrent_lanes:int=4, # Stage 3: max in-flight jobs (admission still gates each)
    gpu_headroom_fraction:float=0.9, # Stage 3: blunt GPU admission margin (budget = total * fraction)
    journal:Optional=None, # CR-14: journal sink; defaults to deps.journal_store (the manager's)
    diagnostics:Optional=None, # CR-14: diagnostics sink for get_job_diagnostics; defaults to deps.diagnostics_store
):

Resource-aware multi-lane job queue with journal-primary observability (CR-6 + CR-14; stage-3 CR-16 rework: ready-set dispatch + resource-derived admission + composition execution).

Job Submission


submit


async def submit(
    capability_instance_id:str, # Target capability instance (per CR-10)
    args:VAR_POSITIONAL, priority:int=0, # Higher = more urgent
    task:Optional=None, # Task-channel address: adapter task name (stage 4)
    method:Optional=None, # Task-channel address: adapter method (set with task)
    run_id:Optional=None, # Host-tier run correlation (CR-14 follow-up; reserved name, never a capability kwarg)
    actor:Optional=None, # Who/what initiated (CR-14 follow-up; reserved name)
    control:Optional=None, # Per-call control flags (force/cache-bypass); reserved name, never a capability kwarg
    kwargs:VAR_KEYWORD
)->str: # Returns job_id

Submit a job to the queue.

CR-2: rejects jobs for disabled capabilities at submit time (typed CapabilityDisabledError) so the failure surface matches CapabilityManager. execute_capability’s disabled gate. Submitting to a disabled capability would otherwise sit in the queue until execution, then raise — moving the check earlier gives operators an actionable signal immediately.

CR-6: no STATE_TRANSITION event fires at submit because the job is already in pending state at construction — there’s no transition to publish. The first STATE_TRANSITION fires when the processor loop moves the job pending → running.

CR-14: refuses loudly when the journal is wedged (see _check_journal_wedge). run_id/actor join priority/task/ method as reserved keyword names (they never reach capability kwargs): cores pass their run-manifest id + initiating actor so every journal row for this job carries the host-tier correlation.

Job Control


reorder


def reorder(
    job_id:str, # Job to move
    new_priority:int, # New priority value
)->bool: # True if reordered

Change the priority of a pending job.


cancel


async def cancel(
    job_id:str, # Job to cancel
)->bool: # True if cancelled

Cancel a pending or running job.

CR-6: publishes STATE_TRANSITION when a pending job moves directly to cancelled (no transition through running). Running-job cancellation publishes CANCEL_PHASE_CHANGED events from _execute_with_cancellation.

Stage 3: when the cancelled job is a composition member, _advance_composition runs after the lock is released so the cancelled status propagates to the composition run (dependents skip; the run finalizes when all nodes are terminal). Lock release is required because _advance_composition may need to enqueue downstream members in some flows; asyncio.Lock is not re-entrant.

Observation

CR-6 split the legacy get_state() UI-shaped dict into typed accessors: get_pending(), get_running_jobs(), get_history(limit), get_stats(). Both the get_state() shim and the single-lane get_running() affordance were removed at SG-48; stage 3’s multi-lane queue exposes get_running_jobs().

CR-14 (stage 7) replaced the log-retrieval surface outright: get_job_logs + the _slice_log_by_job_window timestamp-window heuristic (structurally unsound under stage-3 same-worker concurrency — interleaved windows) are DELETED; get_job_diagnostics(job_id) reads EXACT job-stamped records from the diagnostics store, and get_history_from_journal(limit) rehydrates the durable job history from terminal STATE_TRANSITION journal rows (the _history migration rider — restart-surviving, unlimited depth).


get_job


def get_job(
    job_id:str, # Job to retrieve
)->Optional: # Job or None

Get a job by ID.


wait_for_job


async def wait_for_job(
    job_id:str, # Job to wait for
    timeout:Optional=None, # Max seconds to wait
)->Job: # Completed/failed/cancelled job

Wait for a job to complete.

Independent of the CR-6 event bus — uses a per-job asyncio.Event for the simple block-until-done affordance. Streaming consumers should use events(job_id) instead.


get_pending


def get_pending(
    
)->List: # Pending jobs, priority-sorted

Get pending jobs, priority-sorted (higher priority first, then FIFO).


get_running_jobs


def get_running_jobs(
    
)->List: # All currently-executing jobs

All in-flight jobs (stage 3: the queue is multi-lane).


get_history


def get_history(
    limit:Optional=None, # Max jobs to return (most recent N); None = all
)->List: # Completed/failed/cancelled jobs, most recent first

Get completed jobs, most recent first.

If limit is provided, returns the most recent N. The internal history list grows append-only up to max_history, so older jobs are evicted in submission order (oldest first).


get_stats


def get_stats(
    
)->QueueStats: # Aggregate counts

Get aggregate queue stats — total counts by terminal status.


get_history_from_journal


def get_history_from_journal(
    limit:Optional=None, # Most recent N terminal jobs (None = all)
)->List: # Rehydrated job records, most recent first

Durable job history (the CR-14 _history migration rider).

Rehydrates Job records from terminal STATE_TRANSITION journal rows — restart-surviving and unbounded, unlike the in-memory get_history working set (max_history eviction). Rehydrated Jobs are RECORDS: args/kwargs/result are not journaled (results live in capability DBs; parameters in run manifests) — identity, timing, status, error, composition/task fields are present.

Falls back to the in-memory history when no journal is configured.


get_job_diagnostics


def get_job_diagnostics(
    job_id:str, # Job whose diagnostic records to read
    limit:Optional=200, # Max records (None = all)
    after_seq:Optional=None, # Tail cursor for follow-style reads
)->List: # Job-stamped records, oldest first

EXACT per-job diagnostics (CR-14; replaces get_job_logs).

Records were stamped with the job id IN THE WORKER via the call-envelope contextvar — no timestamp windows, no over-fetch, correct under stage-3 same-worker concurrency and across multi-instance capabilities (both of which the deleted _slice_log_by_job_window heuristic got wrong). Follow-style consumers poll with after_seq (the LOG_APPENDED replacement).

Returns [] when no diagnostics store is configured.

Event Bus

The CR-6 push-based event surface. Three subscriber views (events(job_id), events_for_composition(comp_id), all_events()) all share one fan-out path: each subscriber owns a bounded asyncio.Queue, and the publisher routes every event to all matching subscriber keys ("all", "job:<id>", "comp:<id>"). Slow subscribers backpressure themselves — the publisher never blocks. Composition events are tagged job events (each carries optional composition_id + node_id) rather than a separate topic, so a single subscription captures the full per-composition narrative including the COMPOSITION_ADVANCED aggregate signal.


all_events


def all_events(
    
)->AsyncIterator:

Subscribe to all events (firehose; async generator).

Useful for global dashboards, audit logs, and telemetry sinks that need the complete event stream rather than a filtered view.


events_for_composition


def events_for_composition(
    composition_id:str, # Filter to events tagged with this composition
)->AsyncIterator:

Subscribe to events for all member jobs of a composition (async generator).

Yields the unified per-composition narrative: member-job lifecycle events interleaved with COMPOSITION_ADVANCED aggregate signals (stage 3).


events


def events(
    job_id:str, # Filter to events for this job
)->AsyncIterator:

Subscribe to events for a single job (async generator).

Yields events as they fire. Multiple concurrent subscribers to the same job_id each get their own independent stream — useful for multi-tab UIs. Late subscribers catch up exactly via the journal: query journal.query(job_id=..., after_seq=cursor) then follow live — the bus is a tail of the journal, not the record itself (CR-14).

Composition Submission & Control

Stage 3 (CR-16): compositions — DAGs of capability-invocation nodes with execution-time-bound inputs — REPLACE the pre-bound submit_sequence batch model outright (the pass-2 Thread-4 decision; the 2026-05-31 opt-in framing was superseded). The data model, validation, and binding resolution live in core/ports.ipynb; the queue owns submission, lazy member-Job creation, advancement, cancellation, and finalization.


submit_composition


async def submit_composition(
    comp:Composition, # Composition to run (validated at submit)
)->str: # composition run id

Submit a composition — a DAG of capability-invocation nodes with execution-time-bound inputs (stage 3; replaces submit_sequence).

Validates upfront: structural validation via new_composition_run (duplicate ids / unknown refs / cycles → CompositionValidationError) and the disabled-capability gate across all nodes (CapabilityDisabledError), matching the sequence-era precedent. Member Jobs are created LAZILY — only dependency-free nodes have Jobs at submit; downstream nodes get their kwargs materialized from upstream results at advancement time.

CR-14: refuses loudly when the journal is wedged (the same gate as submit).

Consumers wait via wait_for_composition, observe via events_for_composition, inspect via get_composition.


wait_for_composition


async def wait_for_composition(
    composition_id:str, # Composition to wait for
    timeout:Optional=None, # Max seconds to wait
)->CompositionRun: # Terminal run record

Block until a composition reaches terminal status (the wait_for_job analog for compositions).


cancel_composition


async def cancel_composition(
    composition_id:str, # Composition to cancel
)->bool: # True if cancellation was recorded

Cancel an in-flight composition (USER intent — the run lands cancelled).

Records intent FIRST (cancel_requested) so member-cancel callbacks racing through _advance_composition derive the right terminal status; then marks never-started nodes cancelled and cancels every member whose Job is still pending or running (in-flight members resolve through the per-job cooperative-cancel machinery and finalize the run on the way out). Returns False if the composition is unknown or already terminal.


get_composition


def get_composition(
    composition_id:str, # Composition to retrieve
)->Optional: # CompositionRun or None

Get a composition run by id (read-only inspection).

Resource Snapshots

get_resource_snapshot(job_id) returns a point-in-time ResourceSnapshot for a job, composing worker proxy get_stats() with the configured sysmon capability’s CR-3 typed list_processes() (per-PID matching) and get_system_status() (global GPU stats). _sample_resource_snapshot is the internal helper invoked both from this method and from _poll_progress (which emits RESOURCE_SNAPSHOT events at resource_snapshot_cadence_polls cadence). All sysmon calls are best-effort — failures are swallowed and the snapshot returns with worker stats only.


get_resource_snapshot


def get_resource_snapshot(
    job_id:str, # Job to sample resources for
)->Optional:

Get a point-in-time resource snapshot for a job (CR-6 Stage 3).

Returns None if the job is unknown or the worker proxy doesn’t expose get_stats. Composes worker stats with sysmon GPU stats when the queue is configured with a sysmon_capability_name.

Lifecycle


start


async def start(
    
)->None:

Start the queue processor.

CR-6 Stage 4: installs the substrate-side retry observer on _deps (typically a CapabilityManager). When CR-7’s reactive-retry path fires for a running job, the observer updates Job.retry_count and publishes a RETRY_STARTED event tagged with the in-flight job. Previous observer value (if any) is saved + restored in stop() for cooperative coexistence.


stop


async def stop(
    
)->None:

Stop the queue processor gracefully.

Stage 3: in-flight job tasks are detached lanes now — drain them with the same 5s budget as the processor task; leftovers are cancelled.

CR-6 Stage 4: restores the previous _on_retry observer on deps to leave the manager in the state we found it (cooperative with other queue instances, tests, etc.).

Internal Methods

Multi-lane dispatch + resource-derived admission

Stage 3 replaced the pre-overhaul single-lane loop (one inline _execute_job at a time, a single self._running slot) with ready-set dispatch: the loop launches every job the admission ladder allows, as independent tasks. Admission derives from the CR-7 empirical store (per (instance_id, config_hash) measured peaks) + live sysmon telemetry — never from declared metadata. A job with no empirical evidence runs EXCLUSIVE: its first run is its measurement run, after which the store’s keying graduates it automatically. Worst case (empty store, no sysmon) degrades to exactly the old single-lane behavior; CR-7 reactive retry backstops admission misses.

Usage Example

from cjm_substrate.core.manager import CapabilityManager
from cjm_substrate.core.queue import JobQueue, JobStatus
from cjm_substrate.core.scheduling import QueueScheduler

# Setup
manager = CapabilityManager(scheduler=QueueScheduler())
manager.discover_manifests()
manager.load_capability(manager.get_discovered_meta("sys-mon"))
manager.register_system_monitor("sys-mon")

# Create queue
queue = JobQueue(manager)
await queue.start()

# Submit jobs
job1_id = await queue.submit("whisper", audio="/path/to/audio1.mp3")
job2_id = await queue.submit("gemini-vision", image="/path/to/image.png")
job3_id = await queue.submit("whisper", audio="/path/to/audio2.mp3", priority=10)

# Monitor queue state (typed accessors)
print(f"Running: {queue.get_running_jobs()}")
print(f"Pending: {len(queue.get_pending())} jobs")

# Cancel a job
await queue.cancel(job2_id)

# Wait for a job to complete
job1 = await queue.wait_for_job(job1_id)
if job1.status == JobStatus.completed:
    print(job1.result)

# Cleanup
await queue.stop()
manager.unload_all()
# Task-channel routing test (stage 4, CR-17 pt 2): submit validation + the
# executor routing task-addressed jobs via execute_capability_task_async while
# execute-channel jobs stay on execute_capability_async.
import asyncio as _aio


class _TaskChannelDeps:
    def __init__(self):
        self.task_calls = []
        self.exec_calls = []

    def get_capability_meta(self, name_or_id):
        return None

    def get_capability(self, name_or_id):
        return object()  # non-None: the executor's loaded-check passes; no cancel paths used

    async def execute_capability_async(self, name_or_id, *args, **kwargs):
        self.exec_calls.append((name_or_id, args, kwargs))
        return {"channel": "execute"}

    async def execute_capability_task_async(self, name_or_id, task_name, method, **kwargs):
        self.task_calls.append((name_or_id, task_name, method, kwargs))
        return {"channel": "task"}

    def reload_capability(self, name_or_id):
        pass


async def _run_task_channel_test():
    deps = _TaskChannelDeps()
    q = JobQueue(deps)
    await q.start()
    try:
        jid = await q.submit("graph", task="graph-storage", method="query_nodes",
                             query={"type": "node_query"})
        job = await q.wait_for_job(jid, timeout=10)
        assert job.status == JobStatus.completed, job.error
        assert job.result == {"channel": "task"}
        assert deps.task_calls == [
            ("graph", "graph-storage", "query_nodes", {"query": {"type": "node_query"}})]
        # execute channel unchanged
        jid2 = await q.submit("graph", "posarg", key="val")
        job2 = await q.wait_for_job(jid2, timeout=10)
        assert job2.result == {"channel": "execute"}
        assert deps.exec_calls == [("graph", ("posarg",), {"key": "val"})]
        # validation: task without method refuses at submit
        from cjm_substrate.core.errors import CapabilityInputError as _PIE
        try:
            await q.submit("graph", task="graph-storage")
            raise AssertionError("should have raised")
        except _PIE:
            pass
    finally:
        await q.stop()

_aio.run(_run_task_channel_test())