fsp-finder / fsp.py
dac202's picture
added more curse words
9e4f7db
import whisper_timestamped as whisper_t
import whisper
import torch
import os
import demucs.separate
import re
from pydub import AudioSegment
from mutagen.easyid3 import EasyID3
import lyricsgenius
import jiwer
import shutil
import tempfile
## Get a genius API key at https://genius.com/api-clients
## put your key in system environment at GENIUS_API_TOKEN or set it manually here
GENIUS_API_TOKEN = os.getenv("GENIUS_API_TOKEN")
genius = lyricsgenius.Genius(GENIUS_API_TOKEN, verbose=False, remove_section_headers=True)
#############################################################################
### just a heads up there's a bunch of curse words and racial slurs below ###
#############################################################################
# List of words to search for to be muted:
# The way this works currently is that we look for these words as **substrings** of each transcribed word
# this means that 'fuck' handles all versions 'fucking', 'motherfucker', 'fucked', etc.
# This method is a bit crude as it can lead to some false positive, ex. 'Dickens' would be censored.
# Consider using an LLM on the output for classification?
default_curse_words = {
'fuck', 'shit', 'piss', 'bitch', 'nigg', 'dyke', 'cock', 'faggot',
'cunt', 'tits', 'pussy', 'dick', 'asshole', 'whore', 'goddam',
'douche', 'chink', 'tranny', 'slut', 'jizz', 'kike', 'gook'
}
# Words for which the substring method will absolutely not work
singular_curse_words = {
'fag', 'cum', 'hell', 'spic', 'clit', 'wank', 'ass'
}
######################################################
# Helper functions required for the gradio interface #
######################################################
# Removes all punctuation and returns lower case only words
def remove_punctuation(s):
s = re.sub(r'[^a-zA-Z0-9\s]', '', s)
return s.lower()
# For silencing the audio tracks at the indicated times
def silence_audio_segment(input_audio_path, output_audio_path, times):
audio = AudioSegment.from_file(input_audio_path)
for (start_ms, end_ms) in times:
before_segment = audio[:start_ms]
target_segment = audio[start_ms:end_ms] - 60
after_segment = audio[end_ms:]
audio = before_segment + target_segment + after_segment
audio.export(output_audio_path, format='wav')
# For combining the vocals and instrument stems once the censoring has been applied
def combine_audio(path1, path2, outpath):
audio1 = AudioSegment.from_file(path1, format='wav')
audio2 = AudioSegment.from_file(path2, format='wav')
combined_audio = audio1.overlay(audio2)
combined_audio.export(outpath, format="mp3")
# Extracts metadata from the original song
def get_metadata(original_audio_path):
try:
audio_orig = EasyID3(original_audio_path)
metadata = {'title': audio_orig.get('title', [None])[0], 'artist': audio_orig.get('artist', [None])[0], 'album': audio_orig.get('album', [None])[0], 'year': audio_orig.get('date', [None])[0]}
except Exception:
metadata = {'title': 'N/A', 'artist': 'N/A', 'album': 'N/A', 'year': 'N/A'}
return metadata
# Transfers metadata between two songs
def transfer_metadata(original_audio_path, edited_audio_path):
try:
audio_orig = EasyID3(original_audio_path)
audio_edit = EasyID3(edited_audio_path)
for key in audio_orig.keys():
audio_edit[key] = audio_orig[key]
audio_edit.save()
except Exception as e:
print(f"Could not transfer metadata: {e}")
# Probably overcomplicated function to convert time in seconds to mm:ss format
def seconds_to_minutes(time):
mins = int(time // 60)
secs = int(time % 60)
if secs == 0:
return f'{mins}:00'
elif secs < 10:
return f'{mins}:0{secs}'
else:
return f"{mins}:{secs}"
# Lookup url on genius of lyrics for given song
def get_genius_url(artist, song_title):
if not artist or not song_title or artist == 'N/A' or song_title == 'N/A': return None
try:
song = genius.search_song(song_title, artist)
return song.url if song else None
except Exception: return None
# It's called calculate_wer but I'm actually using *mer*
def calculate_wer(ground_truth, hypothesis):
if not ground_truth or not hypothesis or "not available" in ground_truth.lower(): return None
try:
transformation = jiwer.Compose([jiwer.ToLowerCase(), jiwer.RemovePunctuation(), jiwer.RemoveMultipleSpaces(), jiwer.Strip(), jiwer.ExpandCommonEnglishContractions(), jiwer.RemoveEmptyStrings()])
error = jiwer.mer(transformation(ground_truth), transformation(hypothesis))
return f"{error:.3f}"
except Exception: return "Error"
# Gets the lyrics from genius for a given song
def get_genius_lyrics(artist, song_title):
if not artist or not song_title or artist == 'N/A' or song_title == 'N/A': return "Lyrics not available (missing metadata)."
try:
song = genius.search_song(song_title, artist)
return song.lyrics if song else "Could not find lyrics on Genius."
except Exception: return "An error occurred while searching for lyrics."
##########################################################
# STEP 1: Analyze Audio, Separate Tracks, and Transcribe #
##########################################################
# Obtain transcript from song using Whisper. Whisper_timestamps handles all the splitting of the segments
def analyze_audio(audio_path, model, device, fine_tuned=True, progress=None):
"""
Performs audio separation and transcription. Does NOT apply any edits.
Returns a state dictionary with paths to temp files and the transcript.
"""
if progress: progress(0, desc="Setting up temporary directory...")
run_temp_dir = tempfile.mkdtemp()
source_path = os.path.abspath(audio_path)
# This line is changed to use the standardized filename 'temp_audio.mp3'
temp_audio_path = os.path.join(run_temp_dir, 'temp_audio.mp3')
shutil.copy(source_path, temp_audio_path)
metadata = get_metadata(temp_audio_path)
metadata['genius_url'] = get_genius_url(metadata['artist'], metadata['title'])
metadata['genius_lyrics'] = get_genius_lyrics(metadata['artist'], metadata['title'])
if progress: progress(0.1, desc="Separating vocals with Demucs...")
demucs.separate.main(["--two-stems", "vocals", "-n", "mdx_extra", "-o", run_temp_dir, temp_audio_path])
demucs_out_name = os.path.splitext(os.path.basename(temp_audio_path))[0]
vocals_path = os.path.join(run_temp_dir, "mdx_extra", demucs_out_name, "vocals.wav")
no_vocals_path = os.path.join(run_temp_dir, "mdx_extra", demucs_out_name, "no_vocals.wav")
if progress: progress(0.6, desc="Transcribing with Whisper...")
if not fine_tuned:
result = model.transcribe(vocals_path, language='en', task='transcribe', word_timestamps=True)
word_key, prob_key = 'word', 'probability'
else:
audio = whisper_t.load_audio(vocals_path)
result = whisper_t.transcribe(model, audio, beam_size=5, best_of=5, temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0), language="en", task='transcribe')
word_key, prob_key = 'text', 'confidence'
full_transcript = []
initial_explicit_times = []
# Certain phrases can run two words, we need a previous word catcher
prev_word = ''
prev_start, prev_end = 0.0, 0.0
for segment in result["segments"]:
segment_words = []
for word_info in segment.get('words', []):
word_text = word_info.get(word_key, '').strip()
if not word_text: continue
cleaned_word = remove_punctuation(word_text)
is_explicit = any(curse in cleaned_word for curse in default_curse_words)
start_time = float(word_info['start'])
end_time = float(word_info['end'])
word_data = {'text': word_text, 'start': start_time, 'end': end_time, 'prob': word_info[prob_key]}
segment_words.append(word_data)
# Short words that can be substrings of nonsensitive words
if cleaned_word in singular_curse_words:
initial_explicit_times.append({'start': start_time, 'end': end_time})
# Handle two word cluster "god dam*", "mother fuck*".
# Other ones: jerk off, cock sucker, ... ?
elif ('dam' in cleaned_word and prev_word == 'god') or ('fuck' in cleaned_word and prev_word == 'mother') or (cleaned_word == 'off' and prev_word == 'jerk'):
initial_explicit_times.append({'start': prev_start, 'end': prev_end})
initial_explicit_times.append({'start': start_time, 'end': end_time})
# The majority of censored words will come from here
elif is_explicit:
initial_explicit_times.append({'start': start_time, 'end': end_time})
prev_word = cleaned_word
prev_start, prev_end = start_time, end_time
full_transcript.append({'line_words': segment_words, 'start': segment['start'], 'end': segment['end']})
transcript_text = " ".join([word['text'] for seg in full_transcript for word in seg['line_words']])
metadata['wer_score'] = calculate_wer(metadata['genius_lyrics'], transcript_text)
if device == 'cuda': torch.cuda.empty_cache()
return {
"temp_dir": run_temp_dir,
"vocals_path": vocals_path,
"no_vocals_path": no_vocals_path,
"original_audio_path_copy": temp_audio_path,
"original_filename": os.path.basename(source_path),
"transcript": full_transcript,
"initial_explicit_times": initial_explicit_times,
"metadata": metadata
}
##############################################
# STEP 2: Apply Censoring and Finalize Audio #
##############################################
# Applies the censoring at the indicated times
def apply_censoring(analysis_state, times_to_censor, progress=None):
"""
Takes the state from analyze_audio and a final list of timestamps,
applies silencing, and creates the final audio file in the temp directory.
"""
if not times_to_censor:
# If there's nothing to censor, we don't need to do anything.
# The temporary directory will be cleaned up by the app logic.
return None
if progress: progress(0, desc="Applying silence to vocal track...")
times_in_ms = [(int(t['start']*1000), int(t['end']*1000)) for t in times_to_censor]
silence_audio_segment(analysis_state['vocals_path'], analysis_state['vocals_path'], times_in_ms)
base_name = os.path.splitext(analysis_state['original_filename'])[0]
# MODIFIED: Save the output file to the existing temporary directory.
output_path = os.path.join(analysis_state['temp_dir'], f"{base_name}-edited.mp3")
if progress: progress(0.6, desc="Combining audio tracks...")
combine_audio(analysis_state['vocals_path'], analysis_state['no_vocals_path'], output_path)
transfer_metadata(analysis_state['original_audio_path_copy'], output_path)
# MODIFIED: The temporary directory is no longer removed here.
# Cleanup will be handled by the main application UI logic.
return output_path