Job Queue
The JobQueue provides a resource-aware job queue for plugin execution:
┌─────────────────────────────────────────────────────────────┐
│ User Application │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ JobQueue │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────────┐ │ │
│ │ │ Pending │ │ Running │ │ History │ │ │
│ │ │ Jobs │→ │ Job │→ │ (completed/ │ │ │
│ │ │ (heap) │ │ │ │ cancelled) │ │ │
│ │ └───────────┘ └───────────┘ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Scheduler │ │ Plugin │ │
│ │ (policy) │ │ Manager │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
Key features:
- Priority queue: Higher priority jobs execute first (FIFO within same priority)
- Resource-aware: Waits for resources, triggers eviction of idle plugins
- Cancellation: Cancel pending or running jobs with graceful fallback to force termination
- Progress tracking: Poll progress and status messages during execution
- Observable: Get queue state for UI integration
Enums
JobStatus captures the lifecycle a job moves through. JobEventType enumerates the push-based events emitted on the CR-6 multi-subscriber event bus. CancelPhase surfaces the substrate’s cancel state machine (cooperative → force → reloading → completed) so UIs can render cancel progress.
CancelPhase
def CancelPhase(
args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):
Phase of a cancellation in progress (CR-6 + CR-4 pairing).
Surfaces the substrate’s cancel state machine. Stage 4 wires the transitions; Stage 1 reserves the enum so Job.cancel_phase can be typed correctly without dangling forward references.
JobEventType
def JobEventType(
args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):
Push-based job event types (CR-6).
Emitted by JobQueue on a multi-subscriber event bus. Consumers subscribe via queue.events(job_id) / queue.events_for_sequence(seq_id) / queue.all_events() and receive JobEvent instances asynchronously.
Stage 1 wires STATE_TRANSITION + PROGRESS_CHANGED at the existing execute-path lifecycle points. The remaining types are reserved for Stages 2-4 (sequences / resources + logs / cancel-phase + retry + block-reason). Reserving the enum values up front keeps later stages additive — new event sources publish without changing the enum.
JobStatus
def JobStatus(
args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):
Status of a job in the queue.
JobQueueDependencies
The CR-6 substrate dependency Protocol that JobQueue requires. PluginManager satisfies this structurally, so existing hosts continue to work — passing a PluginManager to JobQueue(deps=...) still type-checks. The Protocol enables clean future extraction of JobQueue into a separate library per OQ-5 resolution. The audit’s locked design listed 3 methods; investigation surfaced 2 more (get_plugin, reload_plugin, get_plugin_logs) that are consumed in the current execute path. All 5 are included so the Protocol genuinely captures the surface the queue uses.
JobQueueDependencies
def JobQueueDependencies(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Substrate dependencies the JobQueue requires (CR-6).
PluginManager satisfies this structurally; the Protocol exists so JobQueue can be tested in isolation (with a lightweight test double) and so a future extraction into a separate library has no API constraint locked in.
Method surface chosen empirically — these are the exact 5 methods the queue’s execute path calls today. Adding new dependencies to the Protocol (e.g., a typed system-monitor reader when Stage 3 wires resource snapshots) is an additive change.
Job / JobEvent / QueueStats / _Subscription
Job is the queued execution unit (CR-6 reshape: plugin_instance_id per CR-10, JobError typed error per CR-5, datetime timestamps, sequence + cancel-phase + block-reason + retry fields reserved for later stages). JobEvent is the push-bus payload — every event carries full tag context (job_id, optional sequence_id/sequence_index, plugin_instance_id) so consumers subscribed to any filter view get the same shape. QueueStats is the aggregate-counts shape returned by get_stats(). _Subscription is the internal per-subscriber bounded-queue + drop-counter holder.
Job keeps backward-compat @property aliases for plugin_name (returns plugin_instance_id) and created_at (returns submitted_at.timestamp()), tagged for SG-48 removal after the consumer cascade migrates.
QueueStats
def QueueStats(
total_pending:int, total_completed:int, total_failed:int, total_cancelled:int
)->None:
Aggregate counts returned by JobQueue.get_stats() (CR-6).
JobEvent
def JobEvent(
type:JobEventType, job_id:str, plugin_instance_id:str, sequence_id:Optional=None, sequence_index:Optional=None,
timestamp:datetime=<factory>, payload:Dict=<factory>
)->None:
A push-based job event (CR-6).
Carries full tag context so a subscriber to all_events(), events(job_id), or events_for_sequence(seq_id) receives identically-shaped instances. payload is a per-event-type structured dict (e.g., STATE_TRANSITION carries {"from": "pending", "to": "running"}; PROGRESS_CHANGED carries {"progress": 0.42, "status_message": "..."}).
Job
def Job(
id:str, plugin_instance_id:str, args:Tuple, kwargs:Dict, status:JobStatus=<JobStatus.pending: 'pending'>,
priority:int=0, submitted_at:datetime=<factory>, started_at:Optional=None, completed_at:Optional=None,
progress:float=0.0, status_message:str='', result:Any=None, error:Optional=None, sequence_id:Optional=None,
sequence_index:Optional=None, cancel_requested_at:Optional=None, cancel_phase:Optional=None,
block_reason:Optional=None, retry_count:int=0, last_resource_snapshot:Optional=None
)->None:
A queued plugin execution request (CR-6 reshape).
Stage 1 lands the full field set so subsequent stages are purely additive on the population paths. Fields tagged “Stage N” are typed but unpopulated by Stage 1’s execute path; their default values keep the dataclass safely constructible from existing call sites.
Sequences
SequenceStep is the user-facing per-step definition. StepResult is one member-job’s outcome in the sequence’s results list. JobSequence is the internal tracker — registered in JobQueue._sequences and traversed by the substrate as member jobs complete. Sequences ship the audit-locked design’s “fail-fast default + opt-in best-effort” semantics: fail_fast=True halts on the first failure; fail_fast=False records each step’s outcome (success or failure) and continues. Cancellation always halts the sequence regardless of fail_fast.
JobSequence
def JobSequence(
id:str, steps:List, fail_fast:bool=True, priority:int=0, current_index:int=0, current_job_id:Optional=None,
results:List=<factory>, status:JobStatus=<JobStatus.running: 'running'>, submitted_at:datetime=<factory>,
completed_at:Optional=None
)->None:
Internal: tracks a multi-step job sequence (CR-6 Stage 2).
Lives in JobQueue._sequences. State machine: starts running, transitions to completed (all steps attempted) / failed (fail_fast halted on a failed member) / cancelled (cancel_sequence called or a member was cancelled). Once terminal, completed_at is set and current_job_id is cleared.
StepResult
def StepResult(
job_id:str, success:bool, result:Any=None, error:Optional=None
)->None:
Result of one step in a job sequence (CR-6 Stage 2).
success=True means the member-job completed with JobStatus.completed. Failure modes (failed / cancelled) capture the JobError in error. For best-effort sequences (fail_fast=False), failure rows are recorded in JobSequence.results and the sequence continues.
SequenceStep
def SequenceStep(
plugin_instance_id:str, args:Tuple=(), kwargs:Dict=<factory>, priority:int=0
)->None:
A single step in a job sequence (CR-6 Stage 2).
User-facing — supplied to JobQueue.submit_sequence. The substrate constructs the actual Job from this at advance time, tagging the job with sequence_id + sequence_index so it appears in events_for_sequence subscriptions.
ResourceSnapshot
Point-in-time worker + GPU stats for one job, populated by sampling the worker proxy’s get_stats() and (optionally) the configured system-monitor plugin’s CR-3 typed get_system_status() + list_processes() methods. Distinct from CR-7’s EmpiricalResourceRecord which aggregates samples across runs — ResourceSnapshot is the substrate’s “what’s happening right now” surface. Field shape mirrors cjm-fasthtml-job-monitor’s existing ResourceSnapshot so the cascade migration to the substrate-owned type is structural rather than semantic.
ResourceSnapshot
def ResourceSnapshot(
timestamp:datetime, worker_pid:int=0, cpu_percent:float=0.0, memory_rss_mb:float=0.0, gpu_index:Optional=None,
gpu_memory_mb:Optional=None, gpu_type:Optional=None, gpu_total_mb:Optional=None, gpu_load_percent:Optional=None
)->None:
Point-in-time resource usage for one job (CR-6 Stage 3).
Worker stats (cpu_percent, memory_rss_mb) come from the plugin proxy’s get_stats(). GPU fields come from the configured system-monitor plugin (when set on JobQueue) via CR-3’s typed list_processes() (per-PID matching) and get_system_status() (global GPU stats). All GPU fields are Optional — None if no sysmon configured or the worker isn’t running on a GPU.
Distinct from CR-7’s EmpiricalResourceRecord (aggregated profile across runs); this is “what’s happening right now” for one job.
JobQueue
Main queue class — submission, execution lifecycle, multi-subscriber event bus, observability. CR-6 reshape: takes deps: JobQueueDependencies (Protocol) instead of manager: PluginManager (concrete); adds bounded-queue multi-subscriber event-bus state.
JobQueue
def JobQueue(
deps:JobQueueDependencies, # Substrate dependencies (PluginManager satisfies structurally)
max_history:int=100, # Max completed jobs to retain
cancel_timeout:float=3.0, # Seconds to wait for cooperative cancel
progress_poll_interval:float=1.0, # Seconds between progress polls
sysmon_plugin_name:Optional=None, # CR-3 MonitorPlugin instance for GPU stats (None = no GPU info)
resource_snapshot_cadence_polls:int=4, # Sample resources every Nth progress poll
):
Resource-aware job queue with push-based observability (CR-6).
Job Submission
submit
async def submit(
plugin_instance_id:str, # Target plugin instance (per CR-10)
args:VAR_POSITIONAL, priority:int=0, # Higher = more urgent
kwargs:VAR_KEYWORD
)->str: # Returns job_id
Submit a job to the queue.
CR-2: rejects jobs for disabled plugins at submit time (typed PluginDisabledError) so the failure surface matches PluginManager. execute_plugin’s disabled gate. Submitting to a disabled plugin would otherwise sit in the queue until execution, then raise — moving the check earlier gives operators an actionable signal immediately.
CR-6: no STATE_TRANSITION event fires at submit because the job is already in pending state at construction — there’s no transition to publish. The first STATE_TRANSITION fires when the processor loop moves the job pending → running.
Job Control
reorder
def reorder(
job_id:str, # Job to move
new_priority:int, # New priority value
)->bool: # True if reordered
Change the priority of a pending job.
cancel
async def cancel(
job_id:str, # Job to cancel
)->bool: # True if cancelled
Cancel a pending or running job.
CR-6: publishes STATE_TRANSITION when a pending job moves directly to cancelled (no transition through running). Running-job cancellation publishes CANCEL_PHASE_CHANGED events from _execute_with_cancellation when Stage 4 wires the phase transitions; Stage 1 just records the cancel-request marker.
Stage 2: when the cancelled job is a sequence member, _advance_sequence runs after the lock is released so the cancelled status propagates to the sequence registry (marks sequence cancelled, no further submission). Lock release is required because _advance_sequence may need to enqueue a next member in some flows; asyncio.Lock is not re-entrant.
Observation
CR-6 splits the legacy get_state() UI-shaped dict into four typed accessors: get_pending(), get_running(), get_history(limit), get_stats(). The legacy get_state() remains as a REMOVE-AFTER-OVERHAUL shim (see the shim cell below this section) so existing UI consumers continue to work until SG-48 sweeps them. get_job_logs keeps the same delegation to JobQueueDependencies.get_plugin_logs — Stage 3 will scope log slicing by (started_at, completed_at).
get_job
def get_job(
job_id:str, # Job to retrieve
)->Optional: # Job or None
Get a job by ID.
wait_for_job
async def wait_for_job(
job_id:str, # Job to wait for
timeout:Optional=None, # Max seconds to wait
)->Job: # Completed/failed/cancelled job
Wait for a job to complete.
Independent of the CR-6 event bus — uses a per-job asyncio.Event for the simple block-until-done affordance. Streaming consumers should use events(job_id) instead.
get_pending
def get_pending(
)->List: # Pending jobs, priority-sorted
Get pending jobs, priority-sorted (higher priority first, then FIFO).
get_running
def get_running(
)->Optional: # Running job or None
Get the currently-executing job, or None if idle.
get_history
def get_history(
limit:Optional=None, # Max jobs to return (most recent N); None = all
)->List: # Completed/failed/cancelled jobs, most recent first
Get completed jobs, most recent first.
If limit is provided, returns the most recent N. The internal history list grows append-only up to max_history, so older jobs are evicted in submission order (oldest first).
get_stats
def get_stats(
)->QueueStats: # Aggregate counts
Get aggregate queue stats — total counts by terminal status.
get_job_logs
def get_job_logs(
job_id:str, # Job to get logs for
lines:int=100, # Max lines to return
)->str: # Log content scoped to this job's execution window
Get logs for a job, scoped to its (started_at, completed_at) window.
CR-6 Stage 3: replaces the legacy whole-plugin-log behavior. The substrate over-fetches the plugin log (lines * 5) and slices by the job’s execution window — the job-monitor library’s --- Starting marker heuristic in _filter_current_session becomes obsolete in cascade.
Falls back gracefully when timestamps are unparseable or the job’s window isn’t known: returns the raw tail.
Event Bus
The CR-6 push-based event surface. Three subscriber views (events(job_id), events_for_sequence(seq_id), all_events()) all share one fan-out path: each subscriber owns a bounded asyncio.Queue, and the publisher routes every event to all matching subscriber keys ("all", "job:<id>", "seq:<id>"). Slow subscribers backpressure themselves — the publisher never blocks. Sequence events are tagged job events (each carries optional sequence_id + sequence_index) rather than a separate topic, so a single subscription captures the full per-sequence narrative including the SEQUENCE_ADVANCED aggregate signal that Stage 2 will wire.
all_events
def all_events(
)->AsyncIterator:
Subscribe to all events (firehose; async generator).
Useful for global dashboards, audit logs, and telemetry sinks that need the complete event stream rather than a filtered view.
events_for_sequence
def events_for_sequence(
sequence_id:str, # Filter to events tagged with this sequence
)->AsyncIterator:
Subscribe to events for all jobs in a sequence (async generator).
Yields the unified per-sequence narrative: member-job lifecycle events interleaved with SEQUENCE_ADVANCED aggregate signals (wired in Stage 2).
events
def events(
job_id:str, # Filter to events for this job
)->AsyncIterator:
Subscribe to events for a single job (async generator).
Yields events as they fire. Multiple concurrent subscribers to the same job_id each get their own independent stream — useful for multi-tab UIs.
Sequence Submission & Control
submit_sequence registers a multi-step run and submits the first member job (tagged with sequence_id + sequence_index=0 before enqueue so its first STATE_TRANSITION already routes to sequence subscribers). submit_uniform_sequence is sugar for the common “same plugin, many arg sets” case. cancel_sequence halts the sequence and cancels the active member. get_sequence returns the live JobSequence (read-only inspection — substrate keeps it). _advance_sequence is the internal advancement helper invoked from _execute_job finalization (Stage 2 wiring in the internal cell) and from cancel()’s pending-member path.
submit_sequence
async def submit_sequence(
steps:List, # Sequence steps to run in order
priority:int=0, # Sequence-level priority (per-step override possible)
fail_fast:bool=True, # Halt on first failure (audit-locked default)
)->str: # sequence_id
Submit a multi-step job sequence (CR-6 Stage 2).
Validates all step plugins upfront (PluginDisabledError if any are disabled) so operators get an immediate failure signal rather than discovering the problem mid-sequence. The first member-job is tagged with sequence fields BEFORE enqueue, so its first STATE_TRANSITION already routes through sequence-tagged subscribers.
Returns the sequence_id; consumers subscribe via queue.events_for_sequence(sequence_id) or query state via queue.get_sequence(sequence_id).
submit_uniform_sequence
async def submit_uniform_sequence(
plugin_instance_id:str, # Plugin every step targets
args_list:List, # Per-step positional args
kwargs_list:Optional=None, # Per-step keyword args (default: empty)
priority:int=0, # Sequence-level priority
fail_fast:bool=True, # Halt on first failure (audit-locked default)
)->str: # sequence_id
Submit a sequence where every step targets the same plugin (CR-6 Stage 2).
Sugar over submit_sequence for the common “same plugin, many arg sets” case — multi-source transcription / batch processing / parameter sweeps.
cancel_sequence
async def cancel_sequence(
sequence_id:str, # Sequence to cancel
)->bool: # True if cancellation was recorded
Cancel an in-flight sequence (CR-6 Stage 2).
Marks the sequence as cancelled (no further advancement) and cancels the current member job. Returns False if the sequence is unknown or already terminal; True if cancellation was recorded.
Sets seq.status = JobStatus.cancelled BEFORE issuing the member-job cancel. This prevents a race where the member completes between the intent record and the cancel call — _advance_sequence checks seq.status == cancelled first and halts regardless of member outcome.
get_sequence
def get_sequence(
sequence_id:str, # Sequence to retrieve
)->Optional: # JobSequence or None
Get a job sequence by ID (read-only inspection).
Resource Snapshots
get_resource_snapshot(job_id) returns a point-in-time ResourceSnapshot for a job, composing worker proxy get_stats() with the configured sysmon plugin’s CR-3 typed list_processes() (per-PID matching) and get_system_status() (global GPU stats). _sample_resource_snapshot is the internal helper invoked both from this method and from _poll_progress (which emits RESOURCE_SNAPSHOT events at resource_snapshot_cadence_polls cadence). All sysmon calls are best-effort — failures are swallowed and the snapshot returns with worker stats only.
get_resource_snapshot
def get_resource_snapshot(
job_id:str, # Job to sample resources for
)->Optional:
Get a point-in-time resource snapshot for a job (CR-6 Stage 3).
Returns None if the job is unknown or the worker proxy doesn’t expose get_stats. Composes worker stats with sysmon GPU stats when the queue is configured with a sysmon_plugin_name.
Lifecycle
start
async def start(
)->None:
Start the queue processor.
CR-6 Stage 4: installs the substrate-side retry observer on _deps (typically a PluginManager). When CR-7’s reactive-retry path fires for a running job, the observer updates Job.retry_count and publishes a RETRY_STARTED event tagged with the in-flight job. Previous observer value (if any) is saved + restored in stop() for cooperative coexistence.
stop
async def stop(
)->None:
Stop the queue processor gracefully.
CR-6 Stage 4: restores the previous _on_retry observer on deps to leave the manager in the state we found it (cooperative with other queue instances, tests, etc.).
Internal Methods
Legacy get_state shim (REMOVE-AFTER-OVERHAUL)
The pre-CR-6 dict shape that existing UI consumers (notably the cjm-fasthtml-job-monitor library) read today. Reshape into typed accessors (get_pending / get_running / get_history / get_stats) is canonical; this shim converts those back to the legacy dict shape until the consumer cascade migrates. SG-48 removes it.
get_state
def get_state(
)->Dict: # Queue state for UI (legacy shape)
Get the current queue state in the pre-CR-6 dict shape.
REMOVE-AFTER-OVERHAUL: kept so existing UI consumers (notably cjm-fasthtml-job-monitor) continue to work until they migrate to the typed get_pending / get_running / get_history / get_stats accessors. Reads from the new accessors and reshapes:
- Legacy keys (
plugin_name,created_at) read from the renamed fields via the Job dataclass’s backward-compat@propertyaliases. - Legacy
errorwas a bare string; new isJobError. Shim returnserror.messageto preserve the string shape. - Legacy timestamps were unix floats; new are
datetime. Shim returns.timestamp()to preserve float shape.
Usage Example
from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.queue import JobQueue, JobStatus
from cjm_plugin_system.core.scheduling import QueueScheduler
# Setup
manager = PluginManager(scheduler=QueueScheduler())
manager.discover_manifests()
manager.load_plugin(manager.get_discovered_meta("sys-mon"))
manager.register_system_monitor("sys-mon")
# Create queue
queue = JobQueue(manager)
await queue.start()
# Submit jobs
job1_id = await queue.submit("whisper", audio="/path/to/audio1.mp3")
job2_id = await queue.submit("gemini-vision", image="/path/to/image.png")
job3_id = await queue.submit("whisper", audio="/path/to/audio2.mp3", priority=10)
# Monitor queue state
state = queue.get_state()
print(f"Running: {state['running']}")
print(f"Pending: {len(state['pending'])} jobs")
# Cancel a job
await queue.cancel(job2_id)
# Wait for a job to complete
job1 = await queue.wait_for_job(job1_id)
if job1.status == JobStatus.completed:
print(job1.result)
# Cleanup
await queue.stop()
manager.unload_all()