worker

Generic worker process for executing plugin-based jobs in isolated subprocesses.

Worker Process

The base_worker_process function is the entry point for worker subprocesses. It:

  1. Receives plugin configurations through an init message
  2. Creates a plugin manager instance (isolated from the main process)
  3. Loads configured plugins
  4. Processes job execution requests from the request queue
  5. Sends results back through the result queue
  6. Handles plugin reload/unload commands
  7. Supports streaming execution for incremental results

source

base_worker_process

 base_worker_process (request_queue:<boundmethodBaseContext.Queueof<multip
                      rocessing.context.DefaultContextobjectat0x7f2eb9b9ca
                      90>>, result_queue:<boundmethodBaseContext.Queueof<m
                      ultiprocessing.context.DefaultContextobjectat0x7f2eb
                      9b9ca90>>, response_queue:<boundmethodBaseContext.Qu
                      eueof<multiprocessing.context.DefaultContextobjectat
                      0x7f2eb9b9ca90>>, plugin_manager_factory:Callable[[]
                      ,cjm_fasthtml_workers.core.protocol.PluginManagerAda
                      pter], result_adapter:Optional[Callable[[Any],Dict[s
                      tr,Any]]]=None, supports_streaming:bool=False)

Generic long-lived worker process that handles job execution.

Type Default Details
request_queue Queue Queue for receiving job requests from parent
result_queue Queue Queue for sending job results back to parent
response_queue Queue Queue for sending command responses back to parent
plugin_manager_factory Callable Factory function that creates a plugin manager instance
result_adapter Optional None Optional function to adapt plugin results to dict format
supports_streaming bool False Whether this worker supports streaming execution

Communication Protocol

The worker communicates with the parent process using three queues:

Request Queue (Parent → Worker): - INIT: Initialize plugin manager with configurations - EXECUTE: Execute a job with specified plugin and parameters - RELOAD: Reload a plugin with new configuration - UNLOAD: Unload a plugin to free resources - GET_STATE: Query current worker state - STOP: Gracefully shutdown the worker

Result Queue (Worker → Parent): - READY: Worker initialized successfully - RESULT: Job execution completed (success or error) - STREAM_CHUNK: Streaming output chunk - ERROR: Fatal worker error

Response Queue (Worker → Parent): - RESPONSE: Synchronous response to commands (reload, unload) - STATE: Worker state information

Worker Lifecycle

  1. Initialization: Parent sends INIT message with plugin configurations
  2. Discovery: Worker discovers available plugins
  3. Loading: Worker loads configured plugins
  4. Execution Loop: Worker processes EXECUTE requests
  5. Shutdown: Parent sends STOP or terminates the process

Model Reuse

The worker keeps models loaded in memory between jobs. When switching plugins, it unloads the old plugin first to free GPU memory before loading the new one.