from dataclasses import dataclass, field, asdict
@dataclass
class ExampleConfig:
"""Configuration for the example plugin."""
mode: str = "balanced"
threshold: float = 0.5
max_workers: int = 4
class ExamplePlugin(PluginInterface):
"""A simple example plugin implementation."""
def __init__(self):
self._config: ExampleConfig = ExampleConfig()
self._resource: Optional[str] = None
@property
def name(self) -> str:
return "example-plugin"
@property
def version(self) -> str:
return "1.0.0"
def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
"""Initialize or re-configure the plugin."""
if config is None:
config = {}
# Merge with defaults
current = asdict(self._config)
current.update(config)
self._config = ExampleConfig(**current)
# Initialize resources based on config
self._resource = f"Resource-{self._config.mode}"
def execute(self, input_data: str, **kwargs) -> str:
"""Process input data."""
return f"Processed '{input_data}' using {self._resource}"
def get_config_schema(self) -> Dict[str, Any]:
"""Return JSON Schema for configuration."""
return {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["fast", "balanced", "quality"],
"default": "balanced"
},
"threshold": {
"type": "number",
"minimum": 0.0,
"maximum": 1.0,
"default": 0.5
},
"max_workers": {
"type": "integer",
"minimum": 1,
"maximum": 16,
"default": 4
}
}
}
def get_current_config(self) -> Dict[str, Any]:
"""Return current configuration."""
return asdict(self._config)
def cleanup(self) -> None:
"""Clean up resources."""
self._resource = NonePlugin Interface
FileBackedDTO Protocol
The FileBackedDTO protocol defines objects that can serialize themselves to disk for zero-copy transfer between Host and Worker processes. When the Proxy detects an argument implementing this protocol, it calls to_temp_file() and sends the file path instead of the data.
FileBackedDTO
def FileBackedDTO(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer.
PluginInterface
The PluginInterface is an abstract base class that defines the contract all plugins must implement. This interface works for both:
- Concrete Plugins: Running in Worker processes (implement the actual logic)
- Remote Proxies: Running in Host process (forward calls over HTTP)
The interface is domain-agnostic. Domain-specific plugin systems (e.g., transcription, vision) subclass this to add their specific methods and DTOs.
PluginInterface
def PluginInterface(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Abstract base class for all plugins (both local workers and remote proxies).
The interface provides:
- Identity:
nameandversionproperties for plugin identification - Lifecycle:
initialize()for configuration andcleanup()for resource release - Execution:
execute()for main logic,execute_stream()for streaming results - Configuration:
get_config_schema()returns JSON Schema,get_current_config()returns current values - Cancellation:
cancel()for cooperative cancellation (plugins opt-in by overriding) - Progress:
report_progress()to report execution progress and status messages
The default execute_stream() implementation yields a single result from execute(). Plugins can override this for true streaming where partial results are yielded as they become available.
The cancel() and report_progress() methods have default implementations. Plugins can override cancel() to implement cooperative cancellation. During execute(), plugins can call self.report_progress(0.5, "Processing...") to report progress.
Example: Implementing a Plugin
Here’s a complete example showing how to implement a concrete plugin:
# Test the example plugin
plugin = ExamplePlugin()
plugin.initialize()
print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"\nConfig Schema:")
print(plugin.get_config_schema())
print(f"\nCurrent Config:")
print(plugin.get_current_config())Plugin: example-plugin v1.0.0
Config Schema:
{'type': 'object', 'properties': {'mode': {'type': 'string', 'enum': ['fast', 'balanced', 'quality'], 'default': 'balanced'}, 'threshold': {'type': 'number', 'minimum': 0.0, 'maximum': 1.0, 'default': 0.5}, 'max_workers': {'type': 'integer', 'minimum': 1, 'maximum': 16, 'default': 4}}}
Current Config:
{'mode': 'balanced', 'threshold': 0.5, 'max_workers': 4}
# Test execution
result = plugin.execute("sample_data")
print(f"Result: {result}")
# Test re-initialization with new config
plugin.initialize({"mode": "quality", "threshold": 0.8})
print(f"\nAfter re-init config: {plugin.get_current_config()}")
result = plugin.execute("more_data")
print(f"Result: {result}")
# Test cleanup
plugin.cleanup()
print(f"\nAfter cleanup, resource is cleared")Result: Processed 'sample_data' using Resource-balanced
After re-init config: {'mode': 'quality', 'threshold': 0.8, 'max_workers': 4}
Result: Processed 'more_data' using Resource-quality
After cleanup, resource is cleared
# Test streaming (default implementation yields single result)
plugin.initialize({"mode": "balanced"})
print("Streaming execution:")
for chunk in plugin.execute_stream("stream_data"):
print(f" Chunk: {chunk}")Streaming execution:
Chunk: Processed 'stream_data' using Resource-balanced
# Test FileBackedDTO protocol detection
import tempfile
class MockAudioData:
"""Example class implementing FileBackedDTO."""
def __init__(self, data: bytes):
self._data = data
def to_temp_file(self) -> str:
"""Save to temp file and return path."""
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f:
f.write(self._data)
return f.name
# Check if it implements the protocol
audio = MockAudioData(b"fake audio data")
print(f"MockAudioData implements FileBackedDTO: {isinstance(audio, FileBackedDTO)}")
print(f"Temp file path: {audio.to_temp_file()}")
# A regular string does not implement the protocol
print(f"str implements FileBackedDTO: {isinstance('hello', FileBackedDTO)}")MockAudioData implements FileBackedDTO: True
Temp file path: /tmp/tmphnwt2uct.wav
str implements FileBackedDTO: False