base
BaseJob Dataclass
BaseJob
BaseJob (id:str, plugin_id:str, status:str='pending', created_at:str=<factory>, started_at:Optional[str]=None, completed_at:Optional[str]=None, result:Optional[Dict[str,Any]]=None, error:Optional[str]=None, metadata:Dict[str,Any]=<factory>, worker_pid:Optional[int]=None)
Base class for all job types.
BaseJobManager Class
The BaseJobManager is an abstract base class that provides the core infrastructure for managing background jobs with worker processes.
Key Features: - Jobs processed sequentially in subprocess - Plugin resources loaded once and reused - True cancellation via subprocess termination - Automatic worker restart based on policy - Isolated worker process avoids duplicating web app initialization - Optional streaming support for incremental results - Optional dependency injection for plugin registry, resource manager, and event broadcaster
Responsibilities: - Worker process lifecycle (start, restart, shutdown) - Job queuing and execution - Result monitoring - Resource management integration (optional) - Event broadcasting (optional)
BaseJobManager
BaseJobManager (worker_type:str, category:Any, supports_streaming:bool=False, worker_config:Optional[cjm _fasthtml_workers.core.config.WorkerConfig]=None, plugin_ registry:Optional[cjm_fasthtml_workers.extensions.protoco ls.PluginRegistryProtocol]=None, resource_manager:Optiona l[cjm_fasthtml_workers.extensions.protocols.ResourceManag erProtocol]=None, event_broadcaster:Optional[cjm_fasthtml _workers.extensions.protocols.EventBroadcasterProtocol]=N one)
Abstract base class for managing jobs using worker processes.
| Type | Default | Details | |
|---|---|---|---|
| worker_type | str | Type identifier (e.g., “transcription”, “llm”, “image-gen”) | |
| category | Any | Plugin category this manager handles | |
| supports_streaming | bool | False | Whether this manager supports streaming jobs |
| worker_config | Optional | None | Worker configuration (uses defaults if None) |
| plugin_registry | Optional | None | Optional plugin registry integration |
| resource_manager | Optional | None | Optional resource manager integration |
| event_broadcaster | Optional | None | Optional SSE event broadcaster |
Abstract Methods
Subclasses must implement these methods to customize job handling for their specific use case.
BaseJobManager.create_job
BaseJobManager.create_job (plugin_id:str, **kwargs)
Factory method for creating domain-specific jobs.
| Type | Details | |
|---|---|---|
| plugin_id | str | Plugin unique identifier |
| kwargs | VAR_KEYWORD | |
| Returns | JobType | Created job instance |
BaseJobManager.get_worker_entry_point
BaseJobManager.get_worker_entry_point ()
Return the worker process function for this manager.
BaseJobManager.prepare_execute_request
BaseJobManager.prepare_execute_request (job:~JobType)
Convert job to worker execute request parameters.
| Type | Details | |
|---|---|---|
| job | JobType | The job to prepare for execution |
| Returns | Dict | Dictionary of parameters for the worker execute request |
BaseJobManager.extract_job_result
BaseJobManager.extract_job_result (job:~JobType, result_data:Dict[str,Any])
Extract and format job result from worker response.
| Type | Details | |
|---|---|---|
| job | JobType | The job that was executed |
| result_data | Dict | Raw result data from worker |
| Returns | Dict | Formatted result for storage |
Hook Methods
These methods provide extension points for subclasses to customize behavior without overriding core functionality.
This hook extracts a plugin resource identifier from the plugin configuration for resource tracking. The default implementation tries common keys (resource_id, model_id, model, model_name) but subclasses can override this to match their specific configuration structure.
Example override:
def _extract_plugin_resource_identifier(self, config):
# Custom logic for your plugin system
return config.get('resource_path', 'unknown')This hook allows you to implement custom validation before starting a job. Return None if validation passes, or an error message string if it fails.
Example override:
async def _validate_resources(self, plugin_id, plugin_config):
# Check if another job is already running
if self.worker_process and self.current_plugin_id:
return "Another job is already running"
# Check GPU availability
if plugin_config.get('device') == 'cuda':
if not torch.cuda.is_available():
return "CUDA not available"
return None # Validation passedThis hook is called automatically when a job completes successfully. Use it to implement post-processing logic like saving results, sending notifications, or triggering follow-up tasks.
Example override:
def _on_job_completed(self, job_id):
job = self.get_job(job_id)
result = self.get_job_result(job_id)
# Save result to disk
output_path = f"results/{job_id}.json"
with open(output_path, 'w') as f:
json.dump(result, f)
# Track completion count
self.completed_count += 1
print(f"Job {job_id} completed and saved to {output_path}")Error Handling Integration
When the cjm-error-handling library is installed, BaseJobManager uses structured errors for better error tracking and debugging:
PluginError: Raised when plugin operations fail (plugin not found, initialization errors)WorkerError: Raised for worker process issues (timeouts, communication failures, validation errors)
These structured errors provide: - User-friendly messages for display - Debug information for developers - Rich context (job_id, plugin_id, worker_pid, etc.) - Error serialization for crossing process boundaries
Without the library: Falls back to standard Python exceptions (ValueError, RuntimeError, etc.)
Usage:
try:
job = await manager.start_job(plugin_id="my-plugin")
except PluginError as e:
print(f"Plugin error: {e.get_user_message()}")
print(f"Debug: {e.get_debug_message()}")
print(f"Plugin ID: {e.plugin_id}")
except WorkerError as e:
print(f"Worker error: {e.get_user_message()}")
if e.is_retryable:
# Retry logic
passWorker Lifecycle Management
Result Monitoring and Handling
Plugin Management Methods
BaseJobManager.get_plugin_name
BaseJobManager.get_plugin_name (plugin_id:str)
Get plugin name from plugin ID (requires plugin registry).
| Type | Details | |
|---|---|---|
| plugin_id | str | Plugin unique identifier |
| Returns | Optional | Plugin name or None |
BaseJobManager.unload_plugin
BaseJobManager.unload_plugin (plugin_name:str)
Unload a plugin from the worker to free resources.
| Type | Details | |
|---|---|---|
| plugin_name | str | Name of the plugin to unload |
| Returns | bool | True if successful, False otherwise |
BaseJobManager.reload_plugin
BaseJobManager.reload_plugin (plugin_name:str, config:Dict[str,Any])
Reload a plugin with new configuration.
| Type | Details | |
|---|---|---|
| plugin_name | str | Name of the plugin to reload |
| config | Dict | New configuration |
| Returns | bool | True if successful, False otherwise |
Job Management Methods
BaseJobManager.start_job
BaseJobManager.start_job (plugin_id:str, **kwargs)
Start a new job.
| Type | Details | |
|---|---|---|
| plugin_id | str | Plugin unique identifier |
| kwargs | VAR_KEYWORD | |
| Returns | JobType | Created and started job |
BaseJobManager.cancel_job
BaseJobManager.cancel_job (job_id:str)
Cancel a running job by terminating the worker process.
| Type | Details | |
|---|---|---|
| job_id | str | ID of the job to cancel |
| Returns | bool | True if cancellation successful |
BaseJobManager.get_job
BaseJobManager.get_job (job_id:str)
Get a job by ID.
| Type | Details | |
|---|---|---|
| job_id | str | Unique job identifier |
| Returns | Optional | Job object or None |
BaseJobManager.get_all_jobs
BaseJobManager.get_all_jobs ()
Get all jobs.
BaseJobManager.get_job_result
BaseJobManager.get_job_result (job_id:str)
Get job result.
| Type | Details | |
|---|---|---|
| job_id | str | Unique job identifier |
| Returns | Optional | Job result or None |
BaseJobManager.clear_completed_jobs
BaseJobManager.clear_completed_jobs ()
Clear completed, failed, and cancelled jobs.
Utility Methods
Usage Example
To use BaseJobManager, create a concrete subclass that implements the required abstract methods. Here’s a minimal example structure:
from dataclasses import dataclass
from cjm_fasthtml_workers.managers.base import BaseJobManager, BaseJob
@dataclass
class MyJob(BaseJob):
"""Custom job type with domain-specific fields."""
input_text: str = ""
class MyJobManager(BaseJobManager[MyJob]):
"""Concrete job manager implementation."""
def create_job(self, plugin_id: str, **kwargs) -> MyJob:
"""Create a job instance."""
return MyJob(
id=str(uuid.uuid4()),
plugin_id=plugin_id,
input_text=kwargs.get('input_text', '')
)
def get_worker_entry_point(self) -> Callable:
"""Return the worker function."""
return my_worker_process
def prepare_execute_request(self, job: MyJob) -> Dict[str, Any]:
"""Convert job to execution parameters."""
return {'text': job.input_text}
def extract_job_result(self, job: MyJob, result_data: Dict[str, Any]) -> Dict[str, Any]:
"""Format the result."""
return result_data
# Create and use the manager
manager = MyJobManager(
worker_type="text_processing",
category="processing",
worker_config=WorkerConfig()
)
# Start a job
job = await manager.start_job(plugin_id="my-plugin", input_text="Hello World")For a complete working example, see the demo_app.py file in the repository.
BaseJobManager.broadcast_event
BaseJobManager.broadcast_event (event_type:str, data:Dict[str,Any])
Broadcast an event to all connected SSE clients (requires event broadcaster).
| Type | Details | |
|---|---|---|
| event_type | str | Event type identifier |
| data | Dict | Event data payload |
BaseJobManager.check_streaming_support
BaseJobManager.check_streaming_support (plugin_id:str)
Check if a plugin supports streaming.
| Type | Details | |
|---|---|---|
| plugin_id | str | Plugin unique identifier |
| Returns | bool | True if streaming supported |
BaseJobManager.shutdown
BaseJobManager.shutdown ()
Shutdown the manager and cleanup resources.