# Test TextProcessRow creation
row = TextProcessRow(
job_id="job_abc123",
input_text="Hello world. How are you?",
input_hash="sha256:" + "a" * 64,
config_hash="sha256:" + "f" * 64,
spans=[
{"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
{"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
],
metadata={"processor": "nltk"}
)
print(f"Row: job_id={row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")Text Processing Storage
TextProcessRow
A dataclass representing a single row in the standardized text_jobs table.
TextProcessRow
def TextProcessRow(
job_id:str, input_text:str, input_hash:str, config_hash:str, spans:Optional=None, metadata:Optional=None,
created_at:Optional=None
)->None:
A single row from the text_jobs table.
TextProcessStorage
Standardized SQLite storage that all text processing plugins should use. Defines the canonical schema for the text_jobs table with input hashing for traceability and config-based caching.
Schema:
CREATE TABLE IF NOT EXISTS text_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT UNIQUE NOT NULL,
input_text TEXT NOT NULL,
input_hash TEXT NOT NULL,
config_hash TEXT NOT NULL DEFAULT '',
spans JSON,
metadata JSON,
created_at REAL NOT NULL
);
CREATE UNIQUE INDEX idx_text_jobs_cache
ON text_jobs(input_hash, config_hash);Text inputs are passed by value, so the input content hash is the input identity — the (input_hash, config_hash) unique index is the cache/upsert key. get_cached(input_hash, config_hash) is therefore content-correct on its own (no separate file-freshness check needed); re-processing the same text with the same config replaces the previous row via INSERT OR REPLACE, and different text (different input_hash) is a separate row. config_hash is added to pre-existing tables via an idempotent ALTER TABLE migration in __init__.
The input_hash column stores a hash of the input text in "algo:hexdigest" format, enabling downstream consumers to verify that the source text hasn’t changed since processing.
TextProcessStorage
def TextProcessStorage(
db_path:str, # Absolute path to the SQLite database file
):
Standardized SQLite storage for text processing results.
Testing
import tempfile
import os
# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = TextProcessStorage(tmp_db.name)
print(f"Storage initialized at: {tmp_db.name}")Storage initialized at: /tmp/tmp50hbpibj.db
# Save a text processing result
test_text = "Hello world. How are you?"
input_hash = hash_bytes(test_text.encode())
cfg1 = "sha256:" + "1" * 64 # Simulated config hash
storage.save(
job_id="job_test_001",
input_text=test_text,
input_hash=input_hash,
config_hash=cfg1,
spans=[
{"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
{"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
],
metadata={"processor": "nltk", "language": "english"}
)
print(f"Saved job_test_001")
print(f"Input hash: {input_hash}")# Retrieve by job ID
row = storage.get_by_job_id("job_test_001")
assert row is not None
assert row.job_id == "job_test_001"
assert row.input_text == test_text
assert row.input_hash == input_hash
assert len(row.spans) == 2
assert row.metadata["processor"] == "nltk"
assert row.created_at is not None
print(f"Retrieved: {row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")
print(f"Input hash: {row.input_hash[:30]}...")
# Missing job returns None
assert storage.get_by_job_id("nonexistent") is None
print("get_by_job_id returns None for missing job: OK")Retrieved: job_test_001
Input: Hello world. How are you?
Spans: 2 spans
Input hash: sha256:1d473b202b6fea30ab890b1...
get_by_job_id returns None for missing job: OK
# Save another and test list_jobs
storage.save(
job_id="job_test_002",
input_text="Second text.",
input_hash=hash_bytes(b"Second text."),
config_hash="sha256:" + "2" * 64,
spans=[{"text": "Second text.", "start_char": 0, "end_char": 12, "label": "sentence"}]
)
jobs = storage.list_jobs()
assert len(jobs) == 2
assert jobs[0].job_id == "job_test_002" # Newest first
print(f"list_jobs returned {len(jobs)} rows: {[j.job_id for j in jobs]}")# get_cached: content-correct by (input_hash, config_hash) — text is the input identity
hit = storage.get_cached(input_hash, cfg1)
assert hit is not None and hit.job_id == "job_test_001"
# Different config -> miss
assert storage.get_cached(input_hash, "sha256:" + "0" * 64) is None
# Different text (different input_hash) -> miss
assert storage.get_cached("sha256:" + "9" * 64, cfg1) is None
print("TextProcessStorage.get_cached hit + 2 miss cases: OK")# INSERT OR REPLACE: re-processing the same (input_hash, config_hash) replaces the row.
tmp_db_repl = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
s2 = TextProcessStorage(tmp_db_repl.name)
ih = hash_bytes(b"same text")
s2.save(job_id="r1", input_text="same text", input_hash=ih,
config_hash="sha256:" + "c" * 64, spans=[{"text": "v1"}])
# Same (input_hash, config) replaces
s2.save(job_id="r2", input_text="same text", input_hash=ih,
config_hash="sha256:" + "c" * 64, spans=[{"text": "v2"}])
assert [j.job_id for j in s2.list_jobs()] == ["r2"]
# Different text (different input_hash) -> separate row
s2.save(job_id="r3", input_text="other text", input_hash=hash_bytes(b"other text"),
config_hash="sha256:" + "c" * 64)
assert len(s2.list_jobs()) == 2
assert s2.get_cached(ih, "sha256:" + "c" * 64).job_id == "r2"
os.unlink(tmp_db_repl.name)
print("INSERT OR REPLACE on (input_hash, config_hash) — stale row replaced, no orphan: OK")# save_with_logging (T29): success returns True + persists
test_logger = logging.getLogger("text_storage_test")
ok = storage.save_with_logging(
job_id="job_swl", input_text="logged", input_hash=hash_bytes(b"logged"),
config_hash="sha256:" + "3" * 64, spans=[{"text": "logged"}], logger=test_logger,
)
assert ok is True and storage.get_by_job_id("job_swl") is not None
print("save_with_logging success path: returned True, row persisted")
# Failure path: if save() raises, the helper logs + returns False (swallowed).
_orig_save = storage.save
def _boom(**kwargs):
raise RuntimeError("simulated DB failure")
storage.save = _boom
try:
ok_fail = storage.save_with_logging(
job_id="job_fail", input_text="x", input_hash="sha256:" + "9" * 64,
config_hash="sha256:" + "9" * 64, logger=test_logger,
)
finally:
storage.save = _orig_save
assert ok_fail is False
print("save_with_logging failure path: returned False, error swallowed")# Migration + de-dup: a pre-cache text_jobs table (no config_hash, append-only with
# duplicate-input rows) gains config_hash on open AND de-dups so the UNIQUE cache
# index can build (keep newest per (input_hash, config_hash)).
tmp_db_mig = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
with sqlite3.connect(tmp_db_mig.name) as con:
con.execute("""
CREATE TABLE text_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT UNIQUE NOT NULL,
input_text TEXT NOT NULL,
input_hash TEXT NOT NULL,
spans JSON,
metadata JSON,
created_at REAL NOT NULL
)
""")
# Two rows for the SAME input (collide post-migration) + one distinct row.
con.executemany(
"INSERT INTO text_jobs (job_id, input_text, input_hash, created_at) VALUES (?, ?, ?, ?)",
[("dup_old", "same text", "sha256:dup", 0.0),
("dup_new", "same text", "sha256:dup", 1.0),
("uniq", "other text", "sha256:uniq", 2.0)],
)
s3 = TextProcessStorage(tmp_db_mig.name) # __init__ migrates + de-dups + builds the unique index
with sqlite3.connect(tmp_db_mig.name) as con:
cols = {r[1] for r in con.execute("PRAGMA table_info(text_jobs)")}
assert "config_hash" in cols
# De-dup kept the newest (max id) of the colliding pair, dropped the older.
assert s3.get_by_job_id("dup_old") is None # older duplicate dropped
assert s3.get_by_job_id("dup_new") is not None # newest kept
assert s3.get_by_job_id("uniq") is not None # distinct row kept
assert len(s3.list_jobs()) == 2
# A real save + get_cached works post-migration
s3.save(job_id="new", input_text="new text", input_hash=hash_bytes(b"new text"), config_hash="sha256:cfg")
assert s3.get_cached(hash_bytes(b"new text"), "sha256:cfg").job_id == "new"
os.unlink(tmp_db_mig.name)
print("Pre-cache schema migrated + de-duped (UNIQUE index built; newest row kept): OK")# Test input verification
assert storage.verify_input("job_test_001") == True
print("verify_input with unchanged text: True")
# Tamper with input text directly in DB
with sqlite3.connect(tmp_db.name) as con:
con.execute("UPDATE text_jobs SET input_text = 'TAMPERED' WHERE job_id = 'job_test_001'")
assert storage.verify_input("job_test_001") == False
print("verify_input after tampering: False")
# Missing job returns None
assert storage.verify_input("nonexistent") is None
print("verify_input for missing job: None")verify_input with unchanged text: True
verify_input after tampering: False
verify_input for missing job: None
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")Cleanup complete