Plugin Manager

Plugin discovery, loading, and lifecycle management system

PluginManager

The PluginManager orchestrates the complete lifecycle of plugins in the process-isolated architecture:

  • Discovery: Finds plugin manifests in local (.cjm/manifests/) or global (~/.cjm/manifests/) directories
  • Loading: Creates RemotePluginProxy instances that spawn isolated Worker subprocesses
  • Execution: Forwards calls to Workers via HTTP, supports both sync and async
  • Lifecycle: Handles initialization, configuration updates, and cleanup
PluginManager                           Worker Subprocesses
┌─────────────────┐                    ┌─────────────────────┐
│ discover_       │                    │ Conda Env: Whisper  │
│   manifests()   │     HTTP/JSON      │   └─ WhisperPlugin  │
│                 │◄──────────────────▶│                     │
│ plugins:        │                    └─────────────────────┘
│   whisper ──────┼──► RemoteProxy     ┌─────────────────────┐
│   gemini ───────┼──► RemoteProxy ◄──▶│ Conda Env: Gemini   │
│                 │                    │   └─ GeminiPlugin   │
└─────────────────┘                    └─────────────────────┘

source

PluginManager


def PluginManager(
    plugin_interface:Type=PluginInterface, # Base interface for type checking
    search_paths:Optional=None, # Custom manifest search paths
    scheduler:Optional=None, # Resource allocation policy
):

Manages plugin discovery, loading, and lifecycle via process isolation.

Key features:

  • Local-first discovery: Manifests in .cjm/manifests/ shadow global ones in ~/.cjm/manifests/
  • Process isolation: Each plugin runs in its own subprocess with a dedicated Python interpreter
  • Dual execution modes: execute_plugin() for sync, execute_plugin_async() for async
  • Automatic cleanup: unload_all() terminates all Worker processes

Configuration Management

Methods for managing plugin configuration. These forward to the RemotePluginProxy which communicates with the Worker over HTTP.


source

get_plugin_stats


def get_plugin_stats(
    plugin_name:str, # Name of the plugin
)->Optional: # Resource telemetry or None

Get resource usage stats for a plugin’s Worker process.


source

reload_plugin


def reload_plugin(
    plugin_name:str, # Name of the plugin
    config:Optional=None, # Optional new configuration
)->bool: # True if successful

Reload a plugin by terminating and restarting its Worker.


source

update_plugin_config


def update_plugin_config(
    plugin_name:str, # Name of the plugin
    config:Dict, # New configuration values
)->bool: # True if successful

Update a plugin’s configuration (hot-reload without restart).


source

get_all_plugin_configs


def get_all_plugin_configs(
    
)->Dict: # Plugin name -> config mapping

Get current configuration for all loaded plugins.


source

get_plugin_config_schema


def get_plugin_config_schema(
    plugin_name:str, # Name of the plugin
)->Optional: # JSON Schema or None

Get the configuration JSON Schema for a plugin.


source

get_plugin_config


def get_plugin_config(
    plugin_name:str, # Name of the plugin
)->Optional: # Current configuration or None

Get the current configuration of a plugin.

Streaming Execution

Async streaming support for real-time results (e.g., transcription word-by-word).


source

execute_plugin_stream


def execute_plugin_stream(
    plugin_name:str, # Name of the plugin
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Async generator yielding results

Execute a plugin with streaming response.

Usage Examples

Basic Usage

import logging
from cjm_plugin_system.core.manager import PluginManager

logging.basicConfig(level=logging.INFO)

# Create manager
manager = PluginManager()

# Discover and load all plugins from manifest directories
results = manager.load_all()
print(f"Loaded: {results}")

# Execute a plugin
result = manager.execute_plugin("whisper-local", audio="/path/to/audio.wav")

# Update configuration (hot-reload)
manager.update_plugin_config("whisper-local", {"model": "large-v3"})

# Clean up all workers
manager.unload_all()

Async Usage (FastHTML)

async def transcribe(audio_path: str):
    manager = PluginManager()
    manager.load_all()
    
    try:
        result = await manager.execute_plugin_async("whisper-local", audio=audio_path)
        return result
    finally:
        manager.unload_all()

# Streaming
async def transcribe_stream(audio_path: str):
    manager = PluginManager()
    manager.load_all()
    
    try:
        async for chunk in manager.execute_plugin_stream("whisper-local", audio=audio_path):
            yield chunk
    finally:
        manager.unload_all()