Plugin Manager
PluginManager
The PluginManager orchestrates the complete lifecycle of plugins in the process-isolated architecture:
- Discovery: Finds plugin manifests in local (
.cjm/manifests/) or global (~/.cjm/manifests/) directories - Loading: Creates
RemotePluginProxyinstances that spawn isolated Worker subprocesses - Execution: Forwards calls to Workers via HTTP, supports both sync and async
- Lifecycle: Handles initialization, configuration updates, and cleanup
PluginManager Worker Subprocesses
┌─────────────────┐ ┌─────────────────────┐
│ discover_ │ │ Conda Env: Whisper │
│ manifests() │ HTTP/JSON │ └─ WhisperPlugin │
│ │◄──────────────────▶│ │
│ plugins: │ └─────────────────────┘
│ whisper ──────┼──► RemoteProxy ┌─────────────────────┐
│ gemini ───────┼──► RemoteProxy ◄──▶│ Conda Env: Gemini │
│ │ │ └─ GeminiPlugin │
└─────────────────┘ └─────────────────────┘
PluginManager
def PluginManager(
plugin_interface:Type=PluginInterface, # Base interface for type checking
search_paths:Optional=None, # Custom manifest search paths
scheduler:Optional=None, # Resource allocation policy
config_store:Optional=None, # CR-2: persistence backend; lazy LocalPluginConfigStore default per OQ-4
empirical_store:Optional=None, # CR-7: resource-usage tracking backend; lazy LocalEmpiricalResourceStore when cfg.substrate.empirical_tracking
secret_store:Optional=None, # CR-12: secret backend; lazy LocalSecretStore default (project-local <data_dir>/secrets)
max_retries:int=1, # CR-7: how many reactive retries to attempt on PluginResourceError (default 1 — one retry after eviction)
sysmon_plugin_name:Optional=None, # MonitorPlugin (CR-3) name for GPU subtree attribution; default-None records skip GPU attribution (compute axis only)
):
Manages plugin discovery, loading, and lifecycle via process isolation.
CR-3 Telemetry (System Monitor)
Bind a MonitorPlugin proxy and read system telemetry through CR-3’s typed surface. _get_global_stats* duck-types on the proxy’s typed accessor, falling back to the legacy dispatcher path for monitors that haven’t migrated.
register_system_monitor
def register_system_monitor(
plugin_name:str, # Name of the system monitor plugin
)->None:
Bind a loaded plugin to act as the hardware system monitor.
Manifest Discovery + Categorization
Read manifests from disk, parse CR-1 taxonomy + Phase 5a resources + derived category, and filter discovered/loaded plugins by category. Stable cluster — CR-1/Phase 5a closed.
discover_manifests
def discover_manifests(
)->List: # List of discovered plugin metadata
Discover plugins via JSON manifests in search paths.
CR-8: reads each manifest via load_manifest, which transparently parses both v2.0 nested + legacy v1.0 flat layouts into a typed ManifestV2. meta.manifest is set to a flat-shaped dict view so existing consumers (proxy, scheduling, execute path) continue working unchanged; the typed ManifestV2 is also attached as meta.manifest_v2 so drift detection + future typed callers can read drift_tracking.config_schema_hash without re-parsing.
get_discovered_by_category
def get_discovered_by_category(
category:str, # Category to filter by (e.g., "transcription")
)->List: # List of matching discovered plugins
Get discovered plugins filtered by category.
get_plugins_by_category
def get_plugins_by_category(
category:str, # Category to filter by (e.g., "transcription")
)->List: # List of matching loaded plugins
Get loaded plugins filtered by category.
get_discovered_categories
def get_discovered_categories(
)->List: # List of unique categories
Get all unique categories among discovered plugins.
get_loaded_categories
def get_loaded_categories(
)->List: # List of unique categories
Get all unique categories among loaded plugins.
get_plugin_meta
def get_plugin_meta(
plugin_name:str, # Name of the plugin
)->Optional: # Plugin metadata or None
Get metadata for a loaded plugin by name.
get_discovered_meta
def get_discovered_meta(
plugin_name:str, # Name of the plugin
)->Optional: # Plugin metadata or None
Get metadata for a discovered (not necessarily loaded) plugin by name.
Configuration Validation + Persistence Helpers
SG-5 strict-by-default config validation against the manifest’s config_schema; SG-9 schema drift detection; CR-2 per-plugin persistence via PluginConfigStore; deferred-on-disable hook bookkeeping. Stable cluster — these gates are settled.
CR-10 Multi-Instance Helpers
Validate explicit instance_id inputs against the constrained pattern, auto-generate unique IDs for new_instance=True, and query the per-instance state in self.instances. Closed by CR-10 core.
get_instance
def get_instance(
name_or_id:str, # Plugin name (default-loaded) or explicit instance_id
)->Optional:
Return the PluginInstance for name_or_id, or None if not loaded.
Lookup is keyed by instance_id (which equals plugin_name for default- loaded plugins). Multi-instance IDs only exist in self.instances.
list_instances
def list_instances(
plugin_name:Optional=None, # If given, filter to this plugin's instances
)->List:
List all loaded instances, optionally filtered by underlying plugin name.
get_worker_env_status
def get_worker_env_status(
name_or_meta:Any, # Plugin name (loaded/discovered) or a PluginMeta
scope:Optional=None, # SG-55 forward seam
)->List: # Per-entry status dicts (secret values never returned)
CR-12: per-entry satisfaction status of a plugin’s worker-env contract.
Each entry: {name, secret, required, satisfied, label, description}. satisfied means a value is resolvable (secret present in the store, or a visible var has a default/override). Secret VALUES are never returned — only whether one is set. The plugin-config UI uses this to gate config display on required secrets being satisfied.
missing_required_env
def missing_required_env(
name_or_meta:Any, # Plugin name or PluginMeta
scope:Optional=None, # SG-55 forward seam
)->List: # Names of required worker-env entries with no resolvable value
CR-12: names of required worker-env entries that are unsatisfied.
set_plugin_secret
def set_plugin_secret(
name_or_id:str, # Plugin name or instance_id whose secret to set
key:str, # Secret key (the env-var name, e.g. "GEMINI_API_KEY")
value:str, # Secret value (stored via the SecretStore, never config/logs)
scope:Optional=None, # SG-55 forward seam: per-principal scope
reload:bool=True, # Respawn loaded worker(s) so the new env is injected
)->bool: # True if the secret was stored
CR-12: store a plugin secret, then respawn its worker(s) to inject it.
Secrets are keyed by the underlying PLUGIN name (not instance_id), so all instances of a plugin share one credential — set the Gemini key once and every Gemini instance gets it at (re)spawn. Because worker env is fixed at spawn, the new value only reaches a running worker via a RESPAWN, so this reloads each loaded instance of the plugin (unless reload=False, e.g. when provisioning a secret before the plugin is loaded). This is the actuation seam both the CLI (cjm-ctl set-secret) and a future config UI call. Reload failures are logged, not raised.
Load + Unload Lifecycle
Spawn / terminate plugin workers. load_plugin is the canonical entry; unload_plugin handles canonical-vs-multi-instance cleanup (drops PluginMeta only when no instances remain for the plugin). get_plugin / list_plugins are the simple-accessor surface. Mid-churn — CR-7 reactive resource management will touch load_plugin lightly to initialize empirical resource tracking.
load_plugin
def load_plugin(
plugin_meta:PluginMeta, # Plugin metadata (with manifest attached)
config:Optional=None, # Initial configuration
strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
instance_id:Optional=None, # CR-10: explicit instance_id; None defaults to plugin_name
new_instance:bool=False, # CR-10: auto-generate `{name}-{hex}` instance_id (with instance_id=None)
max_concurrent_requests:Optional=None, # SG-33 (CR-7): per-instance async concurrency cap; None = unbounded
)->bool: # True if successfully loaded
Load a plugin by spawning a Worker subprocess.
CR-2: reads the persisted PluginConfigRecord from self.config_store before launching the worker. If a persisted record exists and the caller didn’t pass an explicit config, the persisted config is used as the effective input. The persisted enabled flag is applied to plugin_meta.enabled so disabled plugins stay disabled across process restarts.
CR-10: optional instance_id allows multi-instance loading. - instance_id=None, new_instance=False (default): instance_id = plugin_meta.name. Populates self.plugins[plugin_name] + self.instances [plugin_name] together (single-instance backward compat). - instance_id=“custom”: validated against [A-Za-z0-9_-]{1,64}. Populates self.instances[custom]. Persistence is keyed by plugin_name and only applied to the default instance. - instance_id=None, new_instance=True: auto-generates {name}-{6-hex}. Idempotent: re-load against an existing instance_id returns True without re-spawning.
CR-7: computes config_hash from the effective config (post-defaults + post-validation) and stores it on the PluginInstance so execute_plugin* can key empirical samples by (instance_id, config_hash). SG-33 stores max_concurrent_requests on the instance — the actual asyncio.Semaphore is lazy-created in execute_plugin_async via _get_concurrent_limiter.
load_all
def load_all(
configs:Optional=None, # Plugin name -> config mapping
)->Dict: # Plugin name -> success mapping
Discover and load all available plugins.
unload_plugin
def unload_plugin(
name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
)->bool: # True if successfully unloaded
Unload a plugin instance and terminate its Worker process (CR-10).
If name_or_id resolves to the default instance (instance_id == plugin_name) and no other instances remain for the same plugin, also removes the PluginMeta from self.plugins. Otherwise removes only the instance and clears PluginMeta.instance if it pointed at the unloaded canonical.
unload_all
def unload_all(
)->None:
Unload all plugin instances and terminate all Worker processes (CR-10).
Iterates self.instances (CR-10 keying) rather than self.plugins so all multi-instance entries get torn down, not just the canonical instances.
get_plugin
def get_plugin(
name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
)->Optional: # Plugin proxy instance or None
Get a loaded plugin’s proxy by name or instance_id (CR-10).
Lookup order: self.instances first (covers both default plugin_name and multi-instance IDs), falling back to PluginMeta.instance for any legacy code path that populated self.plugins without self.instances (defensive — shouldn’t happen post-CR-10 since load_plugin always records the instance).
list_plugins
def list_plugins(
)->List: # List of loaded plugin metadata
List all loaded plugins.
Execution + Resource Eviction
The high-churn execute path. CR-7’s primary rewrite target — reactive retry + OOM recovery + bounded retries will land here without touching the stable cells above. Per-instance _running_executions tracking (keyed by instance_id) allows concurrent multi-instance executes without colliding on the deferred-disable coordination.
execute_plugin
def execute_plugin(
name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Plugin result
Execute a plugin instance’s main functionality (sync).
CR-10: resolves name_or_id via self.instances; per-instance enabled flag gates execution. _running_executions tracks by instance_id so concurrent multi-instance executes don’t collide.
CR-2: raises PluginDisabledError (typed) when the instance is disabled.
CR-7: reactive retry on PluginResourceError — evicts other plugins to free resources, then ALWAYS reloads the failing plugin’s worker before the retry attempt. Track A (WorkerOOMError — worker died from SIGKILL) needs the reload because there’s no live worker to retry on. Track B (plugin-raised PluginResourceError — worker still alive) ALSO reloads because PyTorch’s CUDA caching allocator can fragment post-OOM in ways the plugin can’t clean up from within its own process; a fresh worker is the only reliable reset. Bounded by self.max_retries (default 1). Empirical sample recorded in the finally block — best-effort, doesn’t break execute on failure.
execute_plugin_async
async def execute_plugin_async(
name_or_id:str, # Plugin name (default-loaded) or instance_id (multi-instance)
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->Any: # Plugin result
Execute a plugin instance’s main functionality (async).
CR-10 + CR-2: same semantics as execute_plugin, async-flavored. Scheduler allocation goes through allocate_async for non-blocking polling.
CR-7 + SG-33: reactive retry on PluginResourceError — always reloads before retry (Track A + Track B converge on the same reload path; see sync variant docstring for the rationale). Per-instance asyncio.Semaphore enforces the max_concurrent_requests cap (None = unbounded). Empirical sample recorded in the finally block.
Enable / Disable + Logs
CR-2 enable/disable surface: per-instance enabled flag with default-instance sync to PluginMeta.enabled + persistence via PluginConfigStore + deferred-on-disable hook fire-after-in-flight-job semantics. Plus the log-file tail reader. The plugin-config UI library (Path C step 4) is the most likely consumer to extend the enable/disable behavior next.
enable_plugin
def enable_plugin(
name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was enabled
Enable a plugin instance (CR-10 multi-instance aware).
CR-2: persists the new state via config_store (default-instance only; persistence is per-plugin, not per-instance) and fires the plugin’s on_enable hook on state-change. Idempotent for already-enabled instances.
disable_plugin
def disable_plugin(
name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->bool: # True if instance was disabled
Disable a plugin instance without unloading it (CR-10 multi-instance aware).
CR-2: persists the new state (default-instance only) and fires the plugin’s on_disable hook — but defers the hook until any in-flight job for THIS instance finishes (the per-instance _running_executions key is the instance_id, so a concurrent execute on a different instance of the same plugin doesn’t gate this instance’s hook).
get_plugin_logs
def get_plugin_logs(
plugin_name:str, # Name of the plugin
lines:int=50, # Number of lines to return
)->str: # Log content
Read the last N lines of the plugin’s log file.
Key features:
- Local-first discovery: Manifests in
.cjm/manifests/shadow global ones in~/.cjm/manifests/ - Process isolation: Each plugin runs in its own subprocess with a dedicated Python interpreter
- Dual execution modes:
execute_plugin()for sync,execute_plugin_async()for async - Automatic cleanup:
unload_all()terminates all Worker processes
Configuration Management
Methods for managing plugin configuration. These forward to the RemotePluginProxy which communicates with the Worker over HTTP.
get_plugin_config
def get_plugin_config(
plugin_name:str, # Name of the plugin
)->Optional: # Current configuration or None
Get the current configuration of a plugin.
get_plugin_config_schema
def get_plugin_config_schema(
plugin_name:str, # Name of the plugin
)->Optional: # JSON Schema or None
Get the configuration JSON Schema for a plugin.
get_config_options
def get_config_options(
name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->Dict: # CR-11: live config option domains, or {} if unavailable
Get a plugin instance’s runtime config option providers (CR-11).
Forwards to the worker’s get_config_options() - live enum domains + per-option metadata for dynamic config fields (e.g. an API model list). Kept separate from get_plugin_config_schema (static, hashed for CR-8 drift); these options are the live companion the plugin-config UI merges on top.
Degrades to {} if the instance is missing or the worker call fails - the UI then falls back to the static schema. Typed-error surfacing for the UI consumer is deferred to the plugin-config UI library (Path C Step 4).
get_all_plugin_configs
def get_all_plugin_configs(
)->Dict: # Plugin name -> config mapping
Get current configuration for all loaded plugins.
update_plugin_config
def update_plugin_config(
name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
config:Dict, # New configuration values
strict:bool=True, # SG-5: reject unknown keys against manifest config_schema (default)
)->bool: # True if successful
Update a plugin instance’s configuration (CR-10 multi-instance aware).
CR-2: on successful reconfigure, persists the new config (default instance only; multi-instance loads don’t persist). Per-instance inst.config is updated regardless. SG-5: validates against the underlying plugin’s config_schema (per-plugin, not per-instance, so all instances share the same schema).
reload_plugin
def reload_plugin(
name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
config:Optional=None, # Optional new configuration
)->bool: # True if successful
Reload a plugin instance by terminating and restarting its Worker (CR-10).
get_plugin_stats
def get_plugin_stats(
name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
)->Optional: # Resource telemetry or None
Get resource usage stats for a plugin instance’s Worker process (CR-10).
Streaming Execution
Async streaming support for real-time results (e.g., transcription word-by-word).
execute_plugin_stream
def execute_plugin_stream(
name_or_id:str, # Plugin name (default instance) or instance_id (multi-instance)
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
)->AsyncGenerator: # Async generator yielding results
Execute a plugin instance with streaming response (CR-10 multi-instance aware).
Same per-instance resolution as execute_plugin_async; scheduler allocation keys off the PluginMeta (plugin-level), execution + bookkeeping key off the PluginInstance (per-instance).
Concurrent Load / Unload (CR-10b)
load_plugins_concurrent / unload_plugins_concurrent fan out plugin spawns + teardowns across an asyncio gather, dropping host startup time from sum-of-spawns to max-of-spawn. The sync load_plugin / unload_plugin paths are unchanged; the async variants run them via asyncio.to_thread so the existing sync _wait_for_ready doesn’t block the event loop while the worker boots.
PluginLoadSpec (defined in core.metadata) collects the per-load parameters into one dataclass so a batch can be expressed as List[PluginLoadSpec].
Result shapes:
load_plugins_concurrent(...)returnsDict[str, Union[str, Exception]]mapping the requested key (explicitinstance_idif set, elsespec.meta.name) → resolvedinstance_idon success or the raisedExceptionon failure.unload_plugins_concurrent(...)returnsDict[str, Union[bool, Exception]]mapping the inputname_or_id→Trueon success or the raisedExceptionon failure.
max_concurrency=None (the default) means unbounded — every spec runs concurrently. Set an integer to cap simultaneous loads (useful when many plugins share a constrained resource like GPU memory at boot). fail_fast=False (the default) collects every result via asyncio.gather(..., return_exceptions=True); fail_fast=True re-raises the first exception and lets remaining tasks complete in the background.
load_plugin_async
async def load_plugin_async(
plugin_meta:PluginMeta, config:Optional=None, strict:bool=True, instance_id:Optional=None,
new_instance:bool=False
)->bool:
Async variant of load_plugin (CR-10b).
Runs the existing sync load_plugin via asyncio.to_thread so the blocking proxy spawn + _wait_for_ready doesn’t stall the event loop. Backward compat: identical behavior to the sync method, just non-blocking.
unload_plugin_async
async def unload_plugin_async(
name_or_id:str
)->bool:
Async variant of unload_plugin (CR-10b).
load_plugins_concurrent
async def load_plugins_concurrent(
specs:List, # Per-plugin load specifications
max_concurrency:Optional=None, # Cap simultaneous loads; None = unbounded
fail_fast:bool=False, # Re-raise first exception (default: collect all results)
)->Dict: # requested_key → instance_id or Exception
CR-10b: fan out plugin loads concurrently via asyncio.gather.
Each spec is loaded via load_plugin_async (asyncio.to_thread under the hood). The total wall-clock drops from sum-of-spawns to max-of-spawns when max_concurrency=None. Capped concurrency uses an asyncio.Semaphore.
Result keys come from _spec_requested_key: explicit instance_id if set, {plugin_name}#new[{index}] for ambiguous new_instance specs, else plugin_name. Successful entries map to the resolved instance_id (string); failures map to the raised exception (caught regardless of fail_fast value for non-fail-fast mode; re-raised in fail_fast=True).
unload_plugins_concurrent
async def unload_plugins_concurrent(
name_or_ids:List, # Plugin names or instance_ids to unload
max_concurrency:Optional=None, fail_fast:bool=False
)->Dict: # name_or_id → True or Exception
CR-10b: fan out plugin unloads concurrently via asyncio.gather.
Same concurrency + fail_fast semantics as load_plugins_concurrent. Result keys are the input name_or_ids (deduplication is the caller’s responsibility; duplicate inputs produce one dict entry per unique key).
PluginBinding (SG-17)
A PluginBinding is a pre-bound view of one plugin through a shared PluginManager. Consumer services (8 audited in the substrate audit) have each been writing the same ~80-line wrapper class to hide the manager + plugin_name pair behind a clean instance API — whisper.execute(...) rather than manager.execute_plugin("whisper", ...).
PluginManager.bind(name, default_config=...) returns one. The binding forwards every method call to the bound manager, supplying plugin_name automatically. default_config is applied when load() is called without an explicit config.
PluginBinding
def PluginBinding(
manager:PluginManager, plugin_name:str, default_config:Dict=<factory>
)->None:
Pre-bound view of a single plugin through a shared PluginManager.
Eliminates the wrapper-class duplication audited across 8 consumer services (SG-17). Methods forward to the manager with plugin_name pre-supplied; default_config is the fallback used when load() is called without an explicit config (matches the manifest-default behavior in load_plugin).
bind
def bind(
plugin_name:str, # Name of the plugin to pre-bind
default_config:Optional=None, # Default config used by binding.load()
)->PluginBinding: # Bound view ready for instance-style use
Create a PluginBinding pre-bound to this manager + plugin_name.
Taxonomy + Platform Queries (CR-1 + Phase 5a)
Substrate query API for discovering plugins by their derived taxonomy (domain, role, interface FQCN) or by current-platform compatibility. All queries are pure string-based — the substrate never imports the interface library to answer them, because _generate_manifest’s introspection script captured the strings at install time inside the plugin’s own conda env.
get_by_role("TranscriptionPlugin")— every discovered plugin whose taxonomy role matches (Whisper, Voxtral, Gemini, etc.).get_by_domain("transcription")— every discovered plugin in the domain (multi-role: TranscriptionPlugin + ForcedAlignmentPlugin both qualify).get_canonical("GraphPlugin")— the single discovered plugin for a role, or None if zero/multiple match. For substrate-internal use cases where multiple implementations would be a configuration error.get_compatible_for_current_platform()— filtersdiscoveredbyresources.platforms. Empty platforms list ≡ universal compatibility.
get_by_role
def get_by_role(
role:str, # Interface class name segment of the FQCN (e.g., "TranscriptionPlugin")
)->List: # Discovered plugins matching the role
CR-1: return discovered plugins implementing the given interface role.
get_by_domain
def get_by_domain(
domain:str, # Domain segment of the taxonomy (e.g., "transcription")
)->List: # Discovered plugins in the domain
CR-1: return discovered plugins in the given domain.
get_canonical
def get_canonical(
role:str, # Interface class name to look up
)->Optional: # The unique matching plugin or None
CR-1: return the single canonical plugin for a role.
Returns None if zero or multiple plugins implement the role — useful for substrate-internal use cases (e.g., the graph storage plugin) where the expectation is exactly one implementation. Callers that want multi-implementation handling use get_by_role() directly.
Multi-match is logged at WARNING level because it’s a substrate-visible configuration-time surprise — without the warning, the caller’s None-handling branch can’t distinguish “no plugins installed for this role” from “multiple plugins competing for an exactly-one role.” Zero-match is silent because absence-of-optional-plugin is a normal probe outcome.
get_compatible_for_current_platform
def get_compatible_for_current_platform(
)->List: # Plugins compatible with current platform
Phase 5a: return discovered plugins compatible with the host platform.
Filters by resources.platforms. Plugins with an empty (or absent) platforms list are considered universally compatible — that’s the introspection-time convention when a plugin author didn’t declare a platform constraint. Plugins lacking the entire resources block (legacy / pre-Phase-5a manifests) also pass through as universal.
Does NOT filter on requires_gpu — substrate doesn’t know whether a GPU is present without invoking a system monitor plugin. Callers gate on GPU availability separately if needed.
Usage Examples
Basic Usage
import logging
from cjm_plugin_system.core.manager import PluginManager
logging.basicConfig(level=logging.INFO)
# Create manager
manager = PluginManager()
# Discover and load all plugins from manifest directories
results = manager.load_all()
print(f"Loaded: {results}")
# Execute a plugin
result = manager.execute_plugin("whisper-local", audio="/path/to/audio.wav")
# Update configuration (hot-reload)
manager.update_plugin_config("whisper-local", {"model": "large-v3"})
# Clean up all workers
manager.unload_all()Async Usage (FastHTML)
async def transcribe(audio_path: str):
manager = PluginManager()
manager.load_all()
try:
result = await manager.execute_plugin_async("whisper-local", audio=audio_path)
return result
finally:
manager.unload_all()
# Streaming
async def transcribe_stream(audio_path: str):
manager = PluginManager()
manager.load_all()
try:
async for chunk in manager.execute_plugin_stream("whisper-local", audio=audio_path):
yield chunk
finally:
manager.unload_all()