Text Processing Storage

Standardized SQLite storage for text processing results with content hashing

TextProcessRow

A dataclass representing a single row in the standardized text_jobs table.


TextProcessRow


def TextProcessRow(
    job_id:str, input_text:str, input_hash:str, spans:Optional=None, metadata:Optional=None,
    created_at:Optional=None
)->None:

A single row from the text_jobs table.

# Test TextProcessRow creation
row = TextProcessRow(
    job_id="job_abc123",
    input_text="Hello world. How are you?",
    input_hash="sha256:" + "a" * 64,
    spans=[
        {"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
        {"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
    ],
    metadata={"processor": "nltk"}
)

print(f"Row: job_id={row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")
Row: job_id=job_abc123
Input: Hello world. How are you?
Spans: 2 spans

TextProcessStorage

Standardized SQLite storage that all text processing plugins should use. Defines the canonical schema for the text_jobs table with input hashing for traceability.

Schema:

CREATE TABLE IF NOT EXISTS text_jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    job_id TEXT UNIQUE NOT NULL,
    input_text TEXT NOT NULL,
    input_hash TEXT NOT NULL,
    spans JSON,
    metadata JSON,
    created_at REAL NOT NULL
);

The input_hash column stores a hash of the input text in "algo:hexdigest" format, enabling downstream consumers to verify that the source text hasn’t changed since processing.


TextProcessStorage


def TextProcessStorage(
    db_path:str, # Absolute path to the SQLite database file
):

Standardized SQLite storage for text processing results.

Testing

import tempfile
import os

# Create storage with temp database
tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
storage = TextProcessStorage(tmp_db.name)

print(f"Storage initialized at: {tmp_db.name}")
Storage initialized at: /tmp/tmpl7m8pnck.db
# Save a text processing result
test_text = "Hello world. How are you?"
input_hash = hash_bytes(test_text.encode())

storage.save(
    job_id="job_test_001",
    input_text=test_text,
    input_hash=input_hash,
    spans=[
        {"text": "Hello world.", "start_char": 0, "end_char": 12, "label": "sentence"},
        {"text": "How are you?", "start_char": 13, "end_char": 25, "label": "sentence"}
    ],
    metadata={"processor": "nltk", "language": "english"}
)

print(f"Saved job_test_001")
print(f"Input hash: {input_hash}")
Saved job_test_001
Input hash: sha256:1d473b202b6fea30ab890b153d9d5fa3a79830a7bdb6d662581a95bda1a57866
# Retrieve by job ID
row = storage.get_by_job_id("job_test_001")
assert row is not None
assert row.job_id == "job_test_001"
assert row.input_text == test_text
assert row.input_hash == input_hash
assert len(row.spans) == 2
assert row.metadata["processor"] == "nltk"
assert row.created_at is not None

print(f"Retrieved: {row.job_id}")
print(f"Input: {row.input_text}")
print(f"Spans: {len(row.spans)} spans")
print(f"Input hash: {row.input_hash[:30]}...")

# Missing job returns None
assert storage.get_by_job_id("nonexistent") is None
print("get_by_job_id returns None for missing job: OK")
Retrieved: job_test_001
Input: Hello world. How are you?
Spans: 2 spans
Input hash: sha256:1d473b202b6fea30ab890b1...
get_by_job_id returns None for missing job: OK
# Save another and test list_jobs
storage.save(
    job_id="job_test_002",
    input_text="Second text.",
    input_hash=hash_bytes(b"Second text."),
    spans=[{"text": "Second text.", "start_char": 0, "end_char": 12, "label": "sentence"}]
)

jobs = storage.list_jobs()
assert len(jobs) == 2
assert jobs[0].job_id == "job_test_002"  # Newest first

print(f"list_jobs returned {len(jobs)} rows: {[j.job_id for j in jobs]}")
list_jobs returned 2 rows: ['job_test_002', 'job_test_001']
# Test input verification
assert storage.verify_input("job_test_001") == True
print("verify_input with unchanged text: True")

# Tamper with input text directly in DB
with sqlite3.connect(tmp_db.name) as con:
    con.execute("UPDATE text_jobs SET input_text = 'TAMPERED' WHERE job_id = 'job_test_001'")

assert storage.verify_input("job_test_001") == False
print("verify_input after tampering: False")

# Missing job returns None
assert storage.verify_input("nonexistent") is None
print("verify_input for missing job: None")
verify_input with unchanged text: True
verify_input after tampering: False
verify_input for missing job: None
# Cleanup
os.unlink(tmp_db.name)
print("Cleanup complete")
Cleanup complete