Spaces:
Sleeping
Sleeping
import whisper_timestamped as whisper_t | |
import whisper | |
import torch | |
import os | |
import demucs.separate | |
import re | |
from pydub import AudioSegment | |
from mutagen.easyid3 import EasyID3 | |
import lyricsgenius | |
import jiwer | |
import shutil | |
import tempfile | |
## Get a genius API key at https://genius.com/api-clients | |
## put your key in system environment at GENIUS_API_TOKEN or set it manually here | |
GENIUS_API_TOKEN = os.getenv("GENIUS_API_TOKEN") | |
genius = lyricsgenius.Genius(GENIUS_API_TOKEN, verbose=False, remove_section_headers=True) | |
############################################################################# | |
### just a heads up there's a bunch of curse words and racial slurs below ### | |
############################################################################# | |
# List of words to search for to be muted: | |
# The way this works currently is that we look for these words as **substrings** of each transcribed word | |
# this means that 'fuck' handles all versions 'fucking', 'motherfucker', 'fucked', etc. | |
# This method is a bit crude as it can lead to some false positive, ex. 'Dickens' would be censored. | |
# Consider using an LLM on the output for classification? | |
default_curse_words = { | |
'fuck', 'shit', 'piss', 'bitch', 'nigg', 'dyke', 'cock', 'faggot', | |
'cunt', 'tits', 'pussy', 'dick', 'asshole', 'whore', 'goddam', | |
'douche', 'chink', 'tranny', 'slut', 'jizz', 'kike', 'gook' | |
} | |
# Words for which the substring method will absolutely not work | |
singular_curse_words = { | |
'fag', 'cum', 'hell', 'spic', 'clit', 'wank', 'ass' | |
} | |
###################################################### | |
# Helper functions required for the gradio interface # | |
###################################################### | |
# Removes all punctuation and returns lower case only words | |
def remove_punctuation(s): | |
s = re.sub(r'[^a-zA-Z0-9\s]', '', s) | |
return s.lower() | |
# For silencing the audio tracks at the indicated times | |
def silence_audio_segment(input_audio_path, output_audio_path, times): | |
audio = AudioSegment.from_file(input_audio_path) | |
for (start_ms, end_ms) in times: | |
before_segment = audio[:start_ms] | |
target_segment = audio[start_ms:end_ms] - 60 | |
after_segment = audio[end_ms:] | |
audio = before_segment + target_segment + after_segment | |
audio.export(output_audio_path, format='wav') | |
# For combining the vocals and instrument stems once the censoring has been applied | |
def combine_audio(path1, path2, outpath): | |
audio1 = AudioSegment.from_file(path1, format='wav') | |
audio2 = AudioSegment.from_file(path2, format='wav') | |
combined_audio = audio1.overlay(audio2) | |
combined_audio.export(outpath, format="mp3") | |
# Extracts metadata from the original song | |
def get_metadata(original_audio_path): | |
try: | |
audio_orig = EasyID3(original_audio_path) | |
metadata = {'title': audio_orig.get('title', [None])[0], 'artist': audio_orig.get('artist', [None])[0], 'album': audio_orig.get('album', [None])[0], 'year': audio_orig.get('date', [None])[0]} | |
except Exception: | |
metadata = {'title': 'N/A', 'artist': 'N/A', 'album': 'N/A', 'year': 'N/A'} | |
return metadata | |
# Transfers metadata between two songs | |
def transfer_metadata(original_audio_path, edited_audio_path): | |
try: | |
audio_orig = EasyID3(original_audio_path) | |
audio_edit = EasyID3(edited_audio_path) | |
for key in audio_orig.keys(): | |
audio_edit[key] = audio_orig[key] | |
audio_edit.save() | |
except Exception as e: | |
print(f"Could not transfer metadata: {e}") | |
# Probably overcomplicated function to convert time in seconds to mm:ss format | |
def seconds_to_minutes(time): | |
mins = int(time // 60) | |
secs = int(time % 60) | |
if secs == 0: | |
return f'{mins}:00' | |
elif secs < 10: | |
return f'{mins}:0{secs}' | |
else: | |
return f"{mins}:{secs}" | |
# Lookup url on genius of lyrics for given song | |
def get_genius_url(artist, song_title): | |
if not artist or not song_title or artist == 'N/A' or song_title == 'N/A': return None | |
try: | |
song = genius.search_song(song_title, artist) | |
return song.url if song else None | |
except Exception: return None | |
# It's called calculate_wer but I'm actually using *mer* | |
def calculate_wer(ground_truth, hypothesis): | |
if not ground_truth or not hypothesis or "not available" in ground_truth.lower(): return None | |
try: | |
transformation = jiwer.Compose([jiwer.ToLowerCase(), jiwer.RemovePunctuation(), jiwer.RemoveMultipleSpaces(), jiwer.Strip(), jiwer.ExpandCommonEnglishContractions(), jiwer.RemoveEmptyStrings()]) | |
error = jiwer.mer(transformation(ground_truth), transformation(hypothesis)) | |
return f"{error:.3f}" | |
except Exception: return "Error" | |
# Gets the lyrics from genius for a given song | |
def get_genius_lyrics(artist, song_title): | |
if not artist or not song_title or artist == 'N/A' or song_title == 'N/A': return "Lyrics not available (missing metadata)." | |
try: | |
song = genius.search_song(song_title, artist) | |
return song.lyrics if song else "Could not find lyrics on Genius." | |
except Exception: return "An error occurred while searching for lyrics." | |
########################################################## | |
# STEP 1: Analyze Audio, Separate Tracks, and Transcribe # | |
########################################################## | |
# Obtain transcript from song using Whisper. Whisper_timestamps handles all the splitting of the segments | |
def analyze_audio(audio_path, model, device, fine_tuned=True, progress=None): | |
""" | |
Performs audio separation and transcription. Does NOT apply any edits. | |
Returns a state dictionary with paths to temp files and the transcript. | |
""" | |
if progress: progress(0, desc="Setting up temporary directory...") | |
run_temp_dir = tempfile.mkdtemp() | |
source_path = os.path.abspath(audio_path) | |
# This line is changed to use the standardized filename 'temp_audio.mp3' | |
temp_audio_path = os.path.join(run_temp_dir, 'temp_audio.mp3') | |
shutil.copy(source_path, temp_audio_path) | |
metadata = get_metadata(temp_audio_path) | |
metadata['genius_url'] = get_genius_url(metadata['artist'], metadata['title']) | |
metadata['genius_lyrics'] = get_genius_lyrics(metadata['artist'], metadata['title']) | |
if progress: progress(0.1, desc="Separating vocals with Demucs...") | |
demucs.separate.main(["--two-stems", "vocals", "-n", "mdx_extra", "-o", run_temp_dir, temp_audio_path]) | |
demucs_out_name = os.path.splitext(os.path.basename(temp_audio_path))[0] | |
vocals_path = os.path.join(run_temp_dir, "mdx_extra", demucs_out_name, "vocals.wav") | |
no_vocals_path = os.path.join(run_temp_dir, "mdx_extra", demucs_out_name, "no_vocals.wav") | |
if progress: progress(0.6, desc="Transcribing with Whisper...") | |
if not fine_tuned: | |
result = model.transcribe(vocals_path, language='en', task='transcribe', word_timestamps=True) | |
word_key, prob_key = 'word', 'probability' | |
else: | |
audio = whisper_t.load_audio(vocals_path) | |
result = whisper_t.transcribe(model, audio, beam_size=5, best_of=5, temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0), language="en", task='transcribe') | |
word_key, prob_key = 'text', 'confidence' | |
full_transcript = [] | |
initial_explicit_times = [] | |
# Certain phrases can run two words, we need a previous word catcher | |
prev_word = '' | |
prev_start, prev_end = 0.0, 0.0 | |
for segment in result["segments"]: | |
segment_words = [] | |
for word_info in segment.get('words', []): | |
word_text = word_info.get(word_key, '').strip() | |
if not word_text: continue | |
cleaned_word = remove_punctuation(word_text) | |
is_explicit = any(curse in cleaned_word for curse in default_curse_words) | |
start_time = float(word_info['start']) | |
end_time = float(word_info['end']) | |
word_data = {'text': word_text, 'start': start_time, 'end': end_time, 'prob': word_info[prob_key]} | |
segment_words.append(word_data) | |
# Short words that can be substrings of nonsensitive words | |
if cleaned_word in singular_curse_words: | |
initial_explicit_times.append({'start': start_time, 'end': end_time}) | |
# Handle two word cluster "god dam*", "mother fuck*". | |
# Other ones: jerk off, cock sucker, ... ? | |
elif ('dam' in cleaned_word and prev_word == 'god') or ('fuck' in cleaned_word and prev_word == 'mother') or (cleaned_word == 'off' and prev_word == 'jerk'): | |
initial_explicit_times.append({'start': prev_start, 'end': prev_end}) | |
initial_explicit_times.append({'start': start_time, 'end': end_time}) | |
# The majority of censored words will come from here | |
elif is_explicit: | |
initial_explicit_times.append({'start': start_time, 'end': end_time}) | |
prev_word = cleaned_word | |
prev_start, prev_end = start_time, end_time | |
full_transcript.append({'line_words': segment_words, 'start': segment['start'], 'end': segment['end']}) | |
transcript_text = " ".join([word['text'] for seg in full_transcript for word in seg['line_words']]) | |
metadata['wer_score'] = calculate_wer(metadata['genius_lyrics'], transcript_text) | |
if device == 'cuda': torch.cuda.empty_cache() | |
return { | |
"temp_dir": run_temp_dir, | |
"vocals_path": vocals_path, | |
"no_vocals_path": no_vocals_path, | |
"original_audio_path_copy": temp_audio_path, | |
"original_filename": os.path.basename(source_path), | |
"transcript": full_transcript, | |
"initial_explicit_times": initial_explicit_times, | |
"metadata": metadata | |
} | |
############################################## | |
# STEP 2: Apply Censoring and Finalize Audio # | |
############################################## | |
# Applies the censoring at the indicated times | |
def apply_censoring(analysis_state, times_to_censor, progress=None): | |
""" | |
Takes the state from analyze_audio and a final list of timestamps, | |
applies silencing, and creates the final audio file in the temp directory. | |
""" | |
if not times_to_censor: | |
# If there's nothing to censor, we don't need to do anything. | |
# The temporary directory will be cleaned up by the app logic. | |
return None | |
if progress: progress(0, desc="Applying silence to vocal track...") | |
times_in_ms = [(int(t['start']*1000), int(t['end']*1000)) for t in times_to_censor] | |
silence_audio_segment(analysis_state['vocals_path'], analysis_state['vocals_path'], times_in_ms) | |
base_name = os.path.splitext(analysis_state['original_filename'])[0] | |
# MODIFIED: Save the output file to the existing temporary directory. | |
output_path = os.path.join(analysis_state['temp_dir'], f"{base_name}-edited.mp3") | |
if progress: progress(0.6, desc="Combining audio tracks...") | |
combine_audio(analysis_state['vocals_path'], analysis_state['no_vocals_path'], output_path) | |
transfer_metadata(analysis_state['original_audio_path_copy'], output_path) | |
# MODIFIED: The temporary directory is no longer removed here. | |
# Cleanup will be handled by the main application UI logic. | |
return output_path |