class InMemoryGraphPlugin(GraphPlugin):
"""Simple in-memory implementation for testing."""
def __init__(self):
self._nodes: Dict[str, GraphNode] = {}
self._edges: Dict[str, GraphEdge] = {}
self._config: Dict[str, Any] = {}
@property
def name(self) -> str:
return "in-memory-graph"
@property
def version(self) -> str:
return "1.0.0"
def initialize(self, config: Optional[Dict[str, Any]] = None) -> None:
self._config = config or {}
def get_config_schema(self) -> Dict[str, Any]:
return {"type": "object", "properties": {}}
def get_current_config(self) -> Dict[str, Any]:
return self._config
def cleanup(self) -> None:
self._nodes.clear()
self._edges.clear()
# -------------------------------------------------------------------------
# EXECUTE - Main dispatcher for RemotePluginProxy
# -------------------------------------------------------------------------
def execute(self, action: str = "get_schema", **kwargs) -> Dict[str, Any]:
"""Dispatch to appropriate method based on action."""
if action == "get_schema":
return self.get_schema()
elif action == "add_nodes":
# Convert dicts to GraphNode objects
nodes = [GraphNode(**n) if isinstance(n, dict) else n for n in kwargs.get("nodes", [])]
ids = self.add_nodes(nodes)
return {"created_ids": ids, "count": len(ids)}
elif action == "add_edges":
edges = [GraphEdge(**e) if isinstance(e, dict) else e for e in kwargs.get("edges", [])]
ids = self.add_edges(edges)
return {"created_ids": ids, "count": len(ids)}
elif action == "get_node":
node = self.get_node(kwargs["node_id"])
return {"node": node.to_dict() if node else None}
elif action == "get_edge":
edge = self.get_edge(kwargs["edge_id"])
return {"edge": edge.to_dict() if edge else None}
elif action == "get_context":
ctx = self.get_context(
kwargs["node_id"],
depth=kwargs.get("depth", 1),
filter_labels=kwargs.get("filter_labels")
)
return ctx.to_dict()
elif action == "find_nodes_by_source":
ref_data = kwargs["source_ref"]
ref = SourceRef(**ref_data) if isinstance(ref_data, dict) else ref_data
nodes = self.find_nodes_by_source(ref)
return {"nodes": [n.to_dict() for n in nodes], "count": len(nodes)}
elif action == "find_nodes_by_label":
nodes = self.find_nodes_by_label(kwargs["label"], limit=kwargs.get("limit", 100))
return {"nodes": [n.to_dict() for n in nodes], "count": len(nodes)}
elif action == "update_node":
success = self.update_node(kwargs["node_id"], kwargs["properties"])
return {"success": success}
elif action == "update_edge":
success = self.update_edge(kwargs["edge_id"], kwargs["properties"])
return {"success": success}
elif action == "delete_nodes":
count = self.delete_nodes(kwargs["node_ids"], cascade=kwargs.get("cascade", True))
return {"deleted_count": count}
elif action == "delete_edges":
count = self.delete_edges(kwargs["edge_ids"])
return {"deleted_count": count}
elif action == "import_graph":
graph_data = kwargs["graph_data"]
if isinstance(graph_data, dict):
graph_data = GraphContext.from_dict(graph_data)
stats = self.import_graph(graph_data, merge_strategy=kwargs.get("merge_strategy", "overwrite"))
return stats
elif action == "export_graph":
ctx = self.export_graph(filter_query=kwargs.get("filter_query"))
return ctx.to_dict()
else:
raise ValueError(f"Unknown action: {action}")
# -------------------------------------------------------------------------
# CREATE
# -------------------------------------------------------------------------
def add_nodes(self, nodes: List[GraphNode]) -> List[str]:
ids = []
for node in nodes:
self._nodes[node.id] = node
ids.append(node.id)
return ids
def add_edges(self, edges: List[GraphEdge]) -> List[str]:
ids = []
for edge in edges:
self._edges[edge.id] = edge
ids.append(edge.id)
return ids
# -------------------------------------------------------------------------
# READ
# -------------------------------------------------------------------------
def get_node(self, node_id: str) -> Optional[GraphNode]:
return self._nodes.get(node_id)
def get_edge(self, edge_id: str) -> Optional[GraphEdge]:
return self._edges.get(edge_id)
def get_context(self, node_id: str, depth: int = 1, filter_labels: Optional[List[str]] = None) -> GraphContext:
nodes = []
edges = []
if node_id in self._nodes:
nodes.append(self._nodes[node_id])
for edge in self._edges.values():
if edge.source_id == node_id or edge.target_id == node_id:
edges.append(edge)
other_id = edge.target_id if edge.source_id == node_id else edge.source_id
if other_id in self._nodes:
neighbor = self._nodes[other_id]
if filter_labels is None or neighbor.label in filter_labels:
if neighbor not in nodes:
nodes.append(neighbor)
return GraphContext(nodes=nodes, edges=edges, metadata={"depth": depth})
def find_nodes_by_source(self, source_ref: SourceRef) -> List[GraphNode]:
results = []
for node in self._nodes.values():
for src in node.sources:
if (src.plugin_name == source_ref.plugin_name and
src.table_name == source_ref.table_name and
src.row_id == source_ref.row_id):
results.append(node)
break
return results
def find_nodes_by_label(self, label: str, limit: int = 100) -> List[GraphNode]:
return [n for n in self._nodes.values() if n.label == label][:limit]
# -------------------------------------------------------------------------
# UPDATE
# -------------------------------------------------------------------------
def update_node(self, node_id: str, properties: Dict[str, Any]) -> bool:
if node_id not in self._nodes:
return False
self._nodes[node_id].properties.update(properties)
return True
def update_edge(self, edge_id: str, properties: Dict[str, Any]) -> bool:
if edge_id not in self._edges:
return False
self._edges[edge_id].properties.update(properties)
return True
# -------------------------------------------------------------------------
# DELETE
# -------------------------------------------------------------------------
def delete_nodes(self, node_ids: List[str], cascade: bool = True) -> int:
count = 0
for nid in node_ids:
if nid in self._nodes:
del self._nodes[nid]
count += 1
if cascade:
to_remove = [eid for eid, e in self._edges.items()
if e.source_id == nid or e.target_id == nid]
for eid in to_remove:
del self._edges[eid]
return count
def delete_edges(self, edge_ids: List[str]) -> int:
count = 0
for eid in edge_ids:
if eid in self._edges:
del self._edges[eid]
count += 1
return count
# -------------------------------------------------------------------------
# LIFECYCLE
# -------------------------------------------------------------------------
def get_schema(self) -> Dict[str, Any]:
labels = set(n.label for n in self._nodes.values())
types = set(e.relation_type for e in self._edges.values())
return {
"node_labels": list(labels),
"edge_types": list(types),
"counts": {
"nodes": len(self._nodes),
"edges": len(self._edges)
}
}
def import_graph(self, graph_data: GraphContext, merge_strategy: str = "overwrite") -> Dict[str, int]:
nodes_created = 0
edges_created = 0
for node in graph_data.nodes:
if merge_strategy == "skip" and node.id in self._nodes:
continue
self._nodes[node.id] = node
nodes_created += 1
for edge in graph_data.edges:
if merge_strategy == "skip" and edge.id in self._edges:
continue
self._edges[edge.id] = edge
edges_created += 1
return {"nodes_created": nodes_created, "edges_created": edges_created}
def export_graph(self, filter_query: Optional[GraphQuery] = None) -> GraphContext:
return GraphContext(
nodes=list(self._nodes.values()),
edges=list(self._edges.values()),
metadata={"exported_at": "now"}
)Graph Plugin Interface
GraphPlugin
Abstract base class for all Context Graph plugins. Provides a standardized interface for:
- CRUD Operations: Create, read, update, delete nodes and edges
- Traversal: Get neighborhood context around a node
- Querying: Execute raw queries or structured searches
- Lifecycle: Import/export graph data, introspect schema
Usage via RemotePluginProxy / JobQueue:
The execute() method is the main entry point when called via HTTP proxy. It dispatches to the appropriate method based on the action parameter:
# Via JobQueue
job_id = await queue.submit(
"cjm-graph-plugin-sqlite",
action="add_nodes",
nodes=[node1.to_dict(), node2.to_dict()]
)
# Via PluginManager
result = manager.execute_plugin(
"cjm-graph-plugin-sqlite",
action="get_context",
node_id="abc-123",
depth=2
)Available Actions:
| Action | Description | Required kwargs |
|---|---|---|
get_schema |
Get graph ontology (default) | - |
add_nodes |
Create nodes | nodes (list of dicts) |
add_edges |
Create edges | edges (list of dicts) |
get_node |
Get single node | node_id |
get_edge |
Get single edge | edge_id |
get_context |
Neighborhood traversal | node_id, optional: depth, filter_labels |
find_nodes_by_source |
Find by external reference | source_ref (dict) |
find_nodes_by_label |
Find by label | label, optional: limit |
update_node |
Update node properties | node_id, properties |
update_edge |
Update edge properties | edge_id, properties |
delete_nodes |
Delete nodes | node_ids, optional: cascade |
delete_edges |
Delete edges | edge_ids |
import_graph |
Bulk import | graph_data (dict), optional: merge_strategy |
export_graph |
Bulk export | optional: filter_query |
query |
Raw query execution | query (string or dict) |
GraphPlugin
def GraphPlugin(
args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):
Abstract base class for all Context Graph plugins.
Interface Documentation
CREATE Operations
GraphPlugin.add_nodes
def add_nodes(
nodes:List, # Nodes to create
)->List: # Created node IDs
Bulk create nodes.
GraphPlugin.add_edges
def add_edges(
edges:List, # Edges to create
)->List: # Created edge IDs
Bulk create edges.
READ Operations
GraphPlugin.get_node
def get_node(
node_id:str, # UUID of node to retrieve
)->Optional: # Node or None if not found
Get a single node by ID.
GraphPlugin.get_edge
def get_edge(
edge_id:str, # UUID of edge to retrieve
)->Optional: # Edge or None if not found
Get a single edge by ID.
GraphPlugin.get_context
def get_context(
node_id:str, # Starting node UUID
depth:int=1, # Traversal depth (1 = immediate neighbors)
filter_labels:Optional=None, # Only include nodes with these labels
)->GraphContext: # Subgraph containing node and its neighborhood
Get the neighborhood of a specific node.
GraphPlugin.find_nodes_by_source
def find_nodes_by_source(
source_ref:SourceRef, # External resource reference
)->List: # Nodes attached to this source
Find all nodes linked to a specific external resource.
GraphPlugin.find_nodes_by_label
def find_nodes_by_label(
label:str, # Node label to search for
limit:int=100, # Max results
)->List: # Matching nodes
Find nodes by label.
GraphPlugin.execute
def execute(
action:str='get_schema', # Action to perform (see docstring for available actions)
kwargs:VAR_KEYWORD
)->Dict: # JSON-serializable result
Execute a graph operation. This is the main entry point for RemotePluginProxy.
Dispatches to the appropriate method based on action parameter. All return values are JSON-serializable dictionaries for HTTP transport.
UPDATE Operations
GraphPlugin.update_node
def update_node(
node_id:str, # UUID of node to update
properties:Dict, # Properties to merge/update
)->bool: # True if successful
Partial update of node properties.
GraphPlugin.update_edge
def update_edge(
edge_id:str, # UUID of edge to update
properties:Dict, # Properties to merge/update
)->bool: # True if successful
Partial update of edge properties.
DELETE Operations
GraphPlugin.delete_nodes
def delete_nodes(
node_ids:List, # UUIDs of nodes to delete
cascade:bool=True, # Also delete connected edges
)->int: # Number of nodes deleted
Delete nodes (and optionally connected edges).
GraphPlugin.delete_edges
def delete_edges(
edge_ids:List, # UUIDs of edges to delete
)->int: # Number of edges deleted
Delete edges.
LIFECYCLE Operations
GraphPlugin.get_schema
def get_schema(
)->Dict: # Graph schema/ontology
Return the current ontology/schema of the graph.
GraphPlugin.import_graph
def import_graph(
graph_data:GraphContext, # Data to import
merge_strategy:str='overwrite', # "overwrite", "skip", or "merge"
)->Dict: # Import statistics {nodes_created, edges_created, ...}
Bulk import a GraphContext (e.g., from backup or another plugin).
GraphPlugin.export_graph
def export_graph(
filter_query:Optional=None, # Optional filter
)->GraphContext: # Exported subgraph or full graph
Export the entire graph or a filtered subset.
Example Schema Return Value
The get_schema() method returns information about the current graph ontology:
{
"node_labels": ["Person", "Concept", "Correction", "Event"],
"edge_types": ["MENTIONS", "CORRECTS", "AUTHORED_BY", "RELATED_TO"],
"counts": {
"nodes": {"Person": 12, "Concept": 45, "Correction": 3},
"edges": {"MENTIONS": 78, "CORRECTS": 5}
},
"properties": {
"Person": ["name", "role", "confidence"],
"Concept": ["name", "definition", "aliases"]
}
}This is useful for UIs that need to know available labels and types for filtering/querying.
Example Implementation
A minimal in-memory implementation demonstrating the interface:
import uuid
# Test the example plugin
plugin = InMemoryGraphPlugin()
plugin.initialize()
print(f"Plugin: {plugin.name} v{plugin.version}")Plugin: in-memory-graph v1.0.0
# Create some nodes
alice_id = str(uuid.uuid4())
bob_id = str(uuid.uuid4())
ml_id = str(uuid.uuid4())
nodes = [
GraphNode(id=alice_id, label="Person", properties={"name": "Alice"}),
GraphNode(id=bob_id, label="Person", properties={"name": "Bob"}),
GraphNode(id=ml_id, label="Concept", properties={"name": "Machine Learning"})
]
created_ids = plugin.add_nodes(nodes)
print(f"Created {len(created_ids)} nodes")Created 3 nodes
# Create edges
edges = [
GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=ml_id, relation_type="MENTIONS"),
GraphEdge(id=str(uuid.uuid4()), source_id=bob_id, target_id=ml_id, relation_type="MENTIONS"),
GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=bob_id, relation_type="KNOWS")
]
created_ids = plugin.add_edges(edges)
print(f"Created {len(created_ids)} edges")Created 3 edges
# Test get_context (neighborhood traversal)
context = plugin.get_context(alice_id, depth=1)
print(f"Alice's neighborhood: {len(context.nodes)} nodes, {len(context.edges)} edges")
print(f"Neighbors: {[n.properties.get('name', n.label) for n in context.nodes]}")Alice's neighborhood: 3 nodes, 2 edges
Neighbors: ['Alice', 'Machine Learning', 'Bob']
# Test find_nodes_by_label
people = plugin.find_nodes_by_label("Person")
print(f"People: {[p.properties['name'] for p in people]}")People: ['Alice', 'Bob']
# Test get_schema
schema = plugin.get_schema()
print(f"Schema: {schema}")Schema: {'node_labels': ['Person', 'Concept'], 'edge_types': ['KNOWS', 'MENTIONS'], 'counts': {'nodes': 3, 'edges': 3}}
# Test update_node
plugin.update_node(alice_id, {"role": "speaker", "confidence": 0.95})
alice = plugin.get_node(alice_id)
print(f"Updated Alice: {alice.properties}")Updated Alice: {'name': 'Alice', 'role': 'speaker', 'confidence': 0.95}
# Test export/import
exported = plugin.export_graph()
print(f"Exported: {len(exported.nodes)} nodes, {len(exported.edges)} edges")
# Import into a new plugin instance
new_plugin = InMemoryGraphPlugin()
new_plugin.initialize()
stats = new_plugin.import_graph(exported)
print(f"Import stats: {stats}")Exported: 3 nodes, 3 edges
Import stats: {'nodes_created': 3, 'edges_created': 3}
# Test delete with cascade
deleted = plugin.delete_nodes([alice_id], cascade=True)
print(f"Deleted {deleted} node(s)")
print(f"Remaining: {plugin.get_schema()}")Deleted 1 node(s)
Remaining: {'node_labels': ['Person', 'Concept'], 'edge_types': ['MENTIONS'], 'counts': {'nodes': 2, 'edges': 1}}
# Cleanup
plugin.cleanup()
new_plugin.cleanup()