SourceRef
A pointer to external data residing in another plugin’s domain. Used to anchor graph nodes to ground truth (e.g., a specific transcript segment from a transcription plugin).
Each SourceRef carries a content_hash — a cryptographic hash of the content that was consumed at link-creation time. This enables verification that the referenced data hasn’t changed since the link was established.
The hash uses a self-describing "algo:hexdigest" format (e.g., "sha256:a3f2b8..."), making it forward-compatible if the algorithm changes.
source
SourceRef
def SourceRef(
plugin_name:str , table_name:str , row_id:str , content_hash:str , segment_slice:Optional= None
)-> None :
A pointer to external data in another plugin’s domain.
# Test SourceRef creation with content hash
sample_text = b"Laying Plans Sun Tzu said, The art of war is of vital importance..."
content_hash = SourceRef.compute_hash(sample_text)
ref = SourceRef(
plugin_name= "cjm-transcription-plugin-voxtral-hf" ,
table_name= "transcriptions" ,
row_id= "b0ceddd3-1234-5678-9abc-def012345678" ,
content_hash= content_hash,
segment_slice= "timestamp:00:10-00:30"
)
print (f"SourceRef: { ref} " )
print (f"As dict: { ref. to_dict()} " )
assert "sha256:" in ref.content_hash
SourceRef: SourceRef(plugin_name='cjm-transcription-plugin-voxtral-hf', table_name='transcriptions', row_id='b0ceddd3-1234-5678-9abc-def012345678', content_hash='sha256:90fc4487461482ec8f7022c3e058f6c026959a444b7deb6cd28dd77190a99de0', segment_slice='timestamp:00:10-00:30')
As dict: {'plugin_name': 'cjm-transcription-plugin-voxtral-hf', 'table_name': 'transcriptions', 'row_id': 'b0ceddd3-1234-5678-9abc-def012345678', 'content_hash': 'sha256:90fc4487461482ec8f7022c3e058f6c026959a444b7deb6cd28dd77190a99de0', 'segment_slice': 'timestamp:00:10-00:30'}
# Test without optional segment_slice
ref_minimal = SourceRef(
plugin_name= "cjm-transcription-plugin-whisper" ,
table_name= "segments" ,
row_id= "job-123" ,
content_hash= SourceRef.compute_hash(b"some segment text" )
)
print (f"Minimal SourceRef: { ref_minimal} " )
assert ref_minimal.segment_slice is None
Minimal SourceRef: SourceRef(plugin_name='cjm-transcription-plugin-whisper', table_name='segments', row_id='job-123', content_hash='sha256:e2ff83ecf25ea1dc835b8c9d6705ee0ad98d647fd9bcc871120eaa2f3c920edc', segment_slice=None)
# Test verify() — content matches
assert ref.verify(sample_text) == True
print ("Verify with original content: True" )
# Test verify() — content has been tampered with
assert ref.verify(b"TAMPERED content" ) == False
print ("Verify with tampered content: False" )
# Test compute_hash() with custom algorithm
sha512_hash = SourceRef.compute_hash(b"test content" , algo= "sha512" )
assert sha512_hash.startswith("sha512:" )
print (f"SHA-512 hash: { sha512_hash[:30 ]} ..." )
# Test round-trip: compute_hash -> SourceRef -> verify
content = b"round trip test"
ref_roundtrip = SourceRef(
plugin_name= "test-plugin" ,
table_name= "tests" ,
row_id= "rt-001" ,
content_hash= SourceRef.compute_hash(content)
)
assert ref_roundtrip.verify(content) == True
print ("Round-trip hash verification passed" )
Verify with original content: True
Verify with tampered content: False
SHA-512 hash: sha512:0cbf4caef38047bba9a24e6...
Round-trip hash verification passed
GraphNode
Represents an entity in the Context Graph. Each node has:
id : Unique identifier (UUID)
label : Node type (e.g., “Person”, “Concept”, “Correction”)
properties : Arbitrary key-value data
sources : Links to external plugin data for provenance tracking
source
GraphNode
def GraphNode(
id :str , label:str , properties:Dict=< factory> , sources:List=< factory> , created_at:Optional= None ,
updated_at:Optional= None
)-> None :
Represents an entity in the Context Graph.
import uuid
# Test SourceRef creation
ref = SourceRef(
plugin_name= "cjm-transcription-plugin-voxtral-hf" ,
table_name= "transcriptions" ,
row_id= "b0ceddd3-1234-5678-9abc-def012345678" ,
content_hash= SourceRef.compute_hash(b"transcript text for testing" ),
segment_slice= "timestamp:00:10-00:30"
)
print (f"SourceRef: { ref} " )
print (f"As dict: { ref. to_dict()} " )
# Test GraphNode creation with sources
node = GraphNode(
id = str (uuid.uuid4()),
label= "Person" ,
properties= {"name" : "Alice" , "role" : "speaker" },
sources= [ref] # Link to transcript segment
)
print (f"GraphNode: { node} " )
print (f" \n As dict: \n { json. dumps(node.to_dict(), indent= 2 )} " )
SourceRef: SourceRef(plugin_name='cjm-transcription-plugin-voxtral-hf', table_name='transcriptions', row_id='b0ceddd3-1234-5678-9abc-def012345678', content_hash='sha256:6589e1c6104bfaffa5ca3f70cba8ab552b04802da56bfee2a3527a39cee4f1c2', segment_slice='timestamp:00:10-00:30')
As dict: {'plugin_name': 'cjm-transcription-plugin-voxtral-hf', 'table_name': 'transcriptions', 'row_id': 'b0ceddd3-1234-5678-9abc-def012345678', 'content_hash': 'sha256:6589e1c6104bfaffa5ca3f70cba8ab552b04802da56bfee2a3527a39cee4f1c2', 'segment_slice': 'timestamp:00:10-00:30'}
GraphNode: GraphNode(id='ac5c36a7-b078-4cb3-84a0-fca7be67c897', label='Person', properties={'name': 'Alice', 'role': 'speaker'}, sources=[SourceRef(plugin_name='cjm-transcription-plugin-voxtral-hf', table_name='transcriptions', row_id='b0ceddd3-1234-5678-9abc-def012345678', content_hash='sha256:6589e1c6104bfaffa5ca3f70cba8ab552b04802da56bfee2a3527a39cee4f1c2', segment_slice='timestamp:00:10-00:30')], created_at=None, updated_at=None)
As dict:
{
"id": "ac5c36a7-b078-4cb3-84a0-fca7be67c897",
"label": "Person",
"properties": {
"name": "Alice",
"role": "speaker"
},
"sources": [
{
"plugin_name": "cjm-transcription-plugin-voxtral-hf",
"table_name": "transcriptions",
"row_id": "b0ceddd3-1234-5678-9abc-def012345678",
"content_hash": "sha256:6589e1c6104bfaffa5ca3f70cba8ab552b04802da56bfee2a3527a39cee4f1c2",
"segment_slice": "timestamp:00:10-00:30"
}
],
"created_at": null,
"updated_at": null
}
# Test minimal node (no properties, no sources)
concept = GraphNode(
id = str (uuid.uuid4()),
label= "Concept"
)
print (f"Minimal node: { concept. to_dict()} " )
Minimal node: {'id': '7b8ea968-a2bb-4e46-9d2a-0e304c962d81', 'label': 'Concept', 'properties': {}, 'sources': [], 'created_at': None, 'updated_at': None}
GraphEdge
Represents a typed relationship between two nodes. Edges are directional (source → target) and can carry properties.
source
GraphEdge
def GraphEdge(
id :str , source_id:str , target_id:str , relation_type:str , properties:Dict=< factory> , created_at:Optional= None ,
updated_at:Optional= None
)-> None :
Represents a relationship between two nodes.
# Create two nodes and an edge between them
person_id = str (uuid.uuid4())
concept_id = str (uuid.uuid4())
person_node = GraphNode(id = person_id, label= "Person" , properties= {"name" : "Bob" })
concept_node = GraphNode(id = concept_id, label= "Concept" , properties= {"name" : "Machine Learning" })
edge = GraphEdge(
id = str (uuid.uuid4()),
source_id= person_id,
target_id= concept_id,
relation_type= "MENTIONS" ,
properties= {"confidence" : 0.95 , "timestamp" : "00:15:30" }
)
print (f"Edge: { person_node. properties['name' ]} --[ { edge. relation_type} ]--> { concept_node. properties['name' ]} " )
print (f" \n As dict: { edge. to_dict()} " )
Edge: Bob --[MENTIONS]--> Machine Learning
As dict: {'id': 'ad10aae0-ae6c-4d0b-9d2e-cacec6e12fe6', 'source_id': 'a84b5289-34ec-4038-95dd-b4fee2b7f85b', 'target_id': '9abc2c30-31c5-48c6-a449-45f8e832617e', 'relation_type': 'MENTIONS', 'properties': {'confidence': 0.95, 'timestamp': '00:15:30'}, 'created_at': None, 'updated_at': None}
GraphContext
Container for graph query results (a subgraph). Implements FileBackedDTO for zero-copy transfer of large subgraphs between Host and Worker processes.
When passed through RemotePluginProxy, large GraphContext objects are automatically serialized to temp files rather than sent inline via JSON.
source
GraphContext
def GraphContext(
nodes:List, edges:List, metadata:Dict=< factory>
)-> None :
Container for graph query results (a subgraph).
# Create two nodes and an edge between them
person_id = str (uuid.uuid4())
concept_id = str (uuid.uuid4())
person_node = GraphNode(id = person_id, label= "Person" , properties= {"name" : "Bob" })
concept_node = GraphNode(id = concept_id, label= "Concept" , properties= {"name" : "Machine Learning" })
edge = GraphEdge(
id = str (uuid.uuid4()),
source_id= person_id,
target_id= concept_id,
relation_type= "MENTIONS" ,
properties= {"confidence" : 0.95 , "timestamp" : "00:15:30" }
)
print (f"Edge: { person_node. properties['name' ]} --[ { edge. relation_type} ]--> { concept_node. properties['name' ]} " )
print (f" \n As dict: { edge. to_dict()} " )
# Test GraphContext creation
context = GraphContext(
nodes= [person_node, concept_node],
edges= [edge],
metadata= {"query" : "neighbors of Bob" , "depth" : 1 }
)
print (f"GraphContext: { len (context.nodes)} nodes, { len (context.edges)} edges" )
print (f"Metadata: { context. metadata} " )
# Test FileBackedDTO protocol
print (f"Implements FileBackedDTO: { isinstance (context, FileBackedDTO)} " )
# Test to_temp_file (this is what the Proxy calls)
temp_path = context.to_temp_file()
print (f"Saved to temp file: { temp_path} " )
# Verify file exists and content
import os
print (f"File exists: { os. path. exists(temp_path)} " )
print (f"File size: { os. path. getsize(temp_path)} bytes" )
# Read back and verify
with open (temp_path) as f:
content = json.load(f)
print (f" \n File content keys: { content. keys()} " )
print (f"Nodes count: { len (content['nodes' ])} " )
# Clean up
os.unlink(temp_path)
Edge: Bob --[MENTIONS]--> Machine Learning
As dict: {'id': 'f19969b9-d25a-4416-bcbb-6991382bee11', 'source_id': 'da5b5109-f2d0-4f36-b825-8944f52e82bc', 'target_id': '5d51c215-fa3a-44ce-a193-83fa3280be70', 'relation_type': 'MENTIONS', 'properties': {'confidence': 0.95, 'timestamp': '00:15:30'}, 'created_at': None, 'updated_at': None}
GraphContext: 2 nodes, 1 edges
Metadata: {'query': 'neighbors of Bob', 'depth': 1}
Implements FileBackedDTO: True
Saved to temp file: /tmp/tmpdaqlr58k.json
File exists: True
File size: 681 bytes
File content keys: dict_keys(['nodes', 'edges', 'metadata'])
Nodes count: 2
# Test from_file round-trip
temp_path = context.to_temp_file()
loaded = GraphContext.from_file(temp_path)
print (f"Original: { len (context.nodes)} nodes, { len (context.edges)} edges" )
print (f"Loaded: { len (loaded.nodes)} nodes, { len (loaded.edges)} edges" )
print (f"Node labels match: { [n.label for n in context.nodes] == [n.label for n in loaded.nodes]} " )
print (f"Edge types match: { [e.relation_type for e in context.edges] == [e.relation_type for e in loaded.edges]} " )
os.unlink(temp_path)
Original: 2 nodes, 1 edges
Loaded: 2 nodes, 1 edges
Node labels match: True
Edge types match: True
# Test from_dict
ctx_dict = context.to_dict()
loaded_from_dict = GraphContext.from_dict(ctx_dict)
print (f"From dict: { len (loaded_from_dict.nodes)} nodes, { len (loaded_from_dict.edges)} edges" )
From dict: 2 nodes, 1 edges
GraphQuery
A standardized query object that can represent:
Raw query strings (SQL, Cypher, etc.)
Structured search parameters
The depth parameter is used for neighborhood traversals.
source
GraphQuery
def GraphQuery(
query:str , parameters:Dict=< factory> , limit:int = 100 , depth:int = 1
)-> None :
A standardized query object for graph operations.
# Test GraphQuery with SQL-style query
sql_query = GraphQuery(
query= "SELECT * FROM nodes WHERE label = :label" ,
parameters= {"label" : "Person" },
limit= 50
)
print (f"SQL Query: { sql_query} " )
print (f"As dict: { sql_query. to_dict()} " )
SQL Query: GraphQuery(query='SELECT * FROM nodes WHERE label = :label', parameters={'label': 'Person'}, limit=50, depth=1)
As dict: {'query': 'SELECT * FROM nodes WHERE label = :label', 'parameters': {'label': 'Person'}, 'limit': 50, 'depth': 1}
# Test GraphQuery for neighborhood traversal
traversal_query = GraphQuery(
query= "NEIGHBORS" ,
parameters= {"start_node" : person_id},
depth= 2 ,
limit= 100
)
print (f"Traversal Query: { traversal_query} " )
Traversal Query: GraphQuery(query='NEIGHBORS', parameters={'start_node': '4fd70f85-b639-4d1e-bd81-4a166f2dda8f'}, limit=100, depth=2)
# Test minimal query
simple = GraphQuery(query= "SELECT * FROM nodes" )
print (f"Simple query defaults: limit= { simple. limit} , depth= { simple. depth} " )
Simple query defaults: limit=100, depth=1