cjm-fasthtml-workers

Background worker system for FastHTML with multiprocess job execution, cancellation support, and streaming capabilities.

Install

pip install cjm_fasthtml_workers

Project Structure

nbs/
├── core/ (4)
│   ├── adapters.ipynb  # Adapter utilities for making plugin managers compatible with the worker system.
│   ├── config.ipynb    # Configuration for worker processes including restart policies, timeouts, and queue sizes.
│   ├── protocol.ipynb  # Protocol definitions for worker communication and plugin manager integration.
│   └── worker.ipynb    # Generic worker process for executing plugin-based jobs in isolated subprocesses.
├── extensions/ (2)
│   ├── adapters.ipynb   # Common adapters for integrating popular FastHTML libraries with the worker system.
│   └── protocols.ipynb  # Optional integration protocols for plugin registries, resource management, and event broadcasting.
└── managers/ (1)
    └── base.ipynb  # Abstract base class for managing background jobs with worker processes.

Total: 7 notebooks across 3 directories

Module Dependencies

graph LR
    core_adapters[core.adapters<br/>adapters]
    core_config[core.config<br/>config]
    core_protocol[core.protocol<br/>protocol]
    core_worker[core.worker<br/>worker]
    extensions_adapters[extensions.adapters<br/>adapters]
    extensions_protocols[extensions.protocols<br/>protocols]
    managers_base[managers.base<br/>base]

    core_adapters --> core_protocol
    core_worker --> core_protocol
    extensions_adapters --> extensions_protocols
    managers_base --> core_protocol
    managers_base --> extensions_protocols
    managers_base --> core_config

6 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

adapters (adapters.ipynb)

Adapter utilities for making plugin managers compatible with the worker system.

Import

from cjm_fasthtml_workers.core.adapters import (
    create_simple_adapter,
    default_result_adapter
)

Functions

def create_simple_adapter(
    plugin_manager:Any,  # The plugin manager instance to adapt
    result_adapter:Optional[callable]=None  # Optional function to convert plugin results to dict
) -> PluginManagerAdapter:  # Adapter that satisfies PluginManagerAdapter protocol
    "Create a simple adapter for a plugin manager."
def default_result_adapter(
    result:Any  # Plugin execution result
) -> Dict[str, Any]:  # Dictionary with text and metadata
    "Default adapter for converting plugin results to dictionaries."

adapters (adapters.ipynb)

Common adapters for integrating popular FastHTML libraries with the worker system.

Import

from cjm_fasthtml_workers.extensions.adapters import (
    UnifiedPluginRegistryAdapter,
    ResourceManagerAdapter,
    SSEBroadcasterAdapter,
    create_standard_adapters
)

Functions

def create_standard_adapters(
    plugin_registry=None,  # UnifiedPluginRegistry instance (optional)
    resource_manager=None,  # ResourceManager instance (optional)
    sse_manager=None  # SSEBroadcastManager instance (optional)
) -> tuple:  # (plugin_adapter, resource_adapter, sse_adapter)
    "Create standard adapters for common FastHTML libraries."

Classes

class UnifiedPluginRegistryAdapter:
    def __init__(
        self,
        registry  # UnifiedPluginRegistry from cjm-fasthtml-plugins
    )
    "Adapter for cjm-fasthtml-plugins UnifiedPluginRegistry."
    
    def __init__(
            self,
            registry  # UnifiedPluginRegistry from cjm-fasthtml-plugins
        )
        "Initialize adapter with UnifiedPluginRegistry instance."
    
    def get_plugins_by_category(self, category) -> list:
            """Get all plugins in a specific category."""
            return self._registry.get_plugins_by_category(category)
    
        def get_plugin(self, plugin_id: str)
        "Get all plugins in a specific category."
    
    def get_plugin(self, plugin_id: str):
            """Get a specific plugin by ID."""
            return self._registry.get_plugin(plugin_id)
    
        def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
        "Get a specific plugin by ID."
    
    def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
        "Load configuration for a plugin."
class ResourceManagerAdapter:
    def __init__(
        self,
        resource_manager  # ResourceManager from cjm-fasthtml-resources
    )
    "Adapter for cjm-fasthtml-resources ResourceManager."
    
    def __init__(
            self,
            resource_manager  # ResourceManager from cjm-fasthtml-resources
        )
        "Initialize adapter with ResourceManager instance."
    
    def register_worker(self, pid: int, worker_type: str) -> None:
            """Register a new worker process."""
            self._resource_manager.register_worker(pid, worker_type)
    
        def unregister_worker(self, pid: int) -> None
        "Register a new worker process."
    
    def unregister_worker(self, pid: int) -> None:
            """Unregister a worker process."""
            self._resource_manager.unregister_worker(pid)
    
        def update_worker_state(
            self,
            pid: int,
            status: Optional[str] = None,
            job_id: Optional[str] = None,
            plugin_name: Optional[str] = None,
            plugin_id: Optional[str] = None,
            loaded_plugin_resource: Optional[str] = None,
            config: Optional[Dict[str, Any]] = None,
        ) -> None
        "Unregister a worker process."
    
    def update_worker_state(
            self,
            pid: int,
            status: Optional[str] = None,
            job_id: Optional[str] = None,
            plugin_name: Optional[str] = None,
            plugin_id: Optional[str] = None,
            loaded_plugin_resource: Optional[str] = None,
            config: Optional[Dict[str, Any]] = None,
        ) -> None
        "Update worker state information."
    
    def check_gpu_availability(self):
            """Check GPU availability and identify conflicts."""
            return self._resource_manager.check_gpu_availability()
    
        def get_worker_by_pid(self, pid: int)
        "Check GPU availability and identify conflicts."
    
    def get_worker_by_pid(self, pid: int)
        "Get worker state by PID."
class SSEBroadcasterAdapter:
    def __init__(
        self,
        sse_manager  # SSEBroadcastManager from cjm-fasthtml-sse
    )
    "Adapter for cjm-fasthtml-sse SSEBroadcastManager."
    
    def __init__(
            self,
            sse_manager  # SSEBroadcastManager from cjm-fasthtml-sse
        )
        "Initialize adapter with SSE broadcast manager."
    
    async def broadcast(
            self,
            event_type: str,  # Event type identifier (e.g., 'transcription:started')
            data: Dict[str, Any]  # Event data payload
        ) -> None
        "Broadcast an event to all connected SSE clients."

base (base.ipynb)

Abstract base class for managing background jobs with worker processes.

Import

from cjm_fasthtml_workers.managers.base import (
    JobType,
    BaseJob,
    BaseJobManager
)

Functions

@patch
@abstractmethod
def create_job(
    self:BaseJobManager,
    plugin_id:str,  # Plugin unique identifier
    **kwargs  # Domain-specific job parameters
) -> JobType:  # Created job instance
    "Factory method for creating domain-specific jobs."
@patch
@abstractmethod
def get_worker_entry_point(
    self:BaseJobManager
) -> Callable:  # Worker process entry point function
    "Return the worker process function for this manager."
@patch
@abstractmethod
def prepare_execute_request(
    self:BaseJobManager,
    job:JobType  # The job to prepare for execution
) -> Dict[str, Any]:  # Dictionary of parameters for the worker execute request
    "Convert job to worker execute request parameters."
@patch
@abstractmethod
def extract_job_result(
    self:BaseJobManager,
    job:JobType,  # The job that was executed
    result_data:Dict[str, Any]  # Raw result data from worker
) -> Dict[str, Any]:  # Formatted result for storage
    "Extract and format job result from worker response."
@patch
def _extract_plugin_resource_identifier(
    self:BaseJobManager,
    config:Dict[str, Any]  # Plugin configuration dictionary
) -> str:  # Plugin resource identifier string
    "Extract plugin resource identifier from plugin configuration."
@patch
async def _validate_resources(
    self:BaseJobManager,
    plugin_id:str,  # Plugin unique identifier
    plugin_config:Dict[str, Any]  # Plugin configuration
) -> Optional[str]:  # Error message if validation fails, None if successful
    "Validate resources before starting a job."
@patch
def _on_job_completed(
    self:BaseJobManager,
    job_id:str  # ID of the completed job
) -> None
    "Hook called when a job completes successfully."
@patch
def _start_worker(self:BaseJobManager):
    """Start the worker process and result monitor."""
    if self.worker_process and self.worker_process.is_alive()
    "Start the worker process and result monitor."
@patch
def _init_worker(self:BaseJobManager):
    """Send initialization message to worker with plugin configurations."""
    # Only send plugin configs if plugin registry is available
    plugin_configs = {}
    
    if self.plugin_registry
    "Send initialization message to worker with plugin configurations."
@patch
def _restart_worker(self:BaseJobManager):
    """Restart the worker process after an error or cancellation."""
    # Track restart
    self.restart_count += 1
    self.last_restart_time = time.time()

    # Stop result monitor thread FIRST (before cleaning up queues)
    # The monitor thread holds references to the queues, so we must stop it first
    self.monitor_running = False
    if self.result_monitor_thread
    "Restart the worker process after an error or cancellation."
@patch
def _monitor_results(self:BaseJobManager):
    """Monitor the result queue in a background thread."""
    while self.monitor_running
    "Monitor the result queue in a background thread."
@patch
def _handle_job_result(
    self:BaseJobManager, 
    result:Dict[str, Any]  # Result data from worker
)
    "Handle a job result from the worker."
@patch
def _handle_stream_chunk(
    self:BaseJobManager, 
    chunk_data:Dict[str, Any]  # Chunk data from worker
)
    "Handle a streaming chunk from the worker."
@patch
def _handle_worker_error(self:BaseJobManager):
    """Handle worker fatal error based on restart policy."""
    policy = self.worker_config.restart_policy

    if policy == RestartPolicy.NEVER
    "Handle worker fatal error based on restart policy."
@patch
def get_plugin_name(
    self:BaseJobManager,
    plugin_id:str  # Plugin unique identifier
) -> Optional[str]:  # Plugin name or None
    "Get plugin name from plugin ID (requires plugin registry)."
@patch
async def unload_plugin(
    self:BaseJobManager,
    plugin_name:str  # Name of the plugin to unload
) -> bool:  # True if successful, False otherwise
    "Unload a plugin from the worker to free resources."
@patch
async def reload_plugin(
    self:BaseJobManager,
    plugin_name:str,  # Name of the plugin to reload
    config:Dict[str, Any]  # New configuration
) -> bool:  # True if successful, False otherwise
    "Reload a plugin with new configuration."
@patch
async def start_job(
    self:BaseJobManager,
    plugin_id:str,  # Plugin unique identifier
    **kwargs  # Domain-specific job parameters
) -> JobType:  # Created and started job
    "Start a new job."
@patch
async def cancel_job(
    self:BaseJobManager,
    job_id:str  # ID of the job to cancel
) -> bool:  # True if cancellation successful
    "Cancel a running job by terminating the worker process."
@patch
def get_job(
    self:BaseJobManager,
    job_id:str  # Unique job identifier
) -> Optional[JobType]:  # Job object or None
    "Get a job by ID."
@patch
def get_all_jobs(
    self:BaseJobManager
) -> List[JobType]:  # List of all jobs
    "Get all jobs."
@patch
def get_job_result(
    self:BaseJobManager,
    job_id:str  # Unique job identifier
) -> Optional[Dict[str, Any]]:  # Job result or None
    "Get job result."
@patch
def clear_completed_jobs(
    self:BaseJobManager
) -> int:  # Number of jobs cleared
    "Clear completed, failed, and cancelled jobs."
@patch
async def broadcast_event(
    self:BaseJobManager,
    event_type:str,  # Event type identifier
    data:Dict[str, Any]  # Event data payload
)
    "Broadcast an event to all connected SSE clients (requires event broadcaster)."
@patch
def check_streaming_support(
    self:BaseJobManager,
    plugin_id:str  # Plugin unique identifier
) -> bool:  # True if streaming supported
    "Check if a plugin supports streaming."
@patch
def shutdown(self:BaseJobManager):
    """Shutdown the manager and cleanup resources."""
    # Stop result monitor
    self.monitor_running = False
    if self.result_monitor_thread
    "Shutdown the manager and cleanup resources."

Classes

@dataclass
class BaseJob:
    "Base class for all job types."
    
    id: str  # Unique job identifier
    plugin_id: str  # Plugin identifier for this job
    status: str = 'pending'  # Job status: pending, running, completed, failed, cancelled
    created_at: str = field(...)  # ISO format timestamp
    started_at: Optional[str]  # When job started executing
    completed_at: Optional[str]  # When job finished
    result: Optional[Dict[str, Any]]  # Job result data
    error: Optional[str]  # Error message if failed
    metadata: Dict[str, Any] = field(...)  # Additional job metadata
    worker_pid: Optional[int]  # Process ID of worker handling this job
class BaseJobManager:
    def __init__(
        self,
        worker_type:str,  # Type identifier (e.g., "transcription", "llm", "image-gen")
        category:Any,  # Plugin category this manager handles
        supports_streaming:bool=False,  # Whether this manager supports streaming jobs
        worker_config:Optional[WorkerConfig]=None,  # Worker configuration (uses defaults if None)
        plugin_registry:Optional[PluginRegistryProtocol]=None,  # Optional plugin registry integration
        resource_manager:Optional[ResourceManagerProtocol]=None,  # Optional resource manager integration
        event_broadcaster:Optional[EventBroadcasterProtocol]=None,  # Optional SSE event broadcaster
    )
    "Abstract base class for managing jobs using worker processes."
    
    def __init__(
            self,
            worker_type:str,  # Type identifier (e.g., "transcription", "llm", "image-gen")
            category:Any,  # Plugin category this manager handles
            supports_streaming:bool=False,  # Whether this manager supports streaming jobs
            worker_config:Optional[WorkerConfig]=None,  # Worker configuration (uses defaults if None)
            plugin_registry:Optional[PluginRegistryProtocol]=None,  # Optional plugin registry integration
            resource_manager:Optional[ResourceManagerProtocol]=None,  # Optional resource manager integration
            event_broadcaster:Optional[EventBroadcasterProtocol]=None,  # Optional SSE event broadcaster
        )
        "Initialize the job manager."

config (config.ipynb)

Configuration for worker processes including restart policies, timeouts, and queue sizes.

Import

from cjm_fasthtml_workers.core.config import (
    RestartPolicy,
    WorkerConfig
)

Classes

class RestartPolicy(Enum):
    "Policy for restarting worker processes after failures."
@dataclass
class WorkerConfig:
    "Configuration for worker process behavior."
    
    request_queue_size: int = 0  # 0 = unlimited
    result_queue_size: int = 100  # Larger for streaming results
    response_queue_size: int = 10  # For synchronous command responses
    restart_policy: RestartPolicy = RestartPolicy.ON_CANCELLATION
    max_restart_attempts: int = 3
    restart_backoff_base_seconds: float = 1.0  # Base delay for exponential backoff
    restart_backoff_max_seconds: float = 60.0  # Max delay for backoff
    max_workers: int = 1  # Currently only 1 is supported
    worker_start_timeout_seconds: float = 30.0
    reload_timeout_seconds: float = 30.0
    unload_timeout_seconds: float = 10.0
    shutdown_timeout_seconds: float = 5.0
    result_monitor_poll_interval_seconds: float = 0.5
    

protocol (protocol.ipynb)

Protocol definitions for worker communication and plugin manager integration.

Import

from cjm_fasthtml_workers.core.protocol import (
    WorkerRequestType,
    WorkerResponseType,
    WorkerRequest,
    WorkerResponse,
    WorkerStreamChunk,
    WorkerResult,
    PluginManagerAdapter
)

Classes

class WorkerRequestType(Enum):
    "Types of requests sent to worker process."
class WorkerResponseType(Enum):
    "Types of responses from worker process."
@dataclass
class WorkerRequest:
    "Base structure for worker requests."
    
    type: WorkerRequestType
    data: Dict[str, Any]
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            return {
                'type': self.type.value,
        "Convert to dictionary for queue serialization."
    
    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerRequest':
            """Create from dictionary received from queue."""
            req_type = WorkerRequestType(data['type'])
            request_data = {k: v for k, v in data.items() if k != 'type'}
        "Create from dictionary received from queue."
@dataclass
class WorkerResponse:
    "Base structure for worker responses."
    
    type: WorkerResponseType
    data: Dict[str, Any]
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            return {
                'type': self.type.value,
        "Convert to dictionary for queue serialization."
    
    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerResponse':
            """Create from dictionary received from queue."""
            resp_type = WorkerResponseType(data['type'])
            response_data = {k: v for k, v in data.items() if k != 'type'}
        "Create from dictionary received from queue."
@dataclass
class WorkerStreamChunk:
    "Structure for streaming job results."
    
    job_id: str  # Unique identifier for the job
    chunk: str  # Text chunk from streaming output
    is_final: bool = False  # Whether this is the final chunk
    metadata: Optional[Dict[str, Any]]  # Optional metadata
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            return {
                'type': WorkerResponseType.STREAM_CHUNK.value,
        "Convert to dictionary for queue serialization."
@dataclass
class WorkerResult:
    "Structure for job execution results."
    
    job_id: str  # Unique identifier for the job
    status: str  # 'success' or 'error'
    data: Optional[Dict[str, Any]]  # Result data on success
    error: Optional[str]  # Error message on failure
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            result = {
                'type': WorkerResponseType.RESULT.value,
        "Convert to dictionary for queue serialization."
class PluginManagerAdapter(Protocol):
    """
    Protocol that plugin managers must satisfy for worker integration.
    
    Uses structural subtyping (duck typing) - plugin managers don't need to
    explicitly inherit from this, they just need to implement these methods.
    """
    
    def discover_plugins(self) -> list:  # List of plugin metadata/data objects
            """Discover available plugins."""
            ...
    
        def load_plugin(
            self, 
            plugin_data:Any,  # Plugin metadata/data from discovery
            config:Dict[str, Any]  # Plugin configuration dictionary
        ) -> None
        "Discover available plugins."
    
    def load_plugin(
            self, 
            plugin_data:Any,  # Plugin metadata/data from discovery
            config:Dict[str, Any]  # Plugin configuration dictionary
        ) -> None
        "Load a plugin with configuration."
    
    def execute_plugin(
            self, 
            plugin_name:str,  # Name of the plugin to execute
            **params  # Plugin-specific parameters
        ) -> Any:  # Plugin execution result
        "Execute a plugin with given parameters."
    
    def execute_plugin_stream(
            self, 
            plugin_name:str,  # Name of the plugin to execute
            **params  # Plugin-specific parameters
        ) -> Iterator[str]:  # String chunks from plugin execution
        "Execute a plugin with streaming output."
    
    def reload_plugin(
            self, 
            plugin_name:str,  # Name of the plugin to reload
            config:Optional[Dict[str, Any]]=None  # New configuration (None to unload)
        ) -> None
        "Reload a plugin with new configuration."
    
    def unload_plugin(
            self, 
            plugin_name:str  # Name of the plugin to unload
        ) -> None
        "Unload a plugin to free resources."
    
    def check_streaming_support(
            self, 
            plugin_name:str  # Name of the plugin to check
        ) -> bool:  # True if plugin supports streaming
        "Check if a plugin supports streaming execution."

protocols (protocols.ipynb)

Optional integration protocols for plugin registries, resource management, and event broadcasting.

Import

from cjm_fasthtml_workers.extensions.protocols import (
    PluginRegistryProtocol,
    ResourceManagerProtocol,
    EventBroadcasterProtocol
)

Classes

class PluginRegistryProtocol(Protocol):
    "Protocol for plugin registry integration."
    
    def get_plugins_by_category(
            self, 
            category:Any  # Plugin category (can be enum, string, etc.)
        ) -> list:  # List of plugin metadata objects
        "Get all plugins in a specific category."
    
    def get_plugin(
            self, 
            plugin_id:str  # Unique plugin identifier
        ) -> Any:  # Plugin metadata object or None
        "Get a specific plugin by ID."
    
    def load_plugin_config(
            self, 
            plugin_id:str  # Unique plugin identifier
        ) -> Dict[str, Any]:  # Plugin configuration dictionary
        "Load configuration for a plugin."
class ResourceManagerProtocol(Protocol):
    "Protocol for resource management integration."
    
    def register_worker(
            self,
            pid:int,  # Worker process ID
            worker_type:str  # Type of worker (e.g., 'transcription', 'llm')
        ) -> None
        "Register a new worker process."
    
    def unregister_worker(
            self, 
            pid:int  # Process ID of the worker to unregister
        ) -> None
        "Unregister a worker process."
    
    def update_worker_state(
            self,
            pid:int,  # Worker process ID
            status:Optional[str]=None,  # Worker status: 'idle', 'running', etc.
            job_id:Optional[str]=None,  # Current job ID (None if idle)
            plugin_name:Optional[str]=None,  # Currently loaded plugin name
            plugin_id:Optional[str]=None,  # Currently loaded plugin ID
            loaded_plugin_resource:Optional[str]=None,  # Currently loaded plugin resource identifier
            config:Optional[Dict[str, Any]]=None,  # Current plugin configuration
        ) -> None
        "Update worker state information."
    
    def check_gpu_availability(
            self
        ) -> Any:  # Returns ResourceConflict object
        "Check GPU availability and identify conflicts."
    
    def get_worker_by_pid(
            self, 
            pid:int  # Worker process ID
        ) -> Optional[Any]:  # Returns WorkerState object or None
        "Get worker state by PID."
class EventBroadcasterProtocol(Protocol):
    "Protocol for SSE event broadcasting."
    
    async def broadcast(
            self,
            event_type:str,  # Event type identifier
            data:Dict[str, Any]  # Event data payload
        ) -> None
        "Broadcast an event to all connected clients."

worker (worker.ipynb)

Generic worker process for executing plugin-based jobs in isolated subprocesses.

Import

from cjm_fasthtml_workers.core.worker import (
    base_worker_process
)

Functions

def base_worker_process(
    request_queue:multiprocessing.Queue,  # Queue for receiving job requests from parent
    result_queue:multiprocessing.Queue,  # Queue for sending job results back to parent
    response_queue:multiprocessing.Queue,  # Queue for sending command responses back to parent
    plugin_manager_factory:Callable[[], PluginManagerAdapter],  # Factory function that creates a plugin manager instance
    result_adapter:Optional[Callable[[Any], Dict[str, Any]]]=None,  # Optional function to adapt plugin results to dict format
    supports_streaming:bool=False  # Whether this worker supports streaming execution
)
    "Generic long-lived worker process that handles job execution."