cjm-plugin-system
Install
pip install cjm_plugin_systemProject Structure
nbs/
├── core/ (9)
│ ├── config.ipynb # Project-level configuration for paths, runtime settings, and environment management
│ ├── interface.ipynb # Abstract base class defining the generic plugin interface
│ ├── manager.ipynb # Plugin discovery, loading, and lifecycle management system
│ ├── metadata.ipynb # Data structures for plugin metadata
│ ├── platform.ipynb # Cross-platform utilities for process management, path handling, and system detection
│ ├── proxy.ipynb # Bridge between Host application and isolated Worker processes
│ ├── queue.ipynb # Resource-aware job queue for sequential plugin execution with cancellation support
│ ├── scheduling.ipynb # Resource scheduling policies for plugin execution
│ └── worker.ipynb # FastAPI server that runs inside isolated plugin environments
├── utils/ (2)
│ ├── hashing.ipynb # Shared cryptographic hashing primitives for content integrity verification
│ └── validation.ipynb # Validation helpers for plugin configuration dataclasses
└── cli.ipynb # CLI tool for declarative plugin management
Total: 12 notebooks across 2 directories
Module Dependencies
graph LR
cli[cli<br/>cli]
core_config[core.config<br/>Configuration]
core_interface[core.interface<br/>Plugin Interface]
core_manager[core.manager<br/>Plugin Manager]
core_metadata[core.metadata<br/>Plugin Metadata]
core_platform[core.platform<br/>Platform Utilities]
core_proxy[core.proxy<br/>Remote Plugin Proxy]
core_queue[core.queue<br/>Job Queue]
core_scheduling[core.scheduling<br/>Scheduling]
core_worker[core.worker<br/>Universal Worker]
utils_hashing[utils.hashing<br/>Content Hashing Utilities]
utils_validation[utils.validation<br/>Configuration Validation]
cli --> core_platform
cli --> core_config
core_manager --> core_metadata
core_manager --> core_scheduling
core_manager --> core_config
core_manager --> core_proxy
core_manager --> core_interface
core_platform --> core_config
core_proxy --> core_platform
core_proxy --> core_interface
core_proxy --> core_config
core_queue --> core_manager
core_scheduling --> core_metadata
core_worker --> core_platform
14 cross-module dependencies detected
CLI Reference
cjm-ctl Command
Usage: cjm-ctl [OPTIONS] COMMAND [ARGS]...
CJM Plugin System CLI
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────╮
│ --cjm-config PATH Path to cjm.yaml configuration file │
│ --data-dir PATH Override data directory (manifests, logs) │
│ --conda-prefix PATH Override conda/mamba prefix path │
│ --conda-type TEXT Conda implementation: micromamba, miniforge, or conda │
│ --install-completion Install completion for the current shell. │
│ --show-completion Show completion for the current shell, to copy it or customize the │
│ installation. │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────╯
╭─ Commands ─────────────────────────────────────────────────────────────────────────────────────────────╮
│ setup-runtime Download and setup micromamba runtime for project-local mode. │
│ install-all Install and register all plugins defined in plugins.yaml. │
│ setup-host Install interface libraries in the current Python environment. │
│ estimate-size Estimate disk space required for plugin environments. │
│ list List installed plugins from manifest directory. │
│ remove Remove a plugin's manifest and conda environment. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────╯
For detailed help on any command, use cjm-ctl <command> --help.
Module Overview
Detailed documentation for each module in the project:
cli (cli.ipynb)
CLI tool for declarative plugin management
Import
from cjm_plugin_system.cli import (
app,
main,
setup_runtime,
run_cmd,
install_all,
setup_host,
estimate_size,
list_plugins,
remove_plugin
)Functions
def main(
ctx:typer.Context,
cjm_config:Annotated[Optional[Path], typer.Option(
"--cjm-config",
help="Path to cjm.yaml configuration file"
)]=None,
data_dir:Annotated[Optional[Path], typer.Option(
"--data-dir",
help="Override data directory (manifests, logs)"
)]=None,
conda_prefix:Annotated[Optional[Path], typer.Option(
"--conda-prefix",
help="Override conda/mamba prefix path"
)]=None,
conda_type:Annotated[Optional[str], typer.Option(
"--conda-type",
help="Conda implementation: micromamba, miniforge, or conda"
)]=None,
) -> None
"CJM Plugin System CLI for managing isolated plugin environments."def setup_runtime(
force:bool=typer.Option(False, "--force", "-f", help="Re-download even if binary exists")
) -> None
"Download and setup micromamba runtime for project-local mode."def _check_runtime_available() -> None:
"""Check if the configured conda runtime is available, exit with helpful message if not."""
cfg = get_config()
if not ensure_runtime_available(cfg)
"Check if the configured conda runtime is available, exit with helpful message if not."def _get_conda_cmd_str() -> str
"Get the conda/micromamba command string for shell commands."def _download_url_to_temp(
url: str, # URL to download
suffix: str = ".yml" # File suffix for temp file
) -> Optional[Path]: # Path to temp file or None if failed
"Download a URL to a temporary file. Returns None if download fails."def _resolve_env_file(
env_file: str # Path or URL to environment file
) -> tuple[str, Optional[Path]]: # (resolved_path, temp_file_to_cleanup)
"""
Resolve env_file to a local path, downloading if it's a URL.
Returns (local_path, temp_file) where temp_file is set if we created
a temporary file that should be cleaned up later.
"""def run_cmd(
cmd: str, # Shell command to execute
check: bool = True # Whether to raise on non-zero exit
) -> None
"""
Run a shell command and stream output.
Uses the platform's default shell (no hardcoded /bin/bash).
"""def _generate_manifest(
env_name: str, # Name of the Conda environment
package_name: str, # Package source string (git URL or package name)
manifest_dir: Path # Directory to write manifest JSON files
) -> None
"Run introspection script inside the target env to generate manifest."def _add_conda_env_to_manifest(
manifest_dir:Path, # Directory containing manifest files
plugin_name:str, # Plugin name (used for finding manifest file)
env_name:str # Conda environment name to add
) -> bool: # True if successfully updated
"Add conda_env field to an existing manifest file."def _conda_env_exists_configured(
env_name: str # Name of the conda environment
) -> bool: # True if environment exists
"Check if conda environment exists using configured conda command."def install_all(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
force:bool=typer.Option(False, help="Force recreation of environments")
) -> None
"Install and register all plugins defined in plugins.yaml."def setup_host(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Install interface libraries in the current Python environment."def _format_size(
size_bytes: int # Size in bytes
) -> str: # Human-readable size string
"Format bytes as human-readable string."def _get_pypi_size(
package_spec: str # Package name or git URL
) -> tuple[int, str]: # (size_bytes, package_name)
"Query PyPI for package download size."def _estimate_conda_size(
env_file: str, # Path or URL to environment.yml
env_name: str # Target environment name
) -> tuple[int, int]: # (total_bytes, package_count)
"Estimate conda package sizes using dry-run."def _estimate_pip_sizes(
packages: list[str] # List of pip package specs
) -> tuple[int, int, list[tuple[str, int]]]: # (total_bytes, found_count, [(name, size), ...])
"Estimate pip package sizes from PyPI."def estimate_size(
plugins_path:str=typer.Option("plugins.yaml", "--plugins", help="Path to plugins.yaml file"),
plugin_name:Optional[str]=typer.Option(None, "--plugin", "-p", help="Estimate for a single plugin"),
verbose:bool=typer.Option(False, "--verbose", "-v", help="Show per-package breakdown")
) -> None
"Estimate disk space required for plugin environments."def _get_conda_envs() -> set[str]: # Set of existing conda environment names
"""Get list of existing conda environment names using configured conda command."""
cfg = get_config()
cmd_parts = build_conda_command(cfg, "env", "list", "--json")
try
"Get list of existing conda environment names using configured conda command."def _get_installed_manifests(
manifest_dir:Optional[Path]=None # Directory to scan (uses config default if None)
) -> list[dict]: # List of manifest dictionaries
"Load all manifest JSON files from the manifest directory."def _extract_env_from_python_path(
python_path:str # Path like /home/user/miniforge3/envs/my-env/bin/python
) -> str: # Extracted environment name or empty string
"Extract conda environment name from python_path."def list_plugins(
plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for cross-reference"),
show_envs:bool=typer.Option(False, "--envs", "-e", help="Show conda environment status")
) -> None
"List installed plugins from manifest directory."def remove_plugin(
plugin_name:str=typer.Argument(..., help="Name of the plugin to remove"),
plugins_path:Optional[str]=typer.Option(None, "--plugins", help="Path to plugins.yaml for env name lookup"),
keep_env:bool=typer.Option(False, "--keep-env", help="Keep the conda environment, only remove manifest"),
yes:bool=typer.Option(False, "--yes", "-y", help="Skip confirmation prompt")
) -> None
"Remove a plugin's manifest and conda environment."Configuration (config.ipynb)
Project-level configuration for paths, runtime settings, and environment management
Import
from cjm_plugin_system.core.config import (
RuntimeMode,
CondaType,
RuntimeConfig,
CJMConfig,
load_config,
get_config,
set_config,
reset_config
)Functions
def _load_from_yaml(
yaml_path:Path # Path to cjm.yaml file
) -> CJMConfig: # Parsed configuration
"Load config from YAML file, resolving relative paths."def load_config(
config_path:Optional[Path]=None, # CLI --cjm-config
data_dir:Optional[Path]=None, # CLI --data-dir
conda_prefix:Optional[Path]=None, # CLI --conda-prefix
conda_type:Optional[str]=None # CLI --conda-type
) -> CJMConfig: # Resolved configuration
"Load config with layered resolution (CLI > env vars > yaml > defaults)."def get_config() -> CJMConfig: # Current configuration
"""Get current config (loads defaults if not set)."""
global _current_config
if _current_config is None
"Get current config (loads defaults if not set)."def set_config(
config:CJMConfig # Configuration to set as current
) -> None
"Set current config (called by CLI callback)."def reset_config() -> None
"Reset to unloaded state (for testing)."Classes
class RuntimeMode(str, Enum):
"Runtime mode for the plugin system."class CondaType(str, Enum):
"Type of conda implementation to use."@dataclass
class RuntimeConfig:
"Runtime environment configuration."
mode: RuntimeMode = RuntimeMode.SYSTEM # LOCAL for project-local, SYSTEM for global
conda_type: CondaType = CondaType.CONDA # Conda implementation to use
prefix: Optional[Path] # Path to runtime directory (LOCAL mode only)
binaries: Dict[str, Path] = field(...) # Platform-specific binary paths@dataclass
class CJMConfig:
"Main configuration for cjm-plugin-system."
runtime: RuntimeConfig = field(...) # Runtime environment settings
data_dir: Path = field(...) # Base directory for manifests, logs
plugins_config: Path = field(...) # Path to plugins.yaml file
models_dir: Optional[Path] # Directory for model downloads
def manifests_dir(self) -> Path: # Directory containing plugin manifests
"""Directory containing plugin manifests."""
return self.data_dir / "manifests"
@property
def plugin_data_dir(self) -> Path: # Directory for plugin runtime data
"Directory containing plugin manifests."
def plugin_data_dir(self) -> Path: # Directory for plugin runtime data
"""Directory for plugin runtime data (databases, caches)."""
return self.data_dir / "data"
@property
def logs_dir(self) -> Path: # Directory containing plugin logs
"Directory for plugin runtime data (databases, caches)."
def logs_dir(self) -> Path: # Directory containing plugin logs
"""Directory containing plugin logs."""
return self.data_dir / "logs"
@property
def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
"Directory containing plugin logs."
def conda_binary_path(self) -> Optional[Path]: # Path to conda/micromamba binary or None
"""Get the configured binary path for the current platform."""
# Inline platform detection to avoid circular imports
system = platform_mod.system().lower()
machine = platform_mod.machine().lower()
if system == "windows"
"Get the configured binary path for the current platform."Variables
_current_config: Optional[CJMConfig] = NoneContent Hashing Utilities (hashing.ipynb)
Shared cryptographic hashing primitives for content integrity verification
Import
from cjm_plugin_system.utils.hashing import (
hash_bytes,
hash_file,
verify_hash
)Functions
def hash_bytes(
content: bytes, # Byte content to hash
algo: str = "sha256" # Hash algorithm name (e.g., "sha256", "sha3_256")
) -> str: # Hash string in "algo:hexdigest" format
"Compute a hash of byte content."def hash_file(
path: Union[str, Path], # Path to file to hash
algo: str = "sha256", # Hash algorithm name
chunk_size: int = 8192 # Read chunk size in bytes
) -> str: # Hash string in "algo:hexdigest" format
"Stream-hash a file without loading it entirely into memory."def verify_hash(
content: bytes, # Content to verify
expected: str # Expected hash in "algo:hexdigest" format
) -> bool: # True if content matches expected hash
"Verify content against an expected hash string."Plugin Interface (interface.ipynb)
Abstract base class defining the generic plugin interface
Import
from cjm_plugin_system.core.interface import (
FileBackedDTO,
PluginInterface
)Classes
@runtime_checkable
class FileBackedDTO(Protocol):
"Protocol for Data Transfer Objects that serialize to disk for zero-copy transfer."
def to_temp_file(self) -> str: # Absolute path to the temporary file
"Save the data to a temporary file and return the absolute path."class PluginInterface(ABC):
"Abstract base class for all plugins (both local workers and remote proxies)."
def name(self) -> str: # Unique identifier for this plugin
"""Unique plugin identifier."""
...
@property
@abstractmethod
def version(self) -> str: # Semantic version string (e.g., "1.0.0")
"Unique plugin identifier."
def version(self) -> str: # Semantic version string (e.g., "1.0.0")
"""Plugin version."""
...
@abstractmethod
def initialize(
self,
config: Optional[Dict[str, Any]] = None # Configuration dictionary
) -> None
"Plugin version."
def initialize(
self,
config: Optional[Dict[str, Any]] = None # Configuration dictionary
) -> None
"Initialize or re-configure the plugin."
def execute(
self,
*args,
**kwargs
) -> Any: # Plugin-specific output
"Execute the plugin's main functionality."
def execute_stream(
self,
*args,
**kwargs
) -> Generator[Any, None, None]: # Yields partial results
"Stream execution results chunk by chunk."
def get_config_schema(self) -> Dict[str, Any]: # JSON Schema for configuration
"""Return JSON Schema describing the plugin's configuration options."""
...
@abstractmethod
def get_current_config(self) -> Dict[str, Any]: # Current configuration values
"Return JSON Schema describing the plugin's configuration options."
def get_current_config(self) -> Dict[str, Any]: # Current configuration values
"""Return the current configuration state as a dictionary."""
...
@abstractmethod
def cleanup(self) -> None
"Return the current configuration state as a dictionary."
def cleanup(self) -> None:
"""Clean up resources when plugin is unloaded."""
...
def cancel(self) -> None
"Clean up resources when plugin is unloaded."
def cancel(self) -> None:
"""Cancel the current execution. Override for cooperative cancellation support."""
pass # Default: no-op (plugins opt-in to cancellation)
"Cancel the current execution. Override for cooperative cancellation support."
def report_progress(
self,
progress: float, # 0.0 to 1.0, or -1.0 for indeterminate
message: str = "" # Descriptive status message
) -> None
"Report execution progress. Call during execute() to update status."Plugin Manager (manager.ipynb)
Plugin discovery, loading, and lifecycle management system
Import
from cjm_plugin_system.core.manager import (
PluginManager,
get_plugin_config,
get_plugin_config_schema,
get_all_plugin_configs,
update_plugin_config,
reload_plugin,
get_plugin_stats,
execute_plugin_stream
)Functions
def get_plugin_config(
self,
plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Current configuration or None
"Get the current configuration of a plugin."def get_plugin_config_schema(
self,
plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # JSON Schema or None
"Get the configuration JSON Schema for a plugin."def get_all_plugin_configs(self) -> Dict[str, Dict[str, Any]]: # Plugin name -> config mapping
"""Get current configuration for all loaded plugins."""
return {
name: plugin.get_current_config()
"Get current configuration for all loaded plugins."def update_plugin_config(
self,
plugin_name: str, # Name of the plugin
config: Dict[str, Any] # New configuration values
) -> bool: # True if successful
"Update a plugin's configuration (hot-reload without restart)."def reload_plugin(
self,
plugin_name: str, # Name of the plugin
config: Optional[Dict[str, Any]] = None # Optional new configuration
) -> bool: # True if successful
"Reload a plugin by terminating and restarting its Worker."def get_plugin_stats(
self,
plugin_name: str # Name of the plugin
) -> Optional[Dict[str, Any]]: # Resource telemetry or None
"Get resource usage stats for a plugin's Worker process."async def execute_plugin_stream(
self,
plugin_name: str, # Name of the plugin
*args,
**kwargs
) -> AsyncGenerator[Any, None]: # Async generator yielding results
"Execute a plugin with streaming response."Classes
class PluginManager:
def __init__(
self,
plugin_interface:Type[PluginInterface]=PluginInterface, # Base interface for type checking
search_paths:Optional[List[Path]]=None, # Custom manifest search paths
scheduler:Optional[ResourceScheduler]=None # Resource allocation policy
)
"Manages plugin discovery, loading, and lifecycle via process isolation."
def __init__(
self,
plugin_interface:Type[PluginInterface]=PluginInterface, # Base interface for type checking
search_paths:Optional[List[Path]]=None, # Custom manifest search paths
scheduler:Optional[ResourceScheduler]=None # Resource allocation policy
)
"Initialize the plugin manager."
def register_system_monitor(
self,
plugin_name:str # Name of the system monitor plugin
) -> None
"Bind a loaded plugin to act as the hardware system monitor."
def discover_manifests(self) -> List[PluginMeta]: # List of discovered plugin metadata
"""Discover plugins via JSON manifests in search paths."""
self.discovered = []
seen_plugins = set()
for base_path in self.search_paths
"Discover plugins via JSON manifests in search paths."
def get_discovered_by_category(
self,
category:str # Category to filter by (e.g., "transcription")
) -> List[PluginMeta]: # List of matching discovered plugins
"Get discovered plugins filtered by category."
def get_plugins_by_category(
self,
category:str # Category to filter by (e.g., "transcription")
) -> List[PluginMeta]: # List of matching loaded plugins
"Get loaded plugins filtered by category."
def get_discovered_categories(self) -> List[str]: # List of unique categories
"""Get all unique categories among discovered plugins."""
return list(set(meta.category for meta in self.discovered if meta.category))
def get_loaded_categories(self) -> List[str]: # List of unique categories
"Get all unique categories among discovered plugins."
def get_loaded_categories(self) -> List[str]: # List of unique categories
"""Get all unique categories among loaded plugins."""
return list(set(meta.category for meta in self.plugins.values() if meta.category))
def get_plugin_meta(
self,
plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
"Get all unique categories among loaded plugins."
def get_plugin_meta(
self,
plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
"Get metadata for a loaded plugin by name."
def get_discovered_meta(
self,
plugin_name:str # Name of the plugin
) -> Optional[PluginMeta]: # Plugin metadata or None
"Get metadata for a discovered (not necessarily loaded) plugin by name."
def load_plugin(
self,
plugin_meta:PluginMeta, # Plugin metadata (with manifest attached)
config:Optional[Dict[str, Any]]=None # Initial configuration
) -> bool: # True if successfully loaded
"Load a plugin by spawning a Worker subprocess."
def load_all(
self,
configs:Optional[Dict[str, Dict[str, Any]]]=None # Plugin name -> config mapping
) -> Dict[str, bool]: # Plugin name -> success mapping
"Discover and load all available plugins."
def unload_plugin(
self,
plugin_name:str # Name of the plugin to unload
) -> bool: # True if successfully unloaded
"Unload a plugin and terminate its Worker process."
def unload_all(self) -> None:
"""Unload all plugins and terminate all Worker processes."""
for name in list(self.plugins.keys())
"Unload all plugins and terminate all Worker processes."
def get_plugin(
self,
plugin_name:str # Name of the plugin
) -> Optional[PluginInterface]: # Plugin proxy instance or None
"Get a loaded plugin instance by name."
def list_plugins(self) -> List[PluginMeta]: # List of loaded plugin metadata
"""List all loaded plugins."""
return list(self.plugins.values())
def _evict_for_resources(self, needed_meta:PluginMeta) -> bool
"List all loaded plugins."
def execute_plugin(
self,
plugin_name:str, # Name of the plugin
*args,
**kwargs
) -> Any: # Plugin result
"Execute a plugin's main functionality (sync)."
async def execute_plugin_async(
self,
plugin_name:str, # Name of the plugin
*args,
**kwargs
) -> Any: # Plugin result
"Execute a plugin's main functionality (async)."
def enable_plugin(
self,
plugin_name:str # Name of the plugin
) -> bool: # True if plugin was enabled
"Enable a plugin."
def disable_plugin(
self,
plugin_name:str # Name of the plugin
) -> bool: # True if plugin was disabled
"Disable a plugin without unloading it."
def get_plugin_logs(
self,
plugin_name:str, # Name of the plugin
lines:int=50 # Number of lines to return
) -> str: # Log content
"Read the last N lines of the plugin's log file."Plugin Metadata (metadata.ipynb)
Data structures for plugin metadata
Import
from cjm_plugin_system.core.metadata import (
PluginMeta
)Classes
@dataclass
class PluginMeta:
"Metadata about a plugin."
name: str # Plugin's unique identifier
version: str # Plugin's version string
description: str = '' # Brief description of the plugin's functionality
author: str = '' # Plugin author's name or organization
package_name: str = '' # Python package name containing the plugin
category: str = '' # Plugin category (e.g., "transcription", "system_monitor")
interface: str = '' # Fully qualified interface class name
config_schema: Optional[Dict[str, Any]] # JSON Schema for plugin configuration
instance: Optional[Any] # Plugin instance (PluginInterface subclass)
enabled: bool = True # Whether the plugin is enabled
last_executed: float = 0.0 # Unix timestampPlatform Utilities (platform.ipynb)
Cross-platform utilities for process management, path handling, and system detection
Import
from cjm_plugin_system.core.platform import (
MICROMAMBA_URLS,
is_windows,
is_macos,
is_linux,
is_apple_silicon,
get_current_platform,
get_python_in_env,
get_popen_isolation_kwargs,
terminate_process,
terminate_self,
run_shell_command,
conda_env_exists,
get_micromamba_download_url,
download_micromamba,
get_conda_command,
build_conda_command,
get_micromamba_binary_path,
ensure_runtime_available
)Functions
def is_windows() -> bool:
"""Check if running on Windows."""
return platform.system() == "Windows"
def is_macos() -> bool
"Check if running on Windows."def is_macos() -> bool:
"""Check if running on macOS."""
return platform.system() == "Darwin"
def is_linux() -> bool
"Check if running on macOS."def is_linux() -> bool:
"""Check if running on Linux."""
return platform.system() == "Linux"
def is_apple_silicon() -> bool
"Check if running on Linux."def is_apple_silicon() -> bool
"Check if running on Apple Silicon Mac (for MPS detection)."def get_current_platform() -> str:
"""Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""
system = platform.system().lower()
machine = platform.machine().lower()
# Normalize system names
if system == "darwin"
"""
Get current platform string for manifest filtering.
Returns strings like 'linux-x64', 'darwin-arm64', 'win-x64'.
"""def get_python_in_env(
env_path: Path # Path to conda environment root
) -> Path: # Path to Python executable
"""
Get the Python executable path for a conda environment.
On Windows: env_path/python.exe
On Unix: env_path/bin/python
"""def get_popen_isolation_kwargs() -> Dict[str, Any]:
"""Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""
if is_windows()
"""
Return kwargs for process isolation in subprocess.Popen.
On Unix: Returns {'start_new_session': True}
On Windows: Returns {'creationflags': CREATE_NEW_PROCESS_GROUP}
Usage:
process = subprocess.Popen(cmd, **get_popen_isolation_kwargs(), ...)
"""def terminate_process(
process: subprocess.Popen, # Process to terminate
timeout: float = 2.0 # Seconds to wait before force kill
) -> None
"""
Terminate a subprocess gracefully, with fallback to force kill.
On all platforms:
1. Calls process.terminate() (SIGTERM on Unix, TerminateProcess on Windows)
2. Waits for timeout seconds
3. If still running, calls process.kill() (SIGKILL on Unix, TerminateProcess on Windows)
"""def terminate_self() -> None:
"""Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""
if is_windows()
"""
Terminate the current process (for worker suicide pact).
On Unix: Sends SIGTERM to self for graceful shutdown
On Windows: Calls os._exit() since Windows lacks SIGTERM
"""def run_shell_command(
cmd: str, # Shell command to execute
check: bool = True, # Whether to raise on non-zero exit
capture_output: bool = False, # Whether to capture stdout/stderr
**kwargs # Additional kwargs passed to subprocess.run
) -> subprocess.CompletedProcess
"""
Run a shell command cross-platform.
Unlike using shell=True with executable='/bin/bash', this function
uses the platform's default shell:
- Linux/macOS: /bin/sh (or $SHELL)
- Windows: cmd.exe
"""def conda_env_exists(
env_name: str, # Name of the conda environment
conda_cmd: str = "conda" # Conda command (conda, mamba, micromamba)
) -> bool
"""
Check if a conda environment exists (cross-platform).
Uses 'conda env list --json' instead of piping to grep,
which doesn't work on Windows.
"""def get_micromamba_download_url(
platform_str: Optional[str] = None # Platform string (e.g., 'linux-x64'), uses current if None
) -> str: # Download URL for micromamba binary
"Get the micromamba download URL for the specified or current platform."def download_micromamba(
dest_path: Path, # Destination path for the micromamba binary
platform_str: Optional[str] = None, # Platform string, uses current if None
show_progress: bool = True # Whether to print progress messages
) -> bool: # True if download succeeded
"Download and extract micromamba binary to the specified path."def get_conda_command(
config: CJMConfig # Configuration object with runtime settings
) -> List[str]: # Base command with prefix args if needed
"Get the conda/mamba/micromamba base command with prefix args for local mode."def build_conda_command(
config: CJMConfig, # Configuration object with runtime settings
*args: str # Additional command arguments
) -> List[str]: # Complete command ready for subprocess
"Build a complete conda/mamba/micromamba command."def get_micromamba_binary_path(
config: CJMConfig # Configuration object with runtime settings
) -> Optional[Path]: # Path to micromamba binary or None
"Get the configured micromamba binary path for the current platform."def ensure_runtime_available(
config: CJMConfig # Configuration object with runtime settings
) -> bool: # True if runtime is available
"Check if the configured conda/micromamba runtime is available."Variables
MICROMAMBA_URLS: Dict[str, str]Remote Plugin Proxy (proxy.ipynb)
Bridge between Host application and isolated Worker processes
Import
from cjm_plugin_system.core.proxy import (
RemotePluginProxy,
execute_async,
execute_stream_sync,
execute_stream,
get_stats,
is_alive,
cancel,
cancel_async,
get_progress,
get_progress_async
)Functions
def _maybe_serialize_input(
self,
obj: Any # Object to potentially serialize
) -> Any: # Serialized form (path string or original object)
"Convert FileBackedDTO objects to file paths for zero-copy transfer."def _prepare_payload(
self,
args: tuple, # Positional arguments
kwargs: dict # Keyword arguments
) -> Dict[str, Any]: # JSON-serializable payload
"Prepare arguments for HTTP transmission."async def execute_async(
self,
*args,
**kwargs
) -> Any: # Plugin result
"Execute the plugin asynchronously."def execute_stream_sync(self, *args, **kwargs) -> Generator[Any, None, None]
"Synchronous wrapper for streaming (blocking)."async def execute_stream(
self,
*args,
**kwargs
) -> AsyncGenerator[Any, None]: # Yields parsed JSON chunks
"Execute with streaming response (async generator)."def get_stats(self) -> Dict[str, Any]: # Process telemetry
"""Get worker process resource usage."""
with httpx.Client() as client
"Get worker process resource usage."def is_alive(self) -> bool: # True if worker is responsive
"""Check if the worker process is still running and responsive."""
if not self.process or self.process.poll() is not None
"Check if the worker process is still running and responsive."def cancel(self) -> bool: # True if cancel request was sent
"""Request cancellation of running execution."""
try
"Request cancellation of running execution."async def cancel_async(self) -> bool: # True if cancel request was sent
"""Request cancellation asynchronously."""
try
"Request cancellation asynchronously."def get_progress(self) -> Dict[str, Any]: # {progress: float, message: str}
"""Get current execution progress from worker."""
try
"Get current execution progress from worker."async def get_progress_async(self) -> Dict[str, Any]: # {progress: float, message: str}
"""Get current execution progress asynchronously."""
try
"Get current execution progress asynchronously."def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb)
"Enter context manager."def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and cleanup."""
self.cleanup()
return False
async def __aenter__(self)
"Exit context manager and cleanup."async def __aenter__(self):
"""Enter async context manager."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb)
"Enter async context manager."async def __aexit__(self, exc_type, exc_val, exc_tb)
"Exit async context manager and cleanup."Classes
class RemotePluginProxy:
def __init__(
self,
manifest:Dict[str, Any] # Plugin manifest with python_path, module, class, etc.
)
"Proxy that forwards plugin calls to an isolated Worker subprocess."
def __init__(
self,
manifest:Dict[str, Any] # Plugin manifest with python_path, module, class, etc.
)
"Initialize proxy and start the worker process."
def name(self) -> str: # Plugin name from manifest
"""Plugin name."""
return self.manifest.get('name', 'unknown')
@property
def version(self) -> str: # Plugin version from manifest
"Plugin name."
def version(self) -> str: # Plugin version from manifest
"""Plugin version."""
return self.manifest.get('version', '0.0.0')
def _get_free_port(self) -> int
"Plugin version."
def initialize(
self,
config:Optional[Dict[str, Any]]=None # Configuration dictionary
) -> None
"Initialize or reconfigure the plugin."
def execute(
self,
*args,
**kwargs
) -> Any: # Plugin result
"Execute the plugin synchronously."
def get_config_schema(self) -> Dict[str, Any]: # JSON Schema
"""Get the plugin's configuration schema."""
with httpx.Client() as client
"Get the plugin's configuration schema."
def get_current_config(self) -> Dict[str, Any]: # Current config values
"""Get the plugin's current configuration."""
with httpx.Client() as client
"Get the plugin's current configuration."
def cleanup(self) -> None:
"""Clean up plugin resources and terminate worker process."""
# Send cleanup request to worker
try
"Clean up plugin resources and terminate worker process."Job Queue (queue.ipynb)
Resource-aware job queue for sequential plugin execution with cancellation support
Import
from cjm_plugin_system.core.queue import (
JobStatus,
Job,
JobQueue,
submit,
cancel,
reorder,
get_job,
wait_for_job,
get_state,
get_job_logs,
start,
stop
)Functions
async def submit(
self,
plugin_name: str, # Target plugin
*args,
priority: int = 0, # Higher = more urgent
**kwargs
) -> str: # Returns job_id
"Submit a job to the queue."async def cancel(
self,
job_id: str # Job to cancel
) -> bool: # True if cancelled
"Cancel a pending or running job."def reorder(
self,
job_id: str, # Job to move
new_priority: int # New priority value
) -> bool: # True if reordered
"Change the priority of a pending job."def get_job(
self,
job_id: str # Job to retrieve
) -> Optional[Job]: # Job or None
"Get a job by ID."async def wait_for_job(
self,
job_id: str, # Job to wait for
timeout: Optional[float] = None # Max seconds to wait
) -> Job: # Completed/failed/cancelled job
"Wait for a job to complete."def get_state(self) -> Dict[str, Any]: # Queue state for UI
"""Get the current queue state."""
running_dict = None
if self._running
"Get the current queue state."def get_job_logs(
self,
job_id: str, # Job to get logs for
lines: int = 100 # Max lines to return
) -> str: # Log content
"Get logs for a job from the plugin's log file."async def start(self) -> None:
"""Start the queue processor."""
if self._running_flag
"Start the queue processor."async def stop(self) -> None:
"""Stop the queue processor gracefully."""
self._running_flag = False
self._job_available.set() # Wake up the processor
if self._processor_task
"Stop the queue processor gracefully."def _move_to_history(self, job: Job) -> None:
"""Move a job to history, maintaining max_history limit."""
self._history.append(job)
if len(self._history) > self.max_history
"Move a job to history, maintaining max_history limit."def _signal_job_completed(self, job_id: str) -> None:
"""Signal that a job has completed."""
event = self._job_completed_events.get(job_id)
if event
"Signal that a job has completed."async def _process_loop(self) -> None:
"""Main processing loop."""
while self._running_flag
"Main processing loop."async def _execute_job(self, job: Job) -> None:
"""Execute a single job."""
self.logger.info(f"Starting job {job.id[:8]} ({job.plugin_name})")
# Mark as running
job.status = JobStatus.running
job.started_at = time.time()
self._running = job
try
"Execute a single job."async def _execute_with_cancellation(
self,
job: Job,
plugin: Any
) -> Any
"Execute job with cancellation monitoring."async def _poll_progress(
self,
job: Job,
plugin: Any
) -> None
"Poll progress from the plugin during execution."Classes
class JobStatus(str, Enum):
"Status of a job in the queue."@dataclass
class Job:
"A queued plugin execution request."
id: str # Unique job identifier (UUID)
plugin_name: str # Target plugin name
args: Tuple[Any, ...] # Positional arguments for execute()
kwargs: Dict[str, Any] # Keyword arguments for execute()
status: JobStatus = JobStatus.pending # Current job status
priority: int = 0 # Higher = more urgent
created_at: float = field(...) # Submission timestamp
started_at: Optional[float] # Execution start timestamp
completed_at: Optional[float] # Completion timestamp
result: Any # Execution result (if completed)
error: Optional[str] # Error message (if failed)
progress: float = 0.0 # 0.0 to 1.0, or -1.0 for indeterminate
status_message: str = '' # Descriptive status message
class JobQueue:
def __init__(
self,
manager: PluginManager, # Plugin manager instance
max_history: int = 100, # Max completed jobs to retain
cancel_timeout: float = 3.0, # Seconds to wait for cooperative cancel
progress_poll_interval: float = 1.0 # Seconds between progress polls
)
"Resource-aware job queue for sequential plugin execution."
def __init__(
self,
manager: PluginManager, # Plugin manager instance
max_history: int = 100, # Max completed jobs to retain
cancel_timeout: float = 3.0, # Seconds to wait for cooperative cancel
progress_poll_interval: float = 1.0 # Seconds between progress polls
)
"Initialize the job queue."Scheduling (scheduling.ipynb)
Resource scheduling policies for plugin execution
Import
from cjm_plugin_system.core.scheduling import (
ResourceScheduler,
PermissiveScheduler,
SafetyScheduler,
QueueScheduler
)Classes
class ResourceScheduler(ABC):
"Abstract base class for resource allocation policies."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function that returns fresh stats
) -> bool: # True if execution is allowed
"Decide if a plugin can start based on its requirements and system state."
async def allocate_async(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Awaitable[Dict[str, Any]]] # Async function returning stats
) -> bool: # True if execution is allowed
"Async allocation decision. Default delegates to sync allocate after fetching stats once."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"Notify scheduler that a task started (to reserve resources)."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"Notify scheduler that a task finished (to release resources)."class PermissiveScheduler(ResourceScheduler):
"Scheduler that allows all executions (Default / Dev Mode)."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Stats provider (ignored)
) -> bool: # Always returns True
"Allow all plugin executions without checking resources."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"No-op for permissive scheduler."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"No-op for permissive scheduler."class SafetyScheduler(ResourceScheduler):
"Scheduler that prevents execution if resources are insufficient."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function returning current stats
) -> bool: # True if resources are available
"Check resource requirements against system state."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"Called when execution starts (for future resource reservation)."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"Called when execution finishes (for future resource release)."class QueueScheduler:
def __init__(
self,
timeout: float = 300.0, # Max seconds to wait for resources
poll_interval: float = 2.0 # Seconds between resource checks
)
"Scheduler that waits for resources to become available."
def __init__(
self,
timeout: float = 300.0, # Max seconds to wait for resources
poll_interval: float = 2.0 # Seconds between resource checks
)
"Initialize queue scheduler with timeout and polling settings."
def allocate(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Dict[str, Any]] # Function returning current stats
) -> bool: # True if resources become available before timeout
"Wait for resources using blocking sleep."
async def allocate_async(
self,
plugin_meta: PluginMeta, # Metadata of the plugin requesting resources
stats_provider: Callable[[], Awaitable[Dict[str, Any]]] # Async stats function
) -> bool: # True if resources become available before timeout
"Wait for resources using non-blocking async sleep."
def on_execution_start(
self,
plugin_name: str # Name of the plugin starting execution
) -> None
"Track that a plugin has started executing."
def on_execution_finish(
self,
plugin_name: str # Name of the plugin finishing execution
) -> None
"Track that a plugin has finished executing."
def get_active_plugins(self) -> Set[str]: # Set of currently executing plugin names
"Get the set of plugins with active executions."Configuration Validation (validation.ipynb)
Validation helpers for plugin configuration dataclasses
Import
from cjm_plugin_system.utils.validation import (
T,
SCHEMA_TITLE,
SCHEMA_DESC,
SCHEMA_MIN,
SCHEMA_MAX,
SCHEMA_ENUM,
SCHEMA_MIN_LEN,
SCHEMA_MAX_LEN,
SCHEMA_PATTERN,
SCHEMA_FORMAT,
validate_field_value,
validate_config,
config_to_dict,
dict_to_config,
extract_defaults,
dataclass_to_jsonschema
)Functions
def validate_field_value(
value:Any, # Value to validate
metadata:Dict[str, Any], # Field metadata containing constraints
field_name:str="" # Field name for error messages
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
"Validate a value against field metadata constraints."def validate_config(
config:Any # Configuration dataclass instance to validate
) -> Tuple[bool, Optional[str]]: # (is_valid, error_message)
"Validate all fields in a configuration dataclass against their metadata constraints."def config_to_dict(
config:Any # Configuration dataclass instance
) -> Dict[str, Any]: # Dictionary representation of the configuration
"Convert a configuration dataclass instance to a dictionary."def dict_to_config(
config_class:Type[T], # Configuration dataclass type
data:Optional[Dict[str, Any]]=None, # Dictionary with configuration values
validate:bool=False # Whether to validate against metadata constraints
) -> T: # Instance of the configuration dataclass
"Create a configuration dataclass instance from a dictionary."def extract_defaults(
config_class:Type # Configuration dataclass type
) -> Dict[str, Any]: # Default values from the dataclass
"Extract default values from a configuration dataclass type."def _python_type_to_json_type(
python_type:type # Python type annotation to convert
) -> Dict[str, Any]: # JSON schema type definition
"Convert Python type to JSON schema type."def dataclass_to_jsonschema(
cls:type # Dataclass with field metadata
) -> Dict[str, Any]: # JSON schema dictionary
"Convert a dataclass to a JSON schema for form generation."Variables
T
SCHEMA_TITLE = 'title' # Display title for the field
SCHEMA_DESC = 'description' # Help text description
SCHEMA_MIN = 'minimum' # Minimum value for numbers
SCHEMA_MAX = 'maximum' # Maximum value for numbers
SCHEMA_ENUM = 'enum' # Allowed values for dropdowns
SCHEMA_MIN_LEN = 'minLength' # Minimum string length
SCHEMA_MAX_LEN = 'maxLength' # Maximum string length
SCHEMA_PATTERN = 'pattern' # Regex pattern for strings
SCHEMA_FORMAT = 'format' # String format (email, uri, date, etc.)Universal Worker (worker.ipynb)
FastAPI server that runs inside isolated plugin environments
Import
from cjm_plugin_system.core.worker import (
EnhancedJSONEncoder,
parent_monitor,
create_app,
run_worker
)Functions
def parent_monitor(
ppid: int # Parent process ID to monitor
) -> None
"""
Monitor parent process and terminate self if parent dies.
This implements the "Suicide Pact" pattern: if the Host process dies,
the Worker must terminate itself to prevent zombie processes.
"""def create_app(
module_name: str, # Python module path (e.g., "my_plugin.plugin")
class_name: str # Plugin class name (e.g., "WhisperPlugin")
) -> FastAPI: # Configured FastAPI application
"Create FastAPI app that hosts the specified plugin."def run_worker() -> None:
"""CLI entry point for running the worker."""
parser = argparse.ArgumentParser(description="Universal Plugin Worker")
parser.add_argument("--module", required=True, help="Plugin module path")
parser.add_argument("--class", dest="class_name", required=True, help="Plugin class name")
parser.add_argument("--port", type=int, required=True, help="Port to listen on")
parser.add_argument("--ppid", type=int, required=False, help="Parent PID to monitor")
args = parser.parse_args()
# Start watchdog if parent PID provided
if args.ppid
"CLI entry point for running the worker."Classes
class EnhancedJSONEncoder(JSONEncoder):
"JSON encoder that handles dataclasses and other common types."
def default(
self,
o: Any # Object to encode
) -> Any: # JSON-serializable representation
"Convert non-serializable objects to serializable form."