These protocols define optional integrations that can be injected into the BaseJobManager . All integrations are optional - the worker system functions without them.
Plugin Registry Protocol
source
PluginRegistryProtocol
PluginRegistryProtocol (*args, **kwargs)
Protocol for plugin registry integration.
Resource Manager Protocol
The ResourceManagerProtocol tracks worker processes and their resource usage. Key methods:
register_worker / unregister_worker : Manage worker lifecycle
update_worker_state : Update worker status, current job, and loaded plugin information
check_gpu_availability : Returns a ResourceConflict object with:
status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)
app_pids: List of application PIDs using GPU
external_pids: List of external PIDs using GPU
app_processes: Detailed info about app processes
external_processes: Detailed info about external processes
get_worker_by_pid : Returns a WorkerState object with:
pid: Worker process ID
worker_type: Type of worker
status: Worker status (‘idle’, ‘running’, ‘busy’)
job_id: Current job ID (None if idle)
plugin_name: Currently loaded plugin name
plugin_id: Currently loaded plugin ID
loaded_plugin_resource: Currently loaded resource identifier
config: Current plugin configuration
source
ResourceManagerProtocol
ResourceManagerProtocol (*args, **kwargs)
Protocol for resource management integration.
Event Broadcaster Protocol
source
EventBroadcasterProtocol
EventBroadcasterProtocol (*args, **kwargs)
Protocol for SSE event broadcasting.
Usage Examples
These protocols enable flexible integration:
# Without any integrations
manager = MyJobManager(
worker_type= "my_worker" ,
category= "processing"
)
# With plugin registry only
manager = MyJobManager(
worker_type= "my_worker" ,
category= "processing" ,
plugin_registry= my_registry
)
# With all integrations
manager = MyJobManager(
worker_type= "my_worker" ,
category= "processing" ,
plugin_registry= my_registry,
resource_manager= my_resource_mgr,
event_broadcaster= my_sse_manager
)
ResourceManagerProtocol Example
The update_worker_state method accepts explicit optional parameters:
class MyResourceManager:
def register_worker(self , pid: int , worker_type: str ) -> None :
print (f"Registered { worker_type} worker with PID { pid} " )
def unregister_worker(self , pid: int ) -> None :
print (f"Unregistered worker with PID { pid} " )
def update_worker_state(
self ,
pid: int ,
status: Optional[str ] = None ,
job_id: Optional[str ] = None ,
plugin_name: Optional[str ] = None ,
plugin_id: Optional[str ] = None ,
loaded_plugin_resource: Optional[str ] = None ,
config: Optional[Dict[str , Any]] = None ,
) -> None :
# Update only the provided fields
if status:
print (f"Worker { pid} status: { status} " )
if job_id:
print (f"Worker { pid} running job: { job_id} " )
if loaded_plugin_resource:
print (f"Worker { pid} loaded resource: { loaded_plugin_resource} " )
# Usage
resource_mgr = MyResourceManager()
# Update only status
resource_mgr.update_worker_state(pid= 12345 , status= "running" )
# Update multiple fields
resource_mgr.update_worker_state(
pid= 12345 ,
status= "running" ,
job_id= "abc123" ,
loaded_plugin_resource= "whisper-large-v3"
)
# Clear fields (set to None)
resource_mgr.update_worker_state(
pid= 12345 ,
status= "idle" ,
job_id= None , # No job running
loaded_plugin_resource= None # Resource unloaded
)
Test Implementations
Let’s create working implementations of these protocols to demonstrate their usage:
# Test PluginRegistryProtocol implementation
class SimplePluginRegistry:
def __init__ (self ):
# Mock plugin data
self .plugins = {
'plugin-1' : type ('PluginMeta' , (), {
'id' : 'plugin-1' ,
'name' : 'Text Processor' ,
'category' : 'processing'
})(),
'plugin-2' : type ('PluginMeta' , (), {
'id' : 'plugin-2' ,
'name' : 'Image Analyzer' ,
'category' : 'vision'
})()
}
self .configs = {
'plugin-1' : {'model' : 'gpt-3.5' , 'max_tokens' : 100 },
'plugin-2' : {'model' : 'resnet-50' , 'device' : 'cuda' }
}
def get_plugins_by_category(self , category):
return [p for p in self .plugins.values() if p.category == category]
def get_plugin(self , plugin_id):
return self .plugins.get(plugin_id)
def load_plugin_config(self , plugin_id):
return self .configs.get(plugin_id, {})
# Test the implementation
registry = SimplePluginRegistry()
registry.get_plugins_by_category('processing' )
# Test getting a specific plugin and its config
plugin = registry.get_plugin('plugin-1' )
config = registry.load_plugin_config('plugin-1' )
print (f"Plugin: { plugin. name} " )
print (f"Config: { config} " )
Plugin: Text Processor
Config: {'model': 'gpt-3.5', 'max_tokens': 100}
# Test ResourceManagerProtocol implementation
class SimpleResourceManager:
def __init__ (self ):
self .workers = {}
def register_worker(self , pid, worker_type):
self .workers[pid] = {
'type' : worker_type,
'status' : 'idle' ,
'job_id' : None ,
'plugin_name' : None ,
'loaded_plugin_resource' : None
}
print (f"Registered { worker_type} worker with PID { pid} " )
def unregister_worker(self , pid):
if pid in self .workers:
del self .workers[pid]
print (f"Unregistered worker PID { pid} " )
def update_worker_state(self , pid, status= None , job_id= None ,
plugin_name= None , plugin_id= None ,
loaded_plugin_resource= None , config= None ):
if pid not in self .workers:
return
if status:
self .workers[pid]['status' ] = status
if job_id is not None :
self .workers[pid]['job_id' ] = job_id
if plugin_name is not None :
self .workers[pid]['plugin_name' ] = plugin_name
if loaded_plugin_resource is not None :
self .workers[pid]['loaded_plugin_resource' ] = loaded_plugin_resource
print (f"Updated worker { pid} : { self . workers[pid]} " )
# Test the implementation
resource_mgr = SimpleResourceManager()
resource_mgr.register_worker(12345 , 'transcription' )
Registered transcription worker with PID 12345
# Update worker state - running a job
resource_mgr.update_worker_state(
pid= 12345 ,
status= 'running' ,
job_id= 'job-abc123' ,
plugin_name= 'whisper' ,
loaded_plugin_resource= 'whisper-large-v3'
)
Updated worker 12345: {'type': 'transcription', 'status': 'running', 'job_id': 'job-abc123', 'plugin_name': 'whisper', 'loaded_plugin_resource': 'whisper-large-v3'}
# Test EventBroadcasterProtocol implementation
class SimpleEventBroadcaster:
def __init__ (self ):
self .events = []
async def broadcast(self , event_type, data):
event = {'type' : event_type, 'data' : data}
self .events.append(event)
print (f"Broadcast: { event_type} - { data} " )
# Test the implementation
import asyncio
broadcaster = SimpleEventBroadcaster()
await broadcaster.broadcast('job:started' , {'job_id' : 'job-123' , 'plugin' : 'test' })
Broadcast: job:started - {'job_id': 'job-123', 'plugin': 'test'}
# Broadcast multiple events
await broadcaster.broadcast('job:completed' , {'job_id' : 'job-123' , 'status' : 'success' })
# View all broadcasted events
broadcaster.events
Broadcast: job:completed - {'job_id': 'job-123', 'status': 'success'}
[{'type': 'job:started', 'data': {'job_id': 'job-123', 'plugin': 'test'}},
{'type': 'job:completed', 'data': {'job_id': 'job-123', 'status': 'success'}}]