cjm-graph-plugin-system
Defines the standardized interface and data structures for Context Graph plugins, enabling the semantic linking, decomposition, and enrichment of multi-modal content.
Install
pip install cjm_graph_plugin_systemProject Structure
nbs/
├── utils/ (2)
│ ├── mermaid.ipynb # Convert GraphContext objects to Mermaid.js diagram strings for visualization
│ └── slices.ipynb # Typed slice dataclasses for specifying referenced content regions in SourceRef
├── core.ipynb # DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer
└── plugin_interface.ipynb # Domain-specific plugin interface for Context Graphs
Total: 4 notebooks across 1 directory
Module Dependencies
graph LR
core[core<br/>Core Data Structures]
plugin_interface[plugin_interface<br/>Graph Plugin Interface]
utils_mermaid[utils.mermaid<br/>Mermaid Diagram Generation]
utils_slices[utils.slices<br/>Segment Slice Specifications]
plugin_interface --> core
utils_mermaid --> core
utils_slices --> core
3 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Core Data Structures (core.ipynb)
DTOs for Context Graph operations with FileBackedDTO support for zero-copy transfer
Import
from cjm_graph_plugin_system.core import (
SourceRef,
GraphNode,
GraphEdge,
GraphContext,
GraphQuery
)Classes
@dataclass
class SourceRef:
"A pointer to external data in another plugin's domain."
plugin_name: str # e.g., "cjm-transcription-plugin-voxtral-hf"
table_name: str # e.g., "transcriptions"
row_id: str # e.g., "b0ceddd3-..." (typically a job_id)
content_hash: str # Hash of consumed content in "algo:hexdigest" format
segment_slice: Optional[str] # Optional slice: "char:0-500" or "timestamp:00:10-00:20"
def to_dict(self) -> Dict[str, Any]: # Dictionary representation for JSON serialization
"""Convert to dictionary."""
return asdict(self)
def verify(
self,
current_content: bytes # Current content bytes to verify against stored hash
) -> bool: # True if content matches the stored hash
"Convert to dictionary."
def verify(
self,
current_content: bytes # Current content bytes to verify against stored hash
) -> bool: # True if content matches the stored hash
"Check if referenced content still matches the stored hash."
def compute_hash(
content: bytes, # Content to hash
algo: str = "sha256" # Hash algorithm name
) -> str: # Hash string in "algo:hexdigest" format
"Compute a content hash string for use in SourceRef."@dataclass
class GraphNode:
"Represents an entity in the Context Graph."
id: str # UUID
label: str # e.g., "Person", "Concept", "Correction"
properties: Dict[str, Any] = field(...) # Arbitrary metadata
sources: List[SourceRef] = field(...) # Links to external plugins
created_at: Optional[float] # Unix timestamp when created
updated_at: Optional[float] # Unix timestamp when last updated
def to_dict(self) -> Dict[str, Any]: # Dictionary representation for JSON serialization
"""Convert to dictionary with nested sources."""
return {
"id": self.id,
"Convert to dictionary with nested sources."@dataclass
class GraphEdge:
"Represents a relationship between two nodes."
id: str # UUID
source_id: str # Origin node UUID
target_id: str # Destination node UUID
relation_type: str # e.g., "MENTIONS", "CORRECTS", "AUTHORED_BY"
properties: Dict[str, Any] = field(...) # Arbitrary metadata
created_at: Optional[float] # Unix timestamp when created
updated_at: Optional[float] # Unix timestamp when last updated
def to_dict(self) -> Dict[str, Any]: # Dictionary representation for JSON serialization
"Convert to dictionary."@dataclass
class GraphContext:
"Container for graph query results (a subgraph)."
nodes: List[GraphNode] # Nodes in the subgraph
edges: List[GraphEdge] # Edges in the subgraph
metadata: Dict[str, Any] = field(...) # Query metadata, stats, etc.
def to_temp_file(self) -> str: # Absolute path to temporary JSON file
"""Save graph data to a temp file for zero-copy transfer."""
tmp = tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode='w')
data = {
"nodes": [n.to_dict() for n in self.nodes],
"Save graph data to a temp file for zero-copy transfer."
def to_dict(self) -> Dict[str, Any]: # Dictionary representation for JSON serialization
"""Convert to dictionary."""
return {
"nodes": [n.to_dict() for n in self.nodes],
"Convert to dictionary."
def from_file(
cls,
filepath: str # Path to JSON file
) -> "GraphContext": # Reconstructed GraphContext
"Load graph context from a JSON file."
def from_dict(
cls,
data: Dict[str, Any] # Dictionary with nodes, edges, metadata
) -> "GraphContext": # Reconstructed GraphContext
"Load graph context from a dictionary."@dataclass
class GraphQuery:
"A standardized query object for graph operations."
query: str # Raw query string (SQL, Cypher, etc.)
parameters: Dict[str, Any] = field(...) # Query parameters
limit: int = 100 # Max results to return
depth: int = 1 # Traversal depth for neighborhood queries
def to_dict(self) -> Dict[str, Any]: # Dictionary representation for JSON serialization
"Convert to dictionary."Mermaid Diagram Generation (mermaid.ipynb)
Convert GraphContext objects to Mermaid.js diagram strings for visualization
Import
from cjm_graph_plugin_system.utils.mermaid import (
context_to_mermaid
)Functions
def context_to_mermaid(
ctx: GraphContext, # The GraphContext to visualize
direction: str = "TD", # Diagram direction: "TD" (top-down) or "LR" (left-right)
node_color_map: Optional[Dict[str, str]] = None # Map of node labels to CSS colors
) -> str: # Mermaid.js diagram string
"Convert a GraphContext into a Mermaid.js diagram string."Graph Plugin Interface (plugin_interface.ipynb)
Domain-specific plugin interface for Context Graphs
Import
from cjm_graph_plugin_system.plugin_interface import (
GraphPlugin
)Classes
class GraphPlugin(PluginInterface):
"Abstract base class for all Context Graph plugins."
def execute(
self,
action: str = "get_schema", # Action to perform (see docstring for available actions)
**kwargs
) -> Dict[str, Any]: # JSON-serializable result
"Execute a graph operation. This is the main entry point for RemotePluginProxy.
Dispatches to the appropriate method based on `action` parameter.
All return values are JSON-serializable dictionaries for HTTP transport."
def add_nodes(
self,
nodes: List[GraphNode] # Nodes to create
) -> List[str]: # Created node IDs
"Bulk create nodes."
def add_edges(
self,
edges: List[GraphEdge] # Edges to create
) -> List[str]: # Created edge IDs
"Bulk create edges."
def get_node(
self,
node_id: str # UUID of node to retrieve
) -> Optional[GraphNode]: # Node or None if not found
"Get a single node by ID."
def get_edge(
self,
edge_id: str # UUID of edge to retrieve
) -> Optional[GraphEdge]: # Edge or None if not found
"Get a single edge by ID."
def get_context(
self,
node_id: str, # Starting node UUID
depth: int = 1, # Traversal depth (1 = immediate neighbors)
filter_labels: Optional[List[str]] = None # Only include nodes with these labels
) -> GraphContext: # Subgraph containing node and its neighborhood
"Get the neighborhood of a specific node."
def find_nodes_by_source(
self,
source_ref: SourceRef # External resource reference
) -> List[GraphNode]: # Nodes attached to this source
"Find all nodes linked to a specific external resource."
def find_nodes_by_label(
self,
label: str, # Node label to search for
limit: int = 100 # Max results
) -> List[GraphNode]: # Matching nodes
"Find nodes by label."
def update_node(
self,
node_id: str, # UUID of node to update
properties: Dict[str, Any] # Properties to merge/update
) -> bool: # True if successful
"Partial update of node properties."
def update_edge(
self,
edge_id: str, # UUID of edge to update
properties: Dict[str, Any] # Properties to merge/update
) -> bool: # True if successful
"Partial update of edge properties."
def delete_nodes(
self,
node_ids: List[str], # UUIDs of nodes to delete
cascade: bool = True # Also delete connected edges
) -> int: # Number of nodes deleted
"Delete nodes (and optionally connected edges)."
def delete_edges(
self,
edge_ids: List[str] # UUIDs of edges to delete
) -> int: # Number of edges deleted
"Delete edges."
def get_schema(self) -> Dict[str, Any]: # Graph schema/ontology
"""Return the current ontology/schema of the graph."""
...
@abstractmethod
def import_graph(
self,
graph_data: GraphContext, # Data to import
merge_strategy: str = "overwrite" # "overwrite", "skip", or "merge"
) -> Dict[str, int]: # Import statistics {nodes_created, edges_created, ...}
"Return the current ontology/schema of the graph."
def import_graph(
self,
graph_data: GraphContext, # Data to import
merge_strategy: str = "overwrite" # "overwrite", "skip", or "merge"
) -> Dict[str, int]: # Import statistics {nodes_created, edges_created, ...}
"Bulk import a GraphContext (e.g., from backup or another plugin)."
def export_graph(
self,
filter_query: Optional[GraphQuery] = None # Optional filter
) -> GraphContext: # Exported subgraph or full graph
"Export the entire graph or a filtered subset."Segment Slice Specifications (slices.ipynb)
Typed slice dataclasses for specifying referenced content regions in SourceRef
Import
from cjm_graph_plugin_system.utils.slices import (
SliceSpec,
CharSlice,
TimestampSlice,
FrameSlice,
PageSlice,
LineSlice,
FullContent,
parse_slice
)Functions
def parse_slice(
s: str # Slice string to parse (e.g., "char:0-500", "timestamp:10.5-30.0")
) -> SliceSpec: # Parsed slice specification
"Parse a slice string into a typed SliceSpec."Classes
@runtime_checkable
class SliceSpec(Protocol):
"Protocol for typed segment slice specifications."
def to_slice_string(self) -> str: # Serialized slice string for SourceRef.segment_slice
"Serialize to a slice string."@dataclass
class CharSlice:
"Character-range slice for text content."
start: int # Start character index (0-indexed)
end: int # End character index (exclusive)
def to_slice_string(self) -> str: # e.g., "char:0-500"
"Serialize to slice string."@dataclass
class TimestampSlice:
"Temporal slice for audio/video content."
start: float # Start time in seconds
end: float # End time in seconds
def to_slice_string(self) -> str: # e.g., "timestamp:10.5-30.0"
"Serialize to slice string."@dataclass
class FrameSlice:
"Frame-range slice for video content."
start: int # Start frame number
end: int # End frame number
def to_slice_string(self) -> str: # e.g., "frame:0-120"
"Serialize to slice string."@dataclass
class PageSlice:
"Page slice for document content (PDFs, EPUBs)."
page: int # Page number (1-indexed)
bbox: Optional[str] # Optional bounding box "x1,y1,x2,y2"
def to_slice_string(self) -> str: # e.g., "page:3" or "page:3:bbox:10,20,300,400"
"""Serialize to slice string."""
if self.bbox
"Serialize to slice string."@dataclass
class LineSlice:
"Line-range slice for code or structured text."
start: int # Start line number (0-indexed)
end: int # End line number (exclusive)
def to_slice_string(self) -> str: # e.g., "line:10-25"
"Serialize to slice string."@dataclass
class FullContent:
"Reference to complete content (no slicing)."
content_type: str = 'text' # Content type: "text", "audio", "image", etc.
def to_slice_string(self) -> str: # e.g., "full_text", "full_audio"
"Serialize to slice string."