Configuration
SQLiteGraphPluginConfig
def SQLiteGraphPluginConfig(
db_path:Optional= None , readonly:bool = False
)-> None :
Configuration for SQLite Graph Plugin.
SQLiteGraphPlugin
Local, file-backed Context Graph implementation using SQLite. Stores nodes and edges in relational tables with JSON payloads for properties.
Schema:
-- Nodes table
CREATE TABLE nodes (
id TEXT PRIMARY KEY ,
label TEXT NOT NULL ,
properties JSON,
sources JSON,
created_at REAL ,
updated_at REAL
);
-- Edges table (with foreign keys for cascade delete)
CREATE TABLE edges (
id TEXT PRIMARY KEY ,
source_id TEXT NOT NULL ,
target_id TEXT NOT NULL ,
relation_type TEXT NOT NULL ,
properties JSON,
created_at REAL ,
updated_at REAL ,
FOREIGN KEY (source_id) REFERENCES nodes(id ) ON DELETE CASCADE ,
FOREIGN KEY (target_id) REFERENCES nodes(id ) ON DELETE CASCADE
);
SQLiteGraphPlugin
def SQLiteGraphPlugin(
):
Local, file-backed Context Graph implementation using SQLite.
Testing the Plugin
import tempfile
import uuid
# Create plugin with temp database
plugin = SQLiteGraphPlugin()
# Use temp file for testing
tmp_db = tempfile.NamedTemporaryFile(suffix= ".db" , delete= False )
plugin.initialize({"db_path" : tmp_db.name})
print (f"Plugin: { plugin. name} v { plugin. version} " )
print (f"Database: { plugin. _db_path} " )
Plugin: cjm-graph-plugin-sqlite v0.0.14
Database: /tmp/tmpzyl3qjuc.db
# Test get_config_schema
schema = plugin.get_config_schema()
print (f"Config schema: { list (schema['properties' ].keys())} " )
Config schema: ['db_path', 'readonly']
# Create some nodes
alice_id = str (uuid.uuid4())
bob_id = str (uuid.uuid4())
ml_id = str (uuid.uuid4())
# Simulate consumed content and compute hash
transcript_content = b"Alice discussed machine learning with Bob in the podcast."
content_hash = SourceRef.compute_hash(transcript_content)
# Create SourceRef to link to external data (now requires content_hash)
transcript_ref = SourceRef(
plugin_name= "cjm-transcription-plugin-whisper" ,
table_name= "transcriptions" ,
row_id= "job-abc123" ,
content_hash= content_hash,
segment_slice= "full_text"
)
nodes = [
GraphNode(id = alice_id, label= "Person" , properties= {"name" : "Alice" , "role" : "speaker" }, sources= [transcript_ref]),
GraphNode(id = bob_id, label= "Person" , properties= {"name" : "Bob" }),
GraphNode(id = ml_id, label= "Concept" , properties= {"name" : "Machine Learning" , "definition" : "AI subfield" })
]
created_ids = plugin.add_nodes(nodes)
print (f"Created { len (created_ids)} nodes" )
# Create edges
edges = [
GraphEdge(id = str (uuid.uuid4()), source_id= alice_id, target_id= ml_id, relation_type= "MENTIONS" , properties= {"confidence" : 0.95 }),
GraphEdge(id = str (uuid.uuid4()), source_id= bob_id, target_id= ml_id, relation_type= "MENTIONS" ),
GraphEdge(id = str (uuid.uuid4()), source_id= alice_id, target_id= bob_id, relation_type= "KNOWS" )
]
created_ids = plugin.add_edges(edges)
print (f"Created { len (created_ids)} edges" )
# Test get_node
alice = plugin.get_node(alice_id)
print (f"Retrieved: { alice. label} - { alice. properties} " )
print (f"Sources: { [s.to_dict() for s in alice.sources]} " )
Retrieved: Person - {'name': 'Alice', 'role': 'speaker'}
Sources: [{'plugin_name': 'cjm-transcription-plugin-whisper', 'table_name': 'transcriptions', 'row_id': 'job-abc123', 'content_hash': 'sha256:f85b2165bd6e790af2cf6a2223c07f74cbf0f588434395be4607a479c7e592a3', 'segment_slice': 'full_text'}]
# Test get_context (neighborhood traversal)
context = plugin.get_context(alice_id, depth= 1 )
print (f"Alice's neighborhood: { len (context.nodes)} nodes, { len (context.edges)} edges" )
print (f"Neighbors: { [n.properties.get('name' , n.label) for n in context.nodes]} " )
Alice's neighborhood: 3 nodes, 2 edges
Neighbors: ['Machine Learning', 'Alice', 'Bob']
# Test find_nodes_by_source
found = plugin.find_nodes_by_source(transcript_ref)
print (f"Nodes linked to transcript job-abc123: { [n.properties.get('name' ) for n in found]} " )
Nodes linked to transcript job-abc123: ['Alice']
# Test content hash round-trip through SQLite
alice = plugin.get_node(alice_id)
loaded_ref = alice.sources[0 ]
# Hash survived storage
print (f"Stored hash: { loaded_ref. content_hash[:40 ]} ..." )
print (f"Original hash: { content_hash[:40 ]} ..." )
assert loaded_ref.content_hash == content_hash
# verify() works after round-trip
assert loaded_ref.verify(transcript_content), "verify() should return True for original content"
assert not loaded_ref.verify(b"tampered" ), "verify() should return False for tampered content"
print ("Content hash round-trip: PASSED" )
Stored hash: sha256:f85b2165bd6e790af2cf6a2223c07f74c...
Original hash: sha256:f85b2165bd6e790af2cf6a2223c07f74c...
Content hash round-trip: PASSED
# Test find_nodes_by_label
people = plugin.find_nodes_by_label("Person" )
print (f"People: { [p.properties['name' ] for p in people]} " )
# Test get_schema
schema = plugin.get_schema()
print (f"Schema: { schema} " )
Schema: {'node_labels': ['Concept', 'Person'], 'edge_types': ['KNOWS', 'MENTIONS'], 'counts': {'Concept': 1, 'Person': 2}}
# Test update_node
plugin.update_node(alice_id, {"role" : "host" , "verified" : True })
alice = plugin.get_node(alice_id)
print (f"Updated Alice: { alice. properties} " )
Updated Alice: {'name': 'Alice', 'role': 'host', 'verified': True}
# Test export/import
exported = plugin.export_graph()
print (f"Exported: { len (exported.nodes)} nodes, { len (exported.edges)} edges" )
# Test FileBackedDTO (zero-copy transfer)
temp_path = exported.to_temp_file()
print (f"Saved to temp file: { temp_path} " )
# Load into new plugin
new_plugin = SQLiteGraphPlugin()
tmp_db2 = tempfile.NamedTemporaryFile(suffix= ".db" , delete= False )
new_plugin.initialize({"db_path" : tmp_db2.name})
# Load from file and import
loaded = GraphContext.from_file(temp_path)
stats = new_plugin.import_graph(loaded)
print (f"Import stats: { stats} " )
import os
os.unlink(temp_path)
Exported: 0 nodes, 0 edges
Saved to temp file: /tmp/tmpekkdqdq8.json
Import stats: {'nodes_created': 0, 'edges_created': 0, 'merge_strategy': 'overwrite'}
# Test delete with cascade
deleted = plugin.delete_nodes([alice_id], cascade= True )
print (f"Deleted { deleted} node(s)" )
print (f"Remaining schema: { plugin. get_schema()} " )
Deleted 1 node(s)
Remaining schema: {'node_labels': ['Concept', 'Person'], 'edge_types': ['MENTIONS'], 'counts': {'Concept': 1, 'Person': 1}}
# Test SG-44 @plugin_action dispatch + SG-41 smells (WAL / readonly / query / merge_strategy)
import tempfile as _tf, os as _os, sqlite3 as _sq
from cjm_plugin_system.core.errors import PluginInputError as _PIE
_d = _tf.mkdtemp(); _db = _os.path.join(_d, 'sg.db' )
_p = SQLiteGraphPlugin(); _p.initialize({'db_path' : _db})
# SG-44: supported_actions discovered from @plugin_action markers
assert len (SQLiteGraphPlugin.supported_actions) == 15
assert 'query' in SQLiteGraphPlugin.supported_actions
# SG-41(a): WAL enabled on the DB file
assert _sq.connect (_db).execute('PRAGMA journal_mode;' ).fetchone()[0 ].lower() == 'wal'
_SR = {'plugin_name' : 'pl' , 'table_name' : 't' , 'row_id' : 'r1' , 'content_hash' : 'h1' }
_p.execute('add_nodes' , nodes= [{'id' : 'n1' , 'label' : 'Doc' , 'properties' : {'x' : 1 }, 'sources' : [_SR]}])
assert _p.execute('get_node' , node_id= 'n1' )['node' ]['properties' ] == {'x' : 1 }
try : _p.execute('no_such_action' )
except _PIE: pass
else : raise AssertionError ('unknown action should raise PluginInputError' )
# SG-41(c): query is a guarded read-only parameterized SELECT
_q = _p.execute('query' , sql= 'SELECT id FROM nodes WHERE label = ?' , params= ['Doc' ])
assert _q['row_count' ] == 1 and _q['columns' ] == ['id' ] and _q['rows' ][0 ][0 ] == 'n1'
for _bad in ('UPDATE nodes SET label = "x"' , 'SELECT 1; SELECT 2' , ' ' ):
try : _p.query(_bad)
except _PIE: pass
else : raise AssertionError ('query guard should reject: %r ' % _bad)
# SG-41(d): import_graph merge_strategy="merge" unions properties + sources
_inc = {'nodes' : [{'id' : 'n1' , 'label' : 'Doc2' , 'properties' : {'y' : 2 },
'sources' : [{'plugin_name' : 'pl' , 'table_name' : 't' , 'row_id' : 'r2' , 'content_hash' : 'h2' }]}],
'edges' : []}
_p.execute('import_graph' , graph_data= _inc, merge_strategy= 'merge' )
_m = _p.execute('get_node' , node_id= 'n1' )['node' ]
assert _m['properties' ] == {'x' : 1 , 'y' : 2 } and len (_m['sources' ]) == 2
# SG-41(b): readonly config honored — reads work, writes rejected
_ro = SQLiteGraphPlugin(); _ro.initialize({'db_path' : _db, 'readonly' : True })
assert _ro.execute('get_node' , node_id= 'n1' )['node' ] is not None
try : _ro.execute('add_nodes' , nodes= [{'id' : 'z' , 'label' : 'Z' , 'properties' : {}, 'sources' : []}])
except Exception : pass
else : raise AssertionError ('readonly write should fail' )
print ('SG-44 dispatch + SG-41 (WAL / readonly / query / merge_strategy) tests passed' )
SG-44 dispatch + SG-41 (WAL / readonly / query / merge_strategy) tests passed
# Cleanup
plugin.cleanup()
new_plugin.cleanup()
# Remove temp files
os.unlink(tmp_db.name)
os.unlink(tmp_db2.name)
print ("Cleanup complete" )