{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "f808869b-b638-42a9-af0f-06ce8d6428c3",
"metadata": {},
"source": [
"# Text-to-speech generation using Bark and OpenVINO\n",
"\n",
"πΆ Bark is a transformer-based text-to-audio model created by [Suno](https://suno.ai). Bark can generate highly realistic, multilingual speech as well as other audio - including music, background noise and simple sound effects. The model can also produce nonverbal communications like laughing, sighing and crying. \n",
"\n",
"With Bark, users can also produce nonverbal communications like laughing, sighing, and crying, making it a versatile tool for a variety of applications.\n",
"\n",
"![image.png](https://user-images.githubusercontent.com/5068315/235310676-a4b3b511-90ec-4edf-8153-7ccf14905d73.png)\n",
"\n",
"Bark is a cutting-edge text-to-speech (TTS) technology that has taken the AI world by storm. Unlike the typical TTS engines that sound robotic and mechanic, Bark offers human-like voices that are highly realistic and natural sounding. Bark uses GPT-style models to generate speech with minimal tweaking, producing highly expressive and emotive voices that can capture nuances such as tone, pitch, and rhythm. It offers a fantastic experience that can leave you wondering if youβre listening to human beings.\n",
"\n",
"Notably, Bark supports multiple languages and can generate speech in Mandarin, French, Italian, Spanish, and other languages with impressive clarity and accuracy. With Bark, you can easily switch between languages and still enjoy high-quality sound effects.\n",
"\n",
"Bark is not only intelligent but also intuitive, making it an ideal tool for individuals and businesses looking to create high-quality voice content for their platforms. Whether youβre looking to create podcasts, audiobooks, video game sounds, or any other form of voice content, Bark has you covered.\n",
"\n",
"So, if youβre looking for a revolutionary text-to-speech technology that can elevate your voice content, Bark is the way to go!\n",
"In this tutorial we consider how to convert and run bark with OpenVINO.\n",
"\n",
"## About model\n",
"\n",
"Bark uses GPT-style models to generate audio from scratch, but the initial text prompt is embedded into high-level semantic tokens without the use of phonemes. This allows Bark to generalize to arbitrary instructions beyond speech that occur in the training data, such as music lyrics, sound effects, or other non-speech sounds.\n",
"\n",
"A subsequent second model is used to convert the generated semantic tokens into audio codec tokens to generate the full waveform. To enable the community to use Bark via public code, EnCodec codec from Facebook is used to act as an audio representation.\n",
"\n",
"\n",
"#### Table of contents:\n",
"\n",
"- [Prerequisites](#Prerequisites)\n",
"- [Download and Convert models](#Download-and-Convert-models)\n",
" - [Text Encoder](#Text-Encoder)\n",
" - [Coarse encoder](#Coarse-encoder)\n",
" - [Fine encoder](#Fine-encoder)\n",
" - [Prepare Inference pipeline](#Prepare-Inference-pipeline)\n",
"- [Run model inference](#Run-model-inference)\n",
" - [Select Inference device](#Select-Inference-device)\n",
"- [Interactive demo](#Interactive-demo)\n",
"\n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d8efc0ab-32aa-46d1-82b9-874b9679b14a",
"metadata": {},
"source": [
"## Prerequisites\n",
"[back to top β¬οΈ](#Table-of-contents:)\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "92a997f3-7ffd-4f9a-9756-0e3bf9d2e81c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Note: you may need to restart the kernel to use updated packages.\n",
"\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.3.2\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m24.0\u001b[0m\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n",
"Note: you may need to restart the kernel to use updated packages.\n",
"\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.3.2\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m24.0\u001b[0m\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n",
"Note: you may need to restart the kernel to use updated packages.\n"
]
}
],
"source": [
"%pip install -q \"torch\" \"torchvision\" \"torchaudio\" --extra-index-url https://download.pytorch.org/whl/cpu\n",
"%pip install -q \"openvino>=2023.1.0\" \"gradio>=4.19\"\n",
"%pip install -q \"git+https://github.com/suno-ai/bark.git\" --extra-index-url https://download.pytorch.org/whl/cpu"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "79cd7579-191a-4682-8525-98d6656cb267",
"metadata": {},
"source": [
"## Download and Convert models\n",
"[back to top β¬οΈ](#Table-of-contents:)\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "ea2ab3fe-fb03-493b-9c02-c74e6246d149",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"from bark.generation import load_model, codec_decode, _flatten_codebooks\n",
"\n",
"models_dir = Path(\"models\")\n",
"models_dir.mkdir(exist_ok=True)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "6cb4431b-b1d3-4181-9152-9b018c1ada0c",
"metadata": {},
"source": [
"### Text Encoder\n",
"[back to top β¬οΈ](#Table-of-contents:)\n",
"\n",
"Text encoder is responsible for embedding initial text prompt into high-level semantic tokens. it uses tokenizer for conversion input text to token ids and predicts semantic text tokens that capture the meaning of the text. There are some differences between text encoder behavior on first step and others. It is the reason why we need to use separated models for that."
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "eef45ebb-e3cc-4886-a6db-3538bee039e7",
"metadata": {},
"outputs": [],
"source": [
"text_use_small = True\n",
"\n",
"text_encoder = load_model(model_type=\"text\", use_gpu=False, use_small=text_use_small, force_reload=False)\n",
"\n",
"text_encoder_model = text_encoder[\"model\"]\n",
"tokenizer = text_encoder[\"tokenizer\"]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "61ef187e-6431-4c15-a39b-bdd5386d0450",
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"import openvino as ov\n",
"\n",
"text_model_suffix = \"_small\" if text_use_small else \"\"\n",
"text_model_dir = models_dir / f\"text_encoder{text_model_suffix}\"\n",
"text_model_dir.mkdir(exist_ok=True)\n",
"text_encoder_path1 = text_model_dir / \"bark_text_encoder_1.xml\"\n",
"text_encoder_path0 = text_model_dir / \"bark_text_encoder_0.xml\""
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "a04bfa97-4b76-4bf7-a2b6-2a2ea3d60df3",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:176: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!\n",
" assert(idx.shape[1] >= 256+256+1)\n",
"/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:199: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!\n",
" assert position_ids.shape == (1, t)\n",
"/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/bark/model.py:172: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!\n",
" assert t == 1\n",
"/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/torch/jit/_trace.py:160: UserWarning: The .grad attribute of a Tensor that is not a leaf Tensor is being accessed. Its .grad attribute won't be populated during autograd.backward(). If you indeed want the .grad field to be populated for a non-leaf Tensor, use .retain_grad() on the non-leaf Tensor. If you access the non-leaf Tensor by mistake, make sure you access the leaf Tensor instead. See github.com/pytorch/pytorch/pull/30531 for more informations. (Triggered internally at aten/src/ATen/core/TensorBody.h:489.)\n",
" if a.grad is not None:\n"
]
}
],
"source": [
"class TextEncoderModel(torch.nn.Module):\n",
" def __init__(self, encoder):\n",
" super().__init__()\n",
" self.encoder = encoder\n",
"\n",
" def forward(self, idx, past_kv=None):\n",
" return self.encoder(idx, merge_context=True, past_kv=past_kv, use_cache=True)\n",
"\n",
"\n",
"if not text_encoder_path0.exists() or not text_encoder_path1.exists():\n",
" text_encoder_exportable = TextEncoderModel(text_encoder_model)\n",
" ov_model = ov.convert_model(text_encoder_exportable, example_input=torch.ones((1, 513), dtype=torch.int64))\n",
" ov.save_model(ov_model, text_encoder_path0)\n",
" logits, kv_cache = text_encoder_exportable(torch.ones((1, 513), dtype=torch.int64))\n",
" ov_model = ov.convert_model(\n",
" text_encoder_exportable,\n",
" example_input=(torch.ones((1, 1), dtype=torch.int64), kv_cache),\n",
" )\n",
" ov.save_model(ov_model, text_encoder_path1)\n",
" del ov_model\n",
" del text_encoder_exportable\n",
"del text_encoder_model, text_encoder"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "2cec4173",
"metadata": {},
"source": [
"### Coarse encoder\n",
"[back to top β¬οΈ](#Table-of-contents:)\n",
"\n",
"Coarse encoder is a causal autoregressive transformer, that takes as input the results of the text encoder model. It aims at predicting the first two audio codebooks necessary for EnCodec. Coarse encoder is autoregressive model, it means that for making prediction on next step, it uses own output from previous step. For reducing model complexity and optimization, caching key and values for attention blocks can be used. past_key_values contains set of precomputed attention keys and values for each attention module in the model from previous step as they will be not changed from step to step and allow us calculate only update for the current step and join to previous. For avoiding to have separated model for first inference, where model does not have \"past\", we will provide empty tensor on the first step."
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "9b33ed74-725a-45ce-a48d-42cf807d70f1",
"metadata": {},
"outputs": [],
"source": [
"coarse_use_small = True\n",
"\n",
"coarse_model = load_model(\n",
" model_type=\"coarse\",\n",
" use_gpu=False,\n",
" use_small=coarse_use_small,\n",
" force_reload=False,\n",
")\n",
"\n",
"coarse_model_suffix = \"_small\" if coarse_use_small else \"\"\n",
"coarse_model_dir = models_dir / f\"coarse{coarse_model_suffix}\"\n",
"coarse_model_dir.mkdir(exist_ok=True)\n",
"coarse_encoder_path = coarse_model_dir / \"bark_coarse_encoder.xml\""
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "d6aef483-3f7e-45b6-b98a-3cff802a27e0",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/tmp/ipykernel_1046533/1432960018.py:47: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect. We can't record the data flow of Python values, so this value will be treated as a constant in the future. This means that the trace might not generalize to other inputs!\n",
" if past_len > 0:\n"
]
}
],
"source": [
"import types\n",
"\n",
"\n",
"class CoarseEncoderModel(torch.nn.Module):\n",
" def __init__(self, encoder):\n",
" super().__init__()\n",
" self.encoder = encoder\n",
"\n",
" def forward(self, idx, past_kv=None):\n",
" return self.encoder(idx, past_kv=past_kv, use_cache=True)\n",
"\n",
"\n",
"def casual_self_attention_forward(self, x, past_kv=None, use_cache=False):\n",
" B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)\n",
"\n",
" # calculate query, key, values for all heads in batch and move head forward to be the batch dim\n",
" q, k, v = self.c_attn(x).split(self.n_embd, dim=2)\n",
" k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)\n",
" q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)\n",
" v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)\n",
" past_len = 0\n",
"\n",
" if past_kv is not None:\n",
" past_key = past_kv[0]\n",
" past_value = past_kv[1]\n",
" k = torch.cat((past_key, k), dim=-2)\n",
" v = torch.cat((past_value, v), dim=-2)\n",
" past_len = past_key.shape[-2]\n",
"\n",
" FULL_T = k.shape[-2]\n",
"\n",
" if use_cache is True:\n",
" present = (k, v)\n",
" else:\n",
" present = None\n",
"\n",
" # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)\n",
" if self.flash:\n",
" # efficient attention using Flash Attention\n",
" full_attention_mask = torch.ones(\n",
" B,\n",
" T,\n",
" T,\n",
" device=x.device,\n",
" dtype=torch.float,\n",
" ) * float(\"-inf\")\n",
" full_attention_mask.triu_(diagonal=1)\n",
" if past_len > 0:\n",
" full_attention_mask = torch.cat(\n",
" (\n",
" torch.zeros(B, T, past_len, device=x.device),\n",
" full_attention_mask,\n",
" ),\n",
" dim=-1,\n",
" )\n",
"\n",
" y = torch.nn.functional.scaled_dot_product_attention(q, k, v, dropout_p=self.dropout, attn_mask=full_attention_mask)\n",
" else:\n",
" # manual implementation of attention\n",
" att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))\n",
" att = att.masked_fill(self.bias[:, :, FULL_T - T : FULL_T, :FULL_T] == 0, float(\"-inf\"))\n",
" att = F.softmax(att, dim=-1)\n",
" att = self.attn_dropout(att)\n",
" y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)\n",
" y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side\n",
"\n",
" # output projection\n",
" y = self.resid_dropout(self.c_proj(y))\n",
" return (y, present)\n",
"\n",
"\n",
"if not coarse_encoder_path.exists():\n",
" coarse_encoder_exportable = CoarseEncoderModel(coarse_model)\n",
" for block in coarse_encoder_exportable.encoder.transformer.h:\n",
" block.attn.forward = types.MethodType(casual_self_attention_forward, block.attn)\n",
" logits, kv_cache = coarse_encoder_exportable(torch.ones((1, 886), dtype=torch.int64))\n",
" ov_model = ov.convert_model(\n",
" coarse_encoder_exportable,\n",
" example_input=(torch.ones((1, 1), dtype=torch.int64), kv_cache),\n",
" )\n",
" ov.save_model(ov_model, coarse_encoder_path)\n",
" del ov_model\n",
" del coarse_encoder_exportable\n",
"del coarse_model"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "eae7826a-e2e2-4267-a33b-84410b5578f3",
"metadata": {},
"outputs": [],
"source": [
"fine_use_small = False\n",
"\n",
"fine_model = load_model(model_type=\"fine\", use_gpu=False, use_small=fine_use_small, force_reload=False)\n",
"\n",
"fine_model_suffix = \"_small\" if fine_use_small else \"\"\n",
"fine_model_dir = models_dir / f\"fine_model{fine_model_suffix}\"\n",
"fine_model_dir.mkdir(exist_ok=True)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "ee932e24-6057-4c9e-8305-74057d4892dc",
"metadata": {},
"outputs": [],
"source": [
"class FineModel(torch.nn.Module):\n",
" def __init__(self, model):\n",
" super().__init__()\n",
" self.model = model\n",
"\n",
" def forward(self, pred_idx, idx):\n",
" b, t, codes = idx.size()\n",
" pos = torch.arange(0, t, dtype=torch.long).unsqueeze(0) # shape (1, t)\n",
"\n",
" # forward the GPT model itself\n",
" tok_embs = [wte(idx[:, :, i]).unsqueeze(-1) for i, wte in enumerate(self.model.transformer.wtes)] # token embeddings of shape (b, t, n_embd)\n",
" tok_emb = torch.cat(tok_embs, dim=-1)\n",
" pos_emb = self.model.transformer.wpe(pos) # position embeddings of shape (1, t, n_embd)\n",
" x = tok_emb[:, :, :, : pred_idx + 1].sum(dim=-1)\n",
" x = self.model.transformer.drop(x + pos_emb)\n",
" for block in self.model.transformer.h:\n",
" x = block(x)\n",
" x = self.model.transformer.ln_f(x)\n",
" return x\n",
"\n",
"\n",
"fine_feature_extractor_path = fine_model_dir / \"bark_fine_feature_extractor.xml\""
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "558ce652",
"metadata": {},
"source": [
"### Fine encoder\n",
"[back to top β¬οΈ](#Table-of-contents:)\n",
"\n",
"Fine encoder is time a non-causal autoencoder transformer, which iteratively predicts the last codebooks based on the sum of the previous codebooks embeddings obtained using Coarse encoder."
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "a46ddd21-e6e0-4705-988e-8aea2c3c3235",
"metadata": {},
"outputs": [],
"source": [
"if not fine_feature_extractor_path.exists():\n",
" lm_heads = fine_model.lm_heads\n",
" fine_feature_extractor = FineModel(fine_model)\n",
" feature_extractor_out = fine_feature_extractor(3, torch.zeros((1, 1024, 8), dtype=torch.int32))\n",
" ov_model = ov.convert_model(\n",
" fine_feature_extractor,\n",
" example_input=(\n",
" torch.ones(1, dtype=torch.long),\n",
" torch.zeros((1, 1024, 8), dtype=torch.long),\n",
" ),\n",
" )\n",
" ov.save_model(ov_model, fine_feature_extractor_path)\n",
" for i, lm_head in enumerate(lm_heads):\n",
" ov.save_model(\n",
" ov.convert_model(lm_head, example_input=feature_extractor_out),\n",
" fine_model_dir / f\"bark_fine_lm_{i}.xml\",\n",
" )"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "916cdd93",
"metadata": {},
"source": [
"### Prepare Inference pipeline\n",
"[back to top β¬οΈ](#Table-of-contents:)\n",
"\n",
"For better usability, classes for working with models provided below."
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "a7ae999d-e327-44ca-a76a-eafd9fa4aca8",
"metadata": {},
"outputs": [],
"source": [
"class OVBarkTextEncoder:\n",
" def __init__(self, core, device, model_path1, model_path2):\n",
" self.compiled_model1 = core.compile_model(model_path1, device)\n",
" self.compiled_model2 = core.compile_model(model_path2, device)\n",
"\n",
" def __call__(self, input_ids, past_kv=None):\n",
" if past_kv is None:\n",
" outputs = self.compiled_model1(input_ids, share_outputs=True)\n",
" else:\n",
" outputs = self.compiled_model2([input_ids, *past_kv], share_outputs=True)\n",
" logits, kv_cache = self.postprocess_outputs(outputs, past_kv is None)\n",
" return logits, kv_cache\n",
"\n",
" def postprocess_outputs(self, outs, is_first_stage):\n",
" net_outs = self.compiled_model1.outputs if is_first_stage else self.compiled_model2.outputs\n",
" logits = outs[net_outs[0]]\n",
" kv_cache = []\n",
" for out_tensor in net_outs[1:]:\n",
" kv_cache.append(outs[out_tensor])\n",
" return logits, kv_cache\n",
"\n",
"\n",
"class OVBarkEncoder:\n",
" def __init__(self, core, device, model_path):\n",
" self.compiled_model = core.compile_model(model_path, device)\n",
"\n",
" def __call__(self, idx, past_kv=None):\n",
" if past_kv is None:\n",
" past_kv = self._init_past_kv()\n",
" outs = self.compiled_model([idx, *past_kv], share_outputs=True)\n",
" return self.postprocess_outputs(outs)\n",
"\n",
" def postprocess_outputs(self, outs):\n",
" net_outs = self.compiled_model.outputs\n",
" logits = outs[net_outs[0]]\n",
" kv_cache = []\n",
" for out_tensor in net_outs[1:]:\n",
" kv_cache.append(outs[out_tensor])\n",
" return logits, kv_cache\n",
"\n",
" def _init_past_kv(self):\n",
" inputs = []\n",
" for input_t in self.compiled_model.inputs[1:]:\n",
" input_shape = input_t.partial_shape\n",
" input_shape[0] = 1\n",
" input_shape[2] = 0\n",
" inputs.append(ov.Tensor(ov.Type.f32, input_shape.get_shape()))\n",
" return inputs\n",
"\n",
"\n",
"class OVBarkFineEncoder:\n",
" def __init__(self, core, device, model_dir, num_lm_heads=7):\n",
" self.feats_compiled_model = core.compile_model(model_dir / \"bark_fine_feature_extractor.xml\", device)\n",
" self.feats_out = self.feats_compiled_model.output(0)\n",
" lm_heads = []\n",
" for i in range(num_lm_heads):\n",
" lm_heads.append(core.compile_model(model_dir / f\"bark_fine_lm_{i}.xml\", device))\n",
" self.lm_heads = lm_heads\n",
"\n",
" def __call__(self, pred_idx, idx):\n",
" feats = self.feats_compiled_model([ov.Tensor(pred_idx), ov.Tensor(idx)])[self.feats_out]\n",
" lm_id = pred_idx - 1\n",
" logits = self.lm_heads[int(lm_id)](feats)[0]\n",
" return logits"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "14ffa78d-6e42-4af1-bcc4-3accbc5f003a",
"metadata": {},
"source": [
"`generate_audio` function is the main function for starting audio generation process. It accepts input text and optionally history prompt, provided by user and run inference pipeline.\n",
"The inference pipeline consists from several steps, illustrated on the diagram below:\n",
"\n",
"![bark_pipeline](https://github.com/openvinotoolkit/openvino_notebooks/assets/29454499/3a272a34-50bc-4d4a-bb1f-8a649cbf1d6d)\n",
"\n",
"1. Generation semantic tokens from input text using Text Encoder\n",
"2. Generation coarse acoustic codebooks from semantic tokens using Coarse Encoder\n",
"3. Generation fine acoustic codebooks from coarse codebooks using Fine Encoder\n",
"4. Decode codebooks to audio waveform"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "32ee753d-6aac-4fa9-a6cb-0860b259aac6",
"metadata": {},
"outputs": [],
"source": [
"from typing import Optional, Union, Dict\n",
"import tqdm\n",
"import numpy as np\n",
"\n",
"\n",
"def generate_audio(\n",
" text: str,\n",
" history_prompt: Optional[Union[Dict, str]] = None,\n",
" text_temp: float = 0.7,\n",
" waveform_temp: float = 0.7,\n",
" silent: bool = False,\n",
"):\n",
" \"\"\"Generate audio array from input text.\n",
"\n",
" Args:\n",
" text: text to be turned into audio\n",
" history_prompt: history choice for audio cloning\n",
" text_temp: generation temperature (1.0 more diverse, 0.0 more conservative)\n",
" waveform_temp: generation temperature (1.0 more diverse, 0.0 more conservative)\n",
" silent: disable progress bar\n",
"\n",
" Returns:\n",
" numpy audio array at sample frequency 24khz\n",
" \"\"\"\n",
" semantic_tokens = text_to_semantic(\n",
" text,\n",
" history_prompt=history_prompt,\n",
" temp=text_temp,\n",
" silent=silent,\n",
" )\n",
" out = semantic_to_waveform(\n",
" semantic_tokens,\n",
" history_prompt=history_prompt,\n",
" temp=waveform_temp,\n",
" silent=silent,\n",
" )\n",
" return out"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "b78cb950-143b-477f-9813-36a356b40e63",
"metadata": {},
"outputs": [],
"source": [
"def text_to_semantic(\n",
" text: str,\n",
" history_prompt: Optional[Union[Dict, str]] = None,\n",
" temp: float = 0.7,\n",
" silent: bool = False,\n",
"):\n",
" \"\"\"Generate semantic array from text.\n",
"\n",
" Args:\n",
" text: text to be turned into audio\n",
" history_prompt: history choice for audio cloning\n",
" temp: generation temperature (1.0 more diverse, 0.0 more conservative)\n",
" silent: disable progress bar\n",
"\n",
" Returns:\n",
" numpy semantic array to be fed into `semantic_to_waveform`\n",
" \"\"\"\n",
" x_semantic = generate_text_semantic(\n",
" text,\n",
" history_prompt=history_prompt,\n",
" temp=temp,\n",
" silent=silent,\n",
" )\n",
" return x_semantic"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "7a04f767-f2a6-4d9d-8396-e217d44299df",
"metadata": {},
"outputs": [],
"source": [
"from bark.generation import (\n",
" _load_history_prompt,\n",
" _tokenize,\n",
" _normalize_whitespace,\n",
" TEXT_PAD_TOKEN,\n",
" TEXT_ENCODING_OFFSET,\n",
" SEMANTIC_VOCAB_SIZE,\n",
" SEMANTIC_PAD_TOKEN,\n",
" SEMANTIC_INFER_TOKEN,\n",
" COARSE_RATE_HZ,\n",
" SEMANTIC_RATE_HZ,\n",
" N_COARSE_CODEBOOKS,\n",
" COARSE_INFER_TOKEN,\n",
" CODEBOOK_SIZE,\n",
" N_FINE_CODEBOOKS,\n",
" COARSE_SEMANTIC_PAD_TOKEN,\n",
")\n",
"import torch.nn.functional as F\n",
"from typing import List, Optional, Union, Dict\n",
"\n",
"\n",
"def generate_text_semantic(\n",
" text: str,\n",
" history_prompt: List[str] = None,\n",
" temp: float = 0.7,\n",
" top_k: int = None,\n",
" top_p: float = None,\n",
" silent: bool = False,\n",
" min_eos_p: float = 0.2,\n",
" max_gen_duration_s: int = None,\n",
" allow_early_stop: bool = True,\n",
"):\n",
" \"\"\"\n",
" Generate semantic tokens from text.\n",
" Args:\n",
" text: text to be turned into audio\n",
" history_prompt: history choice for audio cloning\n",
" temp: generation temperature (1.0 more diverse, 0.0 more conservative)\n",
" top_k: top k number of probabilities for considering during generation\n",
" top_p: top probabilities higher than p for considering during generation\n",
" silent: disable progress bar\n",
" min_eos_p: minimum probability to select end of string token\n",
" max_gen_duration_s: maximum duration for generation in seconds\n",
" allow_early_stop: allow to stop generation if maximum duration is not reached\n",
" Returns:\n",
" numpy semantic array to be fed into `semantic_to_waveform`\n",
"\n",
" \"\"\"\n",
" text = _normalize_whitespace(text)\n",
" if history_prompt is not None:\n",
" history_prompt = _load_history_prompt(history_prompt)\n",
" semantic_history = history_prompt[\"semantic_prompt\"]\n",
" else:\n",
" semantic_history = None\n",
" encoded_text = np.ascontiguousarray(_tokenize(tokenizer, text)) + TEXT_ENCODING_OFFSET\n",
" if len(encoded_text) > 256:\n",
" p = round((len(encoded_text) - 256) / len(encoded_text) * 100, 1)\n",
" logger.warning(f\"warning, text too long, lopping of last {p}%\")\n",
" encoded_text = encoded_text[:256]\n",
" encoded_text = np.pad(\n",
" encoded_text,\n",
" (0, 256 - len(encoded_text)),\n",
" constant_values=TEXT_PAD_TOKEN,\n",
" mode=\"constant\",\n",
" )\n",
" if semantic_history is not None:\n",
" semantic_history = semantic_history.astype(np.int64)\n",
" # lop off if history is too long, pad if needed\n",
" semantic_history = semantic_history[-256:]\n",
" semantic_history = np.pad(\n",
" semantic_history,\n",
" (0, 256 - len(semantic_history)),\n",
" constant_values=SEMANTIC_PAD_TOKEN,\n",
" mode=\"constant\",\n",
" )\n",
" else:\n",
" semantic_history = np.array([SEMANTIC_PAD_TOKEN] * 256)\n",
" x = np.hstack([encoded_text, semantic_history, np.array([SEMANTIC_INFER_TOKEN])]).astype(np.int64)[None]\n",
" assert x.shape[1] == 256 + 256 + 1\n",
" n_tot_steps = 768\n",
" # custom tqdm updates since we don't know when eos will occur\n",
" pbar = tqdm.tqdm(disable=silent, total=100)\n",
" pbar_state = 0\n",
" tot_generated_duration_s = 0\n",
" kv_cache = None\n",
" for n in range(n_tot_steps):\n",
" if kv_cache is not None:\n",
" x_input = x[:, [-1]]\n",
" else:\n",
" x_input = x\n",
" logits, kv_cache = ov_text_model(ov.Tensor(x_input), kv_cache)\n",
" relevant_logits = logits[0, 0, :SEMANTIC_VOCAB_SIZE]\n",
" if allow_early_stop:\n",
" relevant_logits = np.hstack((relevant_logits, logits[0, 0, [SEMANTIC_PAD_TOKEN]])) # eos\n",
" if top_p is not None:\n",
" sorted_indices = np.argsort(relevant_logits)[::-1]\n",
" sorted_logits = relevant_logits[sorted_indices]\n",
" cumulative_probs = np.cumsum(F.softmax(sorted_logits))\n",
" sorted_indices_to_remove = cumulative_probs > top_p\n",
" sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy()\n",
" sorted_indices_to_remove[0] = False\n",
" relevant_logits[sorted_indices[sorted_indices_to_remove]] = -np.inf\n",
" relevant_logits = torch.from_numpy(relevant_logits)\n",
" if top_k is not None:\n",
" relevant_logits = torch.from_numpy(relevant_logits)\n",
" v, _ = torch.topk(relevant_logits, min(top_k, relevant_logits.size(-1)))\n",
" relevant_logits[relevant_logits < v[-1]] = -float(\"Inf\")\n",
" probs = F.softmax(torch.from_numpy(relevant_logits) / temp, dim=-1)\n",
" item_next = torch.multinomial(probs, num_samples=1)\n",
" if allow_early_stop and (item_next == SEMANTIC_VOCAB_SIZE or (min_eos_p is not None and probs[-1] >= min_eos_p)):\n",
" # eos found, so break\n",
" pbar.update(100 - pbar_state)\n",
" break\n",
" x = torch.cat((torch.from_numpy(x), item_next[None]), dim=1).numpy()\n",
" tot_generated_duration_s += 1 / SEMANTIC_RATE_HZ\n",
" if max_gen_duration_s is not None and tot_generated_duration_s > max_gen_duration_s:\n",
" pbar.update(100 - pbar_state)\n",
" break\n",
" if n == n_tot_steps - 1:\n",
" pbar.update(100 - pbar_state)\n",
" break\n",
" del logits, relevant_logits, probs, item_next\n",
" req_pbar_state = np.min([100, int(round(100 * n / n_tot_steps))])\n",
" if req_pbar_state > pbar_state:\n",
" pbar.update(req_pbar_state - pbar_state)\n",
" pbar_state = req_pbar_state\n",
" pbar.close()\n",
" out = x.squeeze()[256 + 256 + 1 :]\n",
" return out"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "05f59ece-9ba0-496b-ae31-b822eb21597c",
"metadata": {},
"outputs": [],
"source": [
"def semantic_to_waveform(\n",
" semantic_tokens: np.ndarray,\n",
" history_prompt: Optional[Union[Dict, str]] = None,\n",
" temp: float = 0.7,\n",
" silent: bool = False,\n",
"):\n",
" \"\"\"Generate audio array from semantic input.\n",
"\n",
" Args:\n",
" semantic_tokens: semantic token output from `text_to_semantic`\n",
" history_prompt: history choice for audio cloning\n",
" temp: generation temperature (1.0 more diverse, 0.0 more conservative)\n",
" silent: disable progress bar\n",
"\n",
" Returns:\n",
" numpy audio array at sample frequency 24khz\n",
" \"\"\"\n",
" coarse_tokens = generate_coarse(\n",
" semantic_tokens,\n",
" history_prompt=history_prompt,\n",
" temp=temp,\n",
" silent=silent,\n",
" )\n",
" fine_tokens = generate_fine(\n",
" coarse_tokens,\n",
" history_prompt=history_prompt,\n",
" temp=0.5,\n",
" )\n",
" audio_arr = codec_decode(fine_tokens)\n",
" return audio_arr"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "cf36e3e1-1672-4ef1-8bd7-eb2f28423e2b",
"metadata": {},
"outputs": [],
"source": [
"def generate_coarse(\n",
" x_semantic: np.ndarray,\n",
" history_prompt: Optional[Union[Dict, str]] = None,\n",
" temp: float = 0.7,\n",
" top_k: int = None,\n",
" top_p: float = None,\n",
" silent: bool = False,\n",
" max_coarse_history: int = 630, # min 60 (faster), max 630 (more context)\n",
" sliding_window_len: int = 60,\n",
"):\n",
" \"\"\"\n",
" Generate coarse audio codes from semantic tokens.\n",
" Args:\n",
" x_semantic: semantic token output from `text_to_semantic`\n",
" history_prompt: history prompt, will be prepened to generated if provided\n",
" temp: generation temperature (1.0 more diverse, 0.0 more conservative)\n",
" top_k: top k number of probabilities for considering during generation\n",
" top_p: top probabilities higher than p for considering during generation\n",
" silent: disable progress bar\n",
" max_coarse_history: threshold for cutting coarse history (minimum 60 for faster generation, maximum 630 for more context)\n",
" sliding_window_len: size of sliding window for generation cycle\n",
" Returns:\n",
" numpy audio array with coarse audio codes\n",
"\n",
" \"\"\"\n",
" semantic_to_coarse_ratio = COARSE_RATE_HZ / SEMANTIC_RATE_HZ * N_COARSE_CODEBOOKS\n",
" max_semantic_history = int(np.floor(max_coarse_history / semantic_to_coarse_ratio))\n",
" if history_prompt is not None:\n",
" history_prompt = _load_history_prompt(history_prompt)\n",
" x_semantic_history = history_prompt[\"semantic_prompt\"]\n",
" x_coarse_history = history_prompt[\"coarse_prompt\"]\n",
" x_coarse_history = _flatten_codebooks(x_coarse_history) + SEMANTIC_VOCAB_SIZE\n",
" # trim histories correctly\n",
" n_semantic_hist_provided = np.min(\n",
" [\n",
" max_semantic_history,\n",
" len(x_semantic_history) - len(x_semantic_history) % 2,\n",
" int(np.floor(len(x_coarse_history) / semantic_to_coarse_ratio)),\n",
" ]\n",
" )\n",
" n_coarse_hist_provided = int(round(n_semantic_hist_provided * semantic_to_coarse_ratio))\n",
" x_semantic_history = x_semantic_history[-n_semantic_hist_provided:].astype(np.int32)\n",
" x_coarse_history = x_coarse_history[-n_coarse_hist_provided:].astype(np.int32)\n",
" x_coarse_history = x_coarse_history[:-2]\n",
" else:\n",
" x_semantic_history = np.array([], dtype=np.int32)\n",
" x_coarse_history = np.array([], dtype=np.int32)\n",
" # start loop\n",
" n_steps = int(round(np.floor(len(x_semantic) * semantic_to_coarse_ratio / N_COARSE_CODEBOOKS) * N_COARSE_CODEBOOKS))\n",
" x_semantic = np.hstack([x_semantic_history, x_semantic]).astype(np.int32)\n",
" x_coarse = x_coarse_history.astype(np.int32)\n",
" base_semantic_idx = len(x_semantic_history)\n",
" x_semantic_in = x_semantic[None]\n",
" x_coarse_in = x_coarse[None]\n",
" n_window_steps = int(np.ceil(n_steps / sliding_window_len))\n",
" n_step = 0\n",
" for _ in tqdm.tqdm(range(n_window_steps), total=n_window_steps, disable=silent):\n",
" semantic_idx = base_semantic_idx + int(round(n_step / semantic_to_coarse_ratio))\n",
" # pad from right side\n",
" x_in = x_semantic_in[:, np.max([0, semantic_idx - max_semantic_history]) :]\n",
" x_in = x_in[:, :256]\n",
" x_in = F.pad(\n",
" torch.from_numpy(x_in),\n",
" (0, 256 - x_in.shape[-1]),\n",
" \"constant\",\n",
" COARSE_SEMANTIC_PAD_TOKEN,\n",
" )\n",
" x_in = torch.hstack(\n",
" [\n",
" x_in,\n",
" torch.tensor([COARSE_INFER_TOKEN])[None],\n",
" torch.from_numpy(x_coarse_in[:, -max_coarse_history:]),\n",
" ]\n",
" ).numpy()\n",
" kv_cache = None\n",
" for _ in range(sliding_window_len):\n",
" if n_step >= n_steps:\n",
" continue\n",
" is_major_step = n_step % N_COARSE_CODEBOOKS == 0\n",
"\n",
" if kv_cache is not None:\n",
" x_input = x_in[:, [-1]]\n",
" else:\n",
" x_input = x_in\n",
"\n",
" logits, kv_cache = ov_coarse_model(x_input, past_kv=kv_cache)\n",
" logit_start_idx = SEMANTIC_VOCAB_SIZE + (1 - int(is_major_step)) * CODEBOOK_SIZE\n",
" logit_end_idx = SEMANTIC_VOCAB_SIZE + (2 - int(is_major_step)) * CODEBOOK_SIZE\n",
" relevant_logits = logits[0, 0, logit_start_idx:logit_end_idx]\n",
" if top_p is not None:\n",
" sorted_indices = np.argsort(relevant_logits)[::-1]\n",
" sorted_logits = relevant_logits[sorted_indices]\n",
" cumulative_probs = np.cumsum(F.softmax(sorted_logits))\n",
" sorted_indices_to_remove = cumulative_probs > top_p\n",
" sorted_indices_to_remove[1:] = sorted_indices_to_remove[:-1].copy()\n",
" sorted_indices_to_remove[0] = False\n",
" relevant_logits[sorted_indices[sorted_indices_to_remove]] = -np.inf\n",
" relevant_logits = torch.from_numpy(relevant_logits)\n",
" if top_k is not None:\n",
" relevant_logits = torch.from_numpy(relevant_logits)\n",
" v, _ = torch.topk(relevant_logits, min(top_k, relevant_logits.size(-1)))\n",
" relevant_logits[relevant_logits < v[-1]] = -float(\"Inf\")\n",
" probs = F.softmax(torch.from_numpy(relevant_logits) / temp, dim=-1)\n",
" item_next = torch.multinomial(probs, num_samples=1)\n",
" item_next = item_next\n",
" item_next += logit_start_idx\n",
" x_coarse_in = torch.cat((torch.from_numpy(x_coarse_in), item_next[None]), dim=1).numpy()\n",
" x_in = torch.cat((torch.from_numpy(x_in), item_next[None]), dim=1).numpy()\n",
" del logits, relevant_logits, probs, item_next\n",
" n_step += 1\n",
" del x_in\n",
" del x_semantic_in\n",
" gen_coarse_arr = x_coarse_in.squeeze()[len(x_coarse_history) :]\n",
" del x_coarse_in\n",
" gen_coarse_audio_arr = gen_coarse_arr.reshape(-1, N_COARSE_CODEBOOKS).T - SEMANTIC_VOCAB_SIZE\n",
" for n in range(1, N_COARSE_CODEBOOKS):\n",
" gen_coarse_audio_arr[n, :] -= n * CODEBOOK_SIZE\n",
" return gen_coarse_audio_arr\n",
"\n",
"\n",
"def generate_fine(\n",
" x_coarse_gen: np.ndarray,\n",
" history_prompt: Optional[Union[Dict, str]] = None,\n",
" temp: float = 0.5,\n",
" silent: bool = True,\n",
"):\n",
" \"\"\"\n",
" Generate full audio codes from coarse audio codes.\n",
" Args:\n",
" x_coarse_gen: generated coarse codebooks from `generate_coarse`\n",
" history_prompt: history prompt, will be prepended to generated\n",
" temp: generation temperature (1.0 more diverse, 0.0 more conservative)\n",
" silent: disable progress bar\n",
" Returns:\n",
" numpy audio array with coarse audio codes\n",
"\n",
" \"\"\"\n",
" if history_prompt is not None:\n",
" history_prompt = _load_history_prompt(history_prompt)\n",
" x_fine_history = history_prompt[\"fine_prompt\"]\n",
" else:\n",
" x_fine_history = None\n",
" n_coarse = x_coarse_gen.shape[0]\n",
" # make input arr\n",
" in_arr = np.vstack(\n",
" [\n",
" x_coarse_gen,\n",
" np.zeros((N_FINE_CODEBOOKS - n_coarse, x_coarse_gen.shape[1])) + CODEBOOK_SIZE,\n",
" ]\n",
" ).astype(\n",
" np.int32\n",
" ) # padding\n",
" # prepend history if available (max 512)\n",
" if x_fine_history is not None:\n",
" x_fine_history = x_fine_history.astype(np.int32)\n",
" in_arr = np.hstack([x_fine_history[:, -512:].astype(np.int32), in_arr])\n",
" n_history = x_fine_history[:, -512:].shape[1]\n",
" else:\n",
" n_history = 0\n",
" n_remove_from_end = 0\n",
" # need to pad if too short (since non-causal model)\n",
" if in_arr.shape[1] < 1024:\n",
" n_remove_from_end = 1024 - in_arr.shape[1]\n",
" in_arr = np.hstack(\n",
" [\n",
" in_arr,\n",
" np.zeros((N_FINE_CODEBOOKS, n_remove_from_end), dtype=np.int32) + CODEBOOK_SIZE,\n",
" ]\n",
" )\n",
" n_loops = np.max([0, int(np.ceil((x_coarse_gen.shape[1] - (1024 - n_history)) / 512))]) + 1\n",
" in_arr = in_arr.T\n",
" for n in tqdm.tqdm(range(n_loops), disable=silent):\n",
" start_idx = np.min([n * 512, in_arr.shape[0] - 1024])\n",
" start_fill_idx = np.min([n_history + n * 512, in_arr.shape[0] - 512])\n",
" rel_start_fill_idx = start_fill_idx - start_idx\n",
" in_buffer = in_arr[start_idx : start_idx + 1024, :][None]\n",
" for nn in range(n_coarse, N_FINE_CODEBOOKS):\n",
" logits = ov_fine_model(np.array([nn]).astype(np.int64), in_buffer.astype(np.int64))\n",
" if temp is None:\n",
" relevant_logits = logits[0, rel_start_fill_idx:, :CODEBOOK_SIZE]\n",
" codebook_preds = torch.argmax(relevant_logits, -1)\n",
" else:\n",
" relevant_logits = logits[0, :, :CODEBOOK_SIZE] / temp\n",
" probs = F.softmax(torch.from_numpy(relevant_logits), dim=-1)\n",
" codebook_preds = torch.hstack([torch.multinomial(probs[nnn], num_samples=1) for nnn in range(rel_start_fill_idx, 1024)])\n",
" in_buffer[0, rel_start_fill_idx:, nn] = codebook_preds.numpy()\n",
" del logits, codebook_preds\n",
" for nn in range(n_coarse, N_FINE_CODEBOOKS):\n",
" in_arr[start_fill_idx : start_fill_idx + (1024 - rel_start_fill_idx), nn] = in_buffer[0, rel_start_fill_idx:, nn]\n",
" del in_buffer\n",
" gen_fine_arr = in_arr.squeeze().T\n",
" del in_arr\n",
" gen_fine_arr = gen_fine_arr[:, n_history:]\n",
" if n_remove_from_end > 0:\n",
" gen_fine_arr = gen_fine_arr[:, :-n_remove_from_end]\n",
" return gen_fine_arr"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1e16e90d",
"metadata": {},
"source": [
"## Run model inference\n",
"[back to top β¬οΈ](#Table-of-contents:)\n",
"\n",
"Now is time to see model in action. We need only wrap our models to classes and run `generate_audio` function."
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "ea60e6bf-d0a8-4ae6-a5f5-5f4807f81291",
"metadata": {},
"source": [
"### Select Inference device\n",
"[back to top β¬οΈ](#Table-of-contents:)\n",
"\n",
"select device from dropdown list for running inference using OpenVINO"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "ed04042e-66cb-449c-b2ae-c00d49c31a8b",
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "5b8d33c4dc974c4aac2bb8067e3be37a",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import ipywidgets as widgets\n",
"import openvino as ov\n",
"\n",
"core = ov.Core()\n",
"\n",
"device = widgets.Dropdown(\n",
" options=core.available_devices + [\"AUTO\"],\n",
" value=\"AUTO\",\n",
" description=\"Device:\",\n",
" disabled=False,\n",
")\n",
"\n",
"device"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "194f7451-f741-4788-9c5d-bdbc4422e559",
"metadata": {},
"outputs": [],
"source": [
"core = ov.Core()\n",
"\n",
"ov_text_model = OVBarkTextEncoder(core, device.value, text_encoder_path0, text_encoder_path1)\n",
"ov_coarse_model = OVBarkEncoder(core, device.value, coarse_encoder_path)\n",
"ov_fine_model = OVBarkFineEncoder(core, device.value, fine_model_dir)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "dee04ecb-f95a-44ca-a3f6-d38a1a94edf9",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ| 100/100 [00:10<00:00, 9.63it/s]\n",
"100%|βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ| 32/32 [00:37<00:00, 1.17s/it]\n",
"/home/ea/work/my_optimum_intel/optimum_env/lib/python3.8/site-packages/torch/nn/utils/weight_norm.py:30: UserWarning: torch.nn.utils.weight_norm is deprecated in favor of torch.nn.utils.parametrizations.weight_norm.\n",
" warnings.warn(\"torch.nn.utils.weight_norm is deprecated in favor of torch.nn.utils.parametrizations.weight_norm.\")\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"took 53s to generate 13s of audio\n"
]
}
],
"source": [
"import time\n",
"from bark import SAMPLE_RATE\n",
"\n",
"torch.manual_seed(42)\n",
"t0 = time.time()\n",
"text = \"Hello, my name is Suno. And, uh β and I like banana and apples. [laughs] But I also have other interests such as playing tic tac toe.\"\n",
"audio_array = generate_audio(text)\n",
"generation_duration_s = time.time() - t0\n",
"audio_duration_s = audio_array.shape[0] / SAMPLE_RATE\n",
"\n",
"print(f\"took {generation_duration_s:.0f}s to generate {audio_duration_s:.0f}s of audio\")"
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "5745a9cf-2a31-4500-a2d3-000afdba9d76",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" \n",
" "
],
"text/plain": [
""
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from IPython.display import Audio\n",
"from bark import SAMPLE_RATE\n",
"\n",
"Audio(audio_array, rate=SAMPLE_RATE)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "e0902a0f",
"metadata": {},
"source": [
"## Interactive demo\n",
"[back to top β¬οΈ](#Table-of-contents:)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "590b9db5",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import gradio as gr\n",
"from bark import SAMPLE_RATE\n",
"from bark.generation import SUPPORTED_LANGS\n",
"\n",
"AVAILABLE_PROMPTS = [\"Unconditional\", \"Announcer\"]\n",
"PROMPT_LOOKUP = {}\n",
"for _, lang in SUPPORTED_LANGS:\n",
" for n in range(10):\n",
" label = f\"Speaker {n} ({lang})\"\n",
" AVAILABLE_PROMPTS.append(label)\n",
" PROMPT_LOOKUP[label] = f\"{lang}_speaker_{n}\"\n",
"PROMPT_LOOKUP[\"Unconditional\"] = None\n",
"PROMPT_LOOKUP[\"Announcer\"] = \"announcer\"\n",
"\n",
"default_text = \"Hello, my name is Suno. And, uh β and I like pizza. [laughs]\\nBut I also have other interests such as playing tic tac toe.\"\n",
"\n",
"title = \"# πΆ Bark: Text-to-Speech using OpenVINO\"\n",
"\n",
"description = \"\"\"\n",
"Bark is a universal text-to-audio model created by [Suno](http://suno.ai). \\\n",
"Bark can generate highly realistic, multilingual speech as well as other audio - including music, background noise and simple sound effects. \\\n",
"The model output is not censored and the authors do not endorse the opinions in the generated content. \\\n",
"Use at your own risk.\n",
"\"\"\"\n",
"\n",
"article = \"\"\"\n",
"\n",
"## π Foreign Language\n",
"\n",
"Bark supports various languages out-of-the-box and automatically determines language from input text. \\\n",
"When prompted with code-switched text, Bark will even attempt to employ the native accent for the respective languages in the same voice.\n",
"\n",
"Try the prompt:\n",
"\n",
"```\n",
"Buenos dΓas Miguel. Tu colega piensa que tu alemΓ‘n es extremadamente malo. But I suppose your english isn't terrible.\n",
"```\n",
"\n",
"## π€ Non-Speech Sounds\n",
"\n",
"Below is a list of some known non-speech sounds, but we are finding more every day. \\\n",
"Please let us know if you find patterns that work particularly well on Discord!\n",
"\n",
"* [laughter]\n",
"* [laughs]\n",
"* [sighs]\n",
"* [music]\n",
"* [gasps]\n",
"* [clears throat]\n",
"* β or ... for hesitations\n",
"* βͺ for song lyrics\n",
"* capitalization for emphasis of a word\n",
"* MAN/WOMAN: for bias towards speaker\n",
"\n",
"Try the prompt:\n",
"\n",
"```\n",
"\" [clears throat] Hello, my name is Suno. And, uh β and I like pizza. [laughs] But I also have other interests such as... βͺ singing βͺ.\"\n",
"```\n",
"\n",
"## πΆ Music\n",
"Bark can generate all types of audio, and, in principle, doesn't see a difference between speech and music. \\\n",
"Sometimes Bark chooses to generate text as music, but you can help it out by adding music notes around your lyrics.\n",
"\n",
"Try the prompt:\n",
"\n",
"```\n",
"βͺ In the jungle, the mighty jungle, the lion barks tonight βͺ\n",
"```\n",
"\n",
"## 𧬠Voice Cloning\n",
"\n",
"Bark has the capability to fully clone voices - including tone, pitch, emotion and prosody. \\\n",
"The model also attempts to preserve music, ambient noise, etc. from input audio. \\\n",
"However, to mitigate misuse of this technology, we limit the audio history prompts to a limited set of Suno-provided, fully synthetic options to choose from.\n",
"\n",
"## π₯ Speaker Prompts\n",
"\n",
"You can provide certain speaker prompts such as NARRATOR, MAN, WOMAN, etc. \\\n",
"Please note that these are not always respected, especially if a conflicting audio history prompt is given.\n",
"\n",
"Try the prompt:\n",
"\n",
"```\n",
"WOMAN: I would like an oatmilk latte please.\n",
"MAN: Wow, that's expensive!\n",
"```\n",
"\n",
"\"\"\"\n",
"\n",
"examples = [\n",
" [\n",
" \"Please surprise me and speak in whatever voice you enjoy. Vielen Dank und Gesundheit!\",\n",
" \"Unconditional\",\n",
" ],\n",
" [\n",
" \"Hello, my name is Suno. And, uh β and I like pizza. [laughs] But I also have other interests such as playing tic tac toe.\",\n",
" \"Speaker 1 (en)\",\n",
" ],\n",
" [\n",
" \"Buenos dΓas Miguel. Tu colega piensa que tu alemΓ‘n es extremadamente malo. But I suppose your english isn't terrible.\",\n",
" \"Speaker 0 (es)\",\n",
" ],\n",
"]\n",
"\n",
"\n",
"def gen_tts(text, history_prompt):\n",
" history_prompt = PROMPT_LOOKUP[history_prompt]\n",
" audio_arr = generate_audio(text, history_prompt=history_prompt)\n",
" audio_arr = (audio_arr * 32767).astype(np.int16)\n",
" return (SAMPLE_RATE, audio_arr)\n",
"\n",
"\n",
"with gr.Blocks() as block:\n",
" gr.Markdown(title)\n",
" gr.Markdown(description)\n",
" with gr.Row():\n",
" with gr.Column():\n",
" input_text = gr.Textbox(label=\"Input Text\", lines=2, value=default_text)\n",
" options = gr.Dropdown(AVAILABLE_PROMPTS, value=\"Speaker 1 (en)\", label=\"Acoustic Prompt\")\n",
" run_button = gr.Button()\n",
" with gr.Column():\n",
" audio_out = gr.Audio(label=\"Generated Audio\", type=\"numpy\")\n",
" inputs = [input_text, options]\n",
" outputs = [audio_out]\n",
" gr.Examples(examples=examples, fn=gen_tts, inputs=inputs, outputs=outputs)\n",
" gr.Markdown(article)\n",
" run_button.click(fn=gen_tts, inputs=inputs, outputs=outputs, queue=True)\n",
"try:\n",
" block.launch(debug=True)\n",
"except Exception:\n",
" block.launch(share=True, debug=True)\n",
"# if you are launching remotely, specify server_name and server_port\n",
"# demo.launch(server_name='your server name', server_port='server port in int')\n",
"# Read more in the docs: https://gradio.app/docs/"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
},
"openvino_notebooks": {
"imageUrl": "https://github.com/openvinotoolkit/openvino_notebooks/blob/latest/notebooks/bark-text-to-audio/bark-text-to-audio.png?raw=true",
"tags": {
"categories": [
"Model Demos",
"AI Trends"
],
"libraries": [],
"other": [],
"tasks": [
"Audio Generation",
"Text-to-Audio"
]
}
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"state": {
"5b8d33c4dc974c4aac2bb8067e3be37a": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "DropdownModel",
"state": {
"_options_labels": [
"CPU",
"AUTO"
],
"description": "Device:",
"index": 1,
"layout": "IPY_MODEL_6589837bbe4643738327175e78a7c883",
"style": "IPY_MODEL_de348751214540028c71ec03f298ba80"
}
},
"6589837bbe4643738327175e78a7c883": {
"model_module": "@jupyter-widgets/base",
"model_module_version": "2.0.0",
"model_name": "LayoutModel",
"state": {}
},
"de348751214540028c71ec03f298ba80": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "DescriptionStyleModel",
"state": {
"description_width": ""
}
}
},
"version_major": 2,
"version_minor": 0
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}