CR-14 (stage 7): the disposable diagnostic-narrative class. Worker-written structured log records (substrate handler stamps contextvars identity — authors never supply attribution) + the host-pumped raw stream chunks (the zero-cooperation death-rattle floor). Retention is a QUERY, not file mechanics. Design ledger: claude-docs/stage-7-evidence.md.
Record shapes
DiagnosticRecord = one structured self.logger.* call (level + logger name restored — the worker basicConfig format dropped both). StreamChunk = raw subprocess stdout/stderr the host pump captured — honestly attributed to the worker SESSION only (never heuristically to a job; the stage-3 multi-lane interleaving made job-attribution of raw streams structurally unsound).
One raw stdout/stderr line the host pump captured (death-rattle floor).
DiagnosticsStore protocol
Unlike the journal: many writers (every worker + the host pump), retention IS part of the contract (disposable class), and append failures degrade gracefully (a broken diagnostics sink must never take down capability execution — handlers swallow; the journal’s loud rule applies to the journal only).
Many concurrent writers (workers + the host pump) -> WAL + busy_timeout + per-call connections (no long-held handles; safe from any thread). Disposable class: retention deletes are routine.
LocalDiagnosticsStore.append_record
def append_record( record:DiagnosticRecord, # Structured record to persist)->int: # Store-assigned seq
Persist one structured record.
LocalDiagnosticsStore.append_chunk
def append_chunk( chunk:StreamChunk, # Raw stream line to persist)->int: # Store-assigned seq
def apply_retention( max_age_days:Optional=None, # Delete rows older than this max_total_mb:Optional=None, # Delete oldest rows until DB under budget)->Dict: # {'records_deleted': n, 'chunks_deleted': m}
Retention as a QUERY (the CR-14 reframe’s mechanical payoff).
Age first, then size: oldest rows (both tables, interleaved by ts) deleted in batches until the DB file is under budget. Safe against concurrent writers (WAL; each batch is its own transaction).
DiagnosticsLogHandler + worker installation
The substrate-installed handler: authors keep calling self.logger.* (derive-from-behavior); the handler stamps the contextvars call identity (the per-call envelope wire.get_call_envelope()) so attribution is EXACT and never author-supplied. Handler failures swallow via handleError — diagnostics are the disposable class; a broken sink must never break capability execution (the journal’s loud rule is the journal’s alone).
Thread-safe via per-call connections (the worker runs capability execute in an executor thread; contextvars propagate via copy_context at the endpoint). Never raises into application code.
install_worker_diagnostics
def install_worker_diagnostics()->Optional:
Configure worker-process logging (replaces the old basicConfig).
Env contract (injected by the proxy at spawn): - CJM_DIAGNOSTICS_DB: diagnostics store path -> install the handler. - CJM_WORKER_SESSION_ID: spawn-scoped session id stamped on records. - CJM_LOG_LEVEL: operator level control (default INFO) — the old worker hardcoded INFO with no surface.
Without CJM_DIAGNOSTICS_DB (standalone/dev import) falls back to the pre-CR-14 stdout basicConfig so nothing changes for direct runs. Returns the installed handler (None on fallback).
Stream-line normalization (host pump helper)
The host pump captures raw subprocess output. tqdm renders by rewriting one terminal line with \r frames; flattened into a file/DB those frames were 52% of the whisper log’s bytes. normalize_stream_line keeps each line’s FINAL frame (the 100% state) and drops the spam — liveness telemetry is not durable (ratified design #3).
normalize_stream_line
def normalize_stream_line( raw:str, # One decoded line (may contain \r progress frames))->Optional: # Final frame, or None when nothing durable remains
Collapse CR progress frames to the final frame; drop empty results.
Tests
import tempfilefrom pathlib import Path as _Pwith tempfile.TemporaryDirectory() as _td: store = LocalDiagnosticsStore(_P(_td) /'diag.db') r = DiagnosticRecord(message='loading model', level='INFO', logger_name='plug.Whisper', worker_session_id='ws-1', job_id='job-9')assert store.append_record(r) ==1 store.append_record(DiagnosticRecord(message='other job line', worker_session_id='ws-1', job_id='job-10')) store.append_chunk(StreamChunk(content='Detected language: English', worker_session_id='ws-1'))# EXACT job correlation — the timestamp-window heuristic's replacement got = store.query_records(job_id='job-9')assertlen(got) ==1and got[0].logger_name =='plug.Whisper'assert got[0].ts.tzinfo isnotNone chunks = store.query_chunks(worker_session_id='ws-1')assertlen(chunks) ==1and'Detected language'in chunks[0].contentprint('diagnostics store OK')
diagnostics store OK
# Handler stamps contextvars identity; never raises into app codefrom cjm_substrate.core.wire import CallEnvelope, set_call_envelope, reset_call_envelopewith tempfile.TemporaryDirectory() as _td: store = LocalDiagnosticsStore(_P(_td) /'diag.db') handler = DiagnosticsLogHandler(store, worker_session_id='ws-test') lg = logging.getLogger('test.cr14.handler') lg.setLevel(logging.INFO) lg.propagate =False lg.addHandler(handler)try: token = set_call_envelope(CallEnvelope(job_id='job-ctx'))try: lg.info('inside the call span')try:raiseValueError('boom')exceptValueError: lg.error('caught', exc_info=True)finally: reset_call_envelope(token) lg.info('outside the span')finally: lg.removeHandler(handler) rows = store.query_records(worker_session_id='ws-test')assert [r.job_id for r in rows] == ['job-ctx', 'job-ctx', None]assert rows[1].exc_text and'ValueError: boom'in rows[1].exc_textassert rows[0].logger_name =='test.cr14.handler'print('handler stamping OK')
handler stamping OK
# Retention as a query: age-based delete leaves newer rowswith tempfile.TemporaryDirectory() as _td: store = LocalDiagnosticsStore(_P(_td) /'diag.db') old = DiagnosticRecord(message='ancient', ts=datetime.now(timezone.utc) - timedelta(days=30)) store.append_record(old) store.append_record(DiagnosticRecord(message='fresh')) store.append_chunk(StreamChunk(content='ancient chunk', ts=datetime.now(timezone.utc) - timedelta(days=30))) out = store.apply_retention(max_age_days=7)assert out == {'records_deleted': 1, 'chunks_deleted': 1} left = store.query_records()assertlen(left) ==1and left[0].message =='fresh'print('retention OK')