Job Queue

Resource-aware job queue for sequential plugin execution with cancellation support

The JobQueue provides a resource-aware job queue for plugin execution:

┌─────────────────────────────────────────────────────────────┐
│                    User Application                         │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                    JobQueue                         │    │
│  │  ┌───────────┐  ┌───────────┐  ┌───────────────┐    │    │
│  │  │ Pending   │  │ Running   │  │ History       │    │    │
│  │  │ Jobs      │→ │ Job       │→ │ (completed/   │    │    │
│  │  │ (heap)    │  │           │  │  cancelled)   │    │    │
│  │  └───────────┘  └───────────┘  └───────────────┘    │    │
│  └─────────────────────────────────────────────────────┘    │
│         │                │                                  │
│         ▼                ▼                                  │
│  ┌─────────────┐  ┌─────────────┐                           │
│  │ Scheduler   │  │ Plugin      │                           │
│  │ (policy)    │  │ Manager     │                           │
│  └─────────────┘  └─────────────┘                           │
└─────────────────────────────────────────────────────────────┘

Key features:

Enums

JobStatus captures the lifecycle a job moves through. JobEventType enumerates the push-based events emitted on the CR-6 multi-subscriber event bus. CancelPhase surfaces the substrate’s cancel state machine (cooperative → force → reloading → completed) so UIs can render cancel progress.


CancelPhase


def CancelPhase(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Phase of a cancellation in progress (CR-6 + CR-4 pairing).

Surfaces the substrate’s cancel state machine. Stage 4 wires the transitions; Stage 1 reserves the enum so Job.cancel_phase can be typed correctly without dangling forward references.


JobEventType


def JobEventType(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Push-based job event types (CR-6).

Emitted by JobQueue on a multi-subscriber event bus. Consumers subscribe via queue.events(job_id) / queue.events_for_sequence(seq_id) / queue.all_events() and receive JobEvent instances asynchronously.

Stage 1 wires STATE_TRANSITION + PROGRESS_CHANGED at the existing execute-path lifecycle points. The remaining types are reserved for Stages 2-4 (sequences / resources + logs / cancel-phase + retry + block-reason). Reserving the enum values up front keeps later stages additive — new event sources publish without changing the enum.


JobStatus


def JobStatus(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Status of a job in the queue.

JobQueueDependencies

The CR-6 substrate dependency Protocol that JobQueue requires. PluginManager satisfies this structurally, so existing hosts continue to work — passing a PluginManager to JobQueue(deps=...) still type-checks. The Protocol enables clean future extraction of JobQueue into a separate library per OQ-5 resolution. The audit’s locked design listed 3 methods; investigation surfaced 2 more (get_plugin, reload_plugin, get_plugin_logs) that are consumed in the current execute path. All 5 are included so the Protocol genuinely captures the surface the queue uses.


JobQueueDependencies


def JobQueueDependencies(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Substrate dependencies the JobQueue requires (CR-6).

PluginManager satisfies this structurally; the Protocol exists so JobQueue can be tested in isolation (with a lightweight test double) and so a future extraction into a separate library has no API constraint locked in.

Method surface chosen empirically — these are the exact 5 methods the queue’s execute path calls today. Adding new dependencies to the Protocol (e.g., a typed system-monitor reader when Stage 3 wires resource snapshots) is an additive change.

Job / JobEvent / QueueStats / _Subscription

Job is the queued execution unit (CR-6 reshape: plugin_instance_id per CR-10, JobError typed error per CR-5, datetime timestamps, sequence + cancel-phase + block-reason + retry fields reserved for later stages). JobEvent is the push-bus payload — every event carries full tag context (job_id, optional sequence_id/sequence_index, plugin_instance_id) so consumers subscribed to any filter view get the same shape. QueueStats is the aggregate-counts shape returned by get_stats(). _Subscription is the internal per-subscriber bounded-queue + drop-counter holder.

Job keeps backward-compat @property aliases for plugin_name (returns plugin_instance_id) and created_at (returns submitted_at.timestamp()), tagged for SG-48 removal after the consumer cascade migrates.


QueueStats


def QueueStats(
    total_pending:int, total_completed:int, total_failed:int, total_cancelled:int
)->None:

Aggregate counts returned by JobQueue.get_stats() (CR-6).


JobEvent


def JobEvent(
    type:JobEventType, job_id:str, plugin_instance_id:str, sequence_id:Optional=None, sequence_index:Optional=None,
    timestamp:datetime=<factory>, payload:Dict=<factory>
)->None:

A push-based job event (CR-6).

Carries full tag context so a subscriber to all_events(), events(job_id), or events_for_sequence(seq_id) receives identically-shaped instances. payload is a per-event-type structured dict (e.g., STATE_TRANSITION carries {"from": "pending", "to": "running"}; PROGRESS_CHANGED carries {"progress": 0.42, "status_message": "..."}).


Job


def Job(
    id:str, plugin_instance_id:str, args:Tuple, kwargs:Dict, status:JobStatus=<JobStatus.pending: 'pending'>,
    priority:int=0, submitted_at:datetime=<factory>, started_at:Optional=None, completed_at:Optional=None,
    progress:float=0.0, status_message:str='', result:Any=None, error:Optional=None, sequence_id:Optional=None,
    sequence_index:Optional=None, cancel_requested_at:Optional=None, cancel_phase:Optional=None,
    block_reason:Optional=None, retry_count:int=0, last_resource_snapshot:Optional=None
)->None:

A queued plugin execution request (CR-6 reshape).

Stage 1 lands the full field set so subsequent stages are purely additive on the population paths. Fields tagged “Stage N” are typed but unpopulated by Stage 1’s execute path; their default values keep the dataclass safely constructible from existing call sites.

Sequences

SequenceStep is the user-facing per-step definition. StepResult is one member-job’s outcome in the sequence’s results list. JobSequence is the internal tracker — registered in JobQueue._sequences and traversed by the substrate as member jobs complete. Sequences ship the audit-locked design’s “fail-fast default + opt-in best-effort” semantics: fail_fast=True halts on the first failure; fail_fast=False records each step’s outcome (success or failure) and continues. Cancellation always halts the sequence regardless of fail_fast.


JobSequence


def JobSequence(
    id:str, steps:List, fail_fast:bool=True, priority:int=0, current_index:int=0, current_job_id:Optional=None,
    results:List=<factory>, status:JobStatus=<JobStatus.running: 'running'>, submitted_at:datetime=<factory>,
    completed_at:Optional=None
)->None:

Internal: tracks a multi-step job sequence (CR-6 Stage 2).

Lives in JobQueue._sequences. State machine: starts running, transitions to completed (all steps attempted) / failed (fail_fast halted on a failed member) / cancelled (cancel_sequence called or a member was cancelled). Once terminal, completed_at is set and current_job_id is cleared.


StepResult


def StepResult(
    job_id:str, success:bool, result:Any=None, error:Optional=None
)->None:

Result of one step in a job sequence (CR-6 Stage 2).

success=True means the member-job completed with JobStatus.completed. Failure modes (failed / cancelled) capture the JobError in error. For best-effort sequences (fail_fast=False), failure rows are recorded in JobSequence.results and the sequence continues.


SequenceStep


def SequenceStep(
    plugin_instance_id:str, args:Tuple=(), kwargs:Dict=<factory>, priority:int=0
)->None:

A single step in a job sequence (CR-6 Stage 2).

User-facing — supplied to JobQueue.submit_sequence. The substrate constructs the actual Job from this at advance time, tagging the job with sequence_id + sequence_index so it appears in events_for_sequence subscriptions.

ResourceSnapshot

Point-in-time worker + GPU stats for one job, populated by sampling the worker proxy’s get_stats() and (optionally) the configured system-monitor plugin’s CR-3 typed get_system_status() + list_processes() methods. Distinct from CR-7’s EmpiricalResourceRecord which aggregates samples across runs — ResourceSnapshot is the substrate’s “what’s happening right now” surface. Field shape mirrors cjm-fasthtml-job-monitor’s existing ResourceSnapshot so the cascade migration to the substrate-owned type is structural rather than semantic.


ResourceSnapshot


def ResourceSnapshot(
    timestamp:datetime, worker_pid:int=0, cpu_percent:float=0.0, memory_rss_mb:float=0.0, gpu_index:Optional=None,
    gpu_memory_mb:Optional=None, gpu_type:Optional=None, gpu_total_mb:Optional=None, gpu_load_percent:Optional=None
)->None:

Point-in-time resource usage for one job (CR-6 Stage 3).

Worker stats (cpu_percent, memory_rss_mb) come from the plugin proxy’s get_stats(). GPU fields come from the configured system-monitor plugin (when set on JobQueue) via CR-3’s typed list_processes() (per-PID matching) and get_system_status() (global GPU stats). All GPU fields are Optional — None if no sysmon configured or the worker isn’t running on a GPU.

Distinct from CR-7’s EmpiricalResourceRecord (aggregated profile across runs); this is “what’s happening right now” for one job.

JobQueue

Main queue class — submission, execution lifecycle, multi-subscriber event bus, observability. CR-6 reshape: takes deps: JobQueueDependencies (Protocol) instead of manager: PluginManager (concrete); adds bounded-queue multi-subscriber event-bus state.


JobQueue


def JobQueue(
    deps:JobQueueDependencies, # Substrate dependencies (PluginManager satisfies structurally)
    max_history:int=100, # Max completed jobs to retain
    cancel_timeout:float=3.0, # Seconds to wait for cooperative cancel
    progress_poll_interval:float=1.0, # Seconds between progress polls
    sysmon_plugin_name:Optional=None, # CR-3 MonitorPlugin instance for GPU stats (None = no GPU info)
    resource_snapshot_cadence_polls:int=4, # Sample resources every Nth progress poll
):

Resource-aware job queue with push-based observability (CR-6).

Job Submission


submit


async def submit(
    plugin_instance_id:str, # Target plugin instance (per CR-10)
    args:VAR_POSITIONAL, priority:int=0, # Higher = more urgent
    kwargs:VAR_KEYWORD
)->str: # Returns job_id

Submit a job to the queue.

CR-2: rejects jobs for disabled plugins at submit time (typed PluginDisabledError) so the failure surface matches PluginManager. execute_plugin’s disabled gate. Submitting to a disabled plugin would otherwise sit in the queue until execution, then raise — moving the check earlier gives operators an actionable signal immediately.

CR-6: no STATE_TRANSITION event fires at submit because the job is already in pending state at construction — there’s no transition to publish. The first STATE_TRANSITION fires when the processor loop moves the job pending → running.

Job Control


reorder


def reorder(
    job_id:str, # Job to move
    new_priority:int, # New priority value
)->bool: # True if reordered

Change the priority of a pending job.


cancel


async def cancel(
    job_id:str, # Job to cancel
)->bool: # True if cancelled

Cancel a pending or running job.

CR-6: publishes STATE_TRANSITION when a pending job moves directly to cancelled (no transition through running). Running-job cancellation publishes CANCEL_PHASE_CHANGED events from _execute_with_cancellation when Stage 4 wires the phase transitions; Stage 1 just records the cancel-request marker.

Stage 2: when the cancelled job is a sequence member, _advance_sequence runs after the lock is released so the cancelled status propagates to the sequence registry (marks sequence cancelled, no further submission). Lock release is required because _advance_sequence may need to enqueue a next member in some flows; asyncio.Lock is not re-entrant.

Observation

CR-6 splits the legacy get_state() UI-shaped dict into four typed accessors: get_pending(), get_running(), get_history(limit), get_stats(). The legacy get_state() remains as a REMOVE-AFTER-OVERHAUL shim (see the shim cell below this section) so existing UI consumers continue to work until SG-48 sweeps them. get_job_logs keeps the same delegation to JobQueueDependencies.get_plugin_logs — Stage 3 will scope log slicing by (started_at, completed_at).


get_job


def get_job(
    job_id:str, # Job to retrieve
)->Optional: # Job or None

Get a job by ID.


wait_for_job


async def wait_for_job(
    job_id:str, # Job to wait for
    timeout:Optional=None, # Max seconds to wait
)->Job: # Completed/failed/cancelled job

Wait for a job to complete.

Independent of the CR-6 event bus — uses a per-job asyncio.Event for the simple block-until-done affordance. Streaming consumers should use events(job_id) instead.


get_pending


def get_pending(
    
)->List: # Pending jobs, priority-sorted

Get pending jobs, priority-sorted (higher priority first, then FIFO).


get_running


def get_running(
    
)->Optional: # Running job or None

Get the currently-executing job, or None if idle.


get_history


def get_history(
    limit:Optional=None, # Max jobs to return (most recent N); None = all
)->List: # Completed/failed/cancelled jobs, most recent first

Get completed jobs, most recent first.

If limit is provided, returns the most recent N. The internal history list grows append-only up to max_history, so older jobs are evicted in submission order (oldest first).


get_stats


def get_stats(
    
)->QueueStats: # Aggregate counts

Get aggregate queue stats — total counts by terminal status.


get_job_logs


def get_job_logs(
    job_id:str, # Job to get logs for
    lines:int=100, # Max lines to return
)->str: # Log content scoped to this job's execution window

Get logs for a job, scoped to its (started_at, completed_at) window.

CR-6 Stage 3: replaces the legacy whole-plugin-log behavior. The substrate over-fetches the plugin log (lines * 5) and slices by the job’s execution window — the job-monitor library’s --- Starting marker heuristic in _filter_current_session becomes obsolete in cascade.

Falls back gracefully when timestamps are unparseable or the job’s window isn’t known: returns the raw tail.

Event Bus

The CR-6 push-based event surface. Three subscriber views (events(job_id), events_for_sequence(seq_id), all_events()) all share one fan-out path: each subscriber owns a bounded asyncio.Queue, and the publisher routes every event to all matching subscriber keys ("all", "job:<id>", "seq:<id>"). Slow subscribers backpressure themselves — the publisher never blocks. Sequence events are tagged job events (each carries optional sequence_id + sequence_index) rather than a separate topic, so a single subscription captures the full per-sequence narrative including the SEQUENCE_ADVANCED aggregate signal that Stage 2 will wire.


all_events


def all_events(
    
)->AsyncIterator:

Subscribe to all events (firehose; async generator).

Useful for global dashboards, audit logs, and telemetry sinks that need the complete event stream rather than a filtered view.


events_for_sequence


def events_for_sequence(
    sequence_id:str, # Filter to events tagged with this sequence
)->AsyncIterator:

Subscribe to events for all jobs in a sequence (async generator).

Yields the unified per-sequence narrative: member-job lifecycle events interleaved with SEQUENCE_ADVANCED aggregate signals (wired in Stage 2).


events


def events(
    job_id:str, # Filter to events for this job
)->AsyncIterator:

Subscribe to events for a single job (async generator).

Yields events as they fire. Multiple concurrent subscribers to the same job_id each get their own independent stream — useful for multi-tab UIs.

Sequence Submission & Control

submit_sequence registers a multi-step run and submits the first member job (tagged with sequence_id + sequence_index=0 before enqueue so its first STATE_TRANSITION already routes to sequence subscribers). submit_uniform_sequence is sugar for the common “same plugin, many arg sets” case. cancel_sequence halts the sequence and cancels the active member. get_sequence returns the live JobSequence (read-only inspection — substrate keeps it). _advance_sequence is the internal advancement helper invoked from _execute_job finalization (Stage 2 wiring in the internal cell) and from cancel()’s pending-member path.


submit_sequence


async def submit_sequence(
    steps:List, # Sequence steps to run in order
    priority:int=0, # Sequence-level priority (per-step override possible)
    fail_fast:bool=True, # Halt on first failure (audit-locked default)
)->str: # sequence_id

Submit a multi-step job sequence (CR-6 Stage 2).

Validates all step plugins upfront (PluginDisabledError if any are disabled) so operators get an immediate failure signal rather than discovering the problem mid-sequence. The first member-job is tagged with sequence fields BEFORE enqueue, so its first STATE_TRANSITION already routes through sequence-tagged subscribers.

Returns the sequence_id; consumers subscribe via queue.events_for_sequence(sequence_id) or query state via queue.get_sequence(sequence_id).


submit_uniform_sequence


async def submit_uniform_sequence(
    plugin_instance_id:str, # Plugin every step targets
    args_list:List, # Per-step positional args
    kwargs_list:Optional=None, # Per-step keyword args (default: empty)
    priority:int=0, # Sequence-level priority
    fail_fast:bool=True, # Halt on first failure (audit-locked default)
)->str: # sequence_id

Submit a sequence where every step targets the same plugin (CR-6 Stage 2).

Sugar over submit_sequence for the common “same plugin, many arg sets” case — multi-source transcription / batch processing / parameter sweeps.


cancel_sequence


async def cancel_sequence(
    sequence_id:str, # Sequence to cancel
)->bool: # True if cancellation was recorded

Cancel an in-flight sequence (CR-6 Stage 2).

Marks the sequence as cancelled (no further advancement) and cancels the current member job. Returns False if the sequence is unknown or already terminal; True if cancellation was recorded.

Sets seq.status = JobStatus.cancelled BEFORE issuing the member-job cancel. This prevents a race where the member completes between the intent record and the cancel call — _advance_sequence checks seq.status == cancelled first and halts regardless of member outcome.


get_sequence


def get_sequence(
    sequence_id:str, # Sequence to retrieve
)->Optional: # JobSequence or None

Get a job sequence by ID (read-only inspection).

Resource Snapshots

get_resource_snapshot(job_id) returns a point-in-time ResourceSnapshot for a job, composing worker proxy get_stats() with the configured sysmon plugin’s CR-3 typed list_processes() (per-PID matching) and get_system_status() (global GPU stats). _sample_resource_snapshot is the internal helper invoked both from this method and from _poll_progress (which emits RESOURCE_SNAPSHOT events at resource_snapshot_cadence_polls cadence). All sysmon calls are best-effort — failures are swallowed and the snapshot returns with worker stats only.


get_resource_snapshot


def get_resource_snapshot(
    job_id:str, # Job to sample resources for
)->Optional:

Get a point-in-time resource snapshot for a job (CR-6 Stage 3).

Returns None if the job is unknown or the worker proxy doesn’t expose get_stats. Composes worker stats with sysmon GPU stats when the queue is configured with a sysmon_plugin_name.

Lifecycle


start


async def start(
    
)->None:

Start the queue processor.

CR-6 Stage 4: installs the substrate-side retry observer on _deps (typically a PluginManager). When CR-7’s reactive-retry path fires for a running job, the observer updates Job.retry_count and publishes a RETRY_STARTED event tagged with the in-flight job. Previous observer value (if any) is saved + restored in stop() for cooperative coexistence.


stop


async def stop(
    
)->None:

Stop the queue processor gracefully.

CR-6 Stage 4: restores the previous _on_retry observer on deps to leave the manager in the state we found it (cooperative with other queue instances, tests, etc.).

Internal Methods

Legacy get_state shim (REMOVE-AFTER-OVERHAUL)

The pre-CR-6 dict shape that existing UI consumers (notably the cjm-fasthtml-job-monitor library) read today. Reshape into typed accessors (get_pending / get_running / get_history / get_stats) is canonical; this shim converts those back to the legacy dict shape until the consumer cascade migrates. SG-48 removes it.


get_state


def get_state(
    
)->Dict: # Queue state for UI (legacy shape)

Get the current queue state in the pre-CR-6 dict shape.

REMOVE-AFTER-OVERHAUL: kept so existing UI consumers (notably cjm-fasthtml-job-monitor) continue to work until they migrate to the typed get_pending / get_running / get_history / get_stats accessors. Reads from the new accessors and reshapes:

  • Legacy keys (plugin_name, created_at) read from the renamed fields via the Job dataclass’s backward-compat @property aliases.
  • Legacy error was a bare string; new is JobError. Shim returns error.message to preserve the string shape.
  • Legacy timestamps were unix floats; new are datetime. Shim returns .timestamp() to preserve float shape.

Usage Example

from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.queue import JobQueue, JobStatus
from cjm_plugin_system.core.scheduling import QueueScheduler

# Setup
manager = PluginManager(scheduler=QueueScheduler())
manager.discover_manifests()
manager.load_plugin(manager.get_discovered_meta("sys-mon"))
manager.register_system_monitor("sys-mon")

# Create queue
queue = JobQueue(manager)
await queue.start()

# Submit jobs
job1_id = await queue.submit("whisper", audio="/path/to/audio1.mp3")
job2_id = await queue.submit("gemini-vision", image="/path/to/image.png")
job3_id = await queue.submit("whisper", audio="/path/to/audio2.mp3", priority=10)

# Monitor queue state
state = queue.get_state()
print(f"Running: {state['running']}")
print(f"Pending: {len(state['pending'])} jobs")

# Cancel a job
await queue.cancel(job2_id)

# Wait for a job to complete
job1 = await queue.wait_for_job(job1_id)
if job1.status == JobStatus.completed:
    print(job1.result)

# Cleanup
await queue.stop()
manager.unload_all()