cjm-fasthtml-resources
Resource monitoring and management system for tracking GPU/CPU usage and worker processes in FastHTML applications.
Install
pip install cjm_fasthtml_resourcesProject Structure
nbs/
├── core/ (4)
│ ├── management_config.ipynb # Configuration schema for resource management settings including GPU thresholds
│ ├── manager.ipynb # Track worker processes and detect resource conflicts for GPU/CPU usage
│ ├── monitoring_config.ipynb # Configuration schema for resource monitoring refresh intervals and SSE settings
│ └── validation.ipynb # Validate resource availability before job execution and determine appropriate actions
└── utils/ (2)
├── plugin_utils.ipynb # Utilities for analyzing plugin configurations and resource requirements
└── route_helpers.ipynb # Helper utilities for resource monitoring route handlers
Total: 6 notebooks across 2 directories
Module Dependencies
graph LR
core_management_config[core.management_config<br/>Management Configuration]
core_manager[core.manager<br/>Resource Manager]
core_monitoring_config[core.monitoring_config<br/>Monitoring Configuration]
core_validation[core.validation<br/>Resource Validation]
utils_plugin_utils[utils.plugin_utils<br/>Plugin Resource Utilities]
utils_route_helpers[utils.route_helpers<br/>Route Helpers]
core_validation --> core_manager
utils_plugin_utils --> core_manager
2 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Management Configuration (management_config.ipynb)
Configuration schema for resource management settings including GPU thresholds
Import
from cjm_fasthtml_resources.core.management_config import (
RESOURCE_MANAGEMENT_SCHEMA
)Variables
RESOURCE_MANAGEMENT_SCHEMA = {7 items}Resource Manager (manager.ipynb)
Track worker processes and detect resource conflicts for GPU/CPU usage
Import
from cjm_fasthtml_resources.core.manager import (
PLUGIN_RESOURCE_CONFIG_KEYS,
ResourceType,
ResourceStatus,
ResourceConflict,
WorkerState,
ResourceManager
)Classes
class ResourceType(Enum):
"Types of resources to monitor."class ResourceStatus(Enum):
"Status of resource availability."@dataclass
class ResourceConflict:
"Information about a resource conflict."
resource_type: ResourceType
status: ResourceStatus
app_pids: List[int] # PIDs from this application using the resource
external_pids: List[int] # External PIDs using the resource
app_processes: List[Dict[str, Any]] # Detailed info about app processes
external_processes: List[Dict[str, Any]] # Detailed info about external processes@dataclass
class WorkerState:
"""
State of a worker process.
Enhanced to support lifecycle-aware and cloud-aware plugins from cjm-fasthtml-plugins.
"""
pid: int
worker_type: str # e.g., "transcription", "llm"
job_id: Optional[str]
plugin_id: Optional[str]
plugin_name: Optional[str]
loaded_plugin_resource: Optional[str] # The plugin resource identifier currently loaded
config: Optional[Dict[str, Any]] # Current plugin configuration
status: str = 'idle' # idle, running, busy
execution_mode: Optional[str] # Execution mode (in_process, subprocess, cloud_gpu, etc.)
child_pids: List[int] = field(...) # PIDs of child processes
container_id: Optional[str] # Docker container ID if applicable
conda_env: Optional[str] # Conda environment name if applicable
is_remote: bool = False # Whether this worker uses remote/cloud resources
remote_resource: Optional[Dict[str, Any]] # Remote resource info (serialized RemoteResourceInfo)class ResourceManager:
def __init__(
self,
gpu_memory_threshold_percent:float=45.0 # GPU memory usage threshold; external processes using more than this percentage are considered conflicts
)
"Manages resource tracking and conflict detection for the application. Tracks PIDs associated with application workers (transcription, LLM, etc.) and provides methods to check resource availability and conflicts. Enhanced to support lifecycle-aware and cloud-aware plugins from cjm-fasthtml-plugins."
def __init__(
self,
gpu_memory_threshold_percent:float=45.0 # GPU memory usage threshold; external processes using more than this percentage are considered conflicts
)
"Initialize the resource manager."
def register_worker(
self,
pid:int, # Process ID of the worker
worker_type:str, # Type of worker (e.g., "transcription", "llm")
job_id:Optional[str]=None, # Optional job ID if worker is processing a job
plugin_id:Optional[str]=None, # Optional plugin unique ID
plugin_name:Optional[str]=None, # Optional plugin name
loaded_plugin_resource:Optional[str]=None, # Optional identifier of the loaded plugin resource
config:Optional[Dict[str, Any]]=None, # Optional plugin configuration
plugin_instance:Optional[Any]=None # Optional plugin instance for lifecycle/cloud protocol detection
)
"Register a worker process with the resource manager."
def get_all_related_pids(
self,
parent_pid:int # Parent worker PID
) -> List[int]: # List of all PIDs (parent + children)
"Get parent PID and all child PIDs managed by this worker."
def update_worker_state(
self,
pid:int, # Process ID of the worker
job_id:Optional[str]=None, # Optional job ID to update
plugin_id:Optional[str]=None, # Optional plugin ID to update
plugin_name:Optional[str]=None, # Optional plugin name to update
loaded_plugin_resource:Optional[str]=None, # Optional loaded plugin resource to update
config:Optional[Dict[str, Any]]=None, # Optional config to update
status:Optional[str]=None # Optional status to update
)
"Update the state of a registered worker."
def unregister_worker(
self,
pid:int # Process ID of the worker to unregister
)
"Unregister a worker process."
def get_worker_by_pid(
self,
pid:int # Process ID
) -> Optional[WorkerState]: # Worker state or None
"Get worker state by PID."
def get_worker_by_job(
self,
job_id:str # Job ID
) -> Optional[WorkerState]: # Worker state or None
"Get worker state by job ID."
def get_all_workers(self) -> List[WorkerState]: # List of all registered workers
"""Get all registered workers."""
return list(self._worker_states.values())
def get_app_pids(self) -> Set[int]: # Set of all PIDs managed by this application (parents only)
"Get all registered workers."
def get_app_pids(self) -> Set[int]: # Set of all PIDs managed by this application (parents only)
"""Get all PIDs managed by this application (parents only)."""
return set(self._worker_states.keys())
def get_all_app_pids_including_children(self) -> Set[int]: # Set of all PIDs (parents and children)
"Get all PIDs managed by this application (parents only)."
def get_all_app_pids_including_children(self) -> Set[int]: # Set of all PIDs (parents and children)
"""Get all PIDs managed by this application including child processes."""
all_pids = set(self._worker_states.keys())
for worker in self._worker_states.values()
"Get all PIDs managed by this application including child processes."
def get_workers_by_type(
self,
worker_type:str # Type of worker (e.g., "transcription", "llm", "ollama")
) -> List[WorkerState]: # List of workers matching the type
"Get all workers of a specific type."
def get_active_worker_types(self) -> Set[str]: # Set of worker type strings
"""Get set of all active worker types."""
return {w.worker_type for w in self._worker_states.values()}
def has_worker_type(
self,
worker_type:str # Type of worker to check
) -> bool: # True if at least one worker of this type exists
"Get set of all active worker types."
def has_worker_type(
self,
worker_type:str # Type of worker to check
) -> bool: # True if at least one worker of this type exists
"Check if a worker of the specified type exists."
def get_cloud_workers(self) -> List[WorkerState]: # List of workers with is_remote=True
"""Get all workers using cloud/remote resources."""
return [w for w in self._worker_states.values() if w.is_remote]
def estimate_total_cloud_cost(
self,
duration_hours:float=1.0 # Duration to estimate for
) -> float: # Total estimated cost in USD
"Get all workers using cloud/remote resources."
def estimate_total_cloud_cost(
self,
duration_hours:float=1.0 # Duration to estimate for
) -> float: # Total estimated cost in USD
"Estimate total cost of all running cloud resources."
def check_gpu_availability(self) -> ResourceConflict: # ResourceConflict with details about GPU usage
"""Check GPU availability and identify conflicts. Uses configurable GPU memory threshold to determine if external processes are using significant GPU resources. Enhanced to detect child processes from lifecycle-aware plugins."""
try
"Check GPU availability and identify conflicts. Uses configurable GPU memory threshold to determine if external processes are using significant GPU resources. Enhanced to detect child processes from lifecycle-aware plugins."
def check_memory_availability(
self,
threshold_percent:float=90.0 # Memory usage threshold to consider as conflict
) -> ResourceConflict: # ResourceConflict with details about memory usage
"Check system memory availability."Variables
PLUGIN_RESOURCE_CONFIG_KEYS = [5 items]Monitoring Configuration (monitoring_config.ipynb)
Configuration schema for resource monitoring refresh intervals and SSE settings
Import
from cjm_fasthtml_resources.core.monitoring_config import (
RESOURCE_MONITOR_SCHEMA,
LAST_UPDATE_TIMES,
SSE_CONFIG
)Variables
RESOURCE_MONITOR_SCHEMA = {7 items}
LAST_UPDATE_TIMES = {7 items}
SSE_CONFIG = {3 items}Plugin Resource Utilities (plugin_utils.ipynb)
Utilities for analyzing plugin configurations and resource requirements
Import
from cjm_fasthtml_resources.utils.plugin_utils import (
is_local_plugin,
uses_gpu_device,
get_plugin_resource_identifier,
compare_plugin_resources,
get_plugin_resource_requirements
)Functions
def is_local_plugin(
plugin_meta # Plugin metadata with config_schema attribute
) -> bool: # True if plugin is local, False if API-based
"Check if a plugin is local (vs API-based)."def uses_gpu_device(
"Check if a plugin is configured to use GPU."def get_plugin_resource_identifier(
"Extract the plugin resource identifier from plugin configuration. Checks common plugin resource configuration keys like 'resource_id', 'model_id', 'model', 'model_name', etc."def compare_plugin_resources(
config1:Dict[str, Any], # First plugin configuration
config2:Dict[str, Any] # Second plugin configuration
) -> bool: # True if both configs specify the same plugin resource, False otherwise
"Compare two plugin configurations to see if they use the same plugin resource."def get_plugin_resource_requirements(
plugin_id:str, # Unique plugin ID
plugin_registry, # Plugin registry instance with get_plugin, load_plugin_config methods
plugin_config:Optional[Dict[str, Any]]=None # Optional plugin configuration
) -> Dict[str, Any]: # Dictionary with resource requirement information (is_local, uses_gpu, plugin_resource, device)
"Get resource requirements for a plugin."Route Helpers (route_helpers.ipynb)
Helper utilities for resource monitoring route handlers
Import
from cjm_fasthtml_resources.utils.route_helpers import (
wrap_card_in_container,
create_card_update
)Functions
def wrap_card_in_container(
content, # Card content to wrap
html_id, # HTML ID for the container
card_cls=None, # Card class (optional, can be provided via DaisyUI)
bg_cls=None, # Background class (optional, can be provided via DaisyUI)
shadow_cls=None, # Shadow class (optional, can be provided via Tailwind)
**kwargs # Additional attributes for the Div
): # Wrapped card container (Div)
"Wrap card content in a Div container with standard styling. This consolidates the common pattern of wrapping monitoring cards in styled containers."def create_card_update(
render_fn:Callable, # Function to render the card
info:Dict[str, Any], # Info dictionary to pass to render function
target_id:str, # Target HTML ID for the swap
swap_type:str="outerHTML" # Type of swap
): # OOB swap element
"Create an OOB swap update for a card. This consolidates the pattern of creating OOB swaps for card updates in SSE streaming."Resource Validation (validation.ipynb)
Validate resource availability before job execution and determine appropriate actions
Import
from cjm_fasthtml_resources.core.validation import (
ValidationAction,
ValidationResult,
validate_resources_for_job,
validation_result_to_error
)Functions
def validate_resources_for_job(
resource_manager, # ResourceManager instance
plugin_registry, # Plugin registry protocol (has get_plugin, load_plugin_config methods)
get_plugin_resource_requirements, # Function: (plugin_id, config) -> Dict with requirements
compare_plugin_resources, # Function: (config1, config2) -> bool (same resource?)
get_plugin_resource_identifier, # Function: (config) -> str (resource ID)
plugin_id:str, # Unique plugin ID
plugin_config:Optional[Dict[str, Any]]=None, # Plugin configuration (will load if not provided)
worker_pid:Optional[int]=None, # PID of the worker that will run the job (if known)
worker_type:str="transcription", # Type of worker (e.g., "transcription", "llm", "ollama")
verbose:bool=False # Whether to print verbose logging
) -> ValidationResult: # ValidationResult with action to take
"Validate if resources are available to run a job with the specified plugin. This function is dependency-injected with helper functions to avoid tight coupling with specific plugin registry implementations."def validation_result_to_error(
result: ValidationResult, # Validation result to convert
plugin_id: Optional[str] = None, # Plugin ID for error context
job_id: Optional[str] = None, # Job ID for error context
worker_pid: Optional[int] = None, # Worker PID for error context
**extra_context # Additional context fields
) -> Optional[Exception]: # Structured error based on validation action, or None if no error needed
"Convert a ValidationResult into a structured error. Requires cjm-error-handling library."Classes
class ValidationAction(Enum):
"Actions that can be taken based on validation results."@dataclass
class ValidationResult:
"Result of resource validation."
action: ValidationAction
can_proceed: bool
message: str
conflict: Optional[ResourceConflict]
current_worker: Optional[WorkerState]
plugin_name_to_reload: Optional[str] # Plugin name to reload
new_config: Optional[Dict[str, Any]] # Config for reload