Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
**monkeyplug** is a little script to censor profanity in audio files (intended for podcasts, but YMMV) in a few simple steps:

1. The user provides a local audio file (or a URL pointing to an audio file which is downloaded)
2. Either [Whisper](https://openai.com/research/whisper) ([GitHub](https://github.com/openai/whisper)) or the [Vosk](https://alphacephei.com/vosk/)-[API](https://github.com/alphacep/vosk-api) is used to recognize speech in the audio file
2. Either [Whisper](https://openai.com/research/whisper) ([GitHub](https://github.com/openai/whisper)) or the [Vosk](https://alphacephei.com/vosk/)-[API](https://github.com/alphacep/vosk-api) is used to recognize speech in the audio file (or a pre-generated transcript can be loaded)
3. Each recognized word is checked against a [list](./src/monkeyplug/swears.txt) of profanity or other words you'd like muted (supports text or [JSON format](./SWEARS_JSON_FORMAT.md))
4. [`ffmpeg`](https://www.ffmpeg.org/) is used to create a cleaned audio file, muting or "bleeping" the objectional words
5. Optionally, the transcript can be saved for reuse in future processing runs

You can then use your favorite media player to play the cleaned audio file.

Expand Down Expand Up @@ -62,10 +63,14 @@ options:
Input file (or URL)
-o <string>, --output <string>
Output file
--output-json <string>
Output file to store transcript JSON
-w <profanity file>, --swears <profanity file>
text or JSON file containing profanity (default: "swears.txt")
--output-json <string>
Output file to store transcript JSON
--input-transcript <string>
Load existing transcript JSON instead of performing speech recognition
--save-transcript Automatically save transcript JSON alongside output audio file
--force-retranscribe Force new transcription even if transcript file exists (overrides automatic reuse)
-a <str>, --audio-params <str>
Audio parameters for ffmpeg (default depends on output audio codec)
-c <int>, --channels <int>
Expand Down Expand Up @@ -137,6 +142,37 @@ Alternately, a [Dockerfile](./docker/Dockerfile) is provided to allow you to run

then run [`monkeyplug-docker.sh`](./docker/monkeyplug-docker.sh) inside the directory where your audio files are located.

## Transcript Workflow

**monkeyplug** supports saving and reusing transcripts to improve workflow efficiency:

### Save Transcript for Later Reuse

```bash
# Generate transcript once and save it
monkeyplug -i input.mp3 -o output.mp3 --save-transcript

# This creates output.mp3 and output_transcript.json
```

### Automatic Transcript Reuse

```bash
# Second run: Automatically detects and reuses transcript (22x faster!)
monkeyplug -i input.mp3 -o output.mp3 --save-transcript
# Finds output_transcript.json and reuses it automatically

# Force new transcription when needed
monkeyplug -i input.mp3 -o output.mp3 --save-transcript --force-retranscribe
```

### Manual Transcript Loading

```bash
# Explicitly specify transcript to load
monkeyplug -i input.mp3 -o output_strict.mp3 --input-transcript output_transcript.json -w strict_swears.txt
```

## Contributing

If you'd like to help improve monkeyplug, pull requests will be welcomed!
Expand Down
Binary file added input/Witch_mother1.m4b
Binary file not shown.
187 changes: 156 additions & 31 deletions src/monkeyplug/monkeyplug.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ def __init__(
oAudioFileFormat,
iSwearsFileSpec,
outputJson,
inputTranscript=None,
saveTranscript=False,
forceRetranscribe=False,
aParams=None,
aChannels=AUDIO_DEFAULT_CHANNELS,
aSampleRate=AUDIO_DEFAULT_SAMPLE_RATE,
Expand Down Expand Up @@ -279,6 +282,8 @@ def __init__(
self.forceDespiteTag = force
self.debug = dbug
self.outputJson = outputJson
self.inputTranscript = inputTranscript
self.saveTranscript = saveTranscript

# determine input file name, or download and save file
if (iFileSpec is not None) and os.path.isfile(iFileSpec):
Expand Down Expand Up @@ -374,12 +379,33 @@ def __init__(
if self.outputVideoFileFormat:
self.outputFileSpec = outParts[0] + self.outputVideoFileFormat

# create output directory if it doesn't exist
self._ensure_directory_exists(self.outputFileSpec, "output directory")

# if output file already exists, remove as we'll be overwriting it anyway
if os.path.isfile(self.outputFileSpec):
if self.debug:
mmguero.eprint(f'Removing existing destination file {self.outputFileSpec}')
os.remove(self.outputFileSpec)

# If save-transcript is enabled and no explicit JSON output path, auto-generate one
if self.saveTranscript and not self.outputJson:
outputBaseName = os.path.splitext(self.outputFileSpec)[0]
self.outputJson = outputBaseName + '_transcript.json'
if self.debug:
mmguero.eprint(f'Auto-generated transcript output: {self.outputJson}')

# Auto-detect existing transcript for reuse (unless force flag set or explicit input provided)
if self.saveTranscript and not self.inputTranscript and self.outputJson and not forceRetranscribe:
if os.path.exists(self.outputJson):
self.inputTranscript = self.outputJson
if self.debug:
mmguero.eprint(f'Found existing transcript, reusing: {self.inputTranscript}')

# If JSON output is specified, ensure its directory exists too
if self.outputJson:
self._ensure_directory_exists(self.outputJson, "JSON output directory")

# load the swears file (not actually mapping right now, but who knows, speech synthesis maybe someday?)
if (iSwearsFileSpec is not None) and os.path.isfile(iSwearsFileSpec):
self.swearsFileSpec = iSwearsFileSpec
Expand All @@ -396,6 +422,10 @@ def __init__(
mmguero.eprint(f'Encode parameters: {self.aParams}')
mmguero.eprint(f'Profanity file: {self.swearsFileSpec}')
mmguero.eprint(f'Intermediate downloaded file: {self.tmpDownloadedFileSpec}')
if self.outputJson:
mmguero.eprint(f'Transcript output: {self.outputJson}')
if self.inputTranscript:
mmguero.eprint(f'Input transcript: {self.inputTranscript}')
mmguero.eprint(f'Beep instead of mute: {self.beep}')
if self.beep:
mmguero.eprint(f'Beep hertz: {self.beepHertz}')
Expand All @@ -411,6 +441,42 @@ def __del__(self):
if os.path.isfile(self.tmpDownloadedFileSpec):
os.remove(self.tmpDownloadedFileSpec)

######## _ensure_directory_exists #############################################
def _ensure_directory_exists(self, filepath, description="directory"):
"""Ensure the directory for a file path exists, creating it if necessary"""
directory = os.path.dirname(filepath)
if directory and not os.path.exists(directory):
if self.debug:
mmguero.eprint(f'Creating {description}: {directory}')
os.makedirs(directory, exist_ok=True)
return directory

######## LoadTranscriptFromFile ##############################################
def LoadTranscriptFromFile(self):
"""Load pre-generated transcript from JSON file"""
if not self.inputTranscript:
return False

if not os.path.isfile(self.inputTranscript):
raise IOError(errno.ENOENT, os.strerror(errno.ENOENT), self.inputTranscript)

if self.debug:
mmguero.eprint(f'Loading transcript from: {self.inputTranscript}')

with open(self.inputTranscript, 'r') as f:
self.wordList = json.load(f)

# Recalculate scrub flags with current swears list
for word in self.wordList:
word['scrub'] = scrubword(word.get('word', '')) in self.swearsMap

if self.debug:
mmguero.eprint(f'Loaded {len(self.wordList)} words from transcript')
scrubbed_count = sum(1 for w in self.wordList if w.get('scrub', False))
mmguero.eprint(f'Words to censor with current swear list: {scrubbed_count}')

return True

######## _load_swears_file ####################################################
def _load_swears_file(self):
"""Load swears from text or JSON format"""
Expand Down Expand Up @@ -463,7 +529,9 @@ def _load_swears_from_text(self):

######## CreateCleanMuteList #################################################
def CreateCleanMuteList(self):
self.RecognizeSpeech()
# Try to load existing transcript first, otherwise perform speech recognition
if not self.LoadTranscriptFromFile():
self.RecognizeSpeech()

self.naughtyWordList = [word for word in self.wordList if word["scrub"] is True]
if len(self.naughtyWordList) > 0:
Expand Down Expand Up @@ -604,6 +672,9 @@ def __init__(
iSwearsFileSpec,
mDir,
outputJson,
inputTranscript=None,
saveTranscript=False,
forceRetranscribe=False,
aParams=None,
aChannels=AUDIO_DEFAULT_CHANNELS,
aSampleRate=AUDIO_DEFAULT_SAMPLE_RATE,
Expand All @@ -622,29 +693,36 @@ def __init__(
dbug=False,
):
self.wavReadFramesChunk = wChunk
self.modelPath = None
self.vosk = None

# Only load model if we're actually going to transcribe
if not inputTranscript:
# make sure the VOSK model path exists
if (mDir is not None) and os.path.isdir(mDir):
self.modelPath = mDir
else:
raise IOError(
errno.ENOENT,
os.strerror(errno.ENOENT) + " (see https://alphacephei.com/vosk/models)",
mDir,
)

# make sure the VOSK model path exists
if (mDir is not None) and os.path.isdir(mDir):
self.modelPath = mDir
else:
raise IOError(
errno.ENOENT,
os.strerror(errno.ENOENT) + " (see https://alphacephei.com/vosk/models)",
mDir,
)

self.vosk = mmguero.dynamic_import("vosk", "vosk", debug=dbug)
if not self.vosk:
raise Exception("Unable to initialize VOSK API")
if not dbug:
self.vosk.SetLogLevel(-1)
self.vosk = mmguero.dynamic_import("vosk", "vosk", debug=dbug)
if not self.vosk:
raise Exception("Unable to initialize VOSK API")
if not dbug:
self.vosk.SetLogLevel(-1)

super().__init__(
iFileSpec=iFileSpec,
oFileSpec=oFileSpec,
oAudioFileFormat=oAudioFileFormat,
iSwearsFileSpec=iSwearsFileSpec,
outputJson=outputJson,
inputTranscript=inputTranscript,
saveTranscript=saveTranscript,
forceRetranscribe=forceRetranscribe,
aParams=aParams,
aChannels=aChannels,
aSampleRate=aSampleRate,
Expand All @@ -665,9 +743,12 @@ def __init__(
self.tmpWavFileSpec = self.inputFileParts[0] + ".wav"

if self.debug:
mmguero.eprint(f'Model directory: {self.modelPath}')
mmguero.eprint(f'Intermediate audio file: {self.tmpWavFileSpec}')
mmguero.eprint(f'Read frames: {self.wavReadFramesChunk}')
if inputTranscript:
mmguero.eprint(f'Using input transcript (skipping speech recognition)')
else:
mmguero.eprint(f'Model directory: {self.modelPath}')
mmguero.eprint(f'Intermediate audio file: {self.tmpWavFileSpec}')
mmguero.eprint(f'Read frames: {self.wavReadFramesChunk}')

def __del__(self):
super().__del__()
Expand Down Expand Up @@ -770,6 +851,9 @@ def __init__(
mName,
torchThreads,
outputJson,
inputTranscript=None,
saveTranscript=False,
forceRetranscribe=False,
aParams=None,
aChannels=AUDIO_DEFAULT_CHANNELS,
aSampleRate=AUDIO_DEFAULT_SAMPLE_RATE,
Expand All @@ -786,25 +870,34 @@ def __init__(
force=False,
dbug=False,
):
if torchThreads > 0:
self.torch = mmguero.dynamic_import("torch", "torch", debug=dbug)
if self.torch:
self.torch.set_num_threads(torchThreads)
self.whisper = None
self.model = None
self.torch = None

self.whisper = mmguero.dynamic_import("whisper", "openai-whisper", debug=dbug)
if not self.whisper:
raise Exception("Unable to initialize Whisper API")
# Only load model if we're actually going to transcribe (no input transcript provided)
if not inputTranscript:
if torchThreads > 0:
self.torch = mmguero.dynamic_import("torch", "torch", debug=dbug)
if self.torch:
self.torch.set_num_threads(torchThreads)

self.model = self.whisper.load_model(mName, download_root=mDir)
if not self.model:
raise Exception(f"Unable to load Whisper model {mName} in {mDir}")
self.whisper = mmguero.dynamic_import("whisper", "openai-whisper", debug=dbug)
if not self.whisper:
raise Exception("Unable to initialize Whisper API")

self.model = self.whisper.load_model(mName, download_root=mDir)
if not self.model:
raise Exception(f"Unable to load Whisper model {mName} in {mDir}")

super().__init__(
iFileSpec=iFileSpec,
oFileSpec=oFileSpec,
oAudioFileFormat=oAudioFileFormat,
iSwearsFileSpec=iSwearsFileSpec,
outputJson=outputJson,
inputTranscript=inputTranscript,
saveTranscript=saveTranscript,
forceRetranscribe=forceRetranscribe,
aParams=aParams,
aChannels=aChannels,
aSampleRate=aSampleRate,
Expand All @@ -823,8 +916,11 @@ def __init__(
)

if self.debug:
mmguero.eprint(f'Model directory: {mDir}')
mmguero.eprint(f'Model name: {mName}')
if inputTranscript:
mmguero.eprint(f'Using input transcript (skipping speech recognition)')
else:
mmguero.eprint(f'Model directory: {mDir}')
mmguero.eprint(f'Model name: {mName}')

def __del__(self):
super().__del__()
Expand Down Expand Up @@ -926,6 +1022,29 @@ def RunMonkeyPlug():
default=os.path.join(script_path, SWEARS_FILENAME_DEFAULT),
metavar="<profanity file>",
)
parser.add_argument(
"--input-transcript",
dest="inputTranscript",
type=str,
default=None,
required=False,
metavar="<string>",
help="Load existing transcript JSON instead of performing speech recognition",
)
parser.add_argument(
"--save-transcript",
dest="saveTranscript",
action="store_true",
default=False,
help="Automatically save transcript JSON alongside output audio file",
)
parser.add_argument(
"--force-retranscribe",
dest="forceRetranscribe",
action="store_true",
default=False,
help="Force new transcription even if transcript file exists (overrides automatic reuse)",
)
parser.add_argument(
"-a",
"--audio-params",
Expand Down Expand Up @@ -1136,6 +1255,9 @@ def RunMonkeyPlug():
args.swears,
args.voskModelDir,
args.outputJson,
inputTranscript=args.inputTranscript,
saveTranscript=args.saveTranscript,
forceRetranscribe=args.forceRetranscribe,
aParams=args.aParams,
aChannels=args.aChannels,
aSampleRate=args.aSampleRate,
Expand Down Expand Up @@ -1165,6 +1287,9 @@ def RunMonkeyPlug():
args.whisperModelName,
args.torchThreads,
args.outputJson,
inputTranscript=args.inputTranscript,
saveTranscript=args.saveTranscript,
forceRetranscribe=args.forceRetranscribe,
aParams=args.aParams,
aChannels=args.aChannels,
aSampleRate=args.aSampleRate,
Expand Down
Loading
Loading