SQLite Graph Plugin

Plugin implementation for Context Graph using SQLite

Configuration


SQLiteGraphPluginConfig


def SQLiteGraphPluginConfig(
    db_path:Optional=None, readonly:bool=False
)->None:

Configuration for SQLite Graph Plugin.

SQLiteGraphPlugin

Local, file-backed Context Graph implementation using SQLite. Stores nodes and edges in relational tables with JSON payloads for properties.

Schema:

-- Nodes table
CREATE TABLE nodes (
    id TEXT PRIMARY KEY,
    label TEXT NOT NULL,
    properties JSON,
    sources JSON,
    created_at REAL,
    updated_at REAL
);

-- Edges table (with foreign keys for cascade delete)
CREATE TABLE edges (
    id TEXT PRIMARY KEY,
    source_id TEXT NOT NULL,
    target_id TEXT NOT NULL,
    relation_type TEXT NOT NULL,
    properties JSON,
    created_at REAL,
    updated_at REAL,
    FOREIGN KEY(source_id) REFERENCES nodes(id) ON DELETE CASCADE,
    FOREIGN KEY(target_id) REFERENCES nodes(id) ON DELETE CASCADE
);

SQLiteGraphPlugin


def SQLiteGraphPlugin(
    
):

Local, file-backed Context Graph implementation using SQLite.

Testing the Plugin

import tempfile
import uuid

# Create plugin with temp database
plugin = SQLiteGraphPlugin()

# Use temp file for testing
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
plugin.initialize({"db_path": tmp_db.name})

print(f"Plugin: {plugin.name} v{plugin.version}")
print(f"Database: {plugin._db_path}")
Plugin: cjm-graph-plugin-sqlite v0.0.14
Database: /tmp/tmpzyl3qjuc.db
# Test get_config_schema
schema = plugin.get_config_schema()
print(f"Config schema: {list(schema['properties'].keys())}")
Config schema: ['db_path', 'readonly']
# Create some nodes
alice_id = str(uuid.uuid4())
bob_id = str(uuid.uuid4())
ml_id = str(uuid.uuid4())

# Simulate consumed content and compute hash
transcript_content = b"Alice discussed machine learning with Bob in the podcast."
content_hash = SourceRef.compute_hash(transcript_content)

# Create SourceRef to link to external data (now requires content_hash)
transcript_ref = SourceRef(
    plugin_name="cjm-transcription-plugin-whisper",
    table_name="transcriptions",
    row_id="job-abc123",
    content_hash=content_hash,
    segment_slice="full_text"
)

nodes = [
    GraphNode(id=alice_id, label="Person", properties={"name": "Alice", "role": "speaker"}, sources=[transcript_ref]),
    GraphNode(id=bob_id, label="Person", properties={"name": "Bob"}),
    GraphNode(id=ml_id, label="Concept", properties={"name": "Machine Learning", "definition": "AI subfield"})
]

created_ids = plugin.add_nodes(nodes)
print(f"Created {len(created_ids)} nodes")
Created 3 nodes
# Create edges
edges = [
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=ml_id, relation_type="MENTIONS", properties={"confidence": 0.95}),
    GraphEdge(id=str(uuid.uuid4()), source_id=bob_id, target_id=ml_id, relation_type="MENTIONS"),
    GraphEdge(id=str(uuid.uuid4()), source_id=alice_id, target_id=bob_id, relation_type="KNOWS")
]

created_ids = plugin.add_edges(edges)
print(f"Created {len(created_ids)} edges")
Created 3 edges
# Test get_node
alice = plugin.get_node(alice_id)
print(f"Retrieved: {alice.label} - {alice.properties}")
print(f"Sources: {[s.to_dict() for s in alice.sources]}")
Retrieved: Person - {'name': 'Alice', 'role': 'speaker'}
Sources: [{'plugin_name': 'cjm-transcription-plugin-whisper', 'table_name': 'transcriptions', 'row_id': 'job-abc123', 'content_hash': 'sha256:f85b2165bd6e790af2cf6a2223c07f74cbf0f588434395be4607a479c7e592a3', 'segment_slice': 'full_text'}]
# Test get_context (neighborhood traversal)
context = plugin.get_context(alice_id, depth=1)
print(f"Alice's neighborhood: {len(context.nodes)} nodes, {len(context.edges)} edges")
print(f"Neighbors: {[n.properties.get('name', n.label) for n in context.nodes]}")
Alice's neighborhood: 3 nodes, 2 edges
Neighbors: ['Machine Learning', 'Alice', 'Bob']
# Test find_nodes_by_source
found = plugin.find_nodes_by_source(transcript_ref)
print(f"Nodes linked to transcript job-abc123: {[n.properties.get('name') for n in found]}")
Nodes linked to transcript job-abc123: ['Alice']
# Test content hash round-trip through SQLite
alice = plugin.get_node(alice_id)
loaded_ref = alice.sources[0]

# Hash survived storage
print(f"Stored hash:   {loaded_ref.content_hash[:40]}...")
print(f"Original hash: {content_hash[:40]}...")
assert loaded_ref.content_hash == content_hash

# verify() works after round-trip
assert loaded_ref.verify(transcript_content), "verify() should return True for original content"
assert not loaded_ref.verify(b"tampered"), "verify() should return False for tampered content"
print("Content hash round-trip: PASSED")
Stored hash:   sha256:f85b2165bd6e790af2cf6a2223c07f74c...
Original hash: sha256:f85b2165bd6e790af2cf6a2223c07f74c...
Content hash round-trip: PASSED
# Test find_nodes_by_label
people = plugin.find_nodes_by_label("Person")
print(f"People: {[p.properties['name'] for p in people]}")
People: ['Alice', 'Bob']
# Test get_schema
schema = plugin.get_schema()
print(f"Schema: {schema}")
Schema: {'node_labels': ['Concept', 'Person'], 'edge_types': ['KNOWS', 'MENTIONS'], 'counts': {'Concept': 1, 'Person': 2}}
# Test update_node
plugin.update_node(alice_id, {"role": "host", "verified": True})
alice = plugin.get_node(alice_id)
print(f"Updated Alice: {alice.properties}")
Updated Alice: {'name': 'Alice', 'role': 'host', 'verified': True}
# Test export/import
exported = plugin.export_graph()
print(f"Exported: {len(exported.nodes)} nodes, {len(exported.edges)} edges")

# Test FileBackedDTO (zero-copy transfer)
temp_path = exported.to_temp_file()
print(f"Saved to temp file: {temp_path}")

# Load into new plugin
new_plugin = SQLiteGraphPlugin()
tmp_db2 = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
new_plugin.initialize({"db_path": tmp_db2.name})

# Load from file and import
loaded = GraphContext.from_file(temp_path)
stats = new_plugin.import_graph(loaded)
print(f"Import stats: {stats}")

import os
os.unlink(temp_path)
Exported: 0 nodes, 0 edges
Saved to temp file: /tmp/tmpekkdqdq8.json
Import stats: {'nodes_created': 0, 'edges_created': 0, 'merge_strategy': 'overwrite'}
# Test delete with cascade
deleted = plugin.delete_nodes([alice_id], cascade=True)
print(f"Deleted {deleted} node(s)")
print(f"Remaining schema: {plugin.get_schema()}")
Deleted 1 node(s)
Remaining schema: {'node_labels': ['Concept', 'Person'], 'edge_types': ['MENTIONS'], 'counts': {'Concept': 1, 'Person': 1}}
# Test SG-44 @plugin_action dispatch + SG-41 smells (WAL / readonly / query / merge_strategy)
import tempfile as _tf, os as _os, sqlite3 as _sq
from cjm_plugin_system.core.errors import PluginInputError as _PIE

_d = _tf.mkdtemp(); _db = _os.path.join(_d, 'sg.db')
_p = SQLiteGraphPlugin(); _p.initialize({'db_path': _db})

# SG-44: supported_actions discovered from @plugin_action markers
assert len(SQLiteGraphPlugin.supported_actions) == 15
assert 'query' in SQLiteGraphPlugin.supported_actions
# SG-41(a): WAL enabled on the DB file
assert _sq.connect(_db).execute('PRAGMA journal_mode;').fetchone()[0].lower() == 'wal'

_SR = {'plugin_name': 'pl', 'table_name': 't', 'row_id': 'r1', 'content_hash': 'h1'}
_p.execute('add_nodes', nodes=[{'id': 'n1', 'label': 'Doc', 'properties': {'x': 1}, 'sources': [_SR]}])
assert _p.execute('get_node', node_id='n1')['node']['properties'] == {'x': 1}
try: _p.execute('no_such_action')
except _PIE: pass
else: raise AssertionError('unknown action should raise PluginInputError')

# SG-41(c): query is a guarded read-only parameterized SELECT
_q = _p.execute('query', sql='SELECT id FROM nodes WHERE label = ?', params=['Doc'])
assert _q['row_count'] == 1 and _q['columns'] == ['id'] and _q['rows'][0][0] == 'n1'
for _bad in ('UPDATE nodes SET label = "x"', 'SELECT 1; SELECT 2', '   '):
    try: _p.query(_bad)
    except _PIE: pass
    else: raise AssertionError('query guard should reject: %r' % _bad)

# SG-41(d): import_graph merge_strategy="merge" unions properties + sources
_inc = {'nodes': [{'id': 'n1', 'label': 'Doc2', 'properties': {'y': 2},
                   'sources': [{'plugin_name': 'pl', 'table_name': 't', 'row_id': 'r2', 'content_hash': 'h2'}]}],
        'edges': []}
_p.execute('import_graph', graph_data=_inc, merge_strategy='merge')
_m = _p.execute('get_node', node_id='n1')['node']
assert _m['properties'] == {'x': 1, 'y': 2} and len(_m['sources']) == 2

# SG-41(b): readonly config honored — reads work, writes rejected
_ro = SQLiteGraphPlugin(); _ro.initialize({'db_path': _db, 'readonly': True})
assert _ro.execute('get_node', node_id='n1')['node'] is not None
try: _ro.execute('add_nodes', nodes=[{'id': 'z', 'label': 'Z', 'properties': {}, 'sources': []}])
except Exception: pass
else: raise AssertionError('readonly write should fail')
print('SG-44 dispatch + SG-41 (WAL / readonly / query / merge_strategy) tests passed')
SG-44 dispatch + SG-41 (WAL / readonly / query / merge_strategy) tests passed
# Cleanup
plugin.cleanup()
new_plugin.cleanup()

# Remove temp files
os.unlink(tmp_db.name)
os.unlink(tmp_db2.name)

print("Cleanup complete")
Cleanup complete