Job Queue
Resource-aware job queue for sequential plugin execution with cancellation support
The JobQueue provides a resource-aware job queue for plugin execution:
┌─────────────────────────────────────────────────────────────┐
│ User Application │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ JobQueue │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────────┐ │ │
│ │ │ Pending │ │ Running │ │ History │ │ │
│ │ │ Jobs │→ │ Job │→ │ (completed/ │ │ │
│ │ │ (heap) │ │ │ │ cancelled) │ │ │
│ │ └───────────┘ └───────────┘ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Scheduler │ │ Plugin │ │
│ │ (policy) │ │ Manager │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
Key features:
- Priority queue: Higher priority jobs execute first (FIFO within same priority)
- Resource-aware: Waits for resources, triggers eviction of idle plugins
- Cancellation: Cancel pending or running jobs with graceful fallback to force termination
- Progress tracking: Poll progress and status messages during execution
- Observable: Get queue state for UI integration
JobStatus
Enumeration of possible job states.
JobStatus
def JobStatus(
args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):
Status of a job in the queue.
Job
Dataclass representing a queued plugin execution.
Job
def Job(
id:str, plugin_name:str, args:Tuple, kwargs:Dict, status:JobStatus=<JobStatus.pending: 'pending'>,
priority:int=0, created_at:float=<factory>, started_at:Optional=None, completed_at:Optional=None,
result:Any=None, error:Optional=None, progress:float=0.0, status_message:str=''
)->None:
A queued plugin execution request.
JobQueue
Main queue class that manages job submission, execution, and lifecycle.
JobQueue
def JobQueue(
manager:PluginManager, # Plugin manager instance
max_history:int=100, # Max completed jobs to retain
cancel_timeout:float=3.0, # Seconds to wait for cooperative cancel
progress_poll_interval:float=1.0, # Seconds between progress polls
):
Resource-aware job queue for sequential plugin execution.
Job Submission
submit
def submit(
plugin_name:str, # Target plugin
args:VAR_POSITIONAL, priority:int=0, # Higher = more urgent
kwargs:VAR_KEYWORD
)->str: # Returns job_id
Submit a job to the queue.
Job Control
reorder
def reorder(
job_id:str, # Job to move
new_priority:int, # New priority value
)->bool: # True if reordered
Change the priority of a pending job.
cancel
def cancel(
job_id:str, # Job to cancel
)->bool: # True if cancelled
Cancel a pending or running job.
Observation
get_job_logs
def get_job_logs(
job_id:str, # Job to get logs for
lines:int=100, # Max lines to return
)->str: # Log content
Get logs for a job from the plugin’s log file.
get_state
def get_state(
)->Dict: # Queue state for UI
Get the current queue state.
wait_for_job
def wait_for_job(
job_id:str, # Job to wait for
timeout:Optional=None, # Max seconds to wait
)->Job: # Completed/failed/cancelled job
Wait for a job to complete.
get_job
def get_job(
job_id:str, # Job to retrieve
)->Optional: # Job or None
Get a job by ID.
Lifecycle
stop
def stop(
)->None:
Stop the queue processor gracefully.
start
def start(
)->None:
Start the queue processor.
Internal Methods
Usage Example
from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.queue import JobQueue, JobStatus
from cjm_plugin_system.core.scheduling import QueueScheduler
# Setup
manager = PluginManager(scheduler=QueueScheduler())
manager.discover_manifests()
manager.load_plugin(manager.get_discovered_meta("sys-mon"))
manager.register_system_monitor("sys-mon")
# Create queue
queue = JobQueue(manager)
await queue.start()
# Submit jobs
job1_id = await queue.submit("whisper", audio="/path/to/audio1.mp3")
job2_id = await queue.submit("gemini-vision", image="/path/to/image.png")
job3_id = await queue.submit("whisper", audio="/path/to/audio2.mp3", priority=10)
# Monitor queue state
state = queue.get_state()
print(f"Running: {state['running']}")
print(f"Pending: {len(state['pending'])} jobs")
# Cancel a job
await queue.cancel(job2_id)
# Wait for a job to complete
job1 = await queue.wait_for_job(job1_id)
if job1.status == JobStatus.completed:
print(job1.result)
# Cleanup
await queue.stop()
manager.unload_all()