Job Queue

Resource-aware job queue for sequential plugin execution with cancellation support

The JobQueue provides a resource-aware job queue for plugin execution:

┌─────────────────────────────────────────────────────────────┐
│                    User Application                         │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                    JobQueue                         │    │
│  │  ┌───────────┐  ┌───────────┐  ┌───────────────┐    │    │
│  │  │ Pending   │  │ Running   │  │ History       │    │    │
│  │  │ Jobs      │→ │ Job       │→ │ (completed/   │    │    │
│  │  │ (heap)    │  │           │  │  cancelled)   │    │    │
│  │  └───────────┘  └───────────┘  └───────────────┘    │    │
│  └─────────────────────────────────────────────────────┘    │
│         │                │                                  │
│         ▼                ▼                                  │
│  ┌─────────────┐  ┌─────────────┐                           │
│  │ Scheduler   │  │ Plugin      │                           │
│  │ (policy)    │  │ Manager     │                           │
│  └─────────────┘  └─────────────┘                           │
└─────────────────────────────────────────────────────────────┘

Key features:

JobStatus

Enumeration of possible job states.


source

JobStatus


def JobStatus(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Status of a job in the queue.

Job

Dataclass representing a queued plugin execution.


source

Job


def Job(
    id:str, plugin_name:str, args:Tuple, kwargs:Dict, status:JobStatus=<JobStatus.pending: 'pending'>,
    priority:int=0, created_at:float=<factory>, started_at:Optional=None, completed_at:Optional=None,
    result:Any=None, error:Optional=None, progress:float=0.0, status_message:str=''
)->None:

A queued plugin execution request.

JobQueue

Main queue class that manages job submission, execution, and lifecycle.


source

JobQueue


def JobQueue(
    manager:PluginManager, # Plugin manager instance
    max_history:int=100, # Max completed jobs to retain
    cancel_timeout:float=3.0, # Seconds to wait for cooperative cancel
    progress_poll_interval:float=1.0, # Seconds between progress polls
):

Resource-aware job queue for sequential plugin execution.

Job Submission


source

submit


def submit(
    plugin_name:str, # Target plugin
    args:VAR_POSITIONAL, priority:int=0, # Higher = more urgent
    kwargs:VAR_KEYWORD
)->str: # Returns job_id

Submit a job to the queue.

Job Control


source

reorder


def reorder(
    job_id:str, # Job to move
    new_priority:int, # New priority value
)->bool: # True if reordered

Change the priority of a pending job.


source

cancel


def cancel(
    job_id:str, # Job to cancel
)->bool: # True if cancelled

Cancel a pending or running job.

Observation


source

get_job_logs


def get_job_logs(
    job_id:str, # Job to get logs for
    lines:int=100, # Max lines to return
)->str: # Log content

Get logs for a job from the plugin’s log file.


source

get_state


def get_state(
    
)->Dict: # Queue state for UI

Get the current queue state.


source

wait_for_job


def wait_for_job(
    job_id:str, # Job to wait for
    timeout:Optional=None, # Max seconds to wait
)->Job: # Completed/failed/cancelled job

Wait for a job to complete.


source

get_job


def get_job(
    job_id:str, # Job to retrieve
)->Optional: # Job or None

Get a job by ID.

Lifecycle


source

stop


def stop(
    
)->None:

Stop the queue processor gracefully.


source

start


def start(
    
)->None:

Start the queue processor.

Internal Methods

Usage Example

from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.queue import JobQueue, JobStatus
from cjm_plugin_system.core.scheduling import QueueScheduler

# Setup
manager = PluginManager(scheduler=QueueScheduler())
manager.discover_manifests()
manager.load_plugin(manager.get_discovered_meta("sys-mon"))
manager.register_system_monitor("sys-mon")

# Create queue
queue = JobQueue(manager)
await queue.start()

# Submit jobs
job1_id = await queue.submit("whisper", audio="/path/to/audio1.mp3")
job2_id = await queue.submit("gemini-vision", image="/path/to/image.png")
job3_id = await queue.submit("whisper", audio="/path/to/audio2.mp3", priority=10)

# Monitor queue state
state = queue.get_state()
print(f"Running: {state['running']}")
print(f"Pending: {len(state['pending'])} jobs")

# Cancel a job
await queue.cancel(job2_id)

# Wait for a job to complete
job1 = await queue.wait_for_job(job1_id)
if job1.status == JobStatus.completed:
    print(job1.result)

# Cleanup
await queue.stop()
manager.unload_all()