protocols

Optional integration protocols for plugin registries, resource management, and event broadcasting.

These protocols define optional integrations that can be injected into the BaseJobManager. All integrations are optional - the worker system functions without them.

Plugin Registry Protocol


source

PluginRegistryProtocol

 PluginRegistryProtocol (*args, **kwargs)

Protocol for plugin registry integration.

Resource Manager Protocol

The ResourceManagerProtocol tracks worker processes and their resource usage. Key methods:

  • register_worker / unregister_worker: Manage worker lifecycle
  • update_worker_state: Update worker status, current job, and loaded plugin information
  • check_gpu_availability: Returns a ResourceConflict object with:
    • status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)
    • app_pids: List of application PIDs using GPU
    • external_pids: List of external PIDs using GPU
    • app_processes: Detailed info about app processes
    • external_processes: Detailed info about external processes
  • get_worker_by_pid: Returns a WorkerState object with:
    • pid: Worker process ID
    • worker_type: Type of worker
    • status: Worker status (‘idle’, ‘running’, ‘busy’)
    • job_id: Current job ID (None if idle)
    • plugin_name: Currently loaded plugin name
    • plugin_id: Currently loaded plugin ID
    • loaded_plugin_resource: Currently loaded resource identifier
    • config: Current plugin configuration

source

ResourceManagerProtocol

 ResourceManagerProtocol (*args, **kwargs)

Protocol for resource management integration.

Event Broadcaster Protocol


source

EventBroadcasterProtocol

 EventBroadcasterProtocol (*args, **kwargs)

Protocol for SSE event broadcasting.

Usage Examples

These protocols enable flexible integration:

# Without any integrations
manager = MyJobManager(
    worker_type="my_worker",
    category="processing"
)

# With plugin registry only
manager = MyJobManager(
    worker_type="my_worker",
    category="processing",
    plugin_registry=my_registry
)

# With all integrations
manager = MyJobManager(
    worker_type="my_worker",
    category="processing",
    plugin_registry=my_registry,
    resource_manager=my_resource_mgr,
    event_broadcaster=my_sse_manager
)

ResourceManagerProtocol Example

The update_worker_state method accepts explicit optional parameters:

class MyResourceManager:
    def register_worker(self, pid: int, worker_type: str) -> None:
        print(f"Registered {worker_type} worker with PID {pid}")
    
    def unregister_worker(self, pid: int) -> None:
        print(f"Unregistered worker with PID {pid}")
    
    def update_worker_state(
        self,
        pid: int,
        status: Optional[str] = None,
        job_id: Optional[str] = None,
        plugin_name: Optional[str] = None,
        plugin_id: Optional[str] = None,
        loaded_plugin_resource: Optional[str] = None,
        config: Optional[Dict[str, Any]] = None,
    ) -> None:
        # Update only the provided fields
        if status:
            print(f"Worker {pid} status: {status}")
        if job_id:
            print(f"Worker {pid} running job: {job_id}")
        if loaded_plugin_resource:
            print(f"Worker {pid} loaded resource: {loaded_plugin_resource}")

# Usage
resource_mgr = MyResourceManager()

# Update only status
resource_mgr.update_worker_state(pid=12345, status="running")

# Update multiple fields
resource_mgr.update_worker_state(
    pid=12345,
    status="running",
    job_id="abc123",
    loaded_plugin_resource="whisper-large-v3"
)

# Clear fields (set to None)
resource_mgr.update_worker_state(
    pid=12345,
    status="idle",
    job_id=None,  # No job running
    loaded_plugin_resource=None  # Resource unloaded
)

Test Implementations

Let’s create working implementations of these protocols to demonstrate their usage:

# Test PluginRegistryProtocol implementation
class SimplePluginRegistry:
    def __init__(self):
        # Mock plugin data
        self.plugins = {
            'plugin-1': type('PluginMeta', (), {
                'id': 'plugin-1',
                'name': 'Text Processor',
                'category': 'processing'
            })(),
            'plugin-2': type('PluginMeta', (), {
                'id': 'plugin-2',
                'name': 'Image Analyzer',
                'category': 'vision'
            })()
        }
        self.configs = {
            'plugin-1': {'model': 'gpt-3.5', 'max_tokens': 100},
            'plugin-2': {'model': 'resnet-50', 'device': 'cuda'}
        }
    
    def get_plugins_by_category(self, category):
        return [p for p in self.plugins.values() if p.category == category]
    
    def get_plugin(self, plugin_id):
        return self.plugins.get(plugin_id)
    
    def load_plugin_config(self, plugin_id):
        return self.configs.get(plugin_id, {})

# Test the implementation
registry = SimplePluginRegistry()
registry.get_plugins_by_category('processing')
[<__main__.PluginMeta>]
# Test getting a specific plugin and its config
plugin = registry.get_plugin('plugin-1')
config = registry.load_plugin_config('plugin-1')
print(f"Plugin: {plugin.name}")
print(f"Config: {config}")
Plugin: Text Processor
Config: {'model': 'gpt-3.5', 'max_tokens': 100}
# Test ResourceManagerProtocol implementation
class SimpleResourceManager:
    def __init__(self):
        self.workers = {}
    
    def register_worker(self, pid, worker_type):
        self.workers[pid] = {
            'type': worker_type,
            'status': 'idle',
            'job_id': None,
            'plugin_name': None,
            'loaded_plugin_resource': None
        }
        print(f"Registered {worker_type} worker with PID {pid}")
    
    def unregister_worker(self, pid):
        if pid in self.workers:
            del self.workers[pid]
            print(f"Unregistered worker PID {pid}")
    
    def update_worker_state(self, pid, status=None, job_id=None, 
                           plugin_name=None, plugin_id=None, 
                           loaded_plugin_resource=None, config=None):
        if pid not in self.workers:
            return
        
        if status:
            self.workers[pid]['status'] = status
        if job_id is not None:
            self.workers[pid]['job_id'] = job_id
        if plugin_name is not None:
            self.workers[pid]['plugin_name'] = plugin_name
        if loaded_plugin_resource is not None:
            self.workers[pid]['loaded_plugin_resource'] = loaded_plugin_resource
        
        print(f"Updated worker {pid}: {self.workers[pid]}")

# Test the implementation
resource_mgr = SimpleResourceManager()
resource_mgr.register_worker(12345, 'transcription')
Registered transcription worker with PID 12345
# Update worker state - running a job
resource_mgr.update_worker_state(
    pid=12345,
    status='running',
    job_id='job-abc123',
    plugin_name='whisper',
    loaded_plugin_resource='whisper-large-v3'
)
Updated worker 12345: {'type': 'transcription', 'status': 'running', 'job_id': 'job-abc123', 'plugin_name': 'whisper', 'loaded_plugin_resource': 'whisper-large-v3'}
# Test EventBroadcasterProtocol implementation
class SimpleEventBroadcaster:
    def __init__(self):
        self.events = []
    
    async def broadcast(self, event_type, data):
        event = {'type': event_type, 'data': data}
        self.events.append(event)
        print(f"Broadcast: {event_type} - {data}")

# Test the implementation
import asyncio

broadcaster = SimpleEventBroadcaster()
await broadcaster.broadcast('job:started', {'job_id': 'job-123', 'plugin': 'test'})
Broadcast: job:started - {'job_id': 'job-123', 'plugin': 'test'}
# Broadcast multiple events
await broadcaster.broadcast('job:completed', {'job_id': 'job-123', 'status': 'success'})

# View all broadcasted events
broadcaster.events
Broadcast: job:completed - {'job_id': 'job-123', 'status': 'success'}
[{'type': 'job:started', 'data': {'job_id': 'job-123', 'plugin': 'test'}},
 {'type': 'job:completed', 'data': {'job_id': 'job-123', 'status': 'success'}}]