cjm-graph-plugin-system

Defines the standardized interface and data structures for Context Graph plugins, enabling the semantic linking, decomposition, and enrichment of multi-modal content.

Install

pip install cjm_graph_plugin_system

Project Structure

nbs/
├── utils/ (2)
│   ├── mermaid.ipynb  # Convert GraphContext objects to Mermaid.js diagram strings for visualization
│   └── slices.ipynb   # Typed slice dataclasses for specifying referenced content regions in SourceRef
├── core.ipynb              # DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer
└── plugin_interface.ipynb  # Domain-specific plugin interface for Context Graphs

Total: 4 notebooks across 1 directory

Module Dependencies

graph LR
    core[core<br/>Core Data Structures]
    plugin_interface[plugin_interface<br/>Graph Plugin Interface]
    utils_mermaid[utils.mermaid<br/>Mermaid Diagram Generation]
    utils_slices[utils.slices<br/>Segment Slice Specifications]

    plugin_interface --> core
    utils_mermaid --> core
    utils_slices --> core

3 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Core Data Structures (core.ipynb)

DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer

Import

from cjm_graph_plugin_system.core import (
    SourceRef,
    GraphNode,
    GraphEdge,
    GraphContext,
    GraphQuery
)

Classes

@dataclass
class SourceRef:
    "A pointer to external data in another plugin's domain."
    
    plugin_name: str  # e.g., "cjm-transcription-plugin-voxtral-hf"
    table_name: str  # e.g., "transcriptions"
    row_id: str  # e.g., "b0ceddd3-..." (typically a job_id)
    content_hash: str  # Hash of consumed content in "algo:hexdigest" format
    segment_slice: Optional[str]  # Optional slice: "char:0-500" or "timestamp:00:10-00:20"
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary."""
            return asdict(self)
    
        def verify(
            self,
            current_content: bytes  # Current content bytes to verify against stored hash
        ) -> bool:  # True if content matches the stored hash
        "Convert to dictionary."
    
    def verify(
            self,
            current_content: bytes  # Current content bytes to verify against stored hash
        ) -> bool:  # True if content matches the stored hash
        "Check if referenced content still matches the stored hash."
    
    def compute_hash(
            content: bytes,  # Content to hash
            algo: str = "sha256"  # Hash algorithm name
        ) -> str:  # Hash string in "algo:hexdigest" format
        "Compute a content hash string for use in SourceRef."
@dataclass
class GraphNode:
    "Represents an entity in the Context Graph."
    
    id: str  # UUID
    label: str  # e.g., "Person", "Concept", "Correction"
    properties: Dict[str, Any] = field(...)  # Arbitrary metadata
    sources: List[SourceRef] = field(...)  # Links to external plugins
    created_at: Optional[float]  # Unix timestamp when created
    updated_at: Optional[float]  # Unix timestamp when last updated
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary with nested sources."""
            return {
                "id": self.id,
        "Convert to dictionary with nested sources."
@dataclass
class GraphEdge:
    "Represents a relationship between two nodes."
    
    id: str  # UUID
    source_id: str  # Origin node UUID
    target_id: str  # Destination node UUID
    relation_type: str  # e.g., "MENTIONS", "CORRECTS", "AUTHORED_BY"
    properties: Dict[str, Any] = field(...)  # Arbitrary metadata
    created_at: Optional[float]  # Unix timestamp when created
    updated_at: Optional[float]  # Unix timestamp when last updated
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
        "Convert to dictionary."
@dataclass
class GraphContext:
    "Container for graph query results (a subgraph)."
    
    nodes: List[GraphNode]  # Nodes in the subgraph
    edges: List[GraphEdge]  # Edges in the subgraph
    metadata: Dict[str, Any] = field(...)  # Query metadata, stats, etc.
    
    def to_temp_file(self) -> str:  # Absolute path to temporary JSON file
            """Save graph data to a temp file for zero-copy transfer."""
            tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
            
            data = {
                "nodes": [n.to_dict() for n in self.nodes],
        "Save graph data to a temp file for zero-copy transfer."
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
            """Convert to dictionary."""
            return {
                "nodes": [n.to_dict() for n in self.nodes],
        "Convert to dictionary."
    
    def from_file(
            cls,
            filepath: str  # Path to JSON file
        ) -> "GraphContext":  # Reconstructed GraphContext
        "Load graph context from a JSON file."
    
    def from_dict(
            cls,
            data: Dict[str, Any]  # Dictionary with nodes, edges, metadata
        ) -> "GraphContext":  # Reconstructed GraphContext
        "Load graph context from a dictionary."
@dataclass
class GraphQuery:
    "A standardized query object for graph operations."
    
    query: str  # Raw query string (SQL, Cypher, etc.)
    parameters: Dict[str, Any] = field(...)  # Query parameters
    limit: int = 100  # Max results to return
    depth: int = 1  # Traversal depth for neighborhood queries
    
    def to_dict(self) -> Dict[str, Any]:  # Dictionary representation for JSON serialization
        "Convert to dictionary."

Mermaid Diagram Generation (mermaid.ipynb)

Convert GraphContext objects to Mermaid.js diagram strings for visualization

Import

from cjm_graph_plugin_system.utils.mermaid import (
    context_to_mermaid
)

Functions

def context_to_mermaid(
    ctx: GraphContext,  # The GraphContext to visualize
    direction: str = "TD",  # Diagram direction: "TD" (top-down) or "LR" (left-right)
    node_color_map: Optional[Dict[str, str]] = None  # Map of node labels to CSS colors
) -> str:  # Mermaid.js diagram string
    "Convert a GraphContext into a Mermaid.js diagram string."

Graph Plugin Interface (plugin_interface.ipynb)

Domain-specific plugin interface for Context Graphs

Import

from cjm_graph_plugin_system.plugin_interface import (
    GraphPlugin
)

Classes

class GraphPlugin(PluginInterface):
    "Abstract base class for all Context Graph plugins."
    
    def execute(
            self,
            action: str = "get_schema",  # Action to perform (see docstring for available actions)
            **kwargs
        ) -> Dict[str, Any]:  # JSON-serializable result
        "Execute a graph operation. This is the main entry point for RemotePluginProxy.

Dispatches to the appropriate method based on `action` parameter.
All return values are JSON-serializable dictionaries for HTTP transport."
    
    def add_nodes(
            self,
            nodes: List[GraphNode]  # Nodes to create
        ) -> List[str]:  # Created node IDs
        "Bulk create nodes."
    
    def add_edges(
            self,
            edges: List[GraphEdge]  # Edges to create
        ) -> List[str]:  # Created edge IDs
        "Bulk create edges."
    
    def get_node(
            self,
            node_id: str  # UUID of node to retrieve
        ) -> Optional[GraphNode]:  # Node or None if not found
        "Get a single node by ID."
    
    def get_edge(
            self,
            edge_id: str  # UUID of edge to retrieve
        ) -> Optional[GraphEdge]:  # Edge or None if not found
        "Get a single edge by ID."
    
    def get_context(
            self,
            node_id: str,  # Starting node UUID
            depth: int = 1,  # Traversal depth (1 = immediate neighbors)
            filter_labels: Optional[List[str]] = None  # Only include nodes with these labels
        ) -> GraphContext:  # Subgraph containing node and its neighborhood
        "Get the neighborhood of a specific node."
    
    def find_nodes_by_source(
            self,
            source_ref: SourceRef  # External resource reference
        ) -> List[GraphNode]:  # Nodes attached to this source
        "Find all nodes linked to a specific external resource."
    
    def find_nodes_by_label(
            self,
            label: str,  # Node label to search for
            limit: int = 100  # Max results
        ) -> List[GraphNode]:  # Matching nodes
        "Find nodes by label."
    
    def update_node(
            self,
            node_id: str,  # UUID of node to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of node properties."
    
    def update_edge(
            self,
            edge_id: str,  # UUID of edge to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of edge properties."
    
    def delete_nodes(
            self,
            node_ids: List[str],  # UUIDs of nodes to delete
            cascade: bool = True  # Also delete connected edges
        ) -> int:  # Number of nodes deleted
        "Delete nodes (and optionally connected edges)."
    
    def delete_edges(
            self,
            edge_ids: List[str]  # UUIDs of edges to delete
        ) -> int:  # Number of edges deleted
        "Delete edges."
    
    def get_schema(self) -> Dict[str, Any]:  # Graph schema/ontology
            """Return the current ontology/schema of the graph."""
            ...
    
        @abstractmethod
        def import_graph(
            self,
            graph_data: GraphContext,  # Data to import
            merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
        ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        "Return the current ontology/schema of the graph."
    
    def import_graph(
            self,
            graph_data: GraphContext,  # Data to import
            merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
        ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, ...}
        "Bulk import a GraphContext (e.g., from backup or another plugin)."
    
    def export_graph(
            self,
            filter_query: Optional[GraphQuery] = None  # Optional filter
        ) -> GraphContext:  # Exported subgraph or full graph
        "Export the entire graph or a filtered subset."

Segment Slice Specifications (slices.ipynb)

Typed slice dataclasses for specifying referenced content regions in SourceRef

Import

from cjm_graph_plugin_system.utils.slices import (
    SliceSpec,
    CharSlice,
    TimestampSlice,
    FrameSlice,
    PageSlice,
    LineSlice,
    FullContent,
    parse_slice
)

Functions

def parse_slice(
    s: str  # Slice string to parse (e.g., "char:0-500", "timestamp:10.5-30.0")
) -> SliceSpec:  # Parsed slice specification
    "Parse a slice string into a typed SliceSpec."

Classes

@runtime_checkable
class SliceSpec(Protocol):
    "Protocol for typed segment slice specifications."
    
    def to_slice_string(self) -> str:  # Serialized slice string for SourceRef.segment_slice
        "Serialize to a slice string."
@dataclass
class CharSlice:
    "Character-range slice for text content."
    
    start: int  # Start character index (0-indexed)
    end: int  # End character index (exclusive)
    
    def to_slice_string(self) -> str:  # e.g., "char:0-500"
        "Serialize to slice string."
@dataclass
class TimestampSlice:
    "Temporal slice for audio/video content."
    
    start: float  # Start time in seconds
    end: float  # End time in seconds
    
    def to_slice_string(self) -> str:  # e.g., "timestamp:10.5-30.0"
        "Serialize to slice string."
@dataclass
class FrameSlice:
    "Frame-range slice for video content."
    
    start: int  # Start frame number
    end: int  # End frame number
    
    def to_slice_string(self) -> str:  # e.g., "frame:0-120"
        "Serialize to slice string."
@dataclass
class PageSlice:
    "Page slice for document content (PDFs, EPUBs)."
    
    page: int  # Page number (1-indexed)
    bbox: Optional[str]  # Optional bounding box "x1,y1,x2,y2"
    
    def to_slice_string(self) -> str:  # e.g., "page:3" or "page:3:bbox:10,20,300,400"
            """Serialize to slice string."""
            if self.bbox
        "Serialize to slice string."
@dataclass
class LineSlice:
    "Line-range slice for code or structured text."
    
    start: int  # Start line number (0-indexed)
    end: int  # End line number (exclusive)
    
    def to_slice_string(self) -> str:  # e.g., "line:10-25"
        "Serialize to slice string."
@dataclass
class FullContent:
    "Reference to complete content (no slicing)."
    
    content_type: str = 'text'  # Content type: "text", "audio", "image", etc.
    
    def to_slice_string(self) -> str:  # e.g., "full_text", "full_audio"
        "Serialize to slice string."