Scheduling
ResourceScheduler
Abstract base class that defines the scheduling policy interface. Schedulers decide whether a capability can execute based on its resource requirements and current system state.
The separation of Mechanism (capabilities reporting stats) from Policy (schedulers deciding allocation) allows the same capability ecosystem to serve:
- Interactive web apps: Use
SafetySchedulerto prevent OOM crashes - Batch processing: Use
QueueSchedulerto wait for resources - Development: Use
PermissiveSchedulerto run everything
ResourceScheduler
def ResourceScheduler(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Abstract base class for resource allocation policies.
PermissiveScheduler
Default scheduler that allows all executions. Use this for development, scripting, and batch processing where you want maximum throughput without safety checks.
PermissiveScheduler
def PermissiveScheduler(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Scheduler that allows all executions (Default / Dev Mode).
SafetyScheduler
Production scheduler that blocks execution if resources are insufficient. Checks GPU VRAM and system RAM against capability requirements defined in the manifest.
Resource requirements are read from the capability manifest:
{
"resources": {
"requires_gpu": true,
"min_gpu_vram_mb": 4096,
"min_system_ram_mb": 8192
}
}System stats are provided by a System Monitor capability implementing the MonitorToolProtocol contract (cjm-capability-primitives.monitoring).
SafetyScheduler
def SafetyScheduler(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Scheduler that prevents execution if resources are insufficient.
QueueScheduler
Batch processing scheduler that waits for resources to become available. Polls the system monitor until resources are free or timeout is reached.
- Sync path: Uses
time.sleep()for blocking wait (scripts, batch jobs) - Async path: Uses
await asyncio.sleep()for non-blocking wait (FastHTML, async apps) - Active tracking: Tracks which capabilities have running executions via
get_active_capabilities()
The active capability tracking enables smart eviction: capabilities that are currently executing should never be evicted, while idle capabilities can be safely released to free resources.
QueueScheduler
def QueueScheduler(
timeout:float=300.0, # Max seconds to wait for resources
poll_interval:float=2.0, # Seconds between resource checks
):
Scheduler that waits for resources to become available.
QueueScheduler.get_active_capabilities
def get_active_capabilities(
)->Set: # Set of currently executing capability names
Get the set of capabilities with active executions.
Usage Examples
Development Mode (Default)
from cjm_substrate.core.manager import CapabilityManager
# Default: PermissiveScheduler allows everything
manager = CapabilityManager()Production Mode with Safety Checks
from cjm_substrate.core.manager import CapabilityManager
from cjm_substrate.core.scheduling import SafetyScheduler
# Create manager with safety scheduler
manager = CapabilityManager(scheduler=SafetyScheduler())
# Load capabilities
manager.load_all()
# Register a system monitor capability for real-time stats
manager.register_system_monitor("sys-mon-nvidia")
# Execution will now check resources before running
try:
result = manager.execute_capability("whisper-local", audio="/path/to/audio.wav")
except RuntimeError as e:
print(f"Blocked: {e}")Batch Processing with Queue
from cjm_substrate.core.manager import CapabilityManager
from cjm_substrate.core.scheduling import QueueScheduler
# Create manager with queue scheduler (waits up to 5 minutes)
manager = CapabilityManager(scheduler=QueueScheduler(timeout=300.0, poll_interval=5.0))
manager.load_all()
manager.register_system_monitor("sys-mon-nvidia")
# Will block until resources are available or timeout
result = manager.execute_capability("whisper-local", audio="/path/to/audio.wav")Async App with Queue (FastHTML)
from cjm_substrate.core.manager import CapabilityManager
from cjm_substrate.core.scheduling import QueueScheduler
manager = CapabilityManager(scheduler=QueueScheduler())
manager.load_all()
manager.register_system_monitor("sys-mon-nvidia")
# Non-blocking wait using asyncio.sleep
result = await manager.execute_capability_async("whisper-local", audio="/path/to/audio.wav")