Resource Manager

Track worker processes and detect resource conflicts for GPU/CPU usage

Resource Types and Status

The resource manager tracks different types of resources (GPU memory, system memory) and their availability status.


source

ResourceType

 ResourceType (value, names=None, module=None, qualname=None, type=None,
               start=1, boundary=None)

Types of resources to monitor.


source

ResourceStatus

 ResourceStatus (value, names=None, module=None, qualname=None, type=None,
                 start=1, boundary=None)

Status of resource availability.

Data Structures

Data classes for tracking resource conflicts and worker states.


source

ResourceConflict

 ResourceConflict (resource_type:__main__.ResourceType,
                   status:__main__.ResourceStatus, app_pids:List[int],
                   external_pids:List[int],
                   app_processes:List[Dict[str,Any]],
                   external_processes:List[Dict[str,Any]])

Information about a resource conflict.


source

WorkerState

 WorkerState (pid:int, worker_type:str, job_id:Optional[str]=None,
              plugin_id:Optional[str]=None,
              plugin_name:Optional[str]=None,
              loaded_plugin_resource:Optional[str]=None,
              config:Optional[Dict[str,Any]]=None, status:str='idle',
              execution_mode:Optional[str]=None,
              child_pids:List[int]=<factory>,
              container_id:Optional[str]=None,
              conda_env:Optional[str]=None, is_remote:bool=False,
              remote_resource:Optional[Dict[str,Any]]=None)

*State of a worker process.

Enhanced to support lifecycle-aware and cloud-aware plugins from cjm-fasthtml-plugins.*

# Example: Creating a worker state
worker = WorkerState(
    pid=12345,
    worker_type="transcription",
    plugin_name="whisper",
    status="idle"
)

print(f"Worker PID: {worker.pid}")
print(f"Worker type: {worker.worker_type}")
print(f"Plugin: {worker.plugin_name}")
print(f"Status: {worker.status}")
Worker PID: 12345
Worker type: transcription
Plugin: whisper
Status: idle

Enhanced Worker State

The WorkerState dataclass now supports lifecycle-aware and cloud-aware plugins: - Lifecycle fields: execution_mode, child_pids, container_id, conda_env - Cloud fields: is_remote, remote_resource

These fields are optional and only used when working with plugins from cjm-fasthtml-plugins that implement the LifecycleAwarePlugin or CloudAwarePlugin protocols.

Configuration Keys

Common configuration keys that indicate the plugin resource being used.

ResourceManager

The ResourceManager class tracks worker processes and provides methods to check resource availability and detect conflicts.


source

ResourceManager

 ResourceManager (gpu_memory_threshold_percent:float=45.0)

Manages resource tracking and conflict detection for the application. Tracks PIDs associated with application workers (transcription, LLM, etc.) and provides methods to check resource availability and conflicts. Enhanced to support lifecycle-aware and cloud-aware plugins from cjm-fasthtml-plugins.

Type Default Details
gpu_memory_threshold_percent float 45.0 GPU memory usage threshold; external processes using more than this percentage are considered conflicts

Example: Basic Usage

# Create a resource manager
manager = ResourceManager(gpu_memory_threshold_percent=45.0)

# Register a worker
manager.register_worker(
    pid=12345,
    worker_type="transcription",
    plugin_name="whisper",
    config={"model_id": "whisper-large-v3"}
)

# Get worker info
worker = manager.get_worker_by_pid(12345)
print(f"Worker type: {worker.worker_type}")
print(f"Plugin: {worker.plugin_name}")
print(f"Status: {worker.status}")

# Update worker state
manager.update_worker_state(12345, status="running", job_id="job-123")
worker = manager.get_worker_by_pid(12345)
print(f"Updated status: {worker.status}")
print(f"Job ID: {worker.job_id}")

# Get worker by job
worker_by_job = manager.get_worker_by_job("job-123")
print(f"Worker for job-123: PID {worker_by_job.pid}")

# Check active worker types
print(f"Active worker types: {manager.get_active_worker_types()}")

# Unregister worker
manager.unregister_worker(12345)
print(f"Workers after unregister: {len(manager.get_all_workers())}")
Worker type: transcription
Plugin: whisper
Status: idle
Updated status: running
Job ID: job-123
Worker for job-123: PID 12345
Active worker types: {'transcription'}
Workers after unregister: 0
# Example: Mock lifecycle-aware plugin
class MockVLLMPlugin:
    """Simulates a vLLM plugin that spawns child processes."""
    def get_execution_mode(self):
        from enum import Enum
        class MockMode(Enum):
            SUBPROCESS = "subprocess"
        return MockMode.SUBPROCESS
    
    def get_child_pids(self):
        return [99001, 99002, 99003]  # Mock child PIDs
    
    def get_managed_resources(self):
        return {
            'server_url': 'http://localhost:8000',
            'is_running': True
        }
    
    def force_cleanup(self):
        pass

# Register worker with lifecycle-aware plugin
manager = ResourceManager()
mock_plugin = MockVLLMPlugin()

# The manager will automatically detect child processes
manager.register_worker(
    pid=12345,
    worker_type="transcription",
    plugin_id="transcription_voxtral_vllm",
    plugin_name="voxtral_vllm",
    plugin_instance=mock_plugin
)

worker = manager.get_worker_by_pid(12345)
print(f"Worker PID: {worker.pid}")
print(f"Execution mode: {worker.execution_mode}")
print(f"Child PIDs: {worker.child_pids}")
print(f"All related PIDs: {manager.get_all_related_pids(12345)}")
print(f"All app PIDs (including children): {sorted(manager.get_all_app_pids_including_children())}")
Worker PID: 12345
Execution mode: subprocess
Child PIDs: [99001, 99002, 99003]
All related PIDs: [12345, 99001, 99002, 99003]
All app PIDs (including children): [12345, 99001, 99002, 99003]

Example: Enhanced Usage with Lifecycle/Cloud Plugins

When using plugins from cjm-fasthtml-plugins, the resource manager automatically detects and tracks: - Child processes spawned by plugins - Cloud/remote resources - Container IDs and conda environments