import collections import io import json import librosa import numpy as np import soundfile as sf import time import torch from scipy.io.wavfile import read from .text import SOS_TOK, EOS_TOK def get_mask_from_lengths(lengths): max_len = torch.max(lengths).item() ids = torch.arange(0, max_len, out=torch.cuda.LongTensor(max_len)) mask = (ids < lengths.unsqueeze(1)) return mask def load_wav_to_torch(full_path, sr=None): data, sr = librosa.load(full_path, sr=sr) data = np.clip(data, -1, 1) # potentially out of [-1, 1] due to resampling data = data * 32768.0 # match values loaded by scipy return torch.FloatTensor(data.astype(np.float32)), sr def read_binary_audio(bin_data, tar_sr=None): """ read binary audio (`bytes` or `uint8` `numpy.ndarray`) to `float32` `numpy.ndarray` RETURNS: data (np.ndarray) : audio of shape (n,) or (2, n) tar_sr (int) : sample rate """ data, ori_sr = sf.read(io.BytesIO(bin_data), dtype='float32') data = data.T if (tar_sr is not None) and (ori_sr != tar_sr): data = librosa.resample(data, ori_sr, tar_sr) else: tar_sr = ori_sr data = np.clip(data, -1, 1) data = data * 32768.0 return torch.FloatTensor(data.astype(np.float32)), tar_sr def load_filepaths_and_text(filename): with open(filename, encoding='utf-8') as f: data = [json.loads(line.rstrip()) for line in f] return data def to_gpu(x): x = x.contiguous() if torch.cuda.is_available(): x = x.cuda(non_blocking=True) return torch.autograd.Variable(x) def load_code_dict(path, add_sos=False, add_eos=False): if not path: return {} with open(path, 'r') as f: codes = ['_'] + [line.rstrip() for line in f] # '_' for pad code_dict = {c: i for i, c in enumerate(codes)} if add_sos: code_dict[SOS_TOK] = len(code_dict) if add_eos: code_dict[EOS_TOK] = len(code_dict) assert(set(code_dict.values()) == set(range(len(code_dict)))) return code_dict def load_obs_label_dict(path): if not path: return {} with open(path, 'r') as f: obs_labels = [line.rstrip() for line in f] return {c: i for i, c in enumerate(obs_labels)} # A simple timer class inspired from `tnt.TimeMeter` class CudaTimer: def __init__(self, keys): self.keys = keys self.reset() def start(self, key): s = torch.cuda.Event(enable_timing=True) s.record() self.start_events[key].append(s) return self def stop(self, key): e = torch.cuda.Event(enable_timing=True) e.record() self.end_events[key].append(e) return self def reset(self): self.start_events = collections.defaultdict(list) self.end_events = collections.defaultdict(list) self.running_times = collections.defaultdict(float) self.n = collections.defaultdict(int) return self def value(self): self._synchronize() return {k: self.running_times[k] / self.n[k] for k in self.keys} def _synchronize(self): torch.cuda.synchronize() for k in self.keys: starts = self.start_events[k] ends = self.end_events[k] if len(starts) == 0: raise ValueError("Trying to divide by zero in TimeMeter") if len(ends) != len(starts): raise ValueError("Call stop before checking value!") time = 0 for start, end in zip(starts, ends): time += start.elapsed_time(end) self.running_times[k] += time * 1e-3 self.n[k] += len(starts) self.start_events = collections.defaultdict(list) self.end_events = collections.defaultdict(list) # Used to measure the time taken for multiple events class Timer: def __init__(self, keys): self.keys = keys self.n = {} self.running_time = {} self.total_time = {} self.reset() def start(self, key): self.running_time[key] = time.time() return self def stop(self, key): self.total_time[key] = time.time() - self.running_time[key] self.n[key] += 1 self.running_time[key] = None return self def reset(self): for k in self.keys: self.total_time[k] = 0 self.running_time[k] = None self.n[k] = 0 return self def value(self): vals = {} for k in self.keys: if self.n[k] == 0: raise ValueError("Trying to divide by zero in TimeMeter") else: vals[k] = self.total_time[k] / self.n[k] return vals