# Journal Store


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Event vocabulary

`SubstrateEventType` covers journal events OUTSIDE the job-scoped
`JobEventType` set (which stays in `core.queue` — the queue imports this
module, never the reverse). Reserved up front per the
reserve-enum-values discipline; emission is progressive — a value
existing here does NOT imply the substrate emits it yet.

------------------------------------------------------------------------

### SubstrateEventType

``` python

def SubstrateEventType(
    args:VAR_POSITIONAL, kwds:VAR_KEYWORD
):

```

*Journal vocabulary beyond the job-scoped `JobEventType` set (CR-14).*

Reserved up front (emission progressive). Job-scoped types stay in
`core.queue.JobEventType`; both serialize to plain strings in the
journal’s `event_type` column — the journal is vocabulary-tolerant by
design (unknown types round-trip; the P5/P6 tolerant-unknown law).

## `JournalEvent`

One durable record. `event_id` is GENERATED (events are occurrences, not
re-derivable — the stage-5 identity rule’s asserted/decision class), so
`EventRef(event_id)` from `cjm-context-graph-primitives` anchors
graph→journal references. `seq` is the store-assigned cursor (rowid) —
`None` until appended; the live-tail subscription replays `after_seq`
for exact late-subscriber catch-up.

------------------------------------------------------------------------

### JournalEvent

``` python

def JournalEvent(
    event_type:str, event_id:str=<factory>, ts:datetime=<factory>, run_id:Optional=None, job_id:Optional=None,
    composition_id:Optional=None, node_id:Optional=None, capability_instance_id:Optional=None,
    capability_name:Optional=None, config_hash:Optional=None, task_name:Optional=None, method:Optional=None,
    worker_session_id:Optional=None, actor:Optional=None, worker_reported:bool=False, payload:Dict=<factory>,
    seq:Optional=None
)->None:

```

*One durable observability record (CR-14).*

The journal never duplicates what manifests / capability DBs / the graph
already record — graph-touching payloads carry REFERENCES (node ids +
content hashes, verifiable via the CR-19 machinery), never content.
`worker_reported=True` marks payloads that originated in-worker and rode
a wire envelope; the HOST still wrote the row (single-writer-class
rule).

## `JournalStore` protocol

The store pattern (config / empirical / secret stores) applied to the
missing store. **No delete API** — the journal is the precious class;
retention policy does not apply to the audit record.

**The named tension (ratified design \#13):** `append` is synchronous,
tiny (one WAL INSERT), and **LOUD** — it never swallows storage
failures, because a wedged journal silently dropping the audit trail is
the G11 failure mode. Buffering is the evidence-awaited escalation, not
the default.

------------------------------------------------------------------------

### JournalStore

``` python

def JournalStore(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

```

*Protocol for the durable account-of-action (CR-14).*

Implementations MUST raise on append failure (loud, never silent — the
audit trail does not degrade quietly) and MUST NOT expose a
delete/retention surface (precious class).

## `LocalJournalStore`

SQLite/WAL, per-call connections (matches the sibling stores), schema
created lazily. `seq` = `INTEGER PRIMARY KEY` rowid. Concurrent host
processes sharing one `.cjm` are WAL-held (the stage-4 concurrent-writer
stress precedent).

------------------------------------------------------------------------

### LocalJournalStore

``` python

def LocalJournalStore(
    db_path:Optional=None
):

```

*SQLite-backed default `JournalStore` (CR-14).*

WAL + busy_timeout for multi-process host writers; per-call connections
(sibling-store convention). `append` raises on failure (loud) — callers
never wrap it in a silent try/except.

------------------------------------------------------------------------

### LocalJournalStore.append

``` python

def append(
    event:JournalEvent, # Event to persist
)->int: # Store-assigned seq (cursor)

```

*Persist one event; sets and returns `event.seq`.*

LOUD by contract: sqlite errors propagate (the audit trail never
degrades silently — ratified design \#13). One tiny WAL INSERT;
synchronous on purpose (G4: the dispatch fast path must stay
predictable; at substrate event volume this is microseconds).

------------------------------------------------------------------------

### LocalJournalStore.query

``` python

def query(
    job_id:Optional=None, # Filter: job correlation
    run_id:Optional=None, # Filter: host-tier run
    composition_id:Optional=None, # Filter: composition
    capability_instance_id:Optional=None, # Filter: instance
    worker_session_id:Optional=None, # Filter: worker session
    event_type:Optional=None, # Filter: one vocabulary value
    after_seq:Optional=None, # Tail cursor: rows with seq > this
    since_ts:Optional=None, # Filter: ts >= (isoformat compare)
    until_ts:Optional=None, # Filter: ts <= (isoformat compare)
    limit:Optional=None, # Max rows
    descending:bool=False, # True = newest first
)->List: # Matching events, seq-ordered

```

*Filtered read; all filters AND-combined.*

------------------------------------------------------------------------

### LocalJournalStore.count

``` python

def count(
    event_type:Optional=None, # Optional per-type count
)->int: # Row count

```

*Total journal rows (volume regression checks).*

------------------------------------------------------------------------

### LocalJournalStore.terminal_state_events

``` python

def terminal_state_events(
    limit:Optional=None, # Most recent N (None = all)
)->List: # Terminal STATE_TRANSITION rows, newest first

```

*The durable job history (`_history` migration rider): terminal*
STATE_TRANSITION rows whose payload carries the job snapshot.

## Tests

``` python
import tempfile
from pathlib import Path as _P

with tempfile.TemporaryDirectory() as _td:
    store = LocalJournalStore(_P(_td) / 'journal.db')
    ev = JournalEvent(event_type='state_transition', job_id='job-1',
                      capability_instance_id='plug-a',
                      payload={'from': 'pending', 'to': 'running'})
    seq1 = store.append(ev)
    assert seq1 == 1 and ev.seq == 1
    ev2 = JournalEvent(event_type=SubstrateEventType.WORKER_SPAWNED.value,
                       capability_name='plug-a', worker_session_id='ws-1',
                       payload={'pid': 1234})
    seq2 = store.append(ev2)
    assert seq2 == 2
    # round-trip: typed rehydration with tz-aware ts + payload intact
    got = store.query(job_id='job-1')
    assert len(got) == 1 and got[0].event_id == ev.event_id
    assert got[0].ts.tzinfo is not None
    assert got[0].payload == {'from': 'pending', 'to': 'running'}
    assert got[0].seq == 1 and got[0].worker_reported is False
    # cursor semantics: after_seq is the live-tail catch-up
    tail = store.query(after_seq=1)
    assert [e.seq for e in tail] == [2]
    assert store.count() == 2 and store.count(event_type='worker_spawned') == 1
print('journal round-trip OK')
```

    journal round-trip OK

``` python
with tempfile.TemporaryDirectory() as _td:
    store = LocalJournalStore(_P(_td) / 'journal.db')
    for jid, to in (('j1', 'running'), ('j1', 'completed'), ('j2', 'running'),
                    ('j2', 'failed'), ('j3', 'running')):
        store.append(JournalEvent(event_type='state_transition', job_id=jid,
                                  payload={'from': 'x', 'to': to,
                                           'job_snapshot': {'id': jid}}))
    term = store.terminal_state_events()
    # newest first; only terminal transitions; snapshots intact
    assert [e.job_id for e in term] == ['j2', 'j1']
    assert term[0].payload['job_snapshot'] == {'id': 'j2'}
    assert [e.job_id for e in store.terminal_state_events(limit=1)] == ['j2']
print('terminal-history query OK')
```

``` python
# Vocabulary tolerance: unknown event types round-trip untouched (P5/P6 law)
with tempfile.TemporaryDirectory() as _td:
    store = LocalJournalStore(_P(_td) / 'journal.db')
    store.append(JournalEvent(event_type='some_future_event_kind',
                              payload={'x': 1}))
    got = store.query(event_type='some_future_event_kind')
    assert len(got) == 1 and got[0].payload == {'x': 1}
# Liveness routing constants match the JobEventType values they exclude
assert LIVENESS_EVENT_TYPES == {'progress_changed', 'resource_snapshot'}
print('tolerant-unknown + routing constants OK')
```

``` python
# Loud-failure contract: append on an unwritable path RAISES (never silent)
_raised = False
try:
    bad = LocalJournalStore(_P('/proc/nonexistent-dir/journal.db'))
    bad.append(JournalEvent(event_type='x'))
except Exception:
    _raised = True
assert _raised, 'journal append must be LOUD on storage failure'
print('loud-failure contract OK')
```
