Graph Plugin Interface

Domain-specific plugin interface for Context Graphs

GraphPlugin

Abstract base class for all Context Graph plugins. Provides a standardized interface for:

  • CRUD Operations: Create, read, update, delete nodes and edges
  • Traversal: Get neighborhood context around a node
  • Querying: Execute raw queries or structured searches
  • Lifecycle: Import/export graph data, introspect schema

Usage via RemotePluginProxy / JobQueue:

The execute() method is the main entry point when called via HTTP proxy. It dispatches to the appropriate method based on the action parameter:

# Via JobQueue
job_id = await queue.submit(
    "cjm-graph-plugin-sqlite",
    action="add_nodes",
    nodes=[node1.to_dict(), node2.to_dict()]
)

# Via PluginManager
result = manager.execute_plugin(
    "cjm-graph-plugin-sqlite",
    action="get_context",
    node_id="abc-123",
    depth=2
)

Available Actions:

Action Description Required kwargs
get_schema Get graph ontology (default) -
add_nodes Create nodes nodes (list of dicts)
add_edges Create edges edges (list of dicts)
get_node Get single node node_id
get_edge Get single edge edge_id
get_context Neighborhood traversal node_id, optional: depth, filter_labels
find_nodes_by_source Find by external reference source_ref (dict)
find_nodes_by_label Find by label label, optional: limit
update_node Update node properties node_id, properties
update_edge Update edge properties edge_id, properties
delete_nodes Delete nodes node_ids, optional: cascade
delete_edges Delete edges edge_ids
import_graph Bulk import graph_data (dict), optional: merge_strategy
export_graph Bulk export optional: filter_query
query Raw query execution query (string or dict)

source

GraphPlugin


def GraphPlugin(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Abstract base class for all Context Graph plugins.

Interface Documentation

CREATE Operations


source

GraphPlugin.add_nodes


def add_nodes(
    nodes:List, # Nodes to create
)->List: # Created node IDs

Bulk create nodes.


source

GraphPlugin.add_edges


def add_edges(
    edges:List, # Edges to create
)->List: # Created edge IDs

Bulk create edges.

READ Operations


source

GraphPlugin.get_node


def get_node(
    node_id:str, # UUID of node to retrieve
)->Optional: # Node or None if not found

Get a single node by ID.


source

GraphPlugin.get_edge


def get_edge(
    edge_id:str, # UUID of edge to retrieve
)->Optional: # Edge or None if not found

Get a single edge by ID.


source

GraphPlugin.get_context


def get_context(
    node_id:str, # Starting node UUID
    depth:int=1, # Traversal depth (1 = immediate neighbors)
    filter_labels:Optional=None, # Only include nodes with these labels
)->GraphContext: # Subgraph containing node and its neighborhood

Get the neighborhood of a specific node.


source

GraphPlugin.find_nodes_by_source


def find_nodes_by_source(
    source_ref:SourceRef, # External resource reference
)->List: # Nodes attached to this source

Find all nodes linked to a specific external resource.


source

GraphPlugin.find_nodes_by_label


def find_nodes_by_label(
    label:str, # Node label to search for
    limit:int=100, # Max results
)->List: # Matching nodes

Find nodes by label.


source

GraphPlugin.execute


def execute(
    action:str='get_schema', # Action to perform (see docstring for available actions)
    kwargs:VAR_KEYWORD
)->Dict: # JSON-serializable result

Execute a graph operation. This is the main entry point for RemotePluginProxy.

Dispatches to the appropriate method based on action parameter. All return values are JSON-serializable dictionaries for HTTP transport.

UPDATE Operations


source

GraphPlugin.update_node


def update_node(
    node_id:str, # UUID of node to update
    properties:Dict, # Properties to merge/update
)->bool: # True if successful

Partial update of node properties.


source

GraphPlugin.update_edge


def update_edge(
    edge_id:str, # UUID of edge to update
    properties:Dict, # Properties to merge/update
)->bool: # True if successful

Partial update of edge properties.

DELETE Operations


source

GraphPlugin.delete_nodes


def delete_nodes(
    node_ids:List, # UUIDs of nodes to delete
    cascade:bool=True, # Also delete connected edges
)->int: # Number of nodes deleted

Delete nodes (and optionally connected edges).


source

GraphPlugin.delete_edges


def delete_edges(
    edge_ids:List, # UUIDs of edges to delete
)->int: # Number of edges deleted

Delete edges.

LIFECYCLE Operations


source

GraphPlugin.get_schema


def get_schema(
    
)->Dict: # Graph schema/ontology

Return the current ontology/schema of the graph.


source

GraphPlugin.import_graph


def import_graph(
    graph_data:GraphContext, # Data to import
    merge_strategy:str='overwrite', # "overwrite", "skip", or "merge"
)->Dict: # Import statistics {nodes_created, edges_created, ...}

Bulk import a GraphContext (e.g., from backup or another plugin).


source

GraphPlugin.export_graph


def export_graph(
    filter_query:Optional=None, # Optional filter
)->GraphContext: # Exported subgraph or full graph

Export the entire graph or a filtered subset.

Example Schema Return Value

The get_schema() method returns information about the current graph ontology:

{
    "node_labels": ["Person", "Concept", "Correction", "Event"],
    "edge_types": ["MENTIONS", "CORRECTS", "AUTHORED_BY", "RELATED_TO"],
    "counts": {
        "nodes": {"Person": 12, "Concept": 45, "Correction": 3},
        "edges": {"MENTIONS": 78, "CORRECTS": 5}
    },
    "properties": {
        "Person": ["name", "role", "confidence"],
        "Concept": ["name", "definition", "aliases"]
    }
}

This is useful for UIs that need to know available labels and types for filtering/querying.

Example Implementation

A minimal in-memory implementation demonstrating the interface:

class InMemoryGraphPlugin(GraphPlugin):
    """Simple in-memory implementation for testing."""

    def __init__(self):
        self._nodes: Dict[str, GraphNode] = {}
        self._edges: Dict[str, GraphEdge] = {}
        self._config: Dict[str, Any] = {}

    @property
    def name(self) -> str:
        return "in-memory-graph"

    @property
    def version(self) -> str:
        return "1.0.0"

    def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
        self._config = config or {}

    def get_config_schema(self) -> Dict[str, Any]:
        return {"type": "object", "properties": {}}

    def get_current_config(self) -> Dict[str, Any]:
        return self._config

    def cleanup(self) -> None:
        self._nodes.clear()
        self._edges.clear()

    # -------------------------------------------------------------------------
    # EXECUTE - Main dispatcher for RemotePluginProxy
    # -------------------------------------------------------------------------

    def execute(self, action: str = "get_schema", **kwargs) -> Dict[str, Any]:
        """Dispatch to appropriate method based on action."""
        
        if action == "get_schema":
            return self.get_schema()
        
        elif action == "add_nodes":
            # Convert dicts to GraphNode objects
            nodes = [GraphNode(**n) if isinstance(n, dict) else n for n in kwargs.get("nodes", [])]
            ids = self.add_nodes(nodes)
            return {"created_ids": ids, "count": len(ids)}
        
        elif action == "add_edges":
            edges = [GraphEdge(**e) if isinstance(e, dict) else e for e in kwargs.get("edges", [])]
            ids = self.add_edges(edges)
            return {"created_ids": ids, "count": len(ids)}
        
        elif action == "get_node":
            node = self.get_node(kwargs["node_id"])
            return {"node": node.to_dict() if node else None}
        
        elif action == "get_edge":
            edge = self.get_edge(kwargs["edge_id"])
            return {"edge": edge.to_dict() if edge else None}
        
        elif action == "get_context":
            ctx = self.get_context(
                kwargs["node_id"],
                depth=kwargs.get("depth", 1),
                filter_labels=kwargs.get("filter_labels")
            )
            return ctx.to_dict()
        
        elif action == "find_nodes_by_source":
            ref_data = kwargs["source_ref"]
            ref = SourceRef(**ref_data) if isinstance(ref_data, dict) else ref_data
            nodes = self.find_nodes_by_source(ref)
            return {"nodes": [n.to_dict() for n in nodes], "count": len(nodes)}
        
        elif action == "find_nodes_by_label":
            nodes = self.find_nodes_by_label(kwargs["label"], limit=kwargs.get("limit", 100))
            return {"nodes": [n.to_dict() for n in nodes], "count": len(nodes)}
        
        elif action == "update_node":
            success = self.update_node(kwargs["node_id"], kwargs["properties"])
            return {"success": success}
        
        elif action == "update_edge":
            success = self.update_edge(kwargs["edge_id"], kwargs["properties"])
            return {"success": success}
        
        elif action == "delete_nodes":
            count = self.delete_nodes(kwargs["node_ids"], cascade=kwargs.get("cascade", True))
            return {"deleted_count": count}
        
        elif action == "delete_edges":
            count = self.delete_edges(kwargs["edge_ids"])
            return {"deleted_count": count}
        
        elif action == "import_graph":
            graph_data = kwargs["graph_data"]
            if isinstance(graph_data, dict):
                graph_data = GraphContext.from_dict(graph_data)
            stats = self.import_graph(graph_data, merge_strategy=kwargs.get("merge_strategy", "overwrite"))
            return stats
        
        elif action == "export_graph":
            ctx = self.export_graph(filter_query=kwargs.get("filter_query"))
            return ctx.to_dict()
        
        else:
            raise ValueError(f"Unknown action: {action}")

    # -------------------------------------------------------------------------
    # CREATE
    # -------------------------------------------------------------------------

    def add_nodes(self, nodes: List[GraphNode]) -> List[str]:
        ids = []
        for node in nodes:
            self._nodes[node.id] = node
            ids.append(node.id)
        return ids

    def add_edges(self, edges: List[GraphEdge]) -> List[str]:
        ids = []
        for edge in edges:
            self._edges[edge.id] = edge
            ids.append(edge.id)
        return ids

    # -------------------------------------------------------------------------
    # READ
    # -------------------------------------------------------------------------

    def get_node(self, node_id: str) -> Optional[GraphNode]:
        return self._nodes.get(node_id)

    def get_edge(self, edge_id: str) -> Optional[GraphEdge]:
        return self._edges.get(edge_id)

    def get_context(self, node_id: str, depth: int = 1, filter_labels: Optional[List[str]] = None) -> GraphContext:
        nodes = []
        edges = []
        
        if node_id in self._nodes:
            nodes.append(self._nodes[node_id])
            
        for edge in self._edges.values():
            if edge.source_id == node_id or edge.target_id == node_id:
                edges.append(edge)
                other_id = edge.target_id if edge.source_id == node_id else edge.source_id
                if other_id in self._nodes:
                    neighbor = self._nodes[other_id]
                    if filter_labels is None or neighbor.label in filter_labels:
                        if neighbor not in nodes:
                            nodes.append(neighbor)
                            
        return GraphContext(nodes=nodes, edges=edges, metadata={"depth": depth})

    def find_nodes_by_source(self, source_ref: SourceRef) -> List[GraphNode]:
        results = []
        for node in self._nodes.values():
            for src in node.sources:
                if (src.plugin_name == source_ref.plugin_name and
                    src.table_name == source_ref.table_name and
                    src.row_id == source_ref.row_id):
                    results.append(node)
                    break
        return results

    def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]:
        return [n for n in self._nodes.values() if n.label == label][:limit]

    # -------------------------------------------------------------------------
    # UPDATE
    # -------------------------------------------------------------------------

    def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool:
        if node_id not in self._nodes:
            return False
        self._nodes[node_id].properties.update(properties)
        return True

    def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool:
        if edge_id not in self._edges:
            return False
        self._edges[edge_id].properties.update(properties)
        return True

    # -------------------------------------------------------------------------
    # DELETE
    # -------------------------------------------------------------------------

    def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int:
        count = 0
        for nid in node_ids:
            if nid in self._nodes:
                del self._nodes[nid]
                count += 1
                if cascade:
                    to_remove = [eid for eid, e in self._edges.items()
                                 if e.source_id == nid or e.target_id == nid]
                    for eid in to_remove:
                        del self._edges[eid]
        return count

    def delete_edges(self, edge_ids: List[str]) -> int:
        count = 0
        for eid in edge_ids:
            if eid in self._edges:
                del self._edges[eid]
                count += 1
        return count

    # -------------------------------------------------------------------------
    # LIFECYCLE
    # -------------------------------------------------------------------------

    def get_schema(self) -> Dict[str, Any]:
        labels = set(n.label for n in self._nodes.values())
        types = set(e.relation_type for e in self._edges.values())
        return {
            "node_labels": list(labels),
            "edge_types": list(types),
            "counts": {
                "nodes": len(self._nodes),
                "edges": len(self._edges)
            }
        }

    def import_graph(self, graph_data: GraphContext, merge_strategy: str = "overwrite") -> Dict[str, int]:
        nodes_created = 0
        edges_created = 0
        
        for node in graph_data.nodes:
            if merge_strategy == "skip" and node.id in self._nodes:
                continue
            self._nodes[node.id] = node
            nodes_created += 1
            
        for edge in graph_data.edges:
            if merge_strategy == "skip" and edge.id in self._edges:
                continue
            self._edges[edge.id] = edge
            edges_created += 1
            
        return {"nodes_created": nodes_created, "edges_created": edges_created}

    def export_graph(self, filter_query: Optional[GraphQuery] = None) -> GraphContext:
        return GraphContext(
            nodes=list(self._nodes.values()),
            edges=list(self._edges.values()),
            metadata={"exported_at": "now"}
        )
import uuid

# Test the example plugin
plugin = InMemoryGraphPlugin()
plugin.initialize()

print(f"Plugin: {plugin.name} v{plugin.version}")
Plugin: in-memory-graph v1.0.0
# Create some nodes
alice_id = str(uuid.uuid4())
bob_id = str(uuid.uuid4())
ml_id = str(uuid.uuid4())

nodes = [
    GraphNode(id=alice_id, label="Person", properties={"name": "Alice"}),
    GraphNode(id=bob_id, label="Person", properties={"name": "Bob"}),
    GraphNode(id=ml_id, label="Concept", properties={"name": "Machine Learning"})
]

created_ids = plugin.add_nodes(nodes)
print(f"Created {len(created_ids)} nodes")
Created 3 nodes
# Create edges
edges = [
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=ml_id, relation_type="MENTIONS"),
    GraphEdge(id=str(uuid.uuid4()), source_id=bob_id, target_id=ml_id, relation_type="MENTIONS"),
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=bob_id, relation_type="KNOWS")
]

created_ids = plugin.add_edges(edges)
print(f"Created {len(created_ids)} edges")
Created 3 edges
# Test get_context (neighborhood traversal)
context = plugin.get_context(alice_id, depth=1)
print(f"Alice's neighborhood: {len(context.nodes)} nodes, {len(context.edges)} edges")
print(f"Neighbors: {[n.properties.get('name', n.label) for n in context.nodes]}")
Alice's neighborhood: 3 nodes, 2 edges
Neighbors: ['Alice', 'Machine Learning', 'Bob']
# Test find_nodes_by_label
people = plugin.find_nodes_by_label("Person")
print(f"People: {[p.properties['name'] for p in people]}")
People: ['Alice', 'Bob']
# Test get_schema
schema = plugin.get_schema()
print(f"Schema: {schema}")
Schema: {'node_labels': ['Person', 'Concept'], 'edge_types': ['KNOWS', 'MENTIONS'], 'counts': {'nodes': 3, 'edges': 3}}
# Test update_node
plugin.update_node(alice_id, {"role": "speaker", "confidence": 0.95})
alice = plugin.get_node(alice_id)
print(f"Updated Alice: {alice.properties}")
Updated Alice: {'name': 'Alice', 'role': 'speaker', 'confidence': 0.95}
# Test export/import
exported = plugin.export_graph()
print(f"Exported: {len(exported.nodes)} nodes, {len(exported.edges)} edges")

# Import into a new plugin instance
new_plugin = InMemoryGraphPlugin()
new_plugin.initialize()
stats = new_plugin.import_graph(exported)
print(f"Import stats: {stats}")
Exported: 3 nodes, 3 edges
Import stats: {'nodes_created': 3, 'edges_created': 3}
# Test delete with cascade
deleted = plugin.delete_nodes([alice_id], cascade=True)
print(f"Deleted {deleted} node(s)")
print(f"Remaining: {plugin.get_schema()}")
Deleted 1 node(s)
Remaining: {'node_labels': ['Person', 'Concept'], 'edge_types': ['MENTIONS'], 'counts': {'nodes': 2, 'edges': 1}}
# Cleanup
plugin.cleanup()
new_plugin.cleanup()