Plugin Manager

Plugin discovery, loading, and lifecycle management system

PluginManager

The PluginManager orchestrates the complete lifecycle of plugins in the process-isolated architecture:

  • Discovery: Finds plugin manifests in local (.cjm/manifests/) or global (~/.cjm/manifests/) directories
  • Loading: Creates RemotePluginProxy instances that spawn isolated Worker subprocesses
  • Execution: Forwards calls to Workers via HTTP, supports both sync and async
  • Lifecycle: Handles initialization, configuration updates, and cleanup
PluginManager                           Worker Subprocesses
┌─────────────────┐                    ┌─────────────────────┐
│ discover_       │                    │ Conda Env: Whisper  │
│   manifests()   │     HTTP/JSON      │   └─ WhisperPlugin  │
│                 │◄──────────────────▶│                     │
│ plugins:        │                    └─────────────────────┘
│   whisper ──────┼──► RemoteProxy     ┌─────────────────────┐
│   gemini ───────┼──► RemoteProxy ◄──▶│ Conda Env: Gemini   │
│                 │                    │   └─ GeminiPlugin   │
└─────────────────┘                    └─────────────────────┘

PluginManager


def PluginManager(
    plugin_interface:Type=PluginInterface, # Base interface for type checking
    search_paths:Optional=None, # Custom manifest search paths
    scheduler:Optional=None, # Resource allocation policy
    config_store:Optional=None, # CR-2: persistence backend; lazy LocalPluginConfigStore default per OQ-4
    empirical_store:Optional=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
    secret_store:Optional=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
    max_retries:int=1, # CR-7: how many reactive retries to attempt on PluginResourceError (default 1 — one retry after eviction)
    sysmon_plugin_name:Optional=None, # MonitorPlugin (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
):

Manages plugin discovery, loading, and lifecycle via process isolation.

CR-3 Telemetry (System Monitor)

Bind a MonitorPlugin proxy and read system telemetry through CR-3’s typed surface. _get_global_stats* duck-types on the proxy’s typed accessor, falling back to the legacy dispatcher path for monitors that haven’t migrated.


register_system_monitor


def register_system_monitor(
    plugin_name:str, # Name of the system monitor plugin
)->None:

Bind a loaded plugin to act as the hardware system monitor.

Manifest Discovery + Categorization

Read manifests from disk, parse CR-1 taxonomy + Phase 5a resources + derived category, and filter discovered/loaded plugins by category. Stable cluster — CR-1/Phase 5a closed.


discover_manifests


def discover_manifests(
    
)->List: # List of discovered plugin metadata

Discover plugins via JSON manifests in search paths.

CR-8: reads each manifest via load_manifest, which transparently parses both v2.0 nested + legacy v1.0 flat layouts into a typed ManifestV2. meta.manifest is set to a flat-shaped dict view so existing consumers (proxy, scheduling, execute path) continue working unchanged; the typed ManifestV2 is also attached as meta.manifest_v2 so drift detection + future typed callers can read drift_tracking.config_schema_hash without re-parsing.


get_discovered_by_category


def get_discovered_by_category(
    category:str, # Category to filter by (e.g., "transcription")
)->List: # List of matching discovered plugins

Get discovered plugins filtered by category.


get_plugins_by_category


def get_plugins_by_category(
    category:str, # Category to filter by (e.g., "transcription")
)->List: # List of matching loaded plugins

Get loaded plugins filtered by category.


get_discovered_categories


def get_discovered_categories(
    
)->List: # List of unique categories

Get all unique categories among discovered plugins.


get_loaded_categories


def get_loaded_categories(
    
)->List: # List of unique categories

Get all unique categories among loaded plugins.


get_plugin_meta


def get_plugin_meta(
    plugin_name:str, # Name of the plugin
)->Optional: # Plugin metadata or None

Get metadata for a loaded plugin by name.


get_discovered_meta


def get_discovered_meta(
    plugin_name:str, # Name of the plugin
)->Optional: # Plugin metadata or None

Get metadata for a discovered (not necessarily loaded) plugin by name.

Configuration Validation + Persistence Helpers

SG-5 strict-by-default config validation against the manifest’s config_schema; SG-9 schema drift detection; CR-2 per-plugin persistence via PluginConfigStore; deferred-on-disable hook bookkeeping. Stable cluster — these gates are settled.

CR-10 Multi-Instance Helpers

Validate explicit instance_id inputs against the constrained pattern, auto-generate unique IDs for new_instance=True, and query the per-instance state in self.instances. Closed by CR-10 core.


get_instance


def get_instance(
    name_or_id:str, # Plugin name (default-loaded) or explicit instance_id
)->Optional:

Return the PluginInstance for name_or_id, or None if not loaded.

Lookup is keyed by instance_id (which equals plugin_name for default- loaded plugins). Multi-instance IDs only exist in self.instances.


list_instances


def list_instances(
    plugin_name:Optional=None, # If given, filter to this plugin's instances
)->List:

List all loaded instances, optionally filtered by underlying plugin name.


get_worker_env_status


def get_worker_env_status(
    name_or_meta:Any, # Plugin name (loaded/discovered) or a PluginMeta
    scope:Optional=None, # SG-55 forward seam
)->List: # Per-entry status dicts (secret values never returned)

CR-12: per-entry satisfaction status of a plugin’s worker-env contract.

Each entry: {name, secret, required, satisfied, label, description}. satisfied means a value is resolvable (secret present in the store, or a visible var has a default/override). Secret VALUES are never returned — only whether one is set. The plugin-config UI uses this to gate config display on required secrets being satisfied.


missing_required_env


def missing_required_env(
    name_or_meta:Any, # Plugin name or PluginMeta
    scope:Optional=None, # SG-55 forward seam
)->List: # Names of required worker-env entries with no resolvable value

CR-12: names of required worker-env entries that are unsatisfied.


set_plugin_secret


def set_plugin_secret(
    name_or_id:str, # Plugin name or instance_id whose secret to set
    key:str, # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
    value:str, # Secret value (stored via the SecretStore, never config/logs)
    scope:Optional=None, # SG-55 forward seam: per-principal scope
    reload:bool=True, # Respawn loaded worker(s) so the new env is injected
)->bool: # True if the secret was stored

CR-12: store a plugin secret, then respawn its worker(s) to inject it.

Secrets are keyed by the underlying PLUGIN name (not instance_id), so all instances of a plugin share one credential — set the Gemini key once and every Gemini instance gets it at (re)spawn. Because worker env is fixed at spawn, the new value only reaches a running worker via a RESPAWN, so this reloads each loaded instance of the plugin (unless reload=False, e.g. when provisioning a secret before the plugin is loaded). This is the actuation seam both the CLI (cjm-ctl set-secret) and a future config UI call. Reload failures are logged, not raised.

Load + Unload Lifecycle

Spawn / terminate plugin workers. load_plugin is the canonical entry; unload_plugin handles canonical-vs-multi-instance cleanup (drops PluginMeta only when no instances remain for the plugin). get_plugin / list_plugins are the simple-accessor surface. Mid-churn — CR-7 reactive resource management will touch load_plugin lightly to initialize empirical resource tracking.


load_plugin


def load_plugin(
    plugin_meta:PluginMeta, # Plugin metadata (with manifest attached)
    config:Optional=None, # Initial configuration
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
    instance_id:Optional=None, # CR-10: explicit instance_id; None defaults to plugin_name
    new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
    max_concurrent_requests:Optional=None, # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
)->bool: # True if successfully loaded

Load a plugin by spawning a Worker subprocess.

CR-2: reads the persisted PluginConfigRecord from self.config_store before launching the worker. If a persisted record exists and the caller didn’t pass an explicit config, the persisted config is used as the effective input. The persisted enabled flag is applied to plugin_meta.enabled so disabled plugins stay disabled across process restarts.

CR-10: optional instance_id allows multi-instance loading. - instance_id=None, new_instance=False (default): instance_id = plugin_meta.name. Populates self.plugins[plugin_name] + self.instances [plugin_name] together (single-instance backward compat). - instance_id=“custom”: validated against [A-Za-z0-9_-]{1,64}. Populates self.instances[custom]. Persistence is keyed by plugin_name and only applied to the default instance. - instance_id=None, new_instance=True: auto-generates {name}-{6-hex}. Idempotent: re-load against an existing instance_id returns True without re-spawning.

CR-7: computes config_hash from the effective config (post-defaults + post-validation) and stores it on the PluginInstance so execute_plugin* can key empirical samples by (instance_id, config_hash). SG-33 stores max_concurrent_requests on the instance — the actual asyncio.Semaphore is lazy-created in execute_plugin_async via _get_concurrent_limiter.


load_all


def load_all(
    configs:Optional=None, # Plugin name -> config mapping
)->Dict: # Plugin name -> success mapping

Discover and load all available plugins.


unload_plugin


def unload_plugin(
    name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
)->bool: # True if successfully unloaded

Unload a plugin instance and terminate its Worker process (CR-10).

If name_or_id resolves to the default instance (instance_id == plugin_name) and no other instances remain for the same plugin, also removes the PluginMeta from self.plugins. Otherwise removes only the instance and clears PluginMeta.instance if it pointed at the unloaded canonical.


unload_all


def unload_all(
    
)->None:

Unload all plugin instances and terminate all Worker processes (CR-10).

Iterates self.instances (CR-10 keying) rather than self.plugins so all multi-instance entries get torn down, not just the canonical instances.


get_plugin


def get_plugin(
    name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
)->Optional: # Plugin proxy instance or None

Get a loaded plugin’s proxy by name or instance_id (CR-10).

Lookup order: self.instances first (covers both default plugin_name and multi-instance IDs), falling back to PluginMeta.instance for any legacy code path that populated self.plugins without self.instances (defensive — shouldn’t happen post-CR-10 since load_plugin always records the instance).


list_plugins


def list_plugins(
    
)->List: # List of loaded plugin metadata

List all loaded plugins.

Execution + Resource Eviction

The high-churn execute path. CR-7’s primary rewrite target — reactive retry + OOM recovery + bounded retries will land here without touching the stable cells above. Per-instance _running_executions tracking (keyed by instance_id) allows concurrent multi-instance executes without colliding on the deferred-disable coordination.


execute_plugin


def execute_plugin(
    name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Plugin result

Execute a plugin instance’s main functionality (sync).

CR-10: resolves name_or_id via self.instances; per-instance enabled flag gates execution. _running_executions tracks by instance_id so concurrent multi-instance executes don’t collide.

CR-2: raises PluginDisabledError (typed) when the instance is disabled.

CR-7: reactive retry on PluginResourceError — evicts other plugins to free resources, then ALWAYS reloads the failing plugin’s worker before the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL) needs the reload because there’s no live worker to retry on. Track B (plugin-raised PluginResourceError — worker still alive) ALSO reloads because PyTorch’s CUDA caching allocator can fragment post-OOM in ways the plugin can’t clean up from within its own process; a fresh worker is the only reliable reset. Bounded by self.max_retries (default 1). Empirical sample recorded in the finally block — best-effort, doesn’t break execute on failure.


execute_plugin_async


async def execute_plugin_async(
    name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Plugin result

Execute a plugin instance’s main functionality (async).

CR-10 + CR-2: same semantics as execute_plugin, async-flavored. Scheduler allocation goes through allocate_async for non-blocking polling.

CR-7 + SG-33: reactive retry on PluginResourceError — always reloads before retry (Track A + Track B converge on the same reload path; see sync variant docstring for the rationale). Per-instance asyncio.Semaphore enforces the max_concurrent_requests cap (None = unbounded). Empirical sample recorded in the finally block.

Enable / Disable + Logs

CR-2 enable/disable surface: per-instance enabled flag with default-instance sync to PluginMeta.enabled + persistence via PluginConfigStore + deferred-on-disable hook fire-after-in-flight-job semantics. Plus the log-file tail reader. The plugin-config UI library (Path C step 4) is the most likely consumer to extend the enable/disable behavior next.


enable_plugin


def enable_plugin(
    name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was enabled

Enable a plugin instance (CR-10 multi-instance aware).

CR-2: persists the new state via config_store (default-instance only; persistence is per-plugin, not per-instance) and fires the plugin’s on_enable hook on state-change. Idempotent for already-enabled instances.


disable_plugin


def disable_plugin(
    name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was disabled

Disable a plugin instance without unloading it (CR-10 multi-instance aware).

CR-2: persists the new state (default-instance only) and fires the plugin’s on_disable hook — but defers the hook until any in-flight job for THIS instance finishes (the per-instance _running_executions key is the instance_id, so a concurrent execute on a different instance of the same plugin doesn’t gate this instance’s hook).


get_plugin_logs


def get_plugin_logs(
    plugin_name:str, # Name of the plugin
    lines:int=50, # Number of lines to return
)->str: # Log content

Read the last N lines of the plugin’s log file.

Key features:

  • Local-first discovery: Manifests in .cjm/manifests/ shadow global ones in ~/.cjm/manifests/
  • Process isolation: Each plugin runs in its own subprocess with a dedicated Python interpreter
  • Dual execution modes: execute_plugin() for sync, execute_plugin_async() for async
  • Automatic cleanup: unload_all() terminates all Worker processes

Configuration Management

Methods for managing plugin configuration. These forward to the RemotePluginProxy which communicates with the Worker over HTTP.


get_plugin_config


def get_plugin_config(
    plugin_name:str, # Name of the plugin
)->Optional: # Current configuration or None

Get the current configuration of a plugin.


get_plugin_config_schema


def get_plugin_config_schema(
    plugin_name:str, # Name of the plugin
)->Optional: # JSON Schema or None

Get the configuration JSON Schema for a plugin.


get_config_options


def get_config_options(
    name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->Dict: # CR-11: live config option domains, or {} if unavailable

Get a plugin instance’s runtime config option providers (CR-11).

Forwards to the worker’s get_config_options() - live enum domains + per-option metadata for dynamic config fields (e.g. an API model list). Kept separate from get_plugin_config_schema (static, hashed for CR-8 drift); these options are the live companion the plugin-config UI merges on top.

Degrades to {} if the instance is missing or the worker call fails - the UI then falls back to the static schema. Typed-error surfacing for the UI consumer is deferred to the plugin-config UI library (Path C Step 4).


get_all_plugin_configs


def get_all_plugin_configs(
    
)->Dict: # Plugin name -> config mapping

Get current configuration for all loaded plugins.


update_plugin_config


def update_plugin_config(
    name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
    config:Dict, # New configuration values
    strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
)->bool: # True if successful

Update a plugin instance’s configuration (CR-10 multi-instance aware).

CR-2: on successful reconfigure, persists the new config (default instance only; multi-instance loads don’t persist). Per-instance inst.config is updated regardless. SG-5: validates against the underlying plugin’s config_schema (per-plugin, not per-instance, so all instances share the same schema).


reload_plugin


def reload_plugin(
    name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
    config:Optional=None, # Optional new configuration
)->bool: # True if successful

Reload a plugin instance by terminating and restarting its Worker (CR-10).


get_plugin_stats


def get_plugin_stats(
    name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->Optional: # Resource telemetry or None

Get resource usage stats for a plugin instance’s Worker process (CR-10).

Streaming Execution

Async streaming support for real-time results (e.g., transcription word-by-word).


execute_plugin_stream


def execute_plugin_stream(
    name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Async generator yielding results

Execute a plugin instance with streaming response (CR-10 multi-instance aware).

Same per-instance resolution as execute_plugin_async; scheduler allocation keys off the PluginMeta (plugin-level), execution + bookkeeping key off the PluginInstance (per-instance).

Concurrent Load / Unload (CR-10b)

load_plugins_concurrent / unload_plugins_concurrent fan out plugin spawns + teardowns across an asyncio gather, dropping host startup time from sum-of-spawns to max-of-spawn. The sync load_plugin / unload_plugin paths are unchanged; the async variants run them via asyncio.to_thread so the existing sync _wait_for_ready doesn’t block the event loop while the worker boots.

PluginLoadSpec (defined in core.metadata) collects the per-load parameters into one dataclass so a batch can be expressed as List[PluginLoadSpec].

Result shapes:

  • load_plugins_concurrent(...) returns Dict[str, Union[str, Exception]] mapping the requested key (explicit instance_id if set, else spec.meta.name) → resolved instance_id on success or the raised Exception on failure.
  • unload_plugins_concurrent(...) returns Dict[str, Union[bool, Exception]] mapping the input name_or_idTrue on success or the raised Exception on failure.

max_concurrency=None (the default) means unbounded — every spec runs concurrently. Set an integer to cap simultaneous loads (useful when many plugins share a constrained resource like GPU memory at boot). fail_fast=False (the default) collects every result via asyncio.gather(..., return_exceptions=True); fail_fast=True re-raises the first exception and lets remaining tasks complete in the background.


load_plugin_async


async def load_plugin_async(
    plugin_meta:PluginMeta, config:Optional=None, strict:bool=True, instance_id:Optional=None,
    new_instance:bool=False
)->bool:

Async variant of load_plugin (CR-10b).

Runs the existing sync load_plugin via asyncio.to_thread so the blocking proxy spawn + _wait_for_ready doesn’t stall the event loop. Backward compat: identical behavior to the sync method, just non-blocking.


unload_plugin_async


async def unload_plugin_async(
    name_or_id:str
)->bool:

Async variant of unload_plugin (CR-10b).


load_plugins_concurrent


async def load_plugins_concurrent(
    specs:List, # Per-plugin load specifications
    max_concurrency:Optional=None, # Cap simultaneous loads; None = unbounded
    fail_fast:bool=False, # Re-raise first exception (default: collect all results)
)->Dict: # requested_key → instance_id or Exception

CR-10b: fan out plugin loads concurrently via asyncio.gather.

Each spec is loaded via load_plugin_async (asyncio.to_thread under the hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when max_concurrency=None. Capped concurrency uses an asyncio.Semaphore.

Result keys come from _spec_requested_key: explicit instance_id if set, {plugin_name}#new[{index}] for ambiguous new_instance specs, else plugin_name. Successful entries map to the resolved instance_id (string); failures map to the raised exception (caught regardless of fail_fast value for non-fail-fast mode; re-raised in fail_fast=True).


unload_plugins_concurrent


async def unload_plugins_concurrent(
    name_or_ids:List, # Plugin names or instance_ids to unload
    max_concurrency:Optional=None, fail_fast:bool=False
)->Dict: # name_or_id → True or Exception

CR-10b: fan out plugin unloads concurrently via asyncio.gather.

Same concurrency + fail_fast semantics as load_plugins_concurrent. Result keys are the input name_or_ids (deduplication is the caller’s responsibility; duplicate inputs produce one dict entry per unique key).

PluginBinding (SG-17)

A PluginBinding is a pre-bound view of one plugin through a shared PluginManager. Consumer services (8 audited in the substrate audit) have each been writing the same ~80-line wrapper class to hide the manager + plugin_name pair behind a clean instance API — whisper.execute(...) rather than manager.execute_plugin("whisper", ...).

PluginManager.bind(name, default_config=...) returns one. The binding forwards every method call to the bound manager, supplying plugin_name automatically. default_config is applied when load() is called without an explicit config.


PluginBinding


def PluginBinding(
    manager:PluginManager, plugin_name:str, default_config:Dict=<factory>
)->None:

Pre-bound view of a single plugin through a shared PluginManager.

Eliminates the wrapper-class duplication audited across 8 consumer services (SG-17). Methods forward to the manager with plugin_name pre-supplied; default_config is the fallback used when load() is called without an explicit config (matches the manifest-default behavior in load_plugin).


bind


def bind(
    plugin_name:str, # Name of the plugin to pre-bind
    default_config:Optional=None, # Default config used by binding.load()
)->PluginBinding: # Bound view ready for instance-style use

Create a PluginBinding pre-bound to this manager + plugin_name.

Taxonomy + Platform Queries (CR-1 + Phase 5a)

Substrate query API for discovering plugins by their derived taxonomy (domain, role, interface FQCN) or by current-platform compatibility. All queries are pure string-based — the substrate never imports the interface library to answer them, because _generate_manifest’s introspection script captured the strings at install time inside the plugin’s own conda env.

  • get_by_role("TranscriptionPlugin") — every discovered plugin whose taxonomy role matches (Whisper, Voxtral, Gemini, etc.).
  • get_by_domain("transcription") — every discovered plugin in the domain (multi-role: TranscriptionPlugin + ForcedAlignmentPlugin both qualify).
  • get_canonical("GraphPlugin") — the single discovered plugin for a role, or None if zero/multiple match. For substrate-internal use cases where multiple implementations would be a configuration error.
  • get_compatible_for_current_platform() — filters discovered by resources.platforms. Empty platforms list ≡ universal compatibility.

get_by_role


def get_by_role(
    role:str, # Interface class name segment of the FQCN (e.g., "TranscriptionPlugin")
)->List: # Discovered plugins matching the role

CR-1: return discovered plugins implementing the given interface role.


get_by_domain


def get_by_domain(
    domain:str, # Domain segment of the taxonomy (e.g., "transcription")
)->List: # Discovered plugins in the domain

CR-1: return discovered plugins in the given domain.


get_canonical


def get_canonical(
    role:str, # Interface class name to look up
)->Optional: # The unique matching plugin or None

CR-1: return the single canonical plugin for a role.

Returns None if zero or multiple plugins implement the role — useful for substrate-internal use cases (e.g., the graph storage plugin) where the expectation is exactly one implementation. Callers that want multi-implementation handling use get_by_role() directly.

Multi-match is logged at WARNING level because it’s a substrate-visible configuration-time surprise — without the warning, the caller’s None-handling branch can’t distinguish “no plugins installed for this role” from “multiple plugins competing for an exactly-one role.” Zero-match is silent because absence-of-optional-plugin is a normal probe outcome.


get_compatible_for_current_platform


def get_compatible_for_current_platform(
    
)->List: # Plugins compatible with current platform

Phase 5a: return discovered plugins compatible with the host platform.

Filters by resources.platforms. Plugins with an empty (or absent) platforms list are considered universally compatible — that’s the introspection-time convention when a plugin author didn’t declare a platform constraint. Plugins lacking the entire resources block (legacy / pre-Phase-5a manifests) also pass through as universal.

Does NOT filter on requires_gpu — substrate doesn’t know whether a GPU is present without invoking a system monitor plugin. Callers gate on GPU availability separately if needed.

Usage Examples

Basic Usage

import logging
from cjm_plugin_system.core.manager import PluginManager

logging.basicConfig(level=logging.INFO)

# Create manager
manager = PluginManager()

# Discover and load all plugins from manifest directories
results = manager.load_all()
print(f"Loaded: {results}")

# Execute a plugin
result = manager.execute_plugin("whisper-local", audio="/path/to/audio.wav")

# Update configuration (hot-reload)
manager.update_plugin_config("whisper-local", {"model": "large-v3"})

# Clean up all workers
manager.unload_all()

Async Usage (FastHTML)

async def transcribe(audio_path: str):
    manager = PluginManager()
    manager.load_all()
    
    try:
        result = await manager.execute_plugin_async("whisper-local", audio=audio_path)
        return result
    finally:
        manager.unload_all()

# Streaming
async def transcribe_stream(audio_path: str):
    manager = PluginManager()
    manager.load_all()
    
    try:
        async for chunk in manager.execute_plugin_stream("whisper-local", audio=audio_path):
            yield chunk
    finally:
        manager.unload_all()