Text Processing Storage

Standardized SQLite storage for text processing results with content hashing

TextProcessRow

A dataclass representing a single row in the standardized text_jobs table.


TextProcessRow


def TextProcessRow(
    job_id:str, input_text:str, input_hash:str, config_hash:str, spans:Optional=None, metadata:Optional=None,
    created_at:Optional=None
)->None:

A single row from the text_jobs table.

# Test TextProcessRow creation
row = TextProcessRow(
    job_id="job_abc123",
    input_text="Hello world. How are you?",
    input_hash="sha256:" + "a" * 64,
    config_hash="sha256:" + "f" * 64,
    spans=[
        {"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
        {"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
    ],
    metadata={"processor": "nltk"}
)

print(f"Row: job_id={row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")

TextProcessStorage

Standardized SQLite storage that all text processing plugins should use. Defines the canonical schema for the text_jobs table with input hashing for traceability and config-based caching.

Schema:

CREATE TABLE IF NOT EXISTS text_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    input_text TEXT NOT NULL,
    input_hash TEXT NOT NULL,
    config_hash TEXT NOT NULL DEFAULT '',
    spans JSON,
    metadata JSON,
    created_at REAL NOT NULL
);

CREATE UNIQUE INDEX idx_text_jobs_cache
    ON text_jobs(input_hash, config_hash);

Text inputs are passed by value, so the input content hash is the input identity — the (input_hash, config_hash) unique index is the cache/upsert key. get_cached(input_hash, config_hash) is therefore content-correct on its own (no separate file-freshness check needed); re-processing the same text with the same config replaces the previous row via INSERT OR REPLACE, and different text (different input_hash) is a separate row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.

The input_hash column stores a hash of the input text in "algo:hexdigest" format, enabling downstream consumers to verify that the source text hasn’t changed since processing.


TextProcessStorage


def TextProcessStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for text processing results.

Testing

import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = TextProcessStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")
Storage initialized at: /tmp/tmp50hbpibj.db
# Save a text processing result
test_text = "Hello world. How are you?"
input_hash = hash_bytes(test_text.encode())
cfg1 = "sha256:" + "1" * 64   # Simulated config hash

storage.save(
    job_id="job_test_001",
    input_text=test_text,
    input_hash=input_hash,
    config_hash=cfg1,
    spans=[
        {"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
        {"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
    ],
    metadata={"processor": "nltk", "language": "english"}
)

print(f"Saved job_test_001")
print(f"Input hash: {input_hash}")
# Retrieve by job ID
row = storage.get_by_job_id("job_test_001")
assert row is not None
assert row.job_id == "job_test_001"
assert row.input_text == test_text
assert row.input_hash == input_hash
assert len(row.spans) == 2
assert row.metadata["processor"] == "nltk"
assert row.created_at is not None

print(f"Retrieved: {row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")
print(f"Input hash: {row.input_hash[:30]}...")

# Missing job returns None
assert storage.get_by_job_id("nonexistent") is None
print("get_by_job_id returns None for missing job: OK")
Retrieved: job_test_001
Input: Hello world. How are you?
Spans: 2 spans
Input hash: sha256:1d473b202b6fea30ab890b1...
get_by_job_id returns None for missing job: OK
# Save another and test list_jobs
storage.save(
    job_id="job_test_002",
    input_text="Second text.",
    input_hash=hash_bytes(b"Second text."),
    config_hash="sha256:" + "2" * 64,
    spans=[{"text": "Second text.", "start_char": 0, "end_char": 12, "label": "sentence"}]
)

jobs = storage.list_jobs()
assert len(jobs) == 2
assert jobs[0].job_id == "job_test_002"  # Newest first

print(f"list_jobs returned {len(jobs)} rows: {[j.job_id for j in jobs]}")
# get_cached: content-correct by (input_hash, config_hash) — text is the input identity
hit = storage.get_cached(input_hash, cfg1)
assert hit is not None and hit.job_id == "job_test_001"
# Different config -> miss
assert storage.get_cached(input_hash, "sha256:" + "0" * 64) is None
# Different text (different input_hash) -> miss
assert storage.get_cached("sha256:" + "9" * 64, cfg1) is None
print("TextProcessStorage.get_cached hit + 2 miss cases: OK")
# INSERT OR REPLACE: re-processing the same (input_hash, config_hash) replaces the row.
tmp_db_repl = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
s2 = TextProcessStorage(tmp_db_repl.name)
ih = hash_bytes(b"same text")
s2.save(job_id="r1", input_text="same text", input_hash=ih,
        config_hash="sha256:" + "c" * 64, spans=[{"text": "v1"}])
# Same (input_hash, config) replaces
s2.save(job_id="r2", input_text="same text", input_hash=ih,
        config_hash="sha256:" + "c" * 64, spans=[{"text": "v2"}])
assert [j.job_id for j in s2.list_jobs()] == ["r2"]
# Different text (different input_hash) -> separate row
s2.save(job_id="r3", input_text="other text", input_hash=hash_bytes(b"other text"),
        config_hash="sha256:" + "c" * 64)
assert len(s2.list_jobs()) == 2
assert s2.get_cached(ih, "sha256:" + "c" * 64).job_id == "r2"
os.unlink(tmp_db_repl.name)

print("INSERT OR REPLACE on (input_hash, config_hash) — stale row replaced, no orphan: OK")
# save_with_logging (T29): success returns True + persists
test_logger = logging.getLogger("text_storage_test")
ok = storage.save_with_logging(
    job_id="job_swl", input_text="logged", input_hash=hash_bytes(b"logged"),
    config_hash="sha256:" + "3" * 64, spans=[{"text": "logged"}], logger=test_logger,
)
assert ok is True and storage.get_by_job_id("job_swl") is not None
print("save_with_logging success path: returned True, row persisted")

# Failure path: if save() raises, the helper logs + returns False (swallowed).
_orig_save = storage.save
def _boom(**kwargs):
    raise RuntimeError("simulated DB failure")
storage.save = _boom
try:
    ok_fail = storage.save_with_logging(
        job_id="job_fail", input_text="x", input_hash="sha256:" + "9" * 64,
        config_hash="sha256:" + "9" * 64, logger=test_logger,
    )
finally:
    storage.save = _orig_save
assert ok_fail is False
print("save_with_logging failure path: returned False, error swallowed")
# Migration + de-dup: a pre-cache text_jobs table (no config_hash, append-only with
# duplicate-input rows) gains config_hash on open AND de-dups so the UNIQUE cache
# index can build (keep newest per (input_hash, config_hash)).
tmp_db_mig = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
with sqlite3.connect(tmp_db_mig.name) as con:
    con.execute("""
        CREATE TABLE text_jobs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            job_id TEXT UNIQUE NOT NULL,
            input_text TEXT NOT NULL,
            input_hash TEXT NOT NULL,
            spans JSON,
            metadata JSON,
            created_at REAL NOT NULL
        )
    """)
    # Two rows for the SAME input (collide post-migration) + one distinct row.
    con.executemany(
        "INSERT INTO text_jobs (job_id, input_text, input_hash, created_at) VALUES (?, ?, ?, ?)",
        [("dup_old", "same text", "sha256:dup", 0.0),
         ("dup_new", "same text", "sha256:dup", 1.0),
         ("uniq", "other text", "sha256:uniq", 2.0)],
    )

s3 = TextProcessStorage(tmp_db_mig.name)  # __init__ migrates + de-dups + builds the unique index
with sqlite3.connect(tmp_db_mig.name) as con:
    cols = {r[1] for r in con.execute("PRAGMA table_info(text_jobs)")}
assert "config_hash" in cols
# De-dup kept the newest (max id) of the colliding pair, dropped the older.
assert s3.get_by_job_id("dup_old") is None       # older duplicate dropped
assert s3.get_by_job_id("dup_new") is not None    # newest kept
assert s3.get_by_job_id("uniq") is not None        # distinct row kept
assert len(s3.list_jobs()) == 2

# A real save + get_cached works post-migration
s3.save(job_id="new", input_text="new text", input_hash=hash_bytes(b"new text"), config_hash="sha256:cfg")
assert s3.get_cached(hash_bytes(b"new text"), "sha256:cfg").job_id == "new"
os.unlink(tmp_db_mig.name)

print("Pre-cache schema migrated + de-duped (UNIQUE index built; newest row kept): OK")
# Test input verification
assert storage.verify_input("job_test_001") == True
print("verify_input with unchanged text: True")

# Tamper with input text directly in DB
with sqlite3.connect(tmp_db.name) as con:
    con.execute("UPDATE text_jobs SET input_text = 'TAMPERED' WHERE job_id = 'job_test_001'")

assert storage.verify_input("job_test_001") == False
print("verify_input after tampering: False")

# Missing job returns None
assert storage.verify_input("nonexistent") is None
print("verify_input for missing job: None")
verify_input with unchanged text: True
verify_input after tampering: False
verify_input for missing job: None
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")
Cleanup complete