protocol

Protocol definitions for worker communication and plugin manager integration.

Request and Response Types

These enums define the message types used for communication between the parent process and worker processes. The parent sends requests and the worker responds with various response types.


source

WorkerRequestType

 WorkerRequestType (value, names=None, module=None, qualname=None,
                    type=None, start=1, boundary=None)

Types of requests sent to worker process.


source

WorkerResponseType

 WorkerResponseType (value, names=None, module=None, qualname=None,
                     type=None, start=1, boundary=None)

Types of responses from worker process.

Data Structures

These dataclasses define the structure of messages passed through multiprocessing queues. They provide to_dict() and from_dict() methods for serialization since objects passed through queues need to be picklable.


source

WorkerRequest

 WorkerRequest (type:__main__.WorkerRequestType, data:Dict[str,Any])

Base structure for worker requests.

# Test WorkerRequest serialization
request = WorkerRequest(
    type=WorkerRequestType.EXECUTE,
    data={'job_id': 'test-123', 'plugin_name': 'test_plugin', 'param1': 'value1'}
)
request
WorkerRequest(type=<WorkerRequestType.EXECUTE: 'execute'>, data={'job_id': 'test-123', 'plugin_name': 'test_plugin', 'param1': 'value1'})
# Test to_dict conversion
request_dict = request.to_dict()
request_dict
{'type': 'execute',
 'job_id': 'test-123',
 'plugin_name': 'test_plugin',
 'param1': 'value1'}
# Test from_dict deserialization
restored_request = WorkerRequest.from_dict(request_dict)
restored_request
WorkerRequest(type=<WorkerRequestType.EXECUTE: 'execute'>, data={'job_id': 'test-123', 'plugin_name': 'test_plugin', 'param1': 'value1'})

source

WorkerResponse

 WorkerResponse (type:__main__.WorkerResponseType, data:Dict[str,Any])

Base structure for worker responses.

# Test WorkerResponse
response = WorkerResponse(
    type=WorkerResponseType.RESULT,
    data={'job_id': 'test-123', 'status': 'success', 'data': {'output': 'result'}}
)
response_dict = response.to_dict()
response_dict
{'type': 'result',
 'job_id': 'test-123',
 'status': 'success',
 'data': {'output': 'result'}}

source

WorkerStreamChunk

 WorkerStreamChunk (job_id:str, chunk:str, is_final:bool=False,
                    metadata:Optional[Dict[str,Any]]=None)

Structure for streaming job results.

# Test WorkerStreamChunk
chunk = WorkerStreamChunk(
    job_id='test-456',
    chunk='This is a streaming chunk',
    is_final=False,
    metadata={'index': 1}
)
chunk.to_dict()
{'type': 'stream_chunk',
 'job_id': 'test-456',
 'chunk': 'This is a streaming chunk',
 'is_final': False,
 'metadata': {'index': 1}}

source

WorkerResult

 WorkerResult (job_id:str, status:str, data:Optional[Dict[str,Any]]=None,
               error:Optional[str]=None)

Structure for job execution results.

# Test WorkerResult - success case
result = WorkerResult(
    job_id='test-789',
    status='success',
    data={'text': 'Job completed successfully', 'metadata': {'time': 1.5}}
)
result.to_dict()
{'type': 'result',
 'job_id': 'test-789',
 'status': 'success',
 'data': {'text': 'Job completed successfully', 'metadata': {'time': 1.5}}}
# Test WorkerResult - error case
error_result = WorkerResult(
    job_id='test-999',
    status='error',
    error='Plugin execution failed: Invalid input'
)
error_result.to_dict()
{'type': 'result',
 'job_id': 'test-999',
 'status': 'error',
 'error': 'Plugin execution failed: Invalid input'}

Plugin Manager Protocol

The PluginManagerAdapter is a Protocol class that uses structural subtyping (duck typing). Any class that implements these methods will satisfy the protocol, even without explicit inheritance.

This protocol defines the interface that plugin managers must implement to work with the worker system. It handles: - Plugin discovery and loading - Plugin execution (both standard and streaming) - Plugin lifecycle management (reload/unload) - Streaming capability detection


source

PluginManagerAdapter

 PluginManagerAdapter (*args, **kwargs)

*Protocol that plugin managers must satisfy for worker integration.

Uses structural subtyping (duck typing) - plugin managers don’t need to explicitly inherit from this, they just need to implement these methods.*