source

Source service for federated transcription queries via DuckDB

TranscriptionDBProvider

A SourceProvider implementation for transcription SQLite databases. Each provider instance wraps a single database file (either from a plugin or an external path).


source

TranscriptionDBProvider


def TranscriptionDBProvider(
    db_path:str, # Path to SQLite database file
    name:str, # Display name for this provider
    provider_id:Optional=None, # Unique ID (defaults to db_path)
):

SourceProvider for transcription SQLite databases.

SourceService

This service provides access to transcription data from multiple sources via DuckDB federation. It queries the SQLite databases of loaded transcription plugins without loading the full plugin workers.


source

SourceService


def SourceService(
    plugin_manager:PluginManager, # Plugin manager for discovering plugin sources
    source_categories:List=None, # Plugin categories to query (default: ['transcription'])
    external_paths:List=None, # External database paths
):

Service for federated access to content sources via providers.

External Database Validation


source

validate_and_toggle_external_db


def validate_and_toggle_external_db(
    source_service:SourceService, # Source service for duplicate detection
    path:str, # Path to the .db file
    external_paths:List, # Current external database paths
    valid_extensions:List=None, # Valid file extensions (default: VALID_DB_EXTENSIONS)
)->Tuple: # (updated_paths, error_message or None)

Validate and toggle an external database path in the external paths list.

Tests

The following cells demonstrate the provider architecture. TranscriptionDBProvider can be created from plugin metadata or external paths. SourceService manages multiple providers and aggregates queries.

# Test TranscriptionDBProvider directly
from pathlib import Path

# Path to existing transcription database
whisper_db = "/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db"

# Create provider from external path
provider = TranscriptionDBProvider.from_external_path(whisper_db)
print(f"Provider: {provider.provider_name}")
print(f"  ID: {provider.provider_id}")
print(f"  Type: {provider.provider_type}")
print(f"  Available: {provider.is_available()}")
Provider: External: whisper_transcriptions
  ID: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  Type: transcription_db
  Available: True
# Test validate_schema on valid transcription database
is_valid, error_msg = provider.validate_schema()
print(f"Schema validation: {is_valid}")
if not is_valid:
    print(f"  Error: {error_msg}")
else:
    print(f"  Required columns found: {TranscriptionDBProvider.REQUIRED_COLUMNS}")
Schema validation: True
  Required columns found: frozenset({'job_id', 'text', 'created_at'})
# Test validate_schema on database with unsupported schema (e.g., VAD database)
vad_db = "/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-media-plugin-silero-vad/vad_jobs.db"
vad_provider = TranscriptionDBProvider.from_external_path(vad_db)
if vad_provider:
    is_valid, error_msg = vad_provider.validate_schema()
    print(f"VAD database schema validation: {is_valid}")
    print(f"  Error: {error_msg}")
VAD database schema validation: False
  Error: Missing 'transcriptions' table
# Test provider query_records
records = provider.query_records(limit=3)
print(f"Found {len(records)} records from provider")

for rec in records:
    print(f"\n  record_id: {rec['record_id']}")
    print(f"  provider_id: {rec['provider_id']}")
    print(f"  text preview: {rec['text'][:60]}...")
Found 3 records from provider

  record_id: job_9065e18d
  provider_id: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  text preview: Laying Plans Sun Tzu said, The art of war is of vital import...

  record_id: job_a0b718cc
  provider_id: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  text preview: Laying Plans Sun Tzu said, The art of war is of vital import...

  record_id: job_3cbae8d3
  provider_id: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  text preview: Laying Plans Sun Tzu said, The art of war is of vital import...
# Test provider get_source_block
if records:
    block = provider.get_source_block(records[0]['record_id'])
    if block:
        print(f"Retrieved SourceBlock:")
        print(f"  id: {block.id}")
        print(f"  provider_id: {block.provider_id}")
        print(f"  text length: {len(block.text)} chars")
Retrieved SourceBlock:
  id: job_9065e18d
  provider_id: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  text length: 3402 chars
# Test SourceService with providers (backward-compatible API)
from cjm_plugin_system.core.manager import PluginManager

# Calculate project root from notebook location
project_root = Path.cwd().parent.parent
manifests_dir = project_root / ".cjm" / "manifests"

# Create plugin manager
manager = PluginManager(search_paths=[manifests_dir])
manager.discover_manifests()
print(f"Discovered {len(manager.discovered)} plugins")

# Initialize service with external path (backward compatible)
source_service = SourceService(
    plugin_manager=manager,
    external_paths=[whisper_db]
)

# Check providers
providers = source_service.get_providers()
print(f"\nRegistered providers: {len(providers)}")
for p in providers:
    print(f"  - {p.provider_name} ({p.provider_type})")
[PluginManager] Discovered manifest: cjm-transcription-plugin-voxtral-hf from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-fasthtml-workflow-transcript-decomp/.cjm/manifests/cjm-transcription-plugin-voxtral-hf.json
[PluginManager] Discovered manifest: cjm-system-monitor-nvidia from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-fasthtml-workflow-transcript-decomp/.cjm/manifests/cjm-system-monitor-nvidia.json
[PluginManager] Discovered manifest: cjm-transcription-plugin-whisper from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-fasthtml-workflow-transcript-decomp/.cjm/manifests/cjm-transcription-plugin-whisper.json
[PluginManager] Discovered manifest: cjm-media-plugin-silero-vad from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-fasthtml-workflow-transcript-decomp/.cjm/manifests/cjm-media-plugin-silero-vad.json
[PluginManager] Discovered manifest: cjm-graph-plugin-sqlite from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-fasthtml-workflow-transcript-decomp/.cjm/manifests/cjm-graph-plugin-sqlite.json
[PluginManager] Discovered manifest: cjm-text-plugin-nltk from /mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-fasthtml-workflow-transcript-decomp/.cjm/manifests/cjm-text-plugin-nltk.json
Discovered 6 plugins

Registered providers: 1
  - External: whisper_transcriptions (transcription_db)
# Test has_provider_for_path for duplicate detection
has_dup, existing_name = source_service.has_provider_for_path(whisper_db)
print(f"Checking if whisper_db is already loaded:")
print(f"  Has duplicate: {has_dup}")
print(f"  Existing provider: {existing_name}")

# Test with a path not yet added
random_path = "/some/random/path.db"
has_dup2, existing_name2 = source_service.has_provider_for_path(random_path)
print(f"\nChecking random path:")
print(f"  Has duplicate: {has_dup2}")
Checking if whisper_db is already loaded:
  Has duplicate: True
  Existing provider: External: whisper_transcriptions

Checking random path:
  Has duplicate: False
# Test query_transcriptions (backward-compatible method)
records = source_service.query_transcriptions(limit=3)
print(f"Found {len(records)} transcription records via SourceService")

for rec in records:
    print(f"\n  record_id: {rec['record_id']}")
    print(f"  provider_id: {rec['provider_id']}")
    print(f"  text preview: {rec['text'][:60]}...")
Found 3 transcription records via SourceService

  record_id: job_9065e18d
  provider_id: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  text preview: Laying Plans Sun Tzu said, The art of war is of vital import...

  record_id: job_a0b718cc
  provider_id: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  text preview: Laying Plans Sun Tzu said, The art of war is of vital import...

  record_id: job_3cbae8d3
  provider_id: external:/mnt/SN850X_8TB_EXT4/Projects/GitHub/cj-mills/cjm-plugin-system/.cjm/data/cjm-transcription-plugin-whisper/whisper_transcriptions.db
  text preview: Laying Plans Sun Tzu said, The art of war is of vital import...
# Test get_source_blocks with multiple selections
if len(records) >= 2:
    selections = [
        {'record_id': records[0]['record_id'], 'provider_id': records[0]['provider_id']},
        {'record_id': records[1]['record_id'], 'provider_id': records[1]['provider_id']}
    ]
    
    blocks = source_service.get_source_blocks(selections)
    print(f"Retrieved {len(blocks)} SourceBlocks via SourceService:")
    for i, block in enumerate(blocks):
        print(f"\n  Block {i}:")
        print(f"    id: {block.id}")
        print(f"    text preview: {block.text[:50]}...")
Retrieved 2 SourceBlocks via SourceService:

  Block 0:
    id: job_9065e18d
    text preview: Laying Plans Sun Tzu said, The art of war is of vi...

  Block 1:
    id: job_a0b718cc
    text preview: Laying Plans Sun Tzu said, The art of war is of vi...