Diagnostics Store

CR-14 (stage 7): the disposable diagnostic-narrative class. Worker-written structured log records (substrate handler stamps contextvars identity — authors never supply attribution) + the host-pumped raw stream chunks (the zero-cooperation death-rattle floor). Retention is a QUERY, not file mechanics. Design ledger: claude-docs/stage-7-evidence.md.

Record shapes

DiagnosticRecord = one structured self.logger.* call (level + logger name restored — the worker basicConfig format dropped both). StreamChunk = raw subprocess stdout/stderr the host pump captured — honestly attributed to the worker SESSION only (never heuristically to a job; the stage-3 multi-lane interleaving made job-attribution of raw streams structurally unsound).


DiagnosticRecord


def DiagnosticRecord(
    message:str, level:str='INFO', logger_name:str='', ts:datetime=<factory>, worker_session_id:Optional=None,
    job_id:Optional=None, exc_text:Optional=None, seq:Optional=None
)->None:

One structured worker log record (CR-14 diagnostics class).


StreamChunk


def StreamChunk(
    content:str, ts:datetime=<factory>, worker_session_id:Optional=None, stream:str='stdout', seq:Optional=None
)->None:

One raw stdout/stderr line the host pump captured (death-rattle floor).

DiagnosticsStore protocol

Unlike the journal: many writers (every worker + the host pump), retention IS part of the contract (disposable class), and append failures degrade gracefully (a broken diagnostics sink must never take down capability execution — handlers swallow; the journal’s loud rule applies to the journal only).


DiagnosticsStore


def DiagnosticsStore(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

Protocol for the disposable diagnostic-narrative store (CR-14).

LocalDiagnosticsStore


LocalDiagnosticsStore


def LocalDiagnosticsStore(
    db_path:Optional=None
):

SQLite-backed default DiagnosticsStore (CR-14).

Many concurrent writers (workers + the host pump) -> WAL + busy_timeout + per-call connections (no long-held handles; safe from any thread). Disposable class: retention deletes are routine.


LocalDiagnosticsStore.append_record


def append_record(
    record:DiagnosticRecord, # Structured record to persist
)->int: # Store-assigned seq

Persist one structured record.


LocalDiagnosticsStore.append_chunk


def append_chunk(
    chunk:StreamChunk, # Raw stream line to persist
)->int: # Store-assigned seq

Persist one raw stream line.


LocalDiagnosticsStore.query_records


def query_records(
    job_id:Optional=None, # EXACT job correlation (stamped at write)
    worker_session_id:Optional=None, # Session scope
    level:Optional=None, # Level name filter
    after_seq:Optional=None, # Tail cursor
    limit:Optional=None, # Max rows
    descending:bool=False, # True = newest first
)->List: # Matching records, seq-ordered

Filtered structured-record read.


LocalDiagnosticsStore.query_chunks


def query_chunks(
    worker_session_id:Optional=None, # Session scope
    after_seq:Optional=None, # Tail cursor
    limit:Optional=None, # Max rows
    descending:bool=False, # True = newest first
)->List: # Matching chunks, seq-ordered

Raw stream read, session-scoped.


LocalDiagnosticsStore.apply_retention


def apply_retention(
    max_age_days:Optional=None, # Delete rows older than this
    max_total_mb:Optional=None, # Delete oldest rows until DB under budget
)->Dict: # {'records_deleted': n, 'chunks_deleted': m}

Retention as a QUERY (the CR-14 reframe’s mechanical payoff).

Age first, then size: oldest rows (both tables, interleaved by ts) deleted in batches until the DB file is under budget. Safe against concurrent writers (WAL; each batch is its own transaction).

DiagnosticsLogHandler + worker installation

The substrate-installed handler: authors keep calling self.logger.* (derive-from-behavior); the handler stamps the contextvars call identity (the per-call envelope wire.get_call_envelope()) so attribution is EXACT and never author-supplied. Handler failures swallow via handleError — diagnostics are the disposable class; a broken sink must never break capability execution (the journal’s loud rule is the journal’s alone).


DiagnosticsLogHandler


def DiagnosticsLogHandler(
    store:DiagnosticsStore, # Sink (LocalDiagnosticsStore in-process)
    worker_session_id:Optional=None, # Spawn-scoped session id
):

Worker-side logging handler writing DiagnosticRecords (CR-14).

Thread-safe via per-call connections (the worker runs capability execute in an executor thread; contextvars propagate via copy_context at the endpoint). Never raises into application code.


install_worker_diagnostics


def install_worker_diagnostics(
    
)->Optional:

Configure worker-process logging (replaces the old basicConfig).

Env contract (injected by the proxy at spawn): - CJM_DIAGNOSTICS_DB: diagnostics store path -> install the handler. - CJM_WORKER_SESSION_ID: spawn-scoped session id stamped on records. - CJM_LOG_LEVEL: operator level control (default INFO) — the old worker hardcoded INFO with no surface.

Without CJM_DIAGNOSTICS_DB (standalone/dev import) falls back to the pre-CR-14 stdout basicConfig so nothing changes for direct runs. Returns the installed handler (None on fallback).

Stream-line normalization (host pump helper)

The host pump captures raw subprocess output. tqdm renders by rewriting one terminal line with \r frames; flattened into a file/DB those frames were 52% of the whisper log’s bytes. normalize_stream_line keeps each line’s FINAL frame (the 100% state) and drops the spam — liveness telemetry is not durable (ratified design #3).


normalize_stream_line


def normalize_stream_line(
    raw:str, # One decoded line (may contain \r progress frames)
)->Optional: # Final frame, or None when nothing durable remains

Collapse CR progress frames to the final frame; drop empty results.

Tests

import tempfile
from pathlib import Path as _P

with tempfile.TemporaryDirectory() as _td:
    store = LocalDiagnosticsStore(_P(_td) / 'diag.db')
    r = DiagnosticRecord(message='loading model', level='INFO',
                         logger_name='plug.Whisper', worker_session_id='ws-1',
                         job_id='job-9')
    assert store.append_record(r) == 1
    store.append_record(DiagnosticRecord(message='other job line',
                                         worker_session_id='ws-1', job_id='job-10'))
    store.append_chunk(StreamChunk(content='Detected language: English',
                                   worker_session_id='ws-1'))
    # EXACT job correlation — the timestamp-window heuristic's replacement
    got = store.query_records(job_id='job-9')
    assert len(got) == 1 and got[0].logger_name == 'plug.Whisper'
    assert got[0].ts.tzinfo is not None
    chunks = store.query_chunks(worker_session_id='ws-1')
    assert len(chunks) == 1 and 'Detected language' in chunks[0].content
print('diagnostics store OK')
diagnostics store OK
# Handler stamps contextvars identity; never raises into app code
from cjm_substrate.core.wire import CallEnvelope, set_call_envelope, reset_call_envelope

with tempfile.TemporaryDirectory() as _td:
    store = LocalDiagnosticsStore(_P(_td) / 'diag.db')
    handler = DiagnosticsLogHandler(store, worker_session_id='ws-test')
    lg = logging.getLogger('test.cr14.handler')
    lg.setLevel(logging.INFO)
    lg.propagate = False
    lg.addHandler(handler)
    try:
        token = set_call_envelope(CallEnvelope(job_id='job-ctx'))
        try:
            lg.info('inside the call span')
            try:
                raise ValueError('boom')
            except ValueError:
                lg.error('caught', exc_info=True)
        finally:
            reset_call_envelope(token)
        lg.info('outside the span')
    finally:
        lg.removeHandler(handler)
    rows = store.query_records(worker_session_id='ws-test')
    assert [r.job_id for r in rows] == ['job-ctx', 'job-ctx', None]
    assert rows[1].exc_text and 'ValueError: boom' in rows[1].exc_text
    assert rows[0].logger_name == 'test.cr14.handler'
print('handler stamping OK')
handler stamping OK
# Retention as a query: age-based delete leaves newer rows
with tempfile.TemporaryDirectory() as _td:
    store = LocalDiagnosticsStore(_P(_td) / 'diag.db')
    old = DiagnosticRecord(message='ancient',
                           ts=datetime.now(timezone.utc) - timedelta(days=30))
    store.append_record(old)
    store.append_record(DiagnosticRecord(message='fresh'))
    store.append_chunk(StreamChunk(content='ancient chunk',
                                   ts=datetime.now(timezone.utc) - timedelta(days=30)))
    out = store.apply_retention(max_age_days=7)
    assert out == {'records_deleted': 1, 'chunks_deleted': 1}
    left = store.query_records()
    assert len(left) == 1 and left[0].message == 'fresh'
print('retention OK')
# tqdm CR-frame collapse: final frame kept, spam dropped
frames = ' 24%|##4 | 2700/11257\r 49%|####9 | 5528/11257\r100%|##########| 11257/11257'
assert normalize_stream_line(frames) == '100%|##########| 11257/11257'
assert normalize_stream_line('plain line') == 'plain line'
assert normalize_stream_line('   \r  ') is None
print('normalize OK')