Scheduling
ResourceScheduler
Abstract base class that defines the scheduling policy interface. Schedulers decide whether a plugin can execute based on its resource requirements and current system state.
The separation of Mechanism (plugins reporting stats) from Policy (schedulers deciding allocation) allows the same plugin ecosystem to serve:
- Interactive web apps: Use
SafetySchedulerto prevent OOM crashes - Batch processing: Use
QueueSchedulerto wait for resources - Development: Use
PermissiveSchedulerto run everything
ResourceScheduler
def ResourceScheduler(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Abstract base class for resource allocation policies.
PermissiveScheduler
Default scheduler that allows all executions. Use this for development, scripting, and batch processing where you want maximum throughput without safety checks.
PermissiveScheduler
def PermissiveScheduler(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Scheduler that allows all executions (Default / Dev Mode).
SafetyScheduler
Production scheduler that blocks execution if resources are insufficient. Checks GPU VRAM and system RAM against plugin requirements defined in the manifest.
Resource requirements are read from the plugin manifest:
{
"resources": {
"requires_gpu": true,
"min_gpu_vram_mb": 4096,
"min_system_ram_mb": 8192
}
}System stats are provided by a System Monitor plugin implementing the cjm-infra-plugin-system interface.
SafetyScheduler
def SafetyScheduler(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Scheduler that prevents execution if resources are insufficient.
QueueScheduler
Batch processing scheduler that waits for resources to become available. Polls the system monitor until resources are free or timeout is reached.
- Sync path: Uses
time.sleep()for blocking wait (scripts, batch jobs) - Async path: Uses
await asyncio.sleep()for non-blocking wait (FastHTML, async apps) - Active tracking: Tracks which plugins have running executions via
get_active_plugins()
The active plugin tracking enables smart eviction: plugins that are currently executing should never be evicted, while idle plugins can be safely released to free resources.
QueueScheduler
def QueueScheduler(
timeout:float=300.0, # Max seconds to wait for resources
poll_interval:float=2.0, # Seconds between resource checks
):
Scheduler that waits for resources to become available.
Usage Examples
Development Mode (Default)
from cjm_plugin_system.core.manager import PluginManager
# Default: PermissiveScheduler allows everything
manager = PluginManager()Production Mode with Safety Checks
from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.scheduling import SafetyScheduler
# Create manager with safety scheduler
manager = PluginManager(scheduler=SafetyScheduler())
# Load plugins
manager.load_all()
# Register a system monitor plugin for real-time stats
manager.register_system_monitor("sys-mon-nvidia")
# Execution will now check resources before running
try:
result = manager.execute_plugin("whisper-local", audio="/path/to/audio.wav")
except RuntimeError as e:
print(f"Blocked: {e}")Batch Processing with Queue
from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.scheduling import QueueScheduler
# Create manager with queue scheduler (waits up to 5 minutes)
manager = PluginManager(scheduler=QueueScheduler(timeout=300.0, poll_interval=5.0))
manager.load_all()
manager.register_system_monitor("sys-mon-nvidia")
# Will block until resources are available or timeout
result = manager.execute_plugin("whisper-local", audio="/path/to/audio.wav")Async App with Queue (FastHTML)
from cjm_plugin_system.core.manager import PluginManager
from cjm_plugin_system.core.scheduling import QueueScheduler
manager = PluginManager(scheduler=QueueScheduler())
manager.load_all()
manager.register_system_monitor("sys-mon-nvidia")
# Non-blocking wait using asyncio.sleep
result = await manager.execute_plugin_async("whisper-local", audio="/path/to/audio.wav")