source_utils

Source record operations for metadata extraction, grouping, and validation

Metadata Extraction


source

extract_batch_id


def extract_batch_id(
    metadata:Any, # Metadata dict or JSON string
)->str: # Batch ID or "No Batch ID"

Extract batch_id from transcription metadata.


source

extract_model_name


def extract_model_name(
    metadata:Any, # Metadata dict or JSON string
)->str: # Formatted model name for display

Extract and format model name from transcription metadata.

Record Grouping


source

group_transcriptions


def group_transcriptions(
    transcriptions:List, # List of transcription records
    group_by:str='media_path', # Grouping mode: "media_path" or "batch_id"
)->Dict: # Grouped transcriptions

Group transcription records by the specified field.


source

group_transcriptions_by_audio


def group_transcriptions_by_audio(
    transcriptions:List, # List of transcription records
)->Dict: # Grouped by media_path

Group transcription records by their source audio file.

Selection Checks


source

is_source_selected


def is_source_selected(
    record_id:str, # Job ID to check
    provider_id:str, # Provider ID to check
    selected_sources:List, # List of selected sources
)->bool: # True if source is selected

Check if a source is in the selected list by (record_id, provider_id) pair.


source

get_selected_media_paths


def get_selected_media_paths(
    selected_sources:List, # Current selections (record_id, provider_id)
    all_transcriptions:List, # All available transcription records
)->Set: # Media paths already represented in selections

Get the set of media_paths for currently selected sources.

Filtering


source

filter_transcriptions


def filter_transcriptions(
    transcriptions:List, # List of transcription records to filter
    search_text:str, # Search term for case-insensitive substring matching
)->List: # Filtered transcription records

Filter transcriptions by substring match across record_id, media_path, and text fields.

Group Selection


source

select_all_in_group


def select_all_in_group(
    transcriptions:List, # All transcription records
    group_key:str, # Group key to match against
    grouping_mode:str, # Grouping mode: "media_path" or "batch_id"
    selected_sources:List, # Current selections
    excluded_media_paths:Optional=None, # Media paths to skip (already selected)
)->List: # Updated selections with new items appended

Add all transcriptions matching a group key to the selection list, skipping duplicates.

Selection Mutations


source

toggle_source_selection


def toggle_source_selection(
    record_id:str, # Job ID to toggle
    provider_id:str, # Plugin name for the source
    selected_sources:List, # Current selections
)->List: # Updated selections

Toggle a source in or out of the selection list by (record_id, provider_id) pair.


source

reorder_item


def reorder_item(
    selected_sources:List, # Current selections
    record_id:str, # Record ID of item to move
    provider_id:str, # Provider ID of item to move
    direction:str, # Direction: "up" or "down"
)->List: # Reordered selections

Move an item up or down in the selection list by swapping with its neighbor.


source

reorder_sources


def reorder_sources(
    selected_sources:List, # Current selections
    new_order_ids:List, # Job IDs in desired order
)->List: # Reordered selections

Reorder sources to match the given job ID order.

Tab Navigation


source

calculate_next_tab


def calculate_next_tab(
    direction:str, # Direction: "prev", "next", or a direct tab name
    current_tab:str, # Currently active tab name
    tabs:List, # Available tab names in order
)->str: # New active tab name

Calculate the next tab based on direction or direct selection.

Filesystem Checks


source

check_audio_exists


def check_audio_exists(
    media_path:str, # Path to audio file
)->bool: # True if file exists

Check if the audio file exists at the given path.


source

validate_browse_path


def validate_browse_path(
    path:str, # Path to validate
)->str: # Validated and resolved path, or home directory on error

Validate a browse path for security. Returns home directory on invalid input.

Tests

assert extract_batch_id(None) == "No Batch ID"
assert extract_batch_id({"batch_id": "batch_123"}) == "batch_123"
assert extract_batch_id('{"batch_id": "batch_456"}') == "batch_456"
assert extract_batch_id({}) == "No Batch ID"
print("extract_batch_id tests passed")
extract_batch_id tests passed
assert extract_model_name(None) == "Unknown"
assert extract_model_name({"model": "mistralai/Voxtral-Mini-3B"}) == "Voxtral-Mini-3B"
assert extract_model_name({"model": "whisper-large"}) == "whisper-large"
assert extract_model_name({}) == "Unknown"
print("extract_model_name tests passed")
extract_model_name tests passed
records = [
    {"record_id": "1", "media_path": "a.wav"},
    {"record_id": "2", "media_path": "a.wav"},
    {"record_id": "3", "media_path": "b.wav"},
]
groups = group_transcriptions(records)
assert len(groups) == 2
assert len(groups["a.wav"]) == 2
assert len(groups["b.wav"]) == 1
print("group_transcriptions tests passed")
group_transcriptions tests passed
sources = [{"record_id": "a", "provider_id": "p1"}, {"record_id": "b", "provider_id": "p2"}]
assert is_source_selected("a", "p1", sources) == True
assert is_source_selected("a", "p2", sources) == False  # Same record_id, different provider
assert is_source_selected("c", "p1", sources) == False
print("is_source_selected tests passed")
all_t = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav"},
    {"record_id": "j2", "provider_id": "p1", "media_path": "b.wav"},
    {"record_id": "j3", "provider_id": "p2", "media_path": "c.wav"},
]
selected = [{"record_id": "j1", "provider_id": "p1"}, {"record_id": "j3", "provider_id": "p2"}]
paths = get_selected_media_paths(selected, all_t)
assert paths == {"a.wav", "c.wav"}

# Empty selections
assert get_selected_media_paths([], all_t) == set()

# Selection not in transcriptions (stale reference)
assert get_selected_media_paths([{"record_id": "jX", "provider_id": "pX"}], all_t) == set()

print("get_selected_media_paths tests passed")
records = [
    {"record_id": "job_001", "media_path": "/data/podcast.wav", "text": "Hello world"},
    {"record_id": "job_002", "media_path": "/data/lecture.wav", "text": "Machine learning intro"},
    {"record_id": "job_003", "media_path": "/data/podcast.wav", "text": "Goodbye world"},
]
assert len(filter_transcriptions(records, "")) == 3
assert len(filter_transcriptions(records, "  ")) == 3
assert len(filter_transcriptions(records, "podcast")) == 2
assert len(filter_transcriptions(records, "PODCAST")) == 2
assert len(filter_transcriptions(records, "machine")) == 1
assert len(filter_transcriptions(records, "job_001")) == 1
assert len(filter_transcriptions(records, "nonexistent")) == 0
print("filter_transcriptions tests passed")
filter_transcriptions tests passed
transcriptions = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j2", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j3", "provider_id": "p2", "media_path": "b.wav", "metadata": '{"batch_id": "b2"}'},
]

# Select all by media_path (no exclusion)
result = select_all_in_group(transcriptions, "a.wav", "media_path", [])
assert len(result) == 2
assert result[0]["record_id"] == "j1"
assert result[1]["record_id"] == "j2"

# Select all by batch_id
result = select_all_in_group(transcriptions, "b1", "batch_id", [])
assert len(result) == 2

# Deduplication: j1/p1 already selected
result = select_all_in_group(transcriptions, "a.wav", "media_path", [{"record_id": "j1", "provider_id": "p1"}])
assert len(result) == 2
assert result[0]["record_id"] == "j1"
assert result[1]["record_id"] == "j2"

# Same record_id from different provider is NOT a duplicate
result = select_all_in_group(transcriptions, "a.wav", "media_path", [{"record_id": "j1", "provider_id": "p_other"}])
assert len(result) == 3  # existing + j1/p1 + j2/p1

# No matches
result = select_all_in_group(transcriptions, "nonexistent.wav", "media_path", [])
assert len(result) == 0

# With excluded_media_paths: skip sources whose audio is already represented
result = select_all_in_group(transcriptions, "a.wav", "media_path", [], excluded_media_paths={"a.wav"})
assert len(result) == 0  # All matching records share excluded media_path

# excluded_media_paths with batch_id grouping across different audio files
mixed = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j2", "provider_id": "p1", "media_path": "b.wav", "metadata": '{"batch_id": "b1"}'},
]
result = select_all_in_group(mixed, "b1", "batch_id", [], excluded_media_paths={"a.wav"})
assert len(result) == 1
assert result[0]["record_id"] == "j2"  # Only b.wav source added

print("select_all_in_group tests passed")
# Toggle on: add new source
sources = [{"record_id": "a", "provider_id": "p1"}]
result = toggle_source_selection("b", "p2", sources)
assert len(result) == 2
assert result[1]["record_id"] == "b"

# Toggle off: remove existing source
result = toggle_source_selection("a", "p1", sources)
assert len(result) == 0

# Same record_id but different provider: adds (not toggle off)
result = toggle_source_selection("a", "p2", sources)
assert len(result) == 2

# Original list is not mutated
assert len(sources) == 1

print("toggle_source_selection tests passed")
sources = [
    {"record_id": "a", "provider_id": "p1"}, 
    {"record_id": "b", "provider_id": "p1"}, 
    {"record_id": "c", "provider_id": "p1"}
]

# Move middle item up
result = reorder_item(sources, "b", "p1", "up")
assert [s["record_id"] for s in result] == ["b", "a", "c"]

# Move middle item down
result = reorder_item(sources, "b", "p1", "down")
assert [s["record_id"] for s in result] == ["a", "c", "b"]

# Move first item up (no-op)
result = reorder_item(sources, "a", "p1", "up")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Move last item down (no-op)
result = reorder_item(sources, "c", "p1", "down")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Item not found (no-op)
result = reorder_item(sources, "x", "p1", "up")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Original list is not mutated
assert [s["record_id"] for s in sources] == ["a", "b", "c"]

print("reorder_item tests passed")
reorder_item tests passed
sources = [{"record_id": "a"}, {"record_id": "b"}, {"record_id": "c"}]

# Normal reorder
result = reorder_sources(sources, ["c", "a", "b"])
assert [s["record_id"] for s in result] == ["c", "a", "b"]

# Empty new_order_ids returns copy
result = reorder_sources(sources, [])
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Unknown IDs in new_order are skipped
result = reorder_sources(sources, ["b", "x", "a"])
assert [s["record_id"] for s in result] == ["b", "a", "c"]

# Missing IDs from new_order are appended
result = reorder_sources(sources, ["c"])
assert [s["record_id"] for s in result] == ["c", "a", "b"]

print("reorder_sources tests passed")
reorder_sources tests passed
tabs = ["db", "files"]

# Direct tab selection
assert calculate_next_tab("db", "files", tabs) == "db"
assert calculate_next_tab("files", "db", tabs) == "files"

# Cycling forward
assert calculate_next_tab("next", "db", tabs) == "files"
assert calculate_next_tab("next", "files", tabs) == "db"

# Cycling backward
assert calculate_next_tab("prev", "db", tabs) == "files"
assert calculate_next_tab("prev", "files", tabs) == "db"

# Unknown current_tab defaults to index 0
assert calculate_next_tab("next", "unknown", tabs) == "files"

print("calculate_next_tab tests passed")
calculate_next_tab tests passed
import os
assert validate_browse_path(os.path.expanduser("~")) == os.path.expanduser("~")
assert validate_browse_path("/nonexistent/path/xyz") == os.path.expanduser("~")
print("validate_browse_path tests passed")
validate_browse_path tests passed