import logging
from cjm_plugin_system.core.interface import PluginInterface
# Create a simple test plugin
class TestPlugin(PluginInterface):
def __init__(self):
self.config = {}
entry_point_group = 'your.plugins'
@property
def name(self) -> str:
return "test_plugin"
@property
def version(self) -> str:
return "1.0.0"
@staticmethod
def get_config_schema():
return {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["fast", "slow"],
"default": "fast"
}
},
"required": ["mode"]
}
def get_current_config(self):
defaults = self.get_config_defaults()
return {**defaults, **self.config}
def initialize(self, config=None):
defaults = self.get_config_defaults()
self.config = {**defaults, **(config or {})}
print(f"Initialized {self.name} with config: {self.config}")
def execute(self, data):
return f"Processed '{data}' in {self.config['mode']} mode"
def is_available(self):
return True
def cleanup(self):
print(f"Cleaning up {self.name}")Plugin Manager
PluginManager
The PluginManager class handles the complete lifecycle of plugins: - Discovery via entry points or direct module loading - Loading and initialization - Configuration management - Execution with optional streaming support - Enable/disable/reload functionality - Cleanup and unloading
Error Handling Integration
When the cjm-error-handling library is installed, PluginManager uses structured errors for better error tracking:
PluginError: Raised for plugin lifecycle failures (discovery, loading, initialization, execution, unloading)ValidationError: Raised for invalid plugin configuration
These structured errors provide: - User-friendly messages for display - Debug information for developers - Rich context (plugin_name, operation, package_name, etc.) - Error chaining with original exception cause
Without the library: Falls back to returning False or raising standard exceptions (ValueError)
Usage:
try:
manager.load_plugin(plugin_meta, config)
except PluginError as e:
print(f"Plugin error: {e.get_user_message()}")
print(f"Debug: {e.get_debug_message()}")
print(f"Plugin: {e.plugin_id}")
except ValidationError as e:
print(f"Config error: {e.get_user_message()}")
if e.validation_errors:
print(f"Validation details: {e.validation_errors}")PluginManager
PluginManager (plugin_interface:Type[cjm_plugin_system.core.interface.Plu ginInterface]=<class 'cjm_plugin_system.core.interface.PluginInterface'>, entry_point_group:Optional[str]=None)
Manages plugin discovery, loading, and lifecycle.
| Type | Default | Details | |
|---|---|---|---|
| plugin_interface | Type | PluginInterface | Base class/interface plugins must implement |
| entry_point_group | Optional | None | Optional override for entry point group name |
This is a generic plugin manager that works with any PluginInterface subclass. It provides: - Automatic discovery via entry points - Manual loading from module files - Configuration management and validation - Plugin enable/disable/reload - Streaming support detection
The manager automatically uses the entry_point_group defined in the plugin interface. If not provided explicitly, it reads from plugin_interface.entry_point_group.
Configuration Management
Methods for managing plugin configuration.
reload_plugin
reload_plugin (plugin_name:str, config:Optional[Dict[str,Any]]=None)
Reload a plugin with optional new configuration.
| Type | Default | Details | |
|---|---|---|---|
| plugin_name | str | Name of the plugin to reload | |
| config | Optional | None | Optional new configuration |
| Returns | bool | True if successful, False otherwise |
get_all_plugin_schemas
get_all_plugin_schemas ()
Get configuration schemas for all loaded plugins.
validate_plugin_config
validate_plugin_config (plugin_name:str, config:Dict[str,Any])
Validate a configuration dictionary for a plugin without applying it.
| Type | Details | |
|---|---|---|
| plugin_name | str | Name of the plugin |
| config | Dict | Configuration to validate |
| Returns | Tuple | (is_valid, error_message) |
update_plugin_config
update_plugin_config (plugin_name:str, config:Dict[str,Any], merge:bool=True)
Update a plugin’s configuration and reinitialize it.
| Type | Default | Details | |
|---|---|---|---|
| plugin_name | str | Name of the plugin | |
| config | Dict | New configuration | |
| merge | bool | True | Whether to merge with existing config or replace entirely |
| Returns | bool | True if successful, False otherwise |
get_plugin_config
get_plugin_config (plugin_name:str)
Get the current configuration of a plugin.
| Type | Details | |
|---|---|---|
| plugin_name | str | Name of the plugin |
| Returns | Optional | Current configuration or None if plugin not found |
get_plugin_config_schema
get_plugin_config_schema (plugin_name:str)
Get the configuration schema for a plugin.
| Type | Details | |
|---|---|---|
| plugin_name | str | Name of the plugin |
| Returns | Optional | Configuration schema or None if plugin not found |
Streaming Support
Methods for managing plugins with streaming capabilities.
get_streaming_plugins
get_streaming_plugins ()
Get a list of all loaded plugins that support streaming.
check_streaming_support
check_streaming_support (plugin_name:str)
Check if a plugin supports streaming execution.
| Type | Details | |
|---|---|---|
| plugin_name | str | Name of the plugin to check |
| Returns | bool | True if plugin supports streaming |
execute_plugin_stream
execute_plugin_stream (plugin_name:str, *args, **kwargs)
Execute a plugin with streaming support if available.
| Type | Details | |
|---|---|---|
| plugin_name | str | Name of the plugin to execute |
| args | VAR_POSITIONAL | Arguments to pass to the plugin |
| kwargs | VAR_KEYWORD | |
| Returns | Generator | Generator yielding partial results, returns final result |
Example: Using the PluginManager
# Initialize plugin manager
logging.basicConfig(level=logging.INFO)
manager = PluginManager()
print(f"Entry point group: {manager.entry_point_group}")
print(f"Plugin interface: {manager.plugin_interface.__name__}")Entry point group: None
Plugin interface: PluginInterface
# Since we don't have actual installed plugins, let's save TestPlugin to a file
# and load it from the file
import tempfile
import os
# Create a temporary plugin file
plugin_code = '''
from cjm_plugin_system.core.interface import PluginInterface
from typing import Dict, Any, Optional
class DemoPlugin(PluginInterface):
def __init__(self):
self.config = {}
entry_point_group = 'your.plugins'
@property
def name(self) -> str:
return "demo_plugin"
@property
def version(self) -> str:
return "1.0.0"
@staticmethod
def get_config_schema():
return {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["fast", "slow"],
"default": "fast"
}
},
"required": ["mode"]
}
def get_current_config(self):
defaults = self.get_config_defaults()
return {**defaults, **self.config}
def initialize(self, config=None):
defaults = self.get_config_defaults()
self.config = {**defaults, **(config or {})}
def execute(self, data):
return f"Demo: Processed '{data}' in {self.config['mode']} mode"
def is_available(self):
return True
'''
# Write to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(plugin_code)
temp_plugin_path = f.name
print(f"Created temporary plugin at: {temp_plugin_path}")Created temporary plugin at: /tmp/tmp0m3_cnbj.py
# Load the plugin from file
success = manager.load_plugin_from_module(temp_plugin_path, config={"mode": "slow"})
print(f"\nPlugin loaded: {success}")
# List loaded plugins
plugins = manager.list_plugins()
print(f"\nLoaded plugins: {len(plugins)}")
for p in plugins:
print(f" - {p.name} v{p.version} (enabled: {p.enabled})")INFO:__main__.PluginManager:Loaded plugin from module: demo_plugin
Plugin loaded: True
Loaded plugins: 1
- demo_plugin v1.0.0 (enabled: True)
# Get and execute plugin
plugin = manager.get_plugin("demo_plugin")
if plugin:
print(f"\nPlugin name: {plugin.name}")
print(f"Plugin version: {plugin.version}")
print(f"Is available: {plugin.is_available()}")
# Execute
result = manager.execute_plugin("demo_plugin", "test data")
print(f"\nExecution result: {result}")
Plugin name: demo_plugin
Plugin version: 1.0.0
Is available: True
Execution result: Demo: Processed 'test data' in slow mode
# Test configuration management
print("\nConfiguration Management:")
# Get current config
current_config = manager.get_plugin_config("demo_plugin")
print(f"Current config: {current_config}")
# Get schema
import json
schema = manager.get_plugin_config_schema("demo_plugin")
print(f"\nConfig schema:")
print(json.dumps(schema, indent=2))
# Validate config
is_valid, error = manager.validate_plugin_config("demo_plugin", {"mode": "fast"})
print(f"\nValid config test: {is_valid}")
is_valid, error = manager.validate_plugin_config("demo_plugin", {"mode": "invalid"})
print(f"Invalid config test: {is_valid}, error: {error}")
Configuration Management:
Current config: {'mode': 'slow'}
Config schema:
null
Valid config test: True
Invalid config test: False, error: 'invalid' is not one of ['fast', 'slow']
Failed validating 'enum' in schema['properties']['mode']:
{'type': 'string', 'enum': ['fast', 'slow'], 'default': 'fast'}
On instance['mode']:
'invalid'
# Update config
print("\nUpdating configuration:")
success = manager.update_plugin_config("demo_plugin", {"mode": "fast"})
print(f"Update successful: {success}")
# Execute with new config
result = manager.execute_plugin("demo_plugin", "new data")
print(f"Result after update: {result}")INFO:__main__.PluginManager:Updated configuration for plugin: demo_plugin
Updating configuration:
Update successful: True
Result after update: Demo: Processed 'new data' in fast mode
# Test enable/disable
print("\nEnable/Disable Test:")
manager.disable_plugin("demo_plugin")
print(f"Plugin disabled: {not manager.plugins['demo_plugin'].enabled}")
try:
manager.execute_plugin("demo_plugin", "data")
except (ValueError, Exception) as e:
# Handle both ValueError (without error handling) and PluginError (with error handling)
error_msg = e.get_user_message() if hasattr(e, 'get_user_message') else str(e)
print(f"Expected error when disabled: {error_msg}")
manager.enable_plugin("demo_plugin")
print(f"Plugin enabled: {manager.plugins['demo_plugin'].enabled}")
result = manager.execute_plugin("demo_plugin", "data")
print(f"Execution after re-enable: {result}")
Enable/Disable Test:
Plugin disabled: True
Expected error when disabled: Cannot execute plugin: disabled
Plugin enabled: True
Execution after re-enable: Demo: Processed 'data' in fast mode
# Test streaming support
print("\nStreaming Support:")
supports_streaming = manager.check_streaming_support("demo_plugin")
print(f"Plugin supports streaming: {supports_streaming}")
streaming_plugins = manager.get_streaming_plugins()
print(f"Streaming plugins: {streaming_plugins}")
# Try streaming execution (will fall back to regular execution)
print("\nStreaming execution:")
for chunk in manager.execute_plugin_stream("demo_plugin", "stream data"):
print(f" Chunk: {chunk}")INFO:__main__.PluginManager:Plugin demo_plugin doesn't support streaming, using regular execution
Streaming Support:
Plugin supports streaming: False
Streaming plugins: []
Streaming execution:
Chunk: Demo: Processed 'stream data' in fast mode
# Cleanup
print("\nUnloading plugin:")
success = manager.unload_plugin("demo_plugin")
print(f"Unload successful: {success}")
print(f"Remaining plugins: {len(manager.list_plugins())}")
# Clean up temporary file
os.unlink(temp_plugin_path)
print(f"\nCleaned up temporary plugin file")INFO:__main__.PluginManager:Unloaded plugin: demo_plugin
Unloading plugin:
Unload successful: True
Remaining plugins: 0
Cleaned up temporary plugin file