Journal Store

CR-14 (stage 7): the durable account-of-action. One substrate-derived, host-written, never-auto-deleted SQLite store of typed observability events — the operational half of the attempted-vs-happened asymmetry (the graph records what HAPPENED; the journal records what was ATTEMPTED, including everything the graph by design refuses to contain: failures, refusals, retries, admission decisions, worker lifecycle). Design ledger: claude-docs/stage-7-evidence.md.

Event vocabulary

SubstrateEventType covers journal events OUTSIDE the job-scoped JobEventType set (which stays in core.queue — the queue imports this module, never the reverse). Reserved up front per the reserve-enum-values discipline; emission is progressive — a value existing here does NOT imply the substrate emits it yet.


SubstrateEventType


def SubstrateEventType(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

Journal vocabulary beyond the job-scoped JobEventType set (CR-14).

Reserved up front (emission progressive). Job-scoped types stay in core.queue.JobEventType; both serialize to plain strings in the journal’s event_type column — the journal is vocabulary-tolerant by design (unknown types round-trip; the P5/P6 tolerant-unknown law).

JournalEvent

One durable record. event_id is GENERATED (events are occurrences, not re-derivable — the stage-5 identity rule’s asserted/decision class), so EventRef(event_id) from cjm-context-graph-primitives anchors graph→journal references. seq is the store-assigned cursor (rowid) — None until appended; the live-tail subscription replays after_seq for exact late-subscriber catch-up.


JournalEvent


def JournalEvent(
    event_type:str, event_id:str=<factory>, ts:datetime=<factory>, run_id:Optional=None, job_id:Optional=None,
    composition_id:Optional=None, node_id:Optional=None, capability_instance_id:Optional=None,
    capability_name:Optional=None, config_hash:Optional=None, task_name:Optional=None, method:Optional=None,
    worker_session_id:Optional=None, actor:Optional=None, worker_reported:bool=False, payload:Dict=<factory>,
    seq:Optional=None
)->None:

One durable observability record (CR-14).

The journal never duplicates what manifests / capability DBs / the graph already record — graph-touching payloads carry REFERENCES (node ids + content hashes, verifiable via the CR-19 machinery), never content. worker_reported=True marks payloads that originated in-worker and rode a wire envelope; the HOST still wrote the row (single-writer-class rule).

JournalStore protocol

The store pattern (config / empirical / secret stores) applied to the missing store. No delete API — the journal is the precious class; retention policy does not apply to the audit record.

The named tension (ratified design #13): append is synchronous, tiny (one WAL INSERT), and LOUD — it never swallows storage failures, because a wedged journal silently dropping the audit trail is the G11 failure mode. Buffering is the evidence-awaited escalation, not the default.


JournalStore


def JournalStore(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Protocol for the durable account-of-action (CR-14).

Implementations MUST raise on append failure (loud, never silent — the audit trail does not degrade quietly) and MUST NOT expose a delete/retention surface (precious class).

LocalJournalStore

SQLite/WAL, per-call connections (matches the sibling stores), schema created lazily. seq = INTEGER PRIMARY KEY rowid. Concurrent host processes sharing one .cjm are WAL-held (the stage-4 concurrent-writer stress precedent).


LocalJournalStore


def LocalJournalStore(
    db_path:Optional=None
):

SQLite-backed default JournalStore (CR-14).

WAL + busy_timeout for multi-process host writers; per-call connections (sibling-store convention). append raises on failure (loud) — callers never wrap it in a silent try/except.


LocalJournalStore.append


def append(
    event:JournalEvent, # Event to persist
)->int: # Store-assigned seq (cursor)

Persist one event; sets and returns event.seq.

LOUD by contract: sqlite errors propagate (the audit trail never degrades silently — ratified design #13). One tiny WAL INSERT; synchronous on purpose (G4: the dispatch fast path must stay predictable; at substrate event volume this is microseconds).


LocalJournalStore.query


def query(
    job_id:Optional=None, # Filter: job correlation
    run_id:Optional=None, # Filter: host-tier run
    composition_id:Optional=None, # Filter: composition
    capability_instance_id:Optional=None, # Filter: instance
    worker_session_id:Optional=None, # Filter: worker session
    event_type:Optional=None, # Filter: one vocabulary value
    after_seq:Optional=None, # Tail cursor: rows with seq > this
    since_ts:Optional=None, # Filter: ts >= (isoformat compare)
    until_ts:Optional=None, # Filter: ts <= (isoformat compare)
    limit:Optional=None, # Max rows
    descending:bool=False, # True = newest first
)->List: # Matching events, seq-ordered

Filtered read; all filters AND-combined.


LocalJournalStore.count


def count(
    event_type:Optional=None, # Optional per-type count
)->int: # Row count

Total journal rows (volume regression checks).


LocalJournalStore.terminal_state_events


def terminal_state_events(
    limit:Optional=None, # Most recent N (None = all)
)->List: # Terminal STATE_TRANSITION rows, newest first

The durable job history (_history migration rider): terminal STATE_TRANSITION rows whose payload carries the job snapshot.

Tests

import tempfile
from pathlib import Path as _P

with tempfile.TemporaryDirectory() as _td:
    store = LocalJournalStore(_P(_td) / 'journal.db')
    ev = JournalEvent(event_type='state_transition', job_id='job-1',
                      capability_instance_id='plug-a',
                      payload={'from': 'pending', 'to': 'running'})
    seq1 = store.append(ev)
    assert seq1 == 1 and ev.seq == 1
    ev2 = JournalEvent(event_type=SubstrateEventType.WORKER_SPAWNED.value,
                       capability_name='plug-a', worker_session_id='ws-1',
                       payload={'pid': 1234})
    seq2 = store.append(ev2)
    assert seq2 == 2
    # round-trip: typed rehydration with tz-aware ts + payload intact
    got = store.query(job_id='job-1')
    assert len(got) == 1 and got[0].event_id == ev.event_id
    assert got[0].ts.tzinfo is not None
    assert got[0].payload == {'from': 'pending', 'to': 'running'}
    assert got[0].seq == 1 and got[0].worker_reported is False
    # cursor semantics: after_seq is the live-tail catch-up
    tail = store.query(after_seq=1)
    assert [e.seq for e in tail] == [2]
    assert store.count() == 2 and store.count(event_type='worker_spawned') == 1
print('journal round-trip OK')
journal round-trip OK
with tempfile.TemporaryDirectory() as _td:
    store = LocalJournalStore(_P(_td) / 'journal.db')
    for jid, to in (('j1', 'running'), ('j1', 'completed'), ('j2', 'running'),
                    ('j2', 'failed'), ('j3', 'running')):
        store.append(JournalEvent(event_type='state_transition', job_id=jid,
                                  payload={'from': 'x', 'to': to,
                                           'job_snapshot': {'id': jid}}))
    term = store.terminal_state_events()
    # newest first; only terminal transitions; snapshots intact
    assert [e.job_id for e in term] == ['j2', 'j1']
    assert term[0].payload['job_snapshot'] == {'id': 'j2'}
    assert [e.job_id for e in store.terminal_state_events(limit=1)] == ['j2']
print('terminal-history query OK')
# Vocabulary tolerance: unknown event types round-trip untouched (P5/P6 law)
with tempfile.TemporaryDirectory() as _td:
    store = LocalJournalStore(_P(_td) / 'journal.db')
    store.append(JournalEvent(event_type='some_future_event_kind',
                              payload={'x': 1}))
    got = store.query(event_type='some_future_event_kind')
    assert len(got) == 1 and got[0].payload == {'x': 1}
# Liveness routing constants match the JobEventType values they exclude
assert LIVENESS_EVENT_TYPES == {'progress_changed', 'resource_snapshot'}
print('tolerant-unknown + routing constants OK')
# Loud-failure contract: append on an unwritable path RAISES (never silent)
_raised = False
try:
    bad = LocalJournalStore(_P('/proc/nonexistent-dir/journal.db'))
    bad.append(JournalEvent(event_type='x'))
except Exception:
    _raised = True
assert _raised, 'journal append must be LOUD on storage failure'
print('loud-failure contract OK')