# Diagnostics Store


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Record shapes

`DiagnosticRecord` = one structured `self.logger.*` call (level + logger
name restored — the worker `basicConfig` format dropped both).
`StreamChunk` = raw subprocess stdout/stderr the host pump captured —
honestly attributed to the worker SESSION only (never heuristically to a
job; the stage-3 multi-lane interleaving made job-attribution of raw
streams structurally unsound).

------------------------------------------------------------------------

### DiagnosticRecord

``` python

def DiagnosticRecord(
    message:str, level:str='INFO', logger_name:str='', ts:datetime=<factory>, worker_session_id:Optional=None,
    job_id:Optional=None, exc_text:Optional=None, seq:Optional=None
)->None:

```

*One structured worker log record (CR-14 diagnostics class).*

------------------------------------------------------------------------

### StreamChunk

``` python

def StreamChunk(
    content:str, ts:datetime=<factory>, worker_session_id:Optional=None, stream:str='stdout', seq:Optional=None
)->None:

```

*One raw stdout/stderr line the host pump captured (death-rattle
floor).*

## `DiagnosticsStore` protocol

Unlike the journal: many writers (every worker + the host pump),
retention IS part of the contract (disposable class), and append
failures degrade gracefully (a broken diagnostics sink must never take
down capability execution — handlers swallow; the journal’s loud rule
applies to the journal only).

------------------------------------------------------------------------

### DiagnosticsStore

``` python

def DiagnosticsStore(
    args:VAR_POSITIONAL, kwargs:VAR_KEYWORD
):

```

*Protocol for the disposable diagnostic-narrative store (CR-14).*

## `LocalDiagnosticsStore`

------------------------------------------------------------------------

### LocalDiagnosticsStore

``` python

def LocalDiagnosticsStore(
    db_path:Optional=None
):

```

*SQLite-backed default `DiagnosticsStore` (CR-14).*

Many concurrent writers (workers + the host pump) -\> WAL +
busy_timeout + per-call connections (no long-held handles; safe from any
thread). Disposable class: retention deletes are routine.

------------------------------------------------------------------------

### LocalDiagnosticsStore.append_record

``` python

def append_record(
    record:DiagnosticRecord, # Structured record to persist
)->int: # Store-assigned seq

```

*Persist one structured record.*

------------------------------------------------------------------------

### LocalDiagnosticsStore.append_chunk

``` python

def append_chunk(
    chunk:StreamChunk, # Raw stream line to persist
)->int: # Store-assigned seq

```

*Persist one raw stream line.*

------------------------------------------------------------------------

### LocalDiagnosticsStore.query_records

``` python

def query_records(
    job_id:Optional=None, # EXACT job correlation (stamped at write)
    worker_session_id:Optional=None, # Session scope
    level:Optional=None, # Level name filter
    after_seq:Optional=None, # Tail cursor
    limit:Optional=None, # Max rows
    descending:bool=False, # True = newest first
)->List: # Matching records, seq-ordered

```

*Filtered structured-record read.*

------------------------------------------------------------------------

### LocalDiagnosticsStore.query_chunks

``` python

def query_chunks(
    worker_session_id:Optional=None, # Session scope
    after_seq:Optional=None, # Tail cursor
    limit:Optional=None, # Max rows
    descending:bool=False, # True = newest first
)->List: # Matching chunks, seq-ordered

```

*Raw stream read, session-scoped.*

------------------------------------------------------------------------

### LocalDiagnosticsStore.apply_retention

``` python

def apply_retention(
    max_age_days:Optional=None, # Delete rows older than this
    max_total_mb:Optional=None, # Delete oldest rows until DB under budget
)->Dict: # {'records_deleted': n, 'chunks_deleted': m}

```

*Retention as a QUERY (the CR-14 reframe’s mechanical payoff).*

Age first, then size: oldest rows (both tables, interleaved by ts)
deleted in batches until the DB file is under budget. Safe against
concurrent writers (WAL; each batch is its own transaction).

## `DiagnosticsLogHandler` + worker installation

The substrate-installed handler: authors keep calling `self.logger.*`
(derive-from-behavior); the handler stamps the contextvars call identity
(the per-call envelope `wire.get_call_envelope()`) so attribution is
EXACT and never author-supplied. Handler failures swallow via
`handleError` — diagnostics are the disposable class; a broken sink must
never break capability execution (the journal’s loud rule is the
journal’s alone).

------------------------------------------------------------------------

### DiagnosticsLogHandler

``` python

def DiagnosticsLogHandler(
    store:DiagnosticsStore, # Sink (LocalDiagnosticsStore in-process)
    worker_session_id:Optional=None, # Spawn-scoped session id
):

```

*Worker-side logging handler writing `DiagnosticRecord`s (CR-14).*

Thread-safe via per-call connections (the worker runs capability execute
in an executor thread; contextvars propagate via copy_context at the
endpoint). Never raises into application code.

------------------------------------------------------------------------

### install_worker_diagnostics

``` python

def install_worker_diagnostics(
    
)->Optional:

```

*Configure worker-process logging (replaces the old `basicConfig`).*

Env contract (injected by the proxy at spawn): - `CJM_DIAGNOSTICS_DB`:
diagnostics store path -\> install the handler. -
`CJM_WORKER_SESSION_ID`: spawn-scoped session id stamped on records. -
`CJM_LOG_LEVEL`: operator level control (default INFO) — the old worker
hardcoded INFO with no surface.

Without `CJM_DIAGNOSTICS_DB` (standalone/dev import) falls back to the
pre-CR-14 stdout `basicConfig` so nothing changes for direct runs.
Returns the installed handler (None on fallback).

## Stream-line normalization (host pump helper)

The host pump captures raw subprocess output. tqdm renders by rewriting
one terminal line with `\r` frames; flattened into a file/DB those
frames were 52% of the whisper log’s bytes. `normalize_stream_line`
keeps each line’s FINAL frame (the 100% state) and drops the spam —
liveness telemetry is not durable (ratified design \#3).

------------------------------------------------------------------------

### normalize_stream_line

``` python

def normalize_stream_line(
    raw:str, # One decoded line (may contain \r progress frames)
)->Optional: # Final frame, or None when nothing durable remains

```

*Collapse CR progress frames to the final frame; drop empty results.*

## Tests

``` python
import tempfile
from pathlib import Path as _P

with tempfile.TemporaryDirectory() as _td:
    store = LocalDiagnosticsStore(_P(_td) / 'diag.db')
    r = DiagnosticRecord(message='loading model', level='INFO',
                         logger_name='plug.Whisper', worker_session_id='ws-1',
                         job_id='job-9')
    assert store.append_record(r) == 1
    store.append_record(DiagnosticRecord(message='other job line',
                                         worker_session_id='ws-1', job_id='job-10'))
    store.append_chunk(StreamChunk(content='Detected language: English',
                                   worker_session_id='ws-1'))
    # EXACT job correlation — the timestamp-window heuristic's replacement
    got = store.query_records(job_id='job-9')
    assert len(got) == 1 and got[0].logger_name == 'plug.Whisper'
    assert got[0].ts.tzinfo is not None
    chunks = store.query_chunks(worker_session_id='ws-1')
    assert len(chunks) == 1 and 'Detected language' in chunks[0].content
print('diagnostics store OK')
```

    diagnostics store OK

``` python
# Handler stamps contextvars identity; never raises into app code
from cjm_substrate.core.wire import CallEnvelope, set_call_envelope, reset_call_envelope

with tempfile.TemporaryDirectory() as _td:
    store = LocalDiagnosticsStore(_P(_td) / 'diag.db')
    handler = DiagnosticsLogHandler(store, worker_session_id='ws-test')
    lg = logging.getLogger('test.cr14.handler')
    lg.setLevel(logging.INFO)
    lg.propagate = False
    lg.addHandler(handler)
    try:
        token = set_call_envelope(CallEnvelope(job_id='job-ctx'))
        try:
            lg.info('inside the call span')
            try:
                raise ValueError('boom')
            except ValueError:
                lg.error('caught', exc_info=True)
        finally:
            reset_call_envelope(token)
        lg.info('outside the span')
    finally:
        lg.removeHandler(handler)
    rows = store.query_records(worker_session_id='ws-test')
    assert [r.job_id for r in rows] == ['job-ctx', 'job-ctx', None]
    assert rows[1].exc_text and 'ValueError: boom' in rows[1].exc_text
    assert rows[0].logger_name == 'test.cr14.handler'
print('handler stamping OK')
```

    handler stamping OK

``` python
# Retention as a query: age-based delete leaves newer rows
with tempfile.TemporaryDirectory() as _td:
    store = LocalDiagnosticsStore(_P(_td) / 'diag.db')
    old = DiagnosticRecord(message='ancient',
                           ts=datetime.now(timezone.utc) - timedelta(days=30))
    store.append_record(old)
    store.append_record(DiagnosticRecord(message='fresh'))
    store.append_chunk(StreamChunk(content='ancient chunk',
                                   ts=datetime.now(timezone.utc) - timedelta(days=30)))
    out = store.apply_retention(max_age_days=7)
    assert out == {'records_deleted': 1, 'chunks_deleted': 1}
    left = store.query_records()
    assert len(left) == 1 and left[0].message == 'fresh'
print('retention OK')
```

``` python
# tqdm CR-frame collapse: final frame kept, spam dropped
frames = ' 24%|##4 | 2700/11257\r 49%|####9 | 5528/11257\r100%|##########| 11257/11257'
assert normalize_stream_line(frames) == '100%|##########| 11257/11257'
assert normalize_stream_line('plain line') == 'plain line'
assert normalize_stream_line('   \r  ') is None
print('normalize OK')
```
