from typing import List
# Create a domain-specific plugin interface
class TextProcessingPlugin(PluginInterface):
"""Domain-specific interface for text processing plugins."""
# REQUIRED: Define entry_point_group for this plugin type
entry_point_group = "text_processing.plugins"
@property
@abstractmethod
def supported_formats(self) -> List[str]:
"""File formats this plugin can process."""
pass
# Now create a concrete plugin that inherits from the domain-specific interface
class ExamplePlugin(TextProcessingPlugin):
"""An example text processing plugin implementation."""
# entry_point_group is inherited from TextProcessingPlugin - no need to redefine!
def __init__(self):
self.logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
self.config = {}
self.resource = None
@property
def name(self) -> str:
return "example_plugin"
@property
def version(self) -> str:
return "1.0.0"
@property
def supported_formats(self) -> List[str]:
return ["txt", "md"]
@staticmethod
def get_config_schema() -> Dict[str, Any]:
"""Return the configuration schema for this plugin."""
return {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["fast", "balanced", "quality"],
"default": "balanced",
"description": "Processing mode"
},
"threshold": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.5,
"description": "Processing threshold"
},
"max_workers": {
"type": "integer",
"minimum": 1,
"maximum": 16,
"default": 4,
"description": "Maximum number of workers"
},
"enable_cache": {
"type": "boolean",
"default": True,
"description": "Enable result caching"
}
},
"required": ["mode"]
}
def get_current_config(self) -> Dict[str, Any]:
"""Return the current configuration."""
# Merge defaults with actual config
defaults = self.get_config_defaults()
return {**defaults, **self.config}
def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the plugin."""
if config:
is_valid, error = self.validate_config(config)
if not is_valid:
raise ValueError(f"Invalid configuration: {error}")
# Merge provided config with defaults
defaults = self.get_config_defaults()
self.config = {**defaults, **(config or {})}
self.logger.info(f"Initializing {self.name} with config: {self.config}")
# Simulate resource initialization
mode = self.config.get("mode", "balanced")
self.resource = f"Resource-{mode}"
def execute(self, input_data: Any, **kwargs) -> Any:
"""Execute the plugin's functionality."""
self.logger.info(f"Executing {self.name} with resource: {self.resource}")
self.logger.info(f"Config: {self.config}")
return f"Processed {input_data} using {self.resource}"
def is_available(self) -> bool:
"""Check availability."""
return True
def cleanup(self) -> None:
"""Clean up resources."""
self.logger.info(f"Cleaning up {self.name}")
self.resource = NonePlugin Interface
PluginInterface
The PluginInterface is a generic abstract base class that defines the contract all plugins must implement. This interface is completely domain-agnostic and can be subclassed to create specific plugin systems (e.g., transcription plugins, LLM plugins, image generation plugins).
PluginInterface
PluginInterface ()
Generic plugin interface that all plugins must implement.
This is a domain-agnostic base class. Domain-specific plugin systems should subclass this interface and add their specific requirements.
Important: Domain-specific interfaces (direct subclasses of PluginInterface) MUST define the entry_point_group class attribute. This requirement is enforced at class definition time.
The interface provides: - Abstract properties for plugin identity (name, version) - Configuration management with JSON Schema validation - Lifecycle methods (initialize, cleanup) - Dependency checking (is_available) - Main execution method (execute) - Optional streaming support via execute_stream
Streaming Support
The plugin system includes optional streaming support. Plugins can implement execute_stream() to provide real-time streaming results.
PluginInterface_execute_stream
PluginInterface_execute_stream (*args, **kwargs)
Stream execution results chunk by chunk.
| Type | Details | |
|---|---|---|
| args | VAR_POSITIONAL | Arguments for plugin execution |
| kwargs | VAR_KEYWORD | |
| Returns | Generator | Yields partial results, returns final result |
PluginInterface_supports_streaming
PluginInterface_supports_streaming ()
Check if this plugin supports streaming execution.
The default execute_stream implementation falls back to execute() without real streaming. Plugins can override this method to provide true streaming capabilities where partial results are yielded as they become available.
The supports_streaming method checks if a plugin has overridden the default execute_stream implementation, allowing callers to detect streaming support at runtime.
First, let’s create a domain-specific plugin interface that defines the entry_point_group:
# Test the example plugin
logging.basicConfig(level=logging.INFO)
plugin = ExamplePlugin()
print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"Available: {plugin.is_available()}")
print(f"Entry point group (inherited): {plugin.entry_point_group}")
# Get schema and defaults
import json
schema = plugin.get_config_schema()
print("\nConfiguration Schema:")
print(json.dumps(schema, indent=2))
defaults = plugin.get_config_defaults()
print("\nDefault Configuration:")
print(json.dumps(defaults, indent=2))Plugin: example_plugin v1.0.0
Available: True
Entry point group (inherited): text_processing.plugins
Configuration Schema:
{
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": [
"fast",
"balanced",
"quality"
],
"default": "balanced",
"description": "Processing mode"
},
"threshold": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.5,
"description": "Processing threshold"
},
"max_workers": {
"type": "integer",
"minimum": 1,
"maximum": 16,
"default": 4,
"description": "Maximum number of workers"
},
"enable_cache": {
"type": "boolean",
"default": true,
"description": "Enable result caching"
}
},
"required": [
"mode"
]
}
Default Configuration:
{
"mode": "balanced",
"threshold": 0.5,
"max_workers": 4,
"enable_cache": true
}
# This will raise a TypeError because entry_point_group is not defined
try:
class BadPlugin(PluginInterface):
"""This will fail - missing entry_point_group!"""
pass
except TypeError as e:
print("✓ Correctly caught missing entry_point_group:")
print(f" {e}")
print("\n" + "="*60 + "\n")
# This works - entry_point_group is defined
try:
class GoodPlugin(PluginInterface):
"""This works - has entry_point_group!"""
entry_point_group = "good.plugins"
print("✓ Successfully created domain-specific interface with entry_point_group")
print(f" entry_point_group = '{GoodPlugin.entry_point_group}'")
except TypeError as e:
print(f"✗ Unexpected error: {e}")✓ Correctly caught missing entry_point_group:
Domain-specific interface 'BadPlugin' must define 'entry_point_group' class attribute. Example:
class BadPlugin(PluginInterface):
entry_point_group = 'your.plugins'
============================================================
✓ Successfully created domain-specific interface with entry_point_group
entry_point_group = 'good.plugins'
Demonstrating entry_point_group Enforcement
The system enforces that domain-specific interfaces (direct subclasses of PluginInterface) must define entry_point_group:
# Initialize and execute
plugin.initialize({"mode": "quality", "threshold": 0.8})
result = plugin.execute("sample_data")
print(f"\nResult: {result}")
# Get current config
current = plugin.get_current_config()
print("\nCurrent Configuration:")
print(json.dumps(current, indent=2))
# Cleanup
plugin.cleanup()INFO:__main__.ExamplePlugin:Initializing example_plugin with config: {'mode': 'quality', 'threshold': 0.8, 'max_workers': 4, 'enable_cache': True}
INFO:__main__.ExamplePlugin:Executing example_plugin with resource: Resource-quality
INFO:__main__.ExamplePlugin:Config: {'mode': 'quality', 'threshold': 0.8, 'max_workers': 4, 'enable_cache': True}
INFO:__main__.ExamplePlugin:Cleaning up example_plugin
Result: Processed sample_data using Resource-quality
Current Configuration:
{
"mode": "quality",
"threshold": 0.8,
"max_workers": 4,
"enable_cache": true
}
# Test configuration validation
test_configs = [
({"mode": "fast"}, "Valid minimal config"),
({"mode": "invalid"}, "Invalid mode value"),
({"threshold": 0.5}, "Missing required 'mode' field"),
({"mode": "balanced", "threshold": 2.0}, "Threshold exceeds maximum"),
]
print("\nConfiguration Validation Tests:")
for config, description in test_configs:
is_valid, error = plugin.validate_config(config)
print(f"\n{description}:")
print(f" Config: {config}")
print(f" Valid: {is_valid}")
if error:
print(f" Error: {error[:80]}...") # Truncate long errors
Configuration Validation Tests:
Valid minimal config:
Config: {'mode': 'fast'}
Valid: True
Invalid mode value:
Config: {'mode': 'invalid'}
Valid: False
Error: 'invalid' is not one of ['fast', 'balanced', 'quality']
Failed validating 'enum...
Missing required 'mode' field:
Config: {'threshold': 0.5}
Valid: False
Error: 'mode' is a required property
Failed validating 'required' in schema:
{'typ...
Threshold exceeds maximum:
Config: {'mode': 'balanced', 'threshold': 2.0}
Valid: False
Error: 2.0 is greater than the maximum of 1.0
Failed validating 'maximum' in schema['p...
# Test streaming support
print(f"\nSupports streaming: {plugin.supports_streaming()}")
# The default implementation doesn't stream, it just yields the result once
print("\nStreaming execution:")
for chunk in plugin.execute_stream("stream_data"):
print(f" Chunk: {chunk}")INFO:__main__.ExamplePlugin:Executing example_plugin with resource: None
INFO:__main__.ExamplePlugin:Config: {'mode': 'quality', 'threshold': 0.8, 'max_workers': 4, 'enable_cache': True}
Supports streaming: False
Streaming execution:
Chunk: Processed stream_data using None