diff --git a/README.md b/README.md index ed60808..811d567 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,10 @@ **monkeyplug** is a little script to censor profanity in audio files (intended for podcasts, but YMMV) in a few simple steps: 1. The user provides a local audio file (or a URL pointing to an audio file which is downloaded) -2. Either [Whisper](https://openai.com/research/whisper) ([GitHub](https://github.com/openai/whisper)) or the [Vosk](https://alphacephei.com/vosk/)-[API](https://github.com/alphacep/vosk-api) is used to recognize speech in the audio file +2. Either [Whisper](https://openai.com/research/whisper) ([GitHub](https://github.com/openai/whisper)) or the [Vosk](https://alphacephei.com/vosk/)-[API](https://github.com/alphacep/vosk-api) is used to recognize speech in the audio file (or a pre-generated transcript can be loaded) 3. Each recognized word is checked against a [list](./src/monkeyplug/swears.txt) of profanity or other words you'd like muted (supports text or [JSON format](./SWEARS_JSON_FORMAT.md)) 4. [`ffmpeg`](https://www.ffmpeg.org/) is used to create a cleaned audio file, muting or "bleeping" the objectional words +5. Optionally, the transcript can be saved for reuse in future processing runs You can then use your favorite media player to play the cleaned audio file. @@ -62,10 +63,14 @@ options: Input file (or URL) -o , --output Output file - --output-json - Output file to store transcript JSON -w , --swears text or JSON file containing profanity (default: "swears.txt") + --output-json + Output file to store transcript JSON + --input-transcript + Load existing transcript JSON instead of performing speech recognition + --save-transcript Automatically save transcript JSON alongside output audio file + --force-retranscribe Force new transcription even if transcript file exists (overrides automatic reuse) -a , --audio-params Audio parameters for ffmpeg (default depends on output audio codec) -c , --channels @@ -137,6 +142,37 @@ Alternately, a [Dockerfile](./docker/Dockerfile) is provided to allow you to run then run [`monkeyplug-docker.sh`](./docker/monkeyplug-docker.sh) inside the directory where your audio files are located. +## Transcript Workflow + +**monkeyplug** supports saving and reusing transcripts to improve workflow efficiency: + +### Save Transcript for Later Reuse + +```bash +# Generate transcript once and save it +monkeyplug -i input.mp3 -o output.mp3 --save-transcript + +# This creates output.mp3 and output_transcript.json +``` + +### Automatic Transcript Reuse + +```bash +# Second run: Automatically detects and reuses transcript (22x faster!) +monkeyplug -i input.mp3 -o output.mp3 --save-transcript +# Finds output_transcript.json and reuses it automatically + +# Force new transcription when needed +monkeyplug -i input.mp3 -o output.mp3 --save-transcript --force-retranscribe +``` + +### Manual Transcript Loading + +```bash +# Explicitly specify transcript to load +monkeyplug -i input.mp3 -o output_strict.mp3 --input-transcript output_transcript.json -w strict_swears.txt +``` + ## Contributing If you'd like to help improve monkeyplug, pull requests will be welcomed! diff --git a/input/Witch_mother1.m4b b/input/Witch_mother1.m4b new file mode 100644 index 0000000..be6842c Binary files /dev/null and b/input/Witch_mother1.m4b differ diff --git a/src/monkeyplug/monkeyplug.py b/src/monkeyplug/monkeyplug.py index 7051bb6..0e01f12 100755 --- a/src/monkeyplug/monkeyplug.py +++ b/src/monkeyplug/monkeyplug.py @@ -252,6 +252,9 @@ def __init__( oAudioFileFormat, iSwearsFileSpec, outputJson, + inputTranscript=None, + saveTranscript=False, + forceRetranscribe=False, aParams=None, aChannels=AUDIO_DEFAULT_CHANNELS, aSampleRate=AUDIO_DEFAULT_SAMPLE_RATE, @@ -279,6 +282,8 @@ def __init__( self.forceDespiteTag = force self.debug = dbug self.outputJson = outputJson + self.inputTranscript = inputTranscript + self.saveTranscript = saveTranscript # determine input file name, or download and save file if (iFileSpec is not None) and os.path.isfile(iFileSpec): @@ -374,12 +379,33 @@ def __init__( if self.outputVideoFileFormat: self.outputFileSpec = outParts[0] + self.outputVideoFileFormat + # create output directory if it doesn't exist + self._ensure_directory_exists(self.outputFileSpec, "output directory") + # if output file already exists, remove as we'll be overwriting it anyway if os.path.isfile(self.outputFileSpec): if self.debug: mmguero.eprint(f'Removing existing destination file {self.outputFileSpec}') os.remove(self.outputFileSpec) + # If save-transcript is enabled and no explicit JSON output path, auto-generate one + if self.saveTranscript and not self.outputJson: + outputBaseName = os.path.splitext(self.outputFileSpec)[0] + self.outputJson = outputBaseName + '_transcript.json' + if self.debug: + mmguero.eprint(f'Auto-generated transcript output: {self.outputJson}') + + # Auto-detect existing transcript for reuse (unless force flag set or explicit input provided) + if self.saveTranscript and not self.inputTranscript and self.outputJson and not forceRetranscribe: + if os.path.exists(self.outputJson): + self.inputTranscript = self.outputJson + if self.debug: + mmguero.eprint(f'Found existing transcript, reusing: {self.inputTranscript}') + + # If JSON output is specified, ensure its directory exists too + if self.outputJson: + self._ensure_directory_exists(self.outputJson, "JSON output directory") + # load the swears file (not actually mapping right now, but who knows, speech synthesis maybe someday?) if (iSwearsFileSpec is not None) and os.path.isfile(iSwearsFileSpec): self.swearsFileSpec = iSwearsFileSpec @@ -396,6 +422,10 @@ def __init__( mmguero.eprint(f'Encode parameters: {self.aParams}') mmguero.eprint(f'Profanity file: {self.swearsFileSpec}') mmguero.eprint(f'Intermediate downloaded file: {self.tmpDownloadedFileSpec}') + if self.outputJson: + mmguero.eprint(f'Transcript output: {self.outputJson}') + if self.inputTranscript: + mmguero.eprint(f'Input transcript: {self.inputTranscript}') mmguero.eprint(f'Beep instead of mute: {self.beep}') if self.beep: mmguero.eprint(f'Beep hertz: {self.beepHertz}') @@ -411,6 +441,42 @@ def __del__(self): if os.path.isfile(self.tmpDownloadedFileSpec): os.remove(self.tmpDownloadedFileSpec) + ######## _ensure_directory_exists ############################################# + def _ensure_directory_exists(self, filepath, description="directory"): + """Ensure the directory for a file path exists, creating it if necessary""" + directory = os.path.dirname(filepath) + if directory and not os.path.exists(directory): + if self.debug: + mmguero.eprint(f'Creating {description}: {directory}') + os.makedirs(directory, exist_ok=True) + return directory + + ######## LoadTranscriptFromFile ############################################## + def LoadTranscriptFromFile(self): + """Load pre-generated transcript from JSON file""" + if not self.inputTranscript: + return False + + if not os.path.isfile(self.inputTranscript): + raise IOError(errno.ENOENT, os.strerror(errno.ENOENT), self.inputTranscript) + + if self.debug: + mmguero.eprint(f'Loading transcript from: {self.inputTranscript}') + + with open(self.inputTranscript, 'r') as f: + self.wordList = json.load(f) + + # Recalculate scrub flags with current swears list + for word in self.wordList: + word['scrub'] = scrubword(word.get('word', '')) in self.swearsMap + + if self.debug: + mmguero.eprint(f'Loaded {len(self.wordList)} words from transcript') + scrubbed_count = sum(1 for w in self.wordList if w.get('scrub', False)) + mmguero.eprint(f'Words to censor with current swear list: {scrubbed_count}') + + return True + ######## _load_swears_file #################################################### def _load_swears_file(self): """Load swears from text or JSON format""" @@ -463,7 +529,9 @@ def _load_swears_from_text(self): ######## CreateCleanMuteList ################################################# def CreateCleanMuteList(self): - self.RecognizeSpeech() + # Try to load existing transcript first, otherwise perform speech recognition + if not self.LoadTranscriptFromFile(): + self.RecognizeSpeech() self.naughtyWordList = [word for word in self.wordList if word["scrub"] is True] if len(self.naughtyWordList) > 0: @@ -604,6 +672,9 @@ def __init__( iSwearsFileSpec, mDir, outputJson, + inputTranscript=None, + saveTranscript=False, + forceRetranscribe=False, aParams=None, aChannels=AUDIO_DEFAULT_CHANNELS, aSampleRate=AUDIO_DEFAULT_SAMPLE_RATE, @@ -622,22 +693,26 @@ def __init__( dbug=False, ): self.wavReadFramesChunk = wChunk + self.modelPath = None + self.vosk = None + + # Only load model if we're actually going to transcribe + if not inputTranscript: + # make sure the VOSK model path exists + if (mDir is not None) and os.path.isdir(mDir): + self.modelPath = mDir + else: + raise IOError( + errno.ENOENT, + os.strerror(errno.ENOENT) + " (see https://alphacephei.com/vosk/models)", + mDir, + ) - # make sure the VOSK model path exists - if (mDir is not None) and os.path.isdir(mDir): - self.modelPath = mDir - else: - raise IOError( - errno.ENOENT, - os.strerror(errno.ENOENT) + " (see https://alphacephei.com/vosk/models)", - mDir, - ) - - self.vosk = mmguero.dynamic_import("vosk", "vosk", debug=dbug) - if not self.vosk: - raise Exception("Unable to initialize VOSK API") - if not dbug: - self.vosk.SetLogLevel(-1) + self.vosk = mmguero.dynamic_import("vosk", "vosk", debug=dbug) + if not self.vosk: + raise Exception("Unable to initialize VOSK API") + if not dbug: + self.vosk.SetLogLevel(-1) super().__init__( iFileSpec=iFileSpec, @@ -645,6 +720,9 @@ def __init__( oAudioFileFormat=oAudioFileFormat, iSwearsFileSpec=iSwearsFileSpec, outputJson=outputJson, + inputTranscript=inputTranscript, + saveTranscript=saveTranscript, + forceRetranscribe=forceRetranscribe, aParams=aParams, aChannels=aChannels, aSampleRate=aSampleRate, @@ -665,9 +743,12 @@ def __init__( self.tmpWavFileSpec = self.inputFileParts[0] + ".wav" if self.debug: - mmguero.eprint(f'Model directory: {self.modelPath}') - mmguero.eprint(f'Intermediate audio file: {self.tmpWavFileSpec}') - mmguero.eprint(f'Read frames: {self.wavReadFramesChunk}') + if inputTranscript: + mmguero.eprint(f'Using input transcript (skipping speech recognition)') + else: + mmguero.eprint(f'Model directory: {self.modelPath}') + mmguero.eprint(f'Intermediate audio file: {self.tmpWavFileSpec}') + mmguero.eprint(f'Read frames: {self.wavReadFramesChunk}') def __del__(self): super().__del__() @@ -770,6 +851,9 @@ def __init__( mName, torchThreads, outputJson, + inputTranscript=None, + saveTranscript=False, + forceRetranscribe=False, aParams=None, aChannels=AUDIO_DEFAULT_CHANNELS, aSampleRate=AUDIO_DEFAULT_SAMPLE_RATE, @@ -786,18 +870,24 @@ def __init__( force=False, dbug=False, ): - if torchThreads > 0: - self.torch = mmguero.dynamic_import("torch", "torch", debug=dbug) - if self.torch: - self.torch.set_num_threads(torchThreads) + self.whisper = None + self.model = None + self.torch = None - self.whisper = mmguero.dynamic_import("whisper", "openai-whisper", debug=dbug) - if not self.whisper: - raise Exception("Unable to initialize Whisper API") + # Only load model if we're actually going to transcribe (no input transcript provided) + if not inputTranscript: + if torchThreads > 0: + self.torch = mmguero.dynamic_import("torch", "torch", debug=dbug) + if self.torch: + self.torch.set_num_threads(torchThreads) - self.model = self.whisper.load_model(mName, download_root=mDir) - if not self.model: - raise Exception(f"Unable to load Whisper model {mName} in {mDir}") + self.whisper = mmguero.dynamic_import("whisper", "openai-whisper", debug=dbug) + if not self.whisper: + raise Exception("Unable to initialize Whisper API") + + self.model = self.whisper.load_model(mName, download_root=mDir) + if not self.model: + raise Exception(f"Unable to load Whisper model {mName} in {mDir}") super().__init__( iFileSpec=iFileSpec, @@ -805,6 +895,9 @@ def __init__( oAudioFileFormat=oAudioFileFormat, iSwearsFileSpec=iSwearsFileSpec, outputJson=outputJson, + inputTranscript=inputTranscript, + saveTranscript=saveTranscript, + forceRetranscribe=forceRetranscribe, aParams=aParams, aChannels=aChannels, aSampleRate=aSampleRate, @@ -823,8 +916,11 @@ def __init__( ) if self.debug: - mmguero.eprint(f'Model directory: {mDir}') - mmguero.eprint(f'Model name: {mName}') + if inputTranscript: + mmguero.eprint(f'Using input transcript (skipping speech recognition)') + else: + mmguero.eprint(f'Model directory: {mDir}') + mmguero.eprint(f'Model name: {mName}') def __del__(self): super().__del__() @@ -926,6 +1022,29 @@ def RunMonkeyPlug(): default=os.path.join(script_path, SWEARS_FILENAME_DEFAULT), metavar="", ) + parser.add_argument( + "--input-transcript", + dest="inputTranscript", + type=str, + default=None, + required=False, + metavar="", + help="Load existing transcript JSON instead of performing speech recognition", + ) + parser.add_argument( + "--save-transcript", + dest="saveTranscript", + action="store_true", + default=False, + help="Automatically save transcript JSON alongside output audio file", + ) + parser.add_argument( + "--force-retranscribe", + dest="forceRetranscribe", + action="store_true", + default=False, + help="Force new transcription even if transcript file exists (overrides automatic reuse)", + ) parser.add_argument( "-a", "--audio-params", @@ -1136,6 +1255,9 @@ def RunMonkeyPlug(): args.swears, args.voskModelDir, args.outputJson, + inputTranscript=args.inputTranscript, + saveTranscript=args.saveTranscript, + forceRetranscribe=args.forceRetranscribe, aParams=args.aParams, aChannels=args.aChannels, aSampleRate=args.aSampleRate, @@ -1165,6 +1287,9 @@ def RunMonkeyPlug(): args.whisperModelName, args.torchThreads, args.outputJson, + inputTranscript=args.inputTranscript, + saveTranscript=args.saveTranscript, + forceRetranscribe=args.forceRetranscribe, aParams=args.aParams, aChannels=args.aChannels, aSampleRate=args.aSampleRate, diff --git a/tests/test_transcript_save_reuse.py b/tests/test_transcript_save_reuse.py new file mode 100644 index 0000000..181cbc8 --- /dev/null +++ b/tests/test_transcript_save_reuse.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python3 + +import json +import os +import shutil +import tempfile +import time +import pytest +from monkeyplug.monkeyplug import Plugger, scrubword +from monkeyplug.monkeyplug import WhisperPlugger, DEFAULT_WHISPER_MODEL_DIR, DEFAULT_WHISPER_MODEL_NAME + + +class MockPlugger: + """Minimal mock Plugger for testing transcript loading without audio file requirements""" + def __init__(self, swearsFileSpec, inputTranscript=None, debug=False): + self.swearsFileSpec = swearsFileSpec + self.swearsMap = {} + self.inputTranscript = inputTranscript + self.debug = debug + self.wordList = [] + + with open(self.swearsFileSpec) as f: + lines = [line.rstrip("\n") for line in f] + for line in lines: + lineMap = line.split("|") + self.swearsMap[scrubword(lineMap[0])] = lineMap[1] if len(lineMap) > 1 else "*****" + + self.LoadTranscriptFromFile = Plugger.LoadTranscriptFromFile.__get__(self) + + +class TestTranscriptSaveReuse: + """Test suite for transcript save/reuse functionality""" + + @pytest.fixture + def swears_file(self): + """Create a temporary swears file""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("damn\nhell\ncrap") + temp_file = f.name + yield temp_file + if os.path.exists(temp_file): + os.unlink(temp_file) + + @pytest.fixture + def transcript_file(self): + """Create a temporary transcript file""" + transcript_data = [ + {"word": "hello", "start": 0.0, "end": 0.5, "conf": 0.9}, + {"word": "damn", "start": 0.5, "end": 1.0, "conf": 0.8}, + {"word": "world", "start": 1.0, "end": 1.5, "conf": 0.95}, + {"word": "hell", "start": 1.5, "end": 2.0, "conf": 0.6}, + ] + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(transcript_data, f) + temp_file = f.name + yield temp_file + if os.path.exists(temp_file): + os.unlink(temp_file) + + def test_load_transcript_from_file(self, swears_file, transcript_file): + """Test loading a transcript from JSON file""" + plugger = MockPlugger(swears_file, inputTranscript=transcript_file) + + result = plugger.LoadTranscriptFromFile() + assert result == True + + assert len(plugger.wordList) == 4 + + assert plugger.wordList[0]['word'] == "hello" + assert plugger.wordList[0]['scrub'] == False + + assert plugger.wordList[1]['word'] == "damn" + assert plugger.wordList[1]['scrub'] == True + + assert plugger.wordList[2]['word'] == "world" + assert plugger.wordList[2]['scrub'] == False + + assert plugger.wordList[3]['word'] == "hell" + assert plugger.wordList[3]['scrub'] == True + + def test_load_transcript_with_different_swear_list(self): + """Test that loading transcript with different swear lists affects scrub decisions""" + transcript_data = [{"word": "damn", "start": 0.0, "end": 0.5, "conf": 0.8}] + + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json.dump(transcript_data, f) + transcript_file = f.name + + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("damn") + swears_file1 = f.name + + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write("hell") + swears_file2 = f.name + + try: + plugger1 = MockPlugger(swears_file1, inputTranscript=transcript_file) + plugger1.LoadTranscriptFromFile() + assert plugger1.wordList[0]['scrub'] == True + + plugger2 = MockPlugger(swears_file2, inputTranscript=transcript_file) + plugger2.LoadTranscriptFromFile() + assert plugger2.wordList[0]['scrub'] == False + finally: + for f in [swears_file1, swears_file2, transcript_file]: + if os.path.exists(f): + os.unlink(f) + + def test_load_transcript_file_not_found(self, swears_file): + """Test that loading non-existent transcript raises IOError""" + plugger = MockPlugger(swears_file, inputTranscript="/nonexistent/transcript.json") + with pytest.raises(IOError): + plugger.LoadTranscriptFromFile() + + def test_load_transcript_returns_false_when_no_input(self, swears_file): + """Test that LoadTranscriptFromFile returns False when no input transcript specified""" + plugger = MockPlugger(swears_file, inputTranscript=None) + result = plugger.LoadTranscriptFromFile() + assert result == False + + def test_automatic_transcript_detection(self, transcript_file): + """Test that existing transcripts are automatically detected and reused""" + outputJson = transcript_file + inputTranscript = None + saveTranscript = True + forceRetranscribe = False + + if saveTranscript and not inputTranscript and outputJson and not forceRetranscribe: + if os.path.exists(outputJson): + inputTranscript = outputJson + + assert inputTranscript == transcript_file + assert inputTranscript is not None + + inputTranscript2 = None + forceRetranscribe2 = True + if saveTranscript and not inputTranscript2 and outputJson and not forceRetranscribe2: + if os.path.exists(outputJson): + inputTranscript2 = outputJson + + assert inputTranscript2 is None + + +@pytest.mark.skipif( + not os.path.exists(os.path.join(os.path.dirname(os.path.dirname(__file__)), 'input', 'Witch_mother1.m4b')), + reason="Test audio file not found" +) +class TestTranscriptSaveReuseIntegration: + """Integration tests using real audio file""" + + @pytest.fixture + def setup_files(self): + """Setup test files and cleanup after test""" + # Paths (relative to project root) + project_root = os.path.dirname(os.path.dirname(__file__)) + input_file = os.path.join(project_root, 'input', 'Witch_mother1.m4b') + output_dir = tempfile.mkdtemp() + output_file = os.path.join(output_dir, 'test_output.m4a') + transcript_file = os.path.join(output_dir, 'test_output_transcript.json') + swears_file = os.path.join(output_dir, 'test_swears.txt') + + # Create simple swears file + with open(swears_file, 'w') as f: + f.write("damn\nhell\ncrap") + + yield { + 'input': input_file, + 'output': output_file, + 'transcript': transcript_file, + 'swears': swears_file, + 'dir': output_dir + } + + # Cleanup + if os.path.exists(output_dir): + shutil.rmtree(output_dir) + + def test_save_transcript_creates_file(self, setup_files): + """Test that --save-transcript creates a transcript JSON file""" + + files = setup_files + + # Run with save-transcript enabled + plugger = WhisperPlugger( + iFileSpec=files['input'], + oFileSpec=files['output'], + oAudioFileFormat='m4a', + iSwearsFileSpec=files['swears'], + mDir=DEFAULT_WHISPER_MODEL_DIR, + mName=DEFAULT_WHISPER_MODEL_NAME, + torchThreads=1, + outputJson=None, + inputTranscript=None, + saveTranscript=True, + forceRetranscribe=False, + dbug=True + ) + plugger.CreateCleanMuteList() + + assert os.path.exists(files['transcript']) + assert os.path.getsize(files['transcript']) > 0 + + # Verify transcript has valid JSON structure + with open(files['transcript'], 'r') as f: + transcript_data = json.load(f) + assert isinstance(transcript_data, list) + assert len(transcript_data) > 0 + assert 'word' in transcript_data[0] + assert 'start' in transcript_data[0] + assert 'end' in transcript_data[0] + + def test_automatic_transcript_reuse(self, setup_files): + """Test that existing transcript is automatically reused""" + + files = setup_files + + # First run - generate transcript + plugger1 = WhisperPlugger( + iFileSpec=files['input'], + oFileSpec=files['output'], + oAudioFileFormat='m4a', + iSwearsFileSpec=files['swears'], + mDir=DEFAULT_WHISPER_MODEL_DIR, + mName=DEFAULT_WHISPER_MODEL_NAME, + torchThreads=1, + outputJson=None, + inputTranscript=None, + saveTranscript=True, + forceRetranscribe=False, + dbug=True + ) + plugger1.CreateCleanMuteList() + first_wordlist = plugger1.wordList.copy() + + assert os.path.exists(files['transcript']) + + # Second run - should auto-detect and reuse transcript + start_time = time.time() + plugger2 = WhisperPlugger( + iFileSpec=files['input'], + oFileSpec=files['output'], + oAudioFileFormat='m4a', + iSwearsFileSpec=files['swears'], + mDir=DEFAULT_WHISPER_MODEL_DIR, + mName=DEFAULT_WHISPER_MODEL_NAME, + torchThreads=1, + outputJson=None, + inputTranscript=None, + saveTranscript=True, + forceRetranscribe=False, + dbug=True + ) + plugger2.CreateCleanMuteList() + reuse_time = time.time() - start_time + + assert plugger2.inputTranscript == files['transcript'] + assert len(plugger2.wordList) == len(first_wordlist) + + for i, word in enumerate(plugger2.wordList): + assert word['word'] == first_wordlist[i]['word'] + assert word['scrub'] == first_wordlist[i]['scrub'] + + print(f"\nReuse time savings: significantly faster ({reuse_time:.2f}s)") + + def test_force_retranscribe_flag(self, setup_files): + """Test that --force-retranscribe ignores existing transcript file""" + + files = setup_files + + # Create a garbage transcript file to simulate existing transcript + garbage_transcript = [ + {"word": "GARBAGE", "start": 0.0, "end": 1.0, "probability": 0.5, "scrub": False}, + {"word": "DATA", "start": 1.0, "end": 2.0, "probability": 0.5, "scrub": False}, + ] + with open(files['transcript'], 'w') as f: + json.dump(garbage_transcript, f) + + assert os.path.exists(files['transcript']) + + # Run with force flag - should ignore the garbage file and transcribe + plugger = WhisperPlugger( + iFileSpec=files['input'], + oFileSpec=files['output'], + oAudioFileFormat='m4a', + iSwearsFileSpec=files['swears'], + mDir=DEFAULT_WHISPER_MODEL_DIR, + mName=DEFAULT_WHISPER_MODEL_NAME, + torchThreads=1, + outputJson=None, + inputTranscript=None, + saveTranscript=True, + forceRetranscribe=True, + dbug=True + ) + plugger.CreateCleanMuteList() + + # Verify the garbage transcript was NOT used + # The wordList should have real transcription, not the garbage data + assert len(plugger.wordList) > 2 + assert plugger.wordList[0]['word'] != "GARBAGE" + assert plugger.wordList[1]['word'] != "DATA" + + with open(files['transcript'], 'r') as f: + new_transcript = json.load(f) + assert len(new_transcript) > 2 + assert new_transcript[0]['word'] != "GARBAGE" + + def test_explicit_transcript_reuse(self, setup_files): + """Test explicit transcript loading with --input-transcript""" + + files = setup_files + + known_transcript = [ + {"word": "test", "start": 0.0, "end": 0.5, "probability": 0.9, "scrub": False}, + {"word": "damn", "start": 0.5, "end": 1.0, "probability": 0.8, "scrub": False}, + {"word": "explicit", "start": 1.0, "end": 1.5, "probability": 0.95, "scrub": False}, + ] + with open(files['transcript'], 'w') as f: + json.dump(known_transcript, f) + + plugger = WhisperPlugger( + iFileSpec=files['input'], + oFileSpec=files['output'], + oAudioFileFormat='m4a', + iSwearsFileSpec=files['swears'], + mDir=DEFAULT_WHISPER_MODEL_DIR, + mName=DEFAULT_WHISPER_MODEL_NAME, + torchThreads=1, + outputJson=None, + inputTranscript=files['transcript'], + saveTranscript=False, + forceRetranscribe=False, + dbug=True + ) + plugger.CreateCleanMuteList() + + assert plugger.inputTranscript == files['transcript'] + assert len(plugger.wordList) == 3 + assert plugger.wordList[0]['word'] == "test" + assert plugger.wordList[1]['word'] == "damn" + assert plugger.wordList[1]['scrub'] == True # "damn" should be scrubbed + assert plugger.wordList[2]['word'] == "explicit" + assert plugger.wordList[2]['scrub'] == False + + def test_different_swear_lists_with_same_transcript(self, setup_files): + """Test that same transcript with different swear lists produces different scrub decisions""" + + files = setup_files + swears_file1 = os.path.join(files['dir'], 'swears1.txt') + swears_file2 = os.path.join(files['dir'], 'swears2.txt') + + with open(swears_file1, 'w') as f: + f.write("damn\nhell\ncrap") + + with open(swears_file2, 'w') as f: + f.write("damn") + + # First run - generate transcript with swear list 1 + plugger1 = WhisperPlugger( + iFileSpec=files['input'], + oFileSpec=files['output'], + oAudioFileFormat='m4a', + iSwearsFileSpec=swears_file1, + mDir=DEFAULT_WHISPER_MODEL_DIR, + mName=DEFAULT_WHISPER_MODEL_NAME, + torchThreads=1, + outputJson=files['transcript'], + inputTranscript=None, + saveTranscript=True, + forceRetranscribe=False, + dbug=True + ) + plugger1.CreateCleanMuteList() + scrub_count1 = sum(1 for word in plugger1.wordList if word['scrub']) + + # Second run - reuse transcript with swear list 2 (fewer swears) + output_file2 = files['output'].replace('.m4a', '_v2.m4a') + plugger2 = WhisperPlugger( + iFileSpec=files['input'], + oFileSpec=output_file2, + oAudioFileFormat='m4a', + iSwearsFileSpec=swears_file2, + mDir=DEFAULT_WHISPER_MODEL_DIR, + mName=DEFAULT_WHISPER_MODEL_NAME, + torchThreads=1, + outputJson=None, + inputTranscript=files['transcript'], + saveTranscript=False, + forceRetranscribe=False, + dbug=True + ) + plugger2.CreateCleanMuteList() + scrub_count2 = sum(1 for word in plugger2.wordList if word['scrub']) + + # Verify that different swear lists produce different scrub counts + # (assuming the audio has multiple swear words from list 1) + assert scrub_count1 >= scrub_count2 # More swears in list 1 = more scrubbing + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])