base

Abstract base class for managing background jobs with worker processes.

BaseJob Dataclass


source

BaseJob

 BaseJob (id:str, plugin_id:str, status:str='pending',
          created_at:str=<factory>, started_at:Optional[str]=None,
          completed_at:Optional[str]=None,
          result:Optional[Dict[str,Any]]=None, error:Optional[str]=None,
          metadata:Dict[str,Any]=<factory>, worker_pid:Optional[int]=None)

Base class for all job types.

BaseJobManager Class

The BaseJobManager is an abstract base class that provides the core infrastructure for managing background jobs with worker processes.

Key Features: - Jobs processed sequentially in subprocess - Plugin resources loaded once and reused - True cancellation via subprocess termination - Automatic worker restart based on policy - Isolated worker process avoids duplicating web app initialization - Optional streaming support for incremental results - Optional dependency injection for plugin registry, resource manager, and event broadcaster

Responsibilities: - Worker process lifecycle (start, restart, shutdown) - Job queuing and execution - Result monitoring - Resource management integration (optional) - Event broadcasting (optional)


source

BaseJobManager

 BaseJobManager (worker_type:str, category:Any,
                 supports_streaming:bool=False, worker_config:Optional[cjm
                 _fasthtml_workers.core.config.WorkerConfig]=None, plugin_
                 registry:Optional[cjm_fasthtml_workers.extensions.protoco
                 ls.PluginRegistryProtocol]=None, resource_manager:Optiona
                 l[cjm_fasthtml_workers.extensions.protocols.ResourceManag
                 erProtocol]=None, event_broadcaster:Optional[cjm_fasthtml
                 _workers.extensions.protocols.EventBroadcasterProtocol]=N
                 one)

Abstract base class for managing jobs using worker processes.

Type Default Details
worker_type str Type identifier (e.g., “transcription”, “llm”, “image-gen”)
category Any Plugin category this manager handles
supports_streaming bool False Whether this manager supports streaming jobs
worker_config Optional None Worker configuration (uses defaults if None)
plugin_registry Optional None Optional plugin registry integration
resource_manager Optional None Optional resource manager integration
event_broadcaster Optional None Optional SSE event broadcaster

Abstract Methods

Subclasses must implement these methods to customize job handling for their specific use case.


source

BaseJobManager.create_job

 BaseJobManager.create_job (plugin_id:str, **kwargs)

Factory method for creating domain-specific jobs.

Type Details
plugin_id str Plugin unique identifier
kwargs VAR_KEYWORD
Returns JobType Created job instance

source

BaseJobManager.get_worker_entry_point

 BaseJobManager.get_worker_entry_point ()

Return the worker process function for this manager.


source

BaseJobManager.prepare_execute_request

 BaseJobManager.prepare_execute_request (job:~JobType)

Convert job to worker execute request parameters.

Type Details
job JobType The job to prepare for execution
Returns Dict Dictionary of parameters for the worker execute request

source

BaseJobManager.extract_job_result

 BaseJobManager.extract_job_result (job:~JobType,
                                    result_data:Dict[str,Any])

Extract and format job result from worker response.

Type Details
job JobType The job that was executed
result_data Dict Raw result data from worker
Returns Dict Formatted result for storage

Hook Methods

These methods provide extension points for subclasses to customize behavior without overriding core functionality.

This hook extracts a plugin resource identifier from the plugin configuration for resource tracking. The default implementation tries common keys (resource_id, model_id, model, model_name) but subclasses can override this to match their specific configuration structure.

Example override:

def _extract_plugin_resource_identifier(self, config):
    # Custom logic for your plugin system
    return config.get('resource_path', 'unknown')

This hook allows you to implement custom validation before starting a job. Return None if validation passes, or an error message string if it fails.

Example override:

async def _validate_resources(self, plugin_id, plugin_config):
    # Check if another job is already running
    if self.worker_process and self.current_plugin_id:
        return "Another job is already running"
    
    # Check GPU availability
    if plugin_config.get('device') == 'cuda':
        if not torch.cuda.is_available():
            return "CUDA not available"
    
    return None  # Validation passed

This hook is called automatically when a job completes successfully. Use it to implement post-processing logic like saving results, sending notifications, or triggering follow-up tasks.

Example override:

def _on_job_completed(self, job_id):
    job = self.get_job(job_id)
    result = self.get_job_result(job_id)
    
    # Save result to disk
    output_path = f"results/{job_id}.json"
    with open(output_path, 'w') as f:
        json.dump(result, f)
    
    # Track completion count
    self.completed_count += 1
    
    print(f"Job {job_id} completed and saved to {output_path}")

Error Handling Integration

When the cjm-error-handling library is installed, BaseJobManager uses structured errors for better error tracking and debugging:

  • PluginError: Raised when plugin operations fail (plugin not found, initialization errors)
  • WorkerError: Raised for worker process issues (timeouts, communication failures, validation errors)

These structured errors provide: - User-friendly messages for display - Debug information for developers - Rich context (job_id, plugin_id, worker_pid, etc.) - Error serialization for crossing process boundaries

Without the library: Falls back to standard Python exceptions (ValueError, RuntimeError, etc.)

Usage:

try:
    job = await manager.start_job(plugin_id="my-plugin")
except PluginError as e:
    print(f"Plugin error: {e.get_user_message()}")
    print(f"Debug: {e.get_debug_message()}")
    print(f"Plugin ID: {e.plugin_id}")
except WorkerError as e:
    print(f"Worker error: {e.get_user_message()}")
    if e.is_retryable:
        # Retry logic
        pass

Worker Lifecycle Management

Result Monitoring and Handling

Plugin Management Methods


source

BaseJobManager.get_plugin_name

 BaseJobManager.get_plugin_name (plugin_id:str)

Get plugin name from plugin ID (requires plugin registry).

Type Details
plugin_id str Plugin unique identifier
Returns Optional Plugin name or None

source

BaseJobManager.unload_plugin

 BaseJobManager.unload_plugin (plugin_name:str)

Unload a plugin from the worker to free resources.

Type Details
plugin_name str Name of the plugin to unload
Returns bool True if successful, False otherwise

source

BaseJobManager.reload_plugin

 BaseJobManager.reload_plugin (plugin_name:str, config:Dict[str,Any])

Reload a plugin with new configuration.

Type Details
plugin_name str Name of the plugin to reload
config Dict New configuration
Returns bool True if successful, False otherwise

Job Management Methods


source

BaseJobManager.start_job

 BaseJobManager.start_job (plugin_id:str, **kwargs)

Start a new job.

Type Details
plugin_id str Plugin unique identifier
kwargs VAR_KEYWORD
Returns JobType Created and started job

source

BaseJobManager.cancel_job

 BaseJobManager.cancel_job (job_id:str)

Cancel a running job by terminating the worker process.

Type Details
job_id str ID of the job to cancel
Returns bool True if cancellation successful

source

BaseJobManager.get_job

 BaseJobManager.get_job (job_id:str)

Get a job by ID.

Type Details
job_id str Unique job identifier
Returns Optional Job object or None

source

BaseJobManager.get_all_jobs

 BaseJobManager.get_all_jobs ()

Get all jobs.


source

BaseJobManager.get_job_result

 BaseJobManager.get_job_result (job_id:str)

Get job result.

Type Details
job_id str Unique job identifier
Returns Optional Job result or None

source

BaseJobManager.clear_completed_jobs

 BaseJobManager.clear_completed_jobs ()

Clear completed, failed, and cancelled jobs.

Utility Methods

Usage Example

To use BaseJobManager, create a concrete subclass that implements the required abstract methods. Here’s a minimal example structure:

from dataclasses import dataclass
from cjm_fasthtml_workers.managers.base import BaseJobManager, BaseJob

@dataclass
class MyJob(BaseJob):
    """Custom job type with domain-specific fields."""
    input_text: str = ""
    
class MyJobManager(BaseJobManager[MyJob]):
    """Concrete job manager implementation."""
    
    def create_job(self, plugin_id: str, **kwargs) -> MyJob:
        """Create a job instance."""
        return MyJob(
            id=str(uuid.uuid4()),
            plugin_id=plugin_id,
            input_text=kwargs.get('input_text', '')
        )
    
    def get_worker_entry_point(self) -> Callable:
        """Return the worker function."""
        return my_worker_process
    
    def prepare_execute_request(self, job: MyJob) -> Dict[str, Any]:
        """Convert job to execution parameters."""
        return {'text': job.input_text}
    
    def extract_job_result(self, job: MyJob, result_data: Dict[str, Any]) -> Dict[str, Any]:
        """Format the result."""
        return result_data

# Create and use the manager
manager = MyJobManager(
    worker_type="text_processing",
    category="processing",
    worker_config=WorkerConfig()
)

# Start a job
job = await manager.start_job(plugin_id="my-plugin", input_text="Hello World")

For a complete working example, see the demo_app.py file in the repository.


source

BaseJobManager.broadcast_event

 BaseJobManager.broadcast_event (event_type:str, data:Dict[str,Any])

Broadcast an event to all connected SSE clients (requires event broadcaster).

Type Details
event_type str Event type identifier
data Dict Event data payload

source

BaseJobManager.check_streaming_support

 BaseJobManager.check_streaming_support (plugin_id:str)

Check if a plugin supports streaming.

Type Details
plugin_id str Plugin unique identifier
Returns bool True if streaming supported

source

BaseJobManager.shutdown

 BaseJobManager.shutdown ()

Shutdown the manager and cleanup resources.