Plugin Interface

Abstract base class defining the generic plugin interface

FileBackedDTO Protocol

The FileBackedDTO protocol defines objects that can serialize themselves to disk for zero-copy transfer between Host and Worker processes. When the Proxy detects an argument implementing this protocol, it calls to_temp_file() and sends the file path instead of the data.


source

FileBackedDTO


def FileBackedDTO(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer.

PluginInterface

The PluginInterface is an abstract base class that defines the contract all plugins must implement. This interface works for both:

  • Concrete Plugins: Running in Worker processes (implement the actual logic)
  • Remote Proxies: Running in Host process (forward calls over HTTP)

The interface is domain-agnostic. Domain-specific plugin systems (e.g., transcription, vision) subclass this to add their specific methods and DTOs.


source

PluginInterface


def PluginInterface(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Abstract base class for all plugins (both local workers and remote proxies).

The interface provides:

  • Identity: name and version properties for plugin identification
  • Lifecycle: initialize() for configuration and cleanup() for resource release
  • Execution: execute() for main logic, execute_stream() for streaming results
  • Configuration: get_config_schema() returns JSON Schema, get_current_config() returns current values
  • Cancellation: cancel() for cooperative cancellation (plugins opt-in by overriding)
  • Progress: report_progress() to report execution progress and status messages

The default execute_stream() implementation yields a single result from execute(). Plugins can override this for true streaming where partial results are yielded as they become available.

The cancel() and report_progress() methods have default implementations. Plugins can override cancel() to implement cooperative cancellation. During execute(), plugins can call self.report_progress(0.5, "Processing...") to report progress.

Example: Implementing a Plugin

Here’s a complete example showing how to implement a concrete plugin:

from dataclasses import dataclass, field, asdict

@dataclass
class ExampleConfig:
    """Configuration for the example plugin."""
    mode: str = "balanced"
    threshold: float = 0.5
    max_workers: int = 4

class ExamplePlugin(PluginInterface):
    """A simple example plugin implementation."""

    def __init__(self):
        self._config: ExampleConfig = ExampleConfig()
        self._resource: Optional[str] = None

    @property
    def name(self) -> str:
        return "example-plugin"
    
    @property
    def version(self) -> str:
        return "1.0.0"
    
    def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
        """Initialize or re-configure the plugin."""
        if config is None:
            config = {}
        
        # Merge with defaults
        current = asdict(self._config)
        current.update(config)
        self._config = ExampleConfig(**current)
        
        # Initialize resources based on config
        self._resource = f"Resource-{self._config.mode}"

    def execute(self, input_data: str, **kwargs) -> str:
        """Process input data."""
        return f"Processed '{input_data}' using {self._resource}"

    def get_config_schema(self) -> Dict[str, Any]:
        """Return JSON Schema for configuration."""
        return {
            "type": "object",
            "properties": {
                "mode": {
                    "type": "string",
                    "enum": ["fast", "balanced", "quality"],
                    "default": "balanced"
                },
                "threshold": {
                    "type": "number",
                    "minimum": 0.0,
                    "maximum": 1.0,
                    "default": 0.5
                },
                "max_workers": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 16,
                    "default": 4
                }
            }
        }

    def get_current_config(self) -> Dict[str, Any]:
        """Return current configuration."""
        return asdict(self._config)

    def cleanup(self) -> None:
        """Clean up resources."""
        self._resource = None
# Test the example plugin
plugin = ExamplePlugin()
plugin.initialize()

print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"\nConfig Schema:")
print(plugin.get_config_schema())
print(f"\nCurrent Config:")
print(plugin.get_current_config())
Plugin: example-plugin v1.0.0

Config Schema:
{'type': 'object', 'properties': {'mode': {'type': 'string', 'enum': ['fast', 'balanced', 'quality'], 'default': 'balanced'}, 'threshold': {'type': 'number', 'minimum': 0.0, 'maximum': 1.0, 'default': 0.5}, 'max_workers': {'type': 'integer', 'minimum': 1, 'maximum': 16, 'default': 4}}}

Current Config:
{'mode': 'balanced', 'threshold': 0.5, 'max_workers': 4}
# Test execution
result = plugin.execute("sample_data")
print(f"Result: {result}")

# Test re-initialization with new config
plugin.initialize({"mode": "quality", "threshold": 0.8})
print(f"\nAfter re-init config: {plugin.get_current_config()}")
result = plugin.execute("more_data")
print(f"Result: {result}")

# Test cleanup
plugin.cleanup()
print(f"\nAfter cleanup, resource is cleared")
Result: Processed 'sample_data' using Resource-balanced

After re-init config: {'mode': 'quality', 'threshold': 0.8, 'max_workers': 4}
Result: Processed 'more_data' using Resource-quality

After cleanup, resource is cleared
# Test streaming (default implementation yields single result)
plugin.initialize({"mode": "balanced"})

print("Streaming execution:")
for chunk in plugin.execute_stream("stream_data"):
    print(f"  Chunk: {chunk}")
Streaming execution:
  Chunk: Processed 'stream_data' using Resource-balanced
# Test FileBackedDTO protocol detection
import tempfile

class MockAudioData:
    """Example class implementing FileBackedDTO."""
    
    def __init__(self, data: bytes):
        self._data = data
    
    def to_temp_file(self) -> str:
        """Save to temp file and return path."""
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f:
            f.write(self._data)
            return f.name

# Check if it implements the protocol
audio = MockAudioData(b"fake audio data")
print(f"MockAudioData implements FileBackedDTO: {isinstance(audio, FileBackedDTO)}")
print(f"Temp file path: {audio.to_temp_file()}")

# A regular string does not implement the protocol
print(f"str implements FileBackedDTO: {isinstance('hello', FileBackedDTO)}")
MockAudioData implements FileBackedDTO: True
Temp file path: /tmp/tmphnwt2uct.wav
str implements FileBackedDTO: False