litagin's picture
update
0cd70b9
raw
history blame
9.49 kB
import json
import os
import random
import warnings
import gradio as gr
import librosa
import numpy as np
from datasets import IterableDatasetDict, load_dataset
from gradio_client import Client
from loguru import logger
warnings.filterwarnings("ignore")
NUM_TAR_FILES = 115
NUM_SAMPLES = 3746131
HF_PATH_TO_DATASET = "litagin/Galgame_Speech_SER_16kHz"
hf_token = os.getenv("HF_TOKEN")
client = Client("litagin/ser_record", hf_token=hf_token)
id2label = {
0: "Angry",
1: "Disgusted",
2: "Embarrassed",
3: "Fearful",
4: "Happy",
5: "Sad",
6: "Surprised",
7: "Neutral",
8: "Sexual1",
9: "Sexual2",
}
id2rich_label = {
0: "😠 怒り (0)",
1: "😒 嫌悪 (1)",
2: "😳 恥ずかしさ・戸惑い (2)",
3: "😨 恐怖 (3)",
4: "😊 幸せ (4)",
5: "😢 悲しみ (5)",
6: "😲 驚き (6)",
7: "😐 中立 (7)",
8: "🥰 NSFW1 (8)",
9: "🍭 NSFW2 (9)",
}
current_item: dict | None = None
def _load_dataset(
*,
streaming: bool = True,
use_local_dataset: bool = False,
local_dataset_path: str | None = None,
data_dir: str = "data",
) -> IterableDatasetDict:
data_files = {
"train": [
f"galgame-speech-ser-16kHz-train-000{index:03d}.tar"
for index in range(0, NUM_TAR_FILES)
],
}
if use_local_dataset:
assert local_dataset_path is not None
path = local_dataset_path
else:
path = HF_PATH_TO_DATASET
dataset: IterableDatasetDict = load_dataset(
path=path, data_dir=data_dir, data_files=data_files, streaming=streaming
) # type: ignore
dataset = dataset.remove_columns(["__url__"])
dataset = dataset.rename_column("ogg", "audio")
return dataset
logger.info("Start loading dataset")
ds = _load_dataset(streaming=True, use_local_dataset=False)
logger.info("Dataset loaded")
seed = random.randint(0, 2**32 - 1)
logger.info(f"Seed: {seed}")
ds_iter = iter(ds["train"].shuffle(seed=seed))
# ds_iter = iter(ds["train"])
counter = 0
shortcut_js = """
<script>
function shortcuts(e) {
if (e.key === "a") {
document.getElementById("btn_skip").click();
} else if (e.key === "0") {
document.getElementById("btn_0").click();
} else if (e.key === "1") {
document.getElementById("btn_1").click();
} else if (e.key === "2") {
document.getElementById("btn_2").click();
} else if (e.key === "3") {
document.getElementById("btn_3").click();
} else if (e.key === "4") {
document.getElementById("btn_4").click();
} else if (e.key === "5") {
document.getElementById("btn_5").click();
} else if (e.key === "6") {
document.getElementById("btn_6").click();
} else if (e.key === "7") {
document.getElementById("btn_7").click();
} else if (e.key === "8") {
document.getElementById("btn_8").click();
} else if (e.key === "9") {
document.getElementById("btn_9").click();
}
}
document.addEventListener('keypress', shortcuts, false);
</script>
"""
def modify_speed(
data: tuple[int, np.ndarray], speed: float = 1.0
) -> tuple[int, np.ndarray]:
if speed == 1.0:
return data
sr, array = data
return sr, librosa.effects.time_stretch(array, rate=speed)
def parse_item(item) -> dict:
global counter
label_id = item["cls"]
sampling_rate = item["audio"]["sampling_rate"]
array = item["audio"]["array"]
return {
"key": item["__key__"],
"audio": (sampling_rate, array),
"text": item["txt"],
"label": id2rich_label[label_id],
"label_id": label_id,
"counter": counter,
}
def get_next_parsed_item() -> dict:
global counter, ds_iter
logger.info("Getting next item")
try:
next_item = next(ds_iter)
counter += 1
except StopIteration:
logger.info("StopIteration, re-initializing using new seed")
seed = random.randint(0, 2**32 - 1)
logger.info(f"New Seed: {seed}")
ds_iter = iter(ds["train"].shuffle(seed=seed))
next_item = next(ds_iter)
counter = 1
parsed = parse_item(next_item)
logger.info(
f"Next item:\nkey={parsed['key']}\ntext={parsed['text']}\nlabel={parsed['label']}"
)
return parsed
md = """
# 説明
- **性的な音声が含まれるため、18歳未満の方はご利用をお控えください**
- このアプリは [このゲームのセリフ音声データセット](https://huggingface.co/datasets/litagin/Galgame_Speech_SER_16kHz) の感情ラベルを修正して、大規模で高品質な感情音声データセットを作成するためのものです
- 「**何を言っているか**」ではなく「**どのように言っているか**」に注目して、感情ラベルを付与してください(例: 悲しそうに「とっても楽しいです…」と言っていたら、 `😊 幸せ` ではなく `😢 悲しみ` とする)
- 既存のラベルが適切であれば、そのまま「現在の感情ラベルで適切」ボタンを押してください(ショートカットキー: `A`)
- ラベルを修正する場合は、適切なボタンを押してください(ショートカットキー: `0` 〜 `9`)
# ラベル補足
- `🥰 NSFW1` は女性の性的行為中の音声(喘ぎ声等)
- `🍭 NSFW2` はキスシーンでのリップ音やフェラシーンでのしゃぶる音(チュパ音)が多く含まれている音声(セリフ+チュパ音の場合も含む)(フェラシーン中のセリフだと思われる場合はこれ)
- 感情が音声からは特に読み取れない場合(普通のテンションの声で「今日はラーメンを食べます」等)は `😐 中立` を選択してください
- 複数の感情が含まれている場合は、最も多く含まれている感情を選択してください
"""
with gr.Blocks(head=shortcut_js) as app:
gr.Markdown(md)
with gr.Row():
with gr.Column():
btn_init = gr.Button("初期化・再読み込み")
speed_slider = gr.Slider(
minimum=0.5, maximum=5.0, step=0.1, value=1.0, label="再生速度"
)
counter_info = gr.Textbox(label="進捗状況")
with gr.Column(variant="panel"):
key = gr.Textbox(label="Key")
audio = gr.Audio(
show_download_button=False,
show_share_button=False,
interactive=False,
)
text = gr.Textbox(label="Text")
label = gr.Textbox(label="感情ラベル")
label_id = gr.Textbox(visible=False)
btn_skip = gr.Button("現在の感情ラベルで適切 (A)", elem_id="btn_skip")
with gr.Column():
gr.Markdown("# 感情ラベルを修正する場合")
btn_list = [
gr.Button(id2rich_label[_id], elem_id=f"btn_{_id}") for _id in range(10)
]
def update_current_item(data: dict) -> dict:
global current_item
if current_item is None:
current_item = get_next_parsed_item()
modified_audio = modify_speed(current_item["audio"], speed=data[speed_slider])
counter_str = f"{current_item['counter']}/{NUM_SAMPLES}: {current_item['counter'] / NUM_SAMPLES * 100:.2f}%"
return {
key: current_item["key"],
audio: gr.Audio(modified_audio, autoplay=True),
text: current_item["text"],
label: current_item["label"],
label_id: current_item["label_id"],
counter_info: counter_str,
}
def set_next_item(data: dict) -> dict:
global current_item
current_item = get_next_parsed_item()
return update_current_item(data)
def put_unmodified(data: dict) -> dict:
logger.info("Putting unmodified")
current_key = data[key]
current_label_id = data[label_id]
_ = client.predict(
new_data=json.dumps(
{
"key": current_key,
"cls": int(current_label_id),
}
),
api_name="/put_data",
)
logger.info("Unmodified sent")
return set_next_item(data)
btn_init.click(
update_current_item,
inputs={speed_slider},
outputs=[key, audio, text, label, label_id, counter_info],
)
btn_skip.click(
put_unmodified,
inputs={key, label_id, speed_slider},
outputs=[key, audio, text, label, label_id, counter_info],
)
functions_list = []
for _id in range(10):
def put_label(data: dict, _id=_id) -> dict:
logger.info(f"Putting label: {id2rich_label[_id]}")
current_key = data[key]
_ = client.predict(
new_data=json.dumps(
{
"key": current_key,
"cls": _id,
}
),
api_name="/put_data",
)
logger.info("Modified sent")
return set_next_item(data)
functions_list.append(put_label)
for _id in range(10):
btn_list[_id].click(
functions_list[_id],
inputs={key, speed_slider},
outputs=[key, audio, text, label, label_id, counter_info],
)
app.launch()