Spaces:
Runtime error
Runtime error
File size: 4,486 Bytes
32ca76b edb49b9 32ca76b d08f2ce 32ca76b edb49b9 32ca76b edb49b9 32ca76b edb49b9 32ca76b 3f2e082 edb49b9 32ca76b edb49b9 32ca76b edb49b9 32ca76b edb49b9 32ca76b edb49b9 32ca76b edb49b9 32ca76b 3f2e082 32ca76b edb49b9 32ca76b edb49b9 32ca76b 3f2e082 32ca76b edb49b9 32ca76b edb49b9 32ca76b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
import pdb
import time
import wavmark
import streamlit as st
import os
import torch
import uuid
import datetime
import numpy as np
import soundfile
from huggingface_hub import hf_hub_download, HfApi
from wavmark.utils import file_reader
# Function to add watermark to audio
def add_watermark(audio_path, watermark_text):
assert len(watermark_text) == 16
watermark_npy = np.array([int(i) for i in watermark_text])
# todo: 控制时间
signal, sr, audio_length_second = file_reader.read_as_single_channel_16k(audio_path, 16000)
watermarked_signal, _ = wavmark.encode_watermark(model, signal, watermark_npy, show_progress=False)
tmp_file_name = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + "_" + str(uuid.uuid4()) + ".wav"
tmp_file_path = '/tmp/' + tmp_file_name
soundfile.write(tmp_file_path, watermarked_signal, sr)
return tmp_file_path
# Function to decode watermark from audio
def decode_watermark(audio_path):
watermarked_signal, sr, audio_length_second = file_reader.read_as_single_channel_16k(audio_path, 16000)
payload_decoded, _ = wavmark.decode_watermark(model, watermarked_signal, show_progress=False)
if payload_decoded is None:
return "No Watermark"
return payload_decoded
# Main web app
def main():
max_upload_size = 20 * 1024 * 1024 # 20 MB in bytes
if "def_value" not in st.session_state:
def_val_npy = np.random.choice([0, 1], size=32 - len_start_bit)
def_val_str = "".join([str(i) for i in def_val_npy])
st.session_state.def_value = def_val_str
st.title("Neural Audio Watermark")
st.write("Choose the action you want to perform:")
action = st.selectbox("Select Action", ["Add Watermark", "Decode Watermark"])
if action == "Add Watermark":
audio_file = st.file_uploader("Upload Audio File (WAV)", type=["wav"], accept_multiple_files=False,
max_upload_size=max_upload_size)
if audio_file:
tmp_input_audio_file = os.path.join("/tmp/", audio_file.name)
with open(tmp_input_audio_file, "wb") as f:
f.write(audio_file.getbuffer())
st.audio(tmp_input_audio_file, format="audio/wav")
watermark_text = st.text_input("Enter Watermark", value=st.session_state.def_value)
add_watermark_button = st.button("Add Watermark", key="add_watermark_btn")
if add_watermark_button: # 点击按钮后执行的
if audio_file and watermark_text:
with st.spinner("Adding Watermark..."):
# add_watermark_button.empty()
# st.button("Add Watermark", disabled=True)
# st.button("Add Watermark", disabled=True, key="add_watermark_btn_disabled")
t1 = time.time()
watermarked_audio = add_watermark(tmp_input_audio_file, watermark_text)
encode_time_cost = time.time() - t1
st.write("Watermarked Audio:")
st.audio(watermarked_audio, format="audio/wav")
st.write("Time Cost:%d seconds" % encode_time_cost)
# st.button("Add Watermark", disabled=False)
elif action == "Decode Watermark":
audio_file = st.file_uploader("Upload Audio File (WAV/MP3)", type=["wav", "mp3"], accept_multiple_files=False,
max_upload_size=max_upload_size)
if audio_file:
if st.button("Decode Watermark"):
# 1.保存
tmp_file_for_decode_path = os.path.join("/tmp/", audio_file.name)
with open(tmp_file_for_decode_path, "wb") as f:
f.write(audio_file.getbuffer())
# 2.执行
with st.spinner("Decoding..."):
t1 = time.time()
decoded_watermark = decode_watermark(tmp_file_for_decode_path)
decode_cost = time.time() - t1
print("decoded_watermark", decoded_watermark)
# Display the decoded watermark
st.write("Decoded Watermark:", decoded_watermark)
st.write("Time Cost:%d seconds" % (decode_cost))
if __name__ == "__main__":
len_start_bit = 16
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = wavmark.load_model().to(device)
main()
|