cjm-fasthtml-workers
Background worker system for FastHTML with multiprocess job execution, cancellation support, and streaming capabilities.
Install
pip install cjm_fasthtml_workersProject Structure
nbs/
├── core/ (4)
│ ├── adapters.ipynb # Adapter utilities for making plugin managers compatible with the worker system.
│ ├── config.ipynb # Configuration for worker processes including restart policies, timeouts, and queue sizes.
│ ├── protocol.ipynb # Protocol definitions for worker communication and plugin manager integration.
│ └── worker.ipynb # Generic worker process for executing plugin-based jobs in isolated subprocesses.
├── extensions/ (2)
│ ├── adapters.ipynb # Common adapters for integrating popular FastHTML libraries with the worker system.
│ └── protocols.ipynb # Optional integration protocols for plugin registries, resource management, and event broadcasting.
└── managers/ (1)
└── base.ipynb # Abstract base class for managing background jobs with worker processes.
Total: 7 notebooks across 3 directories
Module Dependencies
graph LR
core_adapters[core.adapters<br/>adapters]
core_config[core.config<br/>config]
core_protocol[core.protocol<br/>protocol]
core_worker[core.worker<br/>worker]
extensions_adapters[extensions.adapters<br/>adapters]
extensions_protocols[extensions.protocols<br/>protocols]
managers_base[managers.base<br/>base]
core_adapters --> core_protocol
core_worker --> core_protocol
extensions_adapters --> extensions_protocols
managers_base --> core_protocol
managers_base --> extensions_protocols
managers_base --> core_config
6 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
adapters (adapters.ipynb)
Adapter utilities for making plugin managers compatible with the worker system.
Import
from cjm_fasthtml_workers.core.adapters import (
create_simple_adapter,
default_result_adapter
)Functions
def create_simple_adapter(
plugin_manager:Any, # The plugin manager instance to adapt
result_adapter:Optional[callable]=None # Optional function to convert plugin results to dict
) -> PluginManagerAdapter: # Adapter that satisfies PluginManagerAdapter protocol
"Create a simple adapter for a plugin manager."def default_result_adapter(
result:Any # Plugin execution result
) -> Dict[str, Any]: # Dictionary with text and metadata
"Default adapter for converting plugin results to dictionaries."adapters (adapters.ipynb)
Common adapters for integrating popular FastHTML libraries with the worker system.
Import
from cjm_fasthtml_workers.extensions.adapters import (
UnifiedPluginRegistryAdapter,
ResourceManagerAdapter,
SSEBroadcasterAdapter,
create_standard_adapters
)Functions
def create_standard_adapters(
plugin_registry=None, # UnifiedPluginRegistry instance (optional)
resource_manager=None, # ResourceManager instance (optional)
sse_manager=None # SSEBroadcastManager instance (optional)
) -> tuple: # (plugin_adapter, resource_adapter, sse_adapter)
"Create standard adapters for common FastHTML libraries."Classes
class UnifiedPluginRegistryAdapter:
def __init__(
self,
registry # UnifiedPluginRegistry from cjm-fasthtml-plugins
)
"Adapter for cjm-fasthtml-plugins UnifiedPluginRegistry."
def __init__(
self,
registry # UnifiedPluginRegistry from cjm-fasthtml-plugins
)
"Initialize adapter with UnifiedPluginRegistry instance."
def get_plugins_by_category(self, category) -> list:
"""Get all plugins in a specific category."""
return self._registry.get_plugins_by_category(category)
def get_plugin(self, plugin_id: str)
"Get all plugins in a specific category."
def get_plugin(self, plugin_id: str):
"""Get a specific plugin by ID."""
return self._registry.get_plugin(plugin_id)
def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
"Get a specific plugin by ID."
def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
"Load configuration for a plugin."class ResourceManagerAdapter:
def __init__(
self,
resource_manager # ResourceManager from cjm-fasthtml-resources
)
"Adapter for cjm-fasthtml-resources ResourceManager."
def __init__(
self,
resource_manager # ResourceManager from cjm-fasthtml-resources
)
"Initialize adapter with ResourceManager instance."
def register_worker(self, pid: int, worker_type: str) -> None:
"""Register a new worker process."""
self._resource_manager.register_worker(pid, worker_type)
def unregister_worker(self, pid: int) -> None
"Register a new worker process."
def unregister_worker(self, pid: int) -> None:
"""Unregister a worker process."""
self._resource_manager.unregister_worker(pid)
def update_worker_state(
self,
pid: int,
status: Optional[str] = None,
job_id: Optional[str] = None,
plugin_name: Optional[str] = None,
plugin_id: Optional[str] = None,
loaded_plugin_resource: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None
"Unregister a worker process."
def update_worker_state(
self,
pid: int,
status: Optional[str] = None,
job_id: Optional[str] = None,
plugin_name: Optional[str] = None,
plugin_id: Optional[str] = None,
loaded_plugin_resource: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None
"Update worker state information."
def check_gpu_availability(self):
"""Check GPU availability and identify conflicts."""
return self._resource_manager.check_gpu_availability()
def get_worker_by_pid(self, pid: int)
"Check GPU availability and identify conflicts."
def get_worker_by_pid(self, pid: int)
"Get worker state by PID."class SSEBroadcasterAdapter:
def __init__(
self,
sse_manager # SSEBroadcastManager from cjm-fasthtml-sse
)
"Adapter for cjm-fasthtml-sse SSEBroadcastManager."
def __init__(
self,
sse_manager # SSEBroadcastManager from cjm-fasthtml-sse
)
"Initialize adapter with SSE broadcast manager."
async def broadcast(
self,
event_type: str, # Event type identifier (e.g., 'transcription:started')
data: Dict[str, Any] # Event data payload
) -> None
"Broadcast an event to all connected SSE clients."base (base.ipynb)
Abstract base class for managing background jobs with worker processes.
Import
from cjm_fasthtml_workers.managers.base import (
JobType,
BaseJob,
BaseJobManager
)Functions
@patch
@abstractmethod
def create_job(
self:BaseJobManager,
plugin_id:str, # Plugin unique identifier
**kwargs # Domain-specific job parameters
) -> JobType: # Created job instance
"Factory method for creating domain-specific jobs."@patch
@abstractmethod
def get_worker_entry_point(
self:BaseJobManager
) -> Callable: # Worker process entry point function
"Return the worker process function for this manager."@patch
@abstractmethod
def prepare_execute_request(
self:BaseJobManager,
job:JobType # The job to prepare for execution
) -> Dict[str, Any]: # Dictionary of parameters for the worker execute request
"Convert job to worker execute request parameters."@patch
@abstractmethod
def extract_job_result(
self:BaseJobManager,
job:JobType, # The job that was executed
result_data:Dict[str, Any] # Raw result data from worker
) -> Dict[str, Any]: # Formatted result for storage
"Extract and format job result from worker response."@patch
def _extract_plugin_resource_identifier(
self:BaseJobManager,
config:Dict[str, Any] # Plugin configuration dictionary
) -> str: # Plugin resource identifier string
"Extract plugin resource identifier from plugin configuration."@patch
async def _validate_resources(
self:BaseJobManager,
plugin_id:str, # Plugin unique identifier
plugin_config:Dict[str, Any] # Plugin configuration
) -> Optional[str]: # Error message if validation fails, None if successful
"Validate resources before starting a job."@patch
def _on_job_completed(
self:BaseJobManager,
job_id:str # ID of the completed job
) -> None
"Hook called when a job completes successfully."@patch
def _start_worker(self:BaseJobManager):
"""Start the worker process and result monitor."""
if self.worker_process and self.worker_process.is_alive()
"Start the worker process and result monitor."@patch
def _init_worker(self:BaseJobManager):
"""Send initialization message to worker with plugin configurations."""
# Only send plugin configs if plugin registry is available
plugin_configs = {}
if self.plugin_registry
"Send initialization message to worker with plugin configurations."@patch
def _restart_worker(self:BaseJobManager):
"""Restart the worker process after an error or cancellation."""
# Track restart
self.restart_count += 1
self.last_restart_time = time.time()
# Stop result monitor thread FIRST (before cleaning up queues)
# The monitor thread holds references to the queues, so we must stop it first
self.monitor_running = False
if self.result_monitor_thread
"Restart the worker process after an error or cancellation."@patch
def _monitor_results(self:BaseJobManager):
"""Monitor the result queue in a background thread."""
while self.monitor_running
"Monitor the result queue in a background thread."@patch
def _handle_job_result(
self:BaseJobManager,
result:Dict[str, Any] # Result data from worker
)
"Handle a job result from the worker."@patch
def _handle_stream_chunk(
self:BaseJobManager,
chunk_data:Dict[str, Any] # Chunk data from worker
)
"Handle a streaming chunk from the worker."@patch
def _handle_worker_error(self:BaseJobManager):
"""Handle worker fatal error based on restart policy."""
policy = self.worker_config.restart_policy
if policy == RestartPolicy.NEVER
"Handle worker fatal error based on restart policy."@patch
def get_plugin_name(
self:BaseJobManager,
plugin_id:str # Plugin unique identifier
) -> Optional[str]: # Plugin name or None
"Get plugin name from plugin ID (requires plugin registry)."@patch
async def unload_plugin(
self:BaseJobManager,
plugin_name:str # Name of the plugin to unload
) -> bool: # True if successful, False otherwise
"Unload a plugin from the worker to free resources."@patch
async def reload_plugin(
self:BaseJobManager,
plugin_name:str, # Name of the plugin to reload
config:Dict[str, Any] # New configuration
) -> bool: # True if successful, False otherwise
"Reload a plugin with new configuration."@patch
async def start_job(
self:BaseJobManager,
plugin_id:str, # Plugin unique identifier
**kwargs # Domain-specific job parameters
) -> JobType: # Created and started job
"Start a new job."@patch
async def cancel_job(
self:BaseJobManager,
job_id:str # ID of the job to cancel
) -> bool: # True if cancellation successful
"Cancel a running job by terminating the worker process."@patch
def get_job(
self:BaseJobManager,
job_id:str # Unique job identifier
) -> Optional[JobType]: # Job object or None
"Get a job by ID."@patch
def get_all_jobs(
self:BaseJobManager
) -> List[JobType]: # List of all jobs
"Get all jobs."@patch
def get_job_result(
self:BaseJobManager,
job_id:str # Unique job identifier
) -> Optional[Dict[str, Any]]: # Job result or None
"Get job result."@patch
def clear_completed_jobs(
self:BaseJobManager
) -> int: # Number of jobs cleared
"Clear completed, failed, and cancelled jobs."@patch
async def broadcast_event(
self:BaseJobManager,
event_type:str, # Event type identifier
data:Dict[str, Any] # Event data payload
)
"Broadcast an event to all connected SSE clients (requires event broadcaster)."@patch
def check_streaming_support(
self:BaseJobManager,
plugin_id:str # Plugin unique identifier
) -> bool: # True if streaming supported
"Check if a plugin supports streaming."@patch
def shutdown(self:BaseJobManager):
"""Shutdown the manager and cleanup resources."""
# Stop result monitor
self.monitor_running = False
if self.result_monitor_thread
"Shutdown the manager and cleanup resources."Classes
@dataclass
class BaseJob:
"Base class for all job types."
id: str # Unique job identifier
plugin_id: str # Plugin identifier for this job
status: str = 'pending' # Job status: pending, running, completed, failed, cancelled
created_at: str = field(...) # ISO format timestamp
started_at: Optional[str] # When job started executing
completed_at: Optional[str] # When job finished
result: Optional[Dict[str, Any]] # Job result data
error: Optional[str] # Error message if failed
metadata: Dict[str, Any] = field(...) # Additional job metadata
worker_pid: Optional[int] # Process ID of worker handling this jobclass BaseJobManager:
def __init__(
self,
worker_type:str, # Type identifier (e.g., "transcription", "llm", "image-gen")
category:Any, # Plugin category this manager handles
supports_streaming:bool=False, # Whether this manager supports streaming jobs
worker_config:Optional[WorkerConfig]=None, # Worker configuration (uses defaults if None)
plugin_registry:Optional[PluginRegistryProtocol]=None, # Optional plugin registry integration
resource_manager:Optional[ResourceManagerProtocol]=None, # Optional resource manager integration
event_broadcaster:Optional[EventBroadcasterProtocol]=None, # Optional SSE event broadcaster
)
"Abstract base class for managing jobs using worker processes."
def __init__(
self,
worker_type:str, # Type identifier (e.g., "transcription", "llm", "image-gen")
category:Any, # Plugin category this manager handles
supports_streaming:bool=False, # Whether this manager supports streaming jobs
worker_config:Optional[WorkerConfig]=None, # Worker configuration (uses defaults if None)
plugin_registry:Optional[PluginRegistryProtocol]=None, # Optional plugin registry integration
resource_manager:Optional[ResourceManagerProtocol]=None, # Optional resource manager integration
event_broadcaster:Optional[EventBroadcasterProtocol]=None, # Optional SSE event broadcaster
)
"Initialize the job manager."config (config.ipynb)
Configuration for worker processes including restart policies, timeouts, and queue sizes.
Import
from cjm_fasthtml_workers.core.config import (
RestartPolicy,
WorkerConfig
)Classes
class RestartPolicy(Enum):
"Policy for restarting worker processes after failures."@dataclass
class WorkerConfig:
"Configuration for worker process behavior."
request_queue_size: int = 0 # 0 = unlimited
result_queue_size: int = 100 # Larger for streaming results
response_queue_size: int = 10 # For synchronous command responses
restart_policy: RestartPolicy = RestartPolicy.ON_CANCELLATION
max_restart_attempts: int = 3
restart_backoff_base_seconds: float = 1.0 # Base delay for exponential backoff
restart_backoff_max_seconds: float = 60.0 # Max delay for backoff
max_workers: int = 1 # Currently only 1 is supported
worker_start_timeout_seconds: float = 30.0
reload_timeout_seconds: float = 30.0
unload_timeout_seconds: float = 10.0
shutdown_timeout_seconds: float = 5.0
result_monitor_poll_interval_seconds: float = 0.5
protocol (protocol.ipynb)
Protocol definitions for worker communication and plugin manager integration.
Import
from cjm_fasthtml_workers.core.protocol import (
WorkerRequestType,
WorkerResponseType,
WorkerRequest,
WorkerResponse,
WorkerStreamChunk,
WorkerResult,
PluginManagerAdapter
)Classes
class WorkerRequestType(Enum):
"Types of requests sent to worker process."class WorkerResponseType(Enum):
"Types of responses from worker process."@dataclass
class WorkerRequest:
"Base structure for worker requests."
type: WorkerRequestType
data: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
return {
'type': self.type.value,
"Convert to dictionary for queue serialization."
def from_dict(cls, data: Dict[str, Any]) -> 'WorkerRequest':
"""Create from dictionary received from queue."""
req_type = WorkerRequestType(data['type'])
request_data = {k: v for k, v in data.items() if k != 'type'}
"Create from dictionary received from queue."@dataclass
class WorkerResponse:
"Base structure for worker responses."
type: WorkerResponseType
data: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
return {
'type': self.type.value,
"Convert to dictionary for queue serialization."
def from_dict(cls, data: Dict[str, Any]) -> 'WorkerResponse':
"""Create from dictionary received from queue."""
resp_type = WorkerResponseType(data['type'])
response_data = {k: v for k, v in data.items() if k != 'type'}
"Create from dictionary received from queue."@dataclass
class WorkerStreamChunk:
"Structure for streaming job results."
job_id: str # Unique identifier for the job
chunk: str # Text chunk from streaming output
is_final: bool = False # Whether this is the final chunk
metadata: Optional[Dict[str, Any]] # Optional metadata
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
return {
'type': WorkerResponseType.STREAM_CHUNK.value,
"Convert to dictionary for queue serialization."@dataclass
class WorkerResult:
"Structure for job execution results."
job_id: str # Unique identifier for the job
status: str # 'success' or 'error'
data: Optional[Dict[str, Any]] # Result data on success
error: Optional[str] # Error message on failure
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
result = {
'type': WorkerResponseType.RESULT.value,
"Convert to dictionary for queue serialization."class PluginManagerAdapter(Protocol):
"""
Protocol that plugin managers must satisfy for worker integration.
Uses structural subtyping (duck typing) - plugin managers don't need to
explicitly inherit from this, they just need to implement these methods.
"""
def discover_plugins(self) -> list: # List of plugin metadata/data objects
"""Discover available plugins."""
...
def load_plugin(
self,
plugin_data:Any, # Plugin metadata/data from discovery
config:Dict[str, Any] # Plugin configuration dictionary
) -> None
"Discover available plugins."
def load_plugin(
self,
plugin_data:Any, # Plugin metadata/data from discovery
config:Dict[str, Any] # Plugin configuration dictionary
) -> None
"Load a plugin with configuration."
def execute_plugin(
self,
plugin_name:str, # Name of the plugin to execute
**params # Plugin-specific parameters
) -> Any: # Plugin execution result
"Execute a plugin with given parameters."
def execute_plugin_stream(
self,
plugin_name:str, # Name of the plugin to execute
**params # Plugin-specific parameters
) -> Iterator[str]: # String chunks from plugin execution
"Execute a plugin with streaming output."
def reload_plugin(
self,
plugin_name:str, # Name of the plugin to reload
config:Optional[Dict[str, Any]]=None # New configuration (None to unload)
) -> None
"Reload a plugin with new configuration."
def unload_plugin(
self,
plugin_name:str # Name of the plugin to unload
) -> None
"Unload a plugin to free resources."
def check_streaming_support(
self,
plugin_name:str # Name of the plugin to check
) -> bool: # True if plugin supports streaming
"Check if a plugin supports streaming execution."protocols (protocols.ipynb)
Optional integration protocols for plugin registries, resource management, and event broadcasting.
Import
from cjm_fasthtml_workers.extensions.protocols import (
PluginRegistryProtocol,
ResourceManagerProtocol,
EventBroadcasterProtocol
)Classes
class PluginRegistryProtocol(Protocol):
"Protocol for plugin registry integration."
def get_plugins_by_category(
self,
category:Any # Plugin category (can be enum, string, etc.)
) -> list: # List of plugin metadata objects
"Get all plugins in a specific category."
def get_plugin(
self,
plugin_id:str # Unique plugin identifier
) -> Any: # Plugin metadata object or None
"Get a specific plugin by ID."
def load_plugin_config(
self,
plugin_id:str # Unique plugin identifier
) -> Dict[str, Any]: # Plugin configuration dictionary
"Load configuration for a plugin."class ResourceManagerProtocol(Protocol):
"Protocol for resource management integration."
def register_worker(
self,
pid:int, # Worker process ID
worker_type:str # Type of worker (e.g., 'transcription', 'llm')
) -> None
"Register a new worker process."
def unregister_worker(
self,
pid:int # Process ID of the worker to unregister
) -> None
"Unregister a worker process."
def update_worker_state(
self,
pid:int, # Worker process ID
status:Optional[str]=None, # Worker status: 'idle', 'running', etc.
job_id:Optional[str]=None, # Current job ID (None if idle)
plugin_name:Optional[str]=None, # Currently loaded plugin name
plugin_id:Optional[str]=None, # Currently loaded plugin ID
loaded_plugin_resource:Optional[str]=None, # Currently loaded plugin resource identifier
config:Optional[Dict[str, Any]]=None, # Current plugin configuration
) -> None
"Update worker state information."
def check_gpu_availability(
self
) -> Any: # Returns ResourceConflict object
"Check GPU availability and identify conflicts."
def get_worker_by_pid(
self,
pid:int # Worker process ID
) -> Optional[Any]: # Returns WorkerState object or None
"Get worker state by PID."class EventBroadcasterProtocol(Protocol):
"Protocol for SSE event broadcasting."
async def broadcast(
self,
event_type:str, # Event type identifier
data:Dict[str, Any] # Event data payload
) -> None
"Broadcast an event to all connected clients."worker (worker.ipynb)
Generic worker process for executing plugin-based jobs in isolated subprocesses.
Import
from cjm_fasthtml_workers.core.worker import (
base_worker_process
)Functions
def base_worker_process(
request_queue:multiprocessing.Queue, # Queue for receiving job requests from parent
result_queue:multiprocessing.Queue, # Queue for sending job results back to parent
response_queue:multiprocessing.Queue, # Queue for sending command responses back to parent
plugin_manager_factory:Callable[[], PluginManagerAdapter], # Factory function that creates a plugin manager instance
result_adapter:Optional[Callable[[Any], Dict[str, Any]]]=None, # Optional function to adapt plugin results to dict format
supports_streaming:bool=False # Whether this worker supports streaming execution
)
"Generic long-lived worker process that handles job execution."