# source_utils


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Metadata Extraction

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L17"
target="_blank" style="float:right; font-size:smaller">source</a>

### extract_batch_id

``` python

def extract_batch_id(
    metadata:Any, # Metadata dict or JSON string
)->str: # Batch ID or "No Batch ID"

```

*Extract batch_id from transcription metadata.*

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L35"
target="_blank" style="float:right; font-size:smaller">source</a>

### extract_model_name

``` python

def extract_model_name(
    metadata:Any, # Metadata dict or JSON string
)->str: # Formatted model name for display

```

*Extract and format model name from transcription metadata.*

## Record Grouping

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L61"
target="_blank" style="float:right; font-size:smaller">source</a>

### group_transcriptions

``` python

def group_transcriptions(
    transcriptions:List, # List of transcription records
    group_by:str='media_path', # Grouping mode: "media_path" or "batch_id"
)->Dict: # Grouped transcriptions

```

*Group transcription records by the specified field.*

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L80"
target="_blank" style="float:right; font-size:smaller">source</a>

### group_transcriptions_by_audio

``` python

def group_transcriptions_by_audio(
    transcriptions:List, # List of transcription records
)->Dict: # Grouped by media_path

```

*Group transcription records by their source audio file.*

## Selection Checks

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L87"
target="_blank" style="float:right; font-size:smaller">source</a>

### is_source_selected

``` python

def is_source_selected(
    record_id:str, # Job ID to check
    provider_id:str, # Provider ID to check
    selected_sources:List, # List of selected sources
)->bool: # True if source is selected

```

*Check if a source is in the selected list by (record_id, provider_id)
pair.*

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L99"
target="_blank" style="float:right; font-size:smaller">source</a>

### get_selected_media_paths

``` python

def get_selected_media_paths(
    selected_sources:List, # Current selections (record_id, provider_id)
    all_transcriptions:List, # All available transcription records
)->Set: # Media paths already represented in selections

```

*Get the set of media_paths for currently selected sources.*

## Filtering

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L112"
target="_blank" style="float:right; font-size:smaller">source</a>

### filter_transcriptions

``` python

def filter_transcriptions(
    transcriptions:List, # List of transcription records to filter
    search_text:str, # Search term for case-insensitive substring matching
)->List: # Filtered transcription records

```

*Filter transcriptions by substring match across record_id, media_path,
and text fields.*

## Group Selection

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L129"
target="_blank" style="float:right; font-size:smaller">source</a>

### select_all_in_group

``` python

def select_all_in_group(
    transcriptions:List, # All transcription records
    group_key:str, # Group key to match against
    grouping_mode:str, # Grouping mode: "media_path" or "batch_id"
    selected_sources:List, # Current selections
    excluded_media_paths:Optional=None, # Media paths to skip (already selected)
)->List: # Updated selections with new items appended

```

*Add all transcriptions matching a group key to the selection list,
skipping duplicates.*

## Selection Mutations

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L165"
target="_blank" style="float:right; font-size:smaller">source</a>

### toggle_source_selection

``` python

def toggle_source_selection(
    record_id:str, # Job ID to toggle
    provider_id:str, # Plugin name for the source
    selected_sources:List, # Current selections
)->List: # Updated selections

```

*Toggle a source in or out of the selection list by (record_id,
provider_id) pair.*

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L179"
target="_blank" style="float:right; font-size:smaller">source</a>

### reorder_item

``` python

def reorder_item(
    selected_sources:List, # Current selections
    record_id:str, # Record ID of item to move
    provider_id:str, # Provider ID of item to move
    direction:str, # Direction: "up" or "down"
)->List: # Reordered selections

```

*Move an item up or down in the selection list by swapping with its
neighbor.*

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L204"
target="_blank" style="float:right; font-size:smaller">source</a>

### reorder_sources

``` python

def reorder_sources(
    selected_sources:List, # Current selections
    new_order_ids:List, # Job IDs in desired order
)->List: # Reordered selections

```

*Reorder sources to match the given job ID order.*

## Tab Navigation

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L224"
target="_blank" style="float:right; font-size:smaller">source</a>

### calculate_next_tab

``` python

def calculate_next_tab(
    direction:str, # Direction: "prev", "next", or a direct tab name
    current_tab:str, # Currently active tab name
    tabs:List, # Available tab names in order
)->str: # New active tab name

```

*Calculate the next tab based on direction or direct selection.*

## Filesystem Checks

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L240"
target="_blank" style="float:right; font-size:smaller">source</a>

### check_audio_exists

``` python

def check_audio_exists(
    media_path:str, # Path to audio file
)->bool: # True if file exists

```

*Check if the audio file exists at the given path.*

------------------------------------------------------------------------

<a
href="https://github.com/cj-mills/cjm-transcript-source-select/blob/main/cjm_transcript_source_select/services/source_utils.py#L249"
target="_blank" style="float:right; font-size:smaller">source</a>

### validate_browse_path

``` python

def validate_browse_path(
    path:str, # Path to validate
)->str: # Validated and resolved path, or home directory on error

```

*Validate a browse path for security. Returns home directory on invalid
input.*

## Tests

``` python
assert extract_batch_id(None) == "No Batch ID"
assert extract_batch_id({"batch_id": "batch_123"}) == "batch_123"
assert extract_batch_id('{"batch_id": "batch_456"}') == "batch_456"
assert extract_batch_id({}) == "No Batch ID"
print("extract_batch_id tests passed")
```

    extract_batch_id tests passed

``` python
assert extract_model_name(None) == "Unknown"
assert extract_model_name({"model": "mistralai/Voxtral-Mini-3B"}) == "Voxtral-Mini-3B"
assert extract_model_name({"model": "whisper-large"}) == "whisper-large"
assert extract_model_name({}) == "Unknown"
print("extract_model_name tests passed")
```

    extract_model_name tests passed

``` python
records = [
    {"record_id": "1", "media_path": "a.wav"},
    {"record_id": "2", "media_path": "a.wav"},
    {"record_id": "3", "media_path": "b.wav"},
]
groups = group_transcriptions(records)
assert len(groups) == 2
assert len(groups["a.wav"]) == 2
assert len(groups["b.wav"]) == 1
print("group_transcriptions tests passed")
```

    group_transcriptions tests passed

``` python
sources = [{"record_id": "a", "provider_id": "p1"}, {"record_id": "b", "provider_id": "p2"}]
assert is_source_selected("a", "p1", sources) == True
assert is_source_selected("a", "p2", sources) == False  # Same record_id, different provider
assert is_source_selected("c", "p1", sources) == False
print("is_source_selected tests passed")
```

``` python
all_t = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav"},
    {"record_id": "j2", "provider_id": "p1", "media_path": "b.wav"},
    {"record_id": "j3", "provider_id": "p2", "media_path": "c.wav"},
]
selected = [{"record_id": "j1", "provider_id": "p1"}, {"record_id": "j3", "provider_id": "p2"}]
paths = get_selected_media_paths(selected, all_t)
assert paths == {"a.wav", "c.wav"}

# Empty selections
assert get_selected_media_paths([], all_t) == set()

# Selection not in transcriptions (stale reference)
assert get_selected_media_paths([{"record_id": "jX", "provider_id": "pX"}], all_t) == set()

print("get_selected_media_paths tests passed")
```

``` python
records = [
    {"record_id": "job_001", "media_path": "/data/podcast.wav", "text": "Hello world"},
    {"record_id": "job_002", "media_path": "/data/lecture.wav", "text": "Machine learning intro"},
    {"record_id": "job_003", "media_path": "/data/podcast.wav", "text": "Goodbye world"},
]
assert len(filter_transcriptions(records, "")) == 3
assert len(filter_transcriptions(records, "  ")) == 3
assert len(filter_transcriptions(records, "podcast")) == 2
assert len(filter_transcriptions(records, "PODCAST")) == 2
assert len(filter_transcriptions(records, "machine")) == 1
assert len(filter_transcriptions(records, "job_001")) == 1
assert len(filter_transcriptions(records, "nonexistent")) == 0
print("filter_transcriptions tests passed")
```

    filter_transcriptions tests passed

``` python
transcriptions = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j2", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j3", "provider_id": "p2", "media_path": "b.wav", "metadata": '{"batch_id": "b2"}'},
]

# Select all by media_path (no exclusion)
result = select_all_in_group(transcriptions, "a.wav", "media_path", [])
assert len(result) == 2
assert result[0]["record_id"] == "j1"
assert result[1]["record_id"] == "j2"

# Select all by batch_id
result = select_all_in_group(transcriptions, "b1", "batch_id", [])
assert len(result) == 2

# Deduplication: j1/p1 already selected
result = select_all_in_group(transcriptions, "a.wav", "media_path", [{"record_id": "j1", "provider_id": "p1"}])
assert len(result) == 2
assert result[0]["record_id"] == "j1"
assert result[1]["record_id"] == "j2"

# Same record_id from different provider is NOT a duplicate
result = select_all_in_group(transcriptions, "a.wav", "media_path", [{"record_id": "j1", "provider_id": "p_other"}])
assert len(result) == 3  # existing + j1/p1 + j2/p1

# No matches
result = select_all_in_group(transcriptions, "nonexistent.wav", "media_path", [])
assert len(result) == 0

# With excluded_media_paths: skip sources whose audio is already represented
result = select_all_in_group(transcriptions, "a.wav", "media_path", [], excluded_media_paths={"a.wav"})
assert len(result) == 0  # All matching records share excluded media_path

# excluded_media_paths with batch_id grouping across different audio files
mixed = [
    {"record_id": "j1", "provider_id": "p1", "media_path": "a.wav", "metadata": '{"batch_id": "b1"}'},
    {"record_id": "j2", "provider_id": "p1", "media_path": "b.wav", "metadata": '{"batch_id": "b1"}'},
]
result = select_all_in_group(mixed, "b1", "batch_id", [], excluded_media_paths={"a.wav"})
assert len(result) == 1
assert result[0]["record_id"] == "j2"  # Only b.wav source added

print("select_all_in_group tests passed")
```

``` python
# Toggle on: add new source
sources = [{"record_id": "a", "provider_id": "p1"}]
result = toggle_source_selection("b", "p2", sources)
assert len(result) == 2
assert result[1]["record_id"] == "b"

# Toggle off: remove existing source
result = toggle_source_selection("a", "p1", sources)
assert len(result) == 0

# Same record_id but different provider: adds (not toggle off)
result = toggle_source_selection("a", "p2", sources)
assert len(result) == 2

# Original list is not mutated
assert len(sources) == 1

print("toggle_source_selection tests passed")
```

``` python
sources = [
    {"record_id": "a", "provider_id": "p1"}, 
    {"record_id": "b", "provider_id": "p1"}, 
    {"record_id": "c", "provider_id": "p1"}
]

# Move middle item up
result = reorder_item(sources, "b", "p1", "up")
assert [s["record_id"] for s in result] == ["b", "a", "c"]

# Move middle item down
result = reorder_item(sources, "b", "p1", "down")
assert [s["record_id"] for s in result] == ["a", "c", "b"]

# Move first item up (no-op)
result = reorder_item(sources, "a", "p1", "up")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Move last item down (no-op)
result = reorder_item(sources, "c", "p1", "down")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Item not found (no-op)
result = reorder_item(sources, "x", "p1", "up")
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Original list is not mutated
assert [s["record_id"] for s in sources] == ["a", "b", "c"]

print("reorder_item tests passed")
```

    reorder_item tests passed

``` python
sources = [{"record_id": "a"}, {"record_id": "b"}, {"record_id": "c"}]

# Normal reorder
result = reorder_sources(sources, ["c", "a", "b"])
assert [s["record_id"] for s in result] == ["c", "a", "b"]

# Empty new_order_ids returns copy
result = reorder_sources(sources, [])
assert [s["record_id"] for s in result] == ["a", "b", "c"]

# Unknown IDs in new_order are skipped
result = reorder_sources(sources, ["b", "x", "a"])
assert [s["record_id"] for s in result] == ["b", "a", "c"]

# Missing IDs from new_order are appended
result = reorder_sources(sources, ["c"])
assert [s["record_id"] for s in result] == ["c", "a", "b"]

print("reorder_sources tests passed")
```

    reorder_sources tests passed

``` python
tabs = ["db", "files"]

# Direct tab selection
assert calculate_next_tab("db", "files", tabs) == "db"
assert calculate_next_tab("files", "db", tabs) == "files"

# Cycling forward
assert calculate_next_tab("next", "db", tabs) == "files"
assert calculate_next_tab("next", "files", tabs) == "db"

# Cycling backward
assert calculate_next_tab("prev", "db", tabs) == "files"
assert calculate_next_tab("prev", "files", tabs) == "db"

# Unknown current_tab defaults to index 0
assert calculate_next_tab("next", "unknown", tabs) == "files"

print("calculate_next_tab tests passed")
```

    calculate_next_tab tests passed

``` python
import os
assert validate_browse_path(os.path.expanduser("~")) == os.path.expanduser("~")
assert validate_browse_path("/nonexistent/path/xyz") == os.path.expanduser("~")
print("validate_browse_path tests passed")
```

    validate_browse_path tests passed
