cjm-fasthtml-resources

Resource monitoring and management system for tracking GPU/CPU usage and worker processes in FastHTML applications.

Install

pip install cjm_fasthtml_resources

Project Structure

nbs/
├── core/ (4)
│   ├── management_config.ipynb  # Configuration schema for resource management settings including GPU thresholds
│   ├── manager.ipynb            # Track worker processes and detect resource conflicts for GPU/CPU usage
│   ├── monitoring_config.ipynb  # Configuration schema for resource monitoring refresh intervals and SSE settings
│   └── validation.ipynb         # Validate resource availability before job execution and determine appropriate actions
└── utils/ (2)
    ├── plugin_utils.ipynb   # Utilities for analyzing plugin configurations and resource requirements
    └── route_helpers.ipynb  # Helper utilities for resource monitoring route handlers

Total: 6 notebooks across 2 directories

Module Dependencies

graph LR
    core_management_config[core.management_config<br/>Management Configuration]
    core_manager[core.manager<br/>Resource Manager]
    core_monitoring_config[core.monitoring_config<br/>Monitoring Configuration]
    core_validation[core.validation<br/>Resource Validation]
    utils_plugin_utils[utils.plugin_utils<br/>Plugin Resource Utilities]
    utils_route_helpers[utils.route_helpers<br/>Route Helpers]

    core_validation --> core_manager
    utils_plugin_utils --> core_manager

2 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Management Configuration (management_config.ipynb)

Configuration schema for resource management settings including GPU thresholds

Import

from cjm_fasthtml_resources.core.management_config import (
    RESOURCE_MANAGEMENT_SCHEMA
)

Variables

RESOURCE_MANAGEMENT_SCHEMA = {7 items}

Resource Manager (manager.ipynb)

Track worker processes and detect resource conflicts for GPU/CPU usage

Import

from cjm_fasthtml_resources.core.manager import (
    PLUGIN_RESOURCE_CONFIG_KEYS,
    ResourceType,
    ResourceStatus,
    ResourceConflict,
    WorkerState,
    ResourceManager
)

Classes

class ResourceType(Enum):
    "Types of resources to monitor."
class ResourceStatus(Enum):
    "Status of resource availability."
@dataclass
class ResourceConflict:
    "Information about a resource conflict."
    
    resource_type: ResourceType
    status: ResourceStatus
    app_pids: List[int]  # PIDs from this application using the resource
    external_pids: List[int]  # External PIDs using the resource
    app_processes: List[Dict[str, Any]]  # Detailed info about app processes
    external_processes: List[Dict[str, Any]]  # Detailed info about external processes
@dataclass
class WorkerState:
    """
    State of a worker process.
    
    Enhanced to support lifecycle-aware and cloud-aware plugins from cjm-fasthtml-plugins.
    """
    
    pid: int
    worker_type: str  # e.g., "transcription", "llm"
    job_id: Optional[str]
    plugin_id: Optional[str]
    plugin_name: Optional[str]
    loaded_plugin_resource: Optional[str]  # The plugin resource identifier currently loaded
    config: Optional[Dict[str, Any]]  # Current plugin configuration
    status: str = 'idle'  # idle, running, busy
    execution_mode: Optional[str]  # Execution mode (in_process, subprocess, cloud_gpu, etc.)
    child_pids: List[int] = field(...)  # PIDs of child processes
    container_id: Optional[str]  # Docker container ID if applicable
    conda_env: Optional[str]  # Conda environment name if applicable
    is_remote: bool = False  # Whether this worker uses remote/cloud resources
    remote_resource: Optional[Dict[str, Any]]  # Remote resource info (serialized RemoteResourceInfo)
class ResourceManager:
    def __init__(
        self,
        gpu_memory_threshold_percent:float=45.0 # GPU memory usage threshold; external processes using more than this percentage are considered conflicts
    )
    "Manages resource tracking and conflict detection for the application. Tracks PIDs associated with application workers (transcription, LLM, etc.) and provides methods to check resource availability and conflicts. Enhanced to support lifecycle-aware and cloud-aware plugins from cjm-fasthtml-plugins."
    
    def __init__(
            self,
            gpu_memory_threshold_percent:float=45.0 # GPU memory usage threshold; external processes using more than this percentage are considered conflicts
        )
        "Initialize the resource manager."
    
    def register_worker(
            self,
            pid:int, # Process ID of the worker
            worker_type:str, # Type of worker (e.g., "transcription", "llm")
            job_id:Optional[str]=None, # Optional job ID if worker is processing a job
            plugin_id:Optional[str]=None, # Optional plugin unique ID
            plugin_name:Optional[str]=None, # Optional plugin name
            loaded_plugin_resource:Optional[str]=None, # Optional identifier of the loaded plugin resource
            config:Optional[Dict[str, Any]]=None, # Optional plugin configuration
            plugin_instance:Optional[Any]=None # Optional plugin instance for lifecycle/cloud protocol detection
        )
        "Register a worker process with the resource manager."
    
    def get_all_related_pids(
            self,
            parent_pid:int # Parent worker PID
        ) -> List[int]: # List of all PIDs (parent + children)
        "Get parent PID and all child PIDs managed by this worker."
    
    def update_worker_state(
            self,
            pid:int, # Process ID of the worker
            job_id:Optional[str]=None, # Optional job ID to update
            plugin_id:Optional[str]=None, # Optional plugin ID to update
            plugin_name:Optional[str]=None, # Optional plugin name to update
            loaded_plugin_resource:Optional[str]=None, # Optional loaded plugin resource to update
            config:Optional[Dict[str, Any]]=None, # Optional config to update
            status:Optional[str]=None # Optional status to update
        )
        "Update the state of a registered worker."
    
    def unregister_worker(
            self,
            pid:int # Process ID of the worker to unregister
        )
        "Unregister a worker process."
    
    def get_worker_by_pid(
            self,
            pid:int # Process ID
        ) -> Optional[WorkerState]: # Worker state or None
        "Get worker state by PID."
    
    def get_worker_by_job(
            self,
            job_id:str # Job ID
        ) -> Optional[WorkerState]: # Worker state or None
        "Get worker state by job ID."
    
    def get_all_workers(self) -> List[WorkerState]: # List of all registered workers
            """Get all registered workers."""
            return list(self._worker_states.values())
    
        def get_app_pids(self) -> Set[int]: # Set of all PIDs managed by this application (parents only)
        "Get all registered workers."
    
    def get_app_pids(self) -> Set[int]: # Set of all PIDs managed by this application (parents only)
            """Get all PIDs managed by this application (parents only)."""
            return set(self._worker_states.keys())
        
        def get_all_app_pids_including_children(self) -> Set[int]: # Set of all PIDs (parents and children)
        "Get all PIDs managed by this application (parents only)."
    
    def get_all_app_pids_including_children(self) -> Set[int]: # Set of all PIDs (parents and children)
            """Get all PIDs managed by this application including child processes."""
            all_pids = set(self._worker_states.keys())
            for worker in self._worker_states.values()
        "Get all PIDs managed by this application including child processes."
    
    def get_workers_by_type(
            self,
            worker_type:str # Type of worker (e.g., "transcription", "llm", "ollama")
        ) -> List[WorkerState]: # List of workers matching the type
        "Get all workers of a specific type."
    
    def get_active_worker_types(self) -> Set[str]: # Set of worker type strings
            """Get set of all active worker types."""
            return {w.worker_type for w in self._worker_states.values()}
    
        def has_worker_type(
            self,
            worker_type:str # Type of worker to check
        ) -> bool: # True if at least one worker of this type exists
        "Get set of all active worker types."
    
    def has_worker_type(
            self,
            worker_type:str # Type of worker to check
        ) -> bool: # True if at least one worker of this type exists
        "Check if a worker of the specified type exists."
    
    def get_cloud_workers(self) -> List[WorkerState]: # List of workers with is_remote=True
            """Get all workers using cloud/remote resources."""
            return [w for w in self._worker_states.values() if w.is_remote]
        
        def estimate_total_cloud_cost(
            self,
            duration_hours:float=1.0 # Duration to estimate for
        ) -> float: # Total estimated cost in USD
        "Get all workers using cloud/remote resources."
    
    def estimate_total_cloud_cost(
            self,
            duration_hours:float=1.0 # Duration to estimate for
        ) -> float: # Total estimated cost in USD
        "Estimate total cost of all running cloud resources."
    
    def check_gpu_availability(self) -> ResourceConflict: # ResourceConflict with details about GPU usage
            """Check GPU availability and identify conflicts. Uses configurable GPU memory threshold to determine if external processes are using significant GPU resources. Enhanced to detect child processes from lifecycle-aware plugins."""
            try
        "Check GPU availability and identify conflicts. Uses configurable GPU memory threshold to determine if external processes are using significant GPU resources. Enhanced to detect child processes from lifecycle-aware plugins."
    
    def check_memory_availability(
            self,
            threshold_percent:float=90.0 # Memory usage threshold to consider as conflict
        ) -> ResourceConflict: # ResourceConflict with details about memory usage
        "Check system memory availability."

Variables

PLUGIN_RESOURCE_CONFIG_KEYS = [5 items]

Monitoring Configuration (monitoring_config.ipynb)

Configuration schema for resource monitoring refresh intervals and SSE settings

Import

from cjm_fasthtml_resources.core.monitoring_config import (
    RESOURCE_MONITOR_SCHEMA,
    LAST_UPDATE_TIMES,
    SSE_CONFIG
)

Variables

RESOURCE_MONITOR_SCHEMA = {7 items}
LAST_UPDATE_TIMES = {7 items}
SSE_CONFIG = {3 items}

Plugin Resource Utilities (plugin_utils.ipynb)

Utilities for analyzing plugin configurations and resource requirements

Import

from cjm_fasthtml_resources.utils.plugin_utils import (
    is_local_plugin,
    uses_gpu_device,
    get_plugin_resource_identifier,
    compare_plugin_resources,
    get_plugin_resource_requirements
)

Functions

def is_local_plugin(
    plugin_meta # Plugin metadata with config_schema attribute
) -> bool: # True if plugin is local, False if API-based
    "Check if a plugin is local (vs API-based)."
def uses_gpu_device(
    "Check if a plugin is configured to use GPU."
def get_plugin_resource_identifier(
    "Extract the plugin resource identifier from plugin configuration. Checks common plugin resource configuration keys like 'resource_id', 'model_id', 'model', 'model_name', etc."
def compare_plugin_resources(
    config1:Dict[str, Any], # First plugin configuration
    config2:Dict[str, Any] # Second plugin configuration
) -> bool: # True if both configs specify the same plugin resource, False otherwise
    "Compare two plugin configurations to see if they use the same plugin resource."
def get_plugin_resource_requirements(
    plugin_id:str, # Unique plugin ID
    plugin_registry, # Plugin registry instance with get_plugin, load_plugin_config methods
    plugin_config:Optional[Dict[str, Any]]=None # Optional plugin configuration
) -> Dict[str, Any]: # Dictionary with resource requirement information (is_local, uses_gpu, plugin_resource, device)
    "Get resource requirements for a plugin."

Route Helpers (route_helpers.ipynb)

Helper utilities for resource monitoring route handlers

Import

from cjm_fasthtml_resources.utils.route_helpers import (
    wrap_card_in_container,
    create_card_update
)

Functions

def wrap_card_in_container(
    content, # Card content to wrap
    html_id, # HTML ID for the container
    card_cls=None, # Card class (optional, can be provided via DaisyUI)
    bg_cls=None, # Background class (optional, can be provided via DaisyUI)
    shadow_cls=None, # Shadow class (optional, can be provided via Tailwind)
    **kwargs # Additional attributes for the Div
): # Wrapped card container (Div)
    "Wrap card content in a Div container with standard styling. This consolidates the common pattern of wrapping monitoring cards in styled containers."
def create_card_update(
    render_fn:Callable, # Function to render the card
    info:Dict[str, Any], # Info dictionary to pass to render function
    target_id:str, # Target HTML ID for the swap
    swap_type:str="outerHTML" # Type of swap
): # OOB swap element
    "Create an OOB swap update for a card. This consolidates the pattern of creating OOB swaps for card updates in SSE streaming."

Resource Validation (validation.ipynb)

Validate resource availability before job execution and determine appropriate actions

Import

from cjm_fasthtml_resources.core.validation import (
    ValidationAction,
    ValidationResult,
    validate_resources_for_job,
    validation_result_to_error
)

Functions

def validate_resources_for_job(
    resource_manager, # ResourceManager instance
    plugin_registry, # Plugin registry protocol (has get_plugin, load_plugin_config methods)
    get_plugin_resource_requirements, # Function: (plugin_id, config) -> Dict with requirements
    compare_plugin_resources, # Function: (config1, config2) -> bool (same resource?)
    get_plugin_resource_identifier, # Function: (config) -> str (resource ID)
    plugin_id:str, # Unique plugin ID
    plugin_config:Optional[Dict[str, Any]]=None, # Plugin configuration (will load if not provided)
    worker_pid:Optional[int]=None, # PID of the worker that will run the job (if known)
    worker_type:str="transcription", # Type of worker (e.g., "transcription", "llm", "ollama")
    verbose:bool=False # Whether to print verbose logging
) -> ValidationResult: # ValidationResult with action to take
    "Validate if resources are available to run a job with the specified plugin. This function is dependency-injected with helper functions to avoid tight coupling with specific plugin registry implementations."
def validation_result_to_error(
    result: ValidationResult,  # Validation result to convert
    plugin_id: Optional[str] = None,  # Plugin ID for error context
    job_id: Optional[str] = None,  # Job ID for error context
    worker_pid: Optional[int] = None,  # Worker PID for error context
    **extra_context  # Additional context fields
) -> Optional[Exception]:  # Structured error based on validation action, or None if no error needed
    "Convert a ValidationResult into a structured error. Requires cjm-error-handling library."

Classes

class ValidationAction(Enum):
    "Actions that can be taken based on validation results."
@dataclass
class ValidationResult:
    "Result of resource validation."
    
    action: ValidationAction
    can_proceed: bool
    message: str
    conflict: Optional[ResourceConflict]
    current_worker: Optional[WorkerState]
    plugin_name_to_reload: Optional[str]  # Plugin name to reload
    new_config: Optional[Dict[str, Any]]  # Config for reload