Spaces:
Sleeping
Sleeping
File size: 6,997 Bytes
9da994b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
"""Utils module"""
from datetime import datetime
import logging
import pyaudio
import wave
import logging
import os
import functools
import yaml
from types import SimpleNamespace
import pygame
import time
import pyaudio
import math
import struct
import wave
import time
import os
from threading import Thread
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
format_mapping = {"pyaudio.paInt16": pyaudio.paInt16}
def yaml_file_decorator(yaml_file_path):
"""Decorator to pass a config file to other functions
Args:
yaml_file_path (_type_): path to config file
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Load YAML file
with open(yaml_file_path, "r") as file:
config = yaml.safe_load(file)
# Pass the loaded YAML data to the decorated function
result = func(config, *args, **kwargs)
return result
return wrapper
return decorator
@yaml_file_decorator("conf/recording.yaml")
def record_audio(config) -> str:
"""function to record audio. Configuaration file is in conf/recordings.yaml
Args:
config (dict): configuration of recording parameters
Returns:
recording_path (str): destination path of recorded audio
"""
config = SimpleNamespace(**config)
p = pyaudio.PyAudio()
sample_format = format_mapping.get(config.sample_format, 8)
stream = p.open(
format=sample_format,
channels=1, # config.channels,
rate=config.fs * 2,
frames_per_buffer=config.chunk,
input=True,
)
frames = [] # Initialize array to store frames
# Store data in chunks for 3 seconds
for i in range(0, int(config.fs / config.chunk * config.seconds)):
data = stream.read(config.chunk)
frames.append(data)
# Stop and close the stream
stream.stop_stream()
stream.close()
# Terminate the PortAudio interface
p.terminate()
logging.info("Finished recording")
# Save the recorded data as a WAV file
recording_path = os.path.join(config.recording_folder, get_current_time() + ".wav")
recording_path
with wave.open(recording_path, "wb") as wf:
wf.setnchannels(config.channels)
wf.setsampwidth(p.get_sample_size(sample_format)) # config.sample_format
wf.setframerate(config.fs)
wf.writeframes(b"".join(frames))
wf.close()
return recording_path
def play_mp3(mp3_file_path):
try:
pygame.mixer.init()
pygame.mixer.music.load(mp3_file_path)
pygame.mixer.music.play()
# Wait for the music to finish playing
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
except Exception as e:
print(f"Error: {e}")
def get_current_time():
now = datetime.now()
dt_string = now.strftime("%d-%m-%Y_%H:%M:%S")
return dt_string
def read_text(file_path: str) -> str:
try:
with open(file_path, "r") as file_in:
text = file_in.read()
file_in.close()
logging.info(f"Success: file {file_path} read correctly")
return text
except FileNotFoundError as e:
logging.error(f"Error: File {file_path} not found - {str(e)}")
return ""
def get_pyaudio_format(subtype):
if subtype == "PCM_16":
return pyaudio.paInt16
elif subtype == "PCM_8":
return pyaudio.paInt8
elif subtype == "PCM_32":
return pyaudio.paInt32
else:
return pyaudio.paInt16
class Recorder:
"""Class to continuosly listen to user input, ans save audio when noise is detected.
Once noise is detected it will be run the function_to_call with the create filename as parameter
Args:
function_to_call: (function) func to be called with the generated file path as parameter
"""
def __init__(self, function_to_call):
self.Threshold = 10
self.SHORT_NORMALIZE = 1.0 / 32768.0
self.chunk = 1024
self.FORMAT = pyaudio.paInt16
self.CHANNELS = 1
self.RATE = 16_000
self.swidth = 2
self.TIMEOUT_LENGTH = 2
self.f_name_directory = r"/Users/marcellopoliti/Documents/Coding/pischool/lux-voice-processing/data/audio_recordings"
self.function_to_call = function_to_call
self.p = pyaudio.PyAudio()
self.stream = self.p.open(
format=self.FORMAT,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
output=True,
frames_per_buffer=self.chunk,
)
# @staticmethod
def rms(self, frame):
count = len(frame) / self.swidth
format = "%dh" % (count)
shorts = struct.unpack(format, frame)
sum_squares = 0.0
for sample in shorts:
n = sample * self.SHORT_NORMALIZE
sum_squares += n * n
rms = math.pow(sum_squares / count, 0.5)
return rms * 1000
def record(self):
print("Noise detected, recording beginning")
rec = []
current = time.time()
end = time.time() + self.TIMEOUT_LENGTH
while current <= end:
data = self.stream.read(self.chunk)
if self.rms(data) >= self.Threshold:
end = time.time() + self.TIMEOUT_LENGTH
current = time.time()
rec.append(data)
filename = self.write(b"".join(rec))
return filename
def write(self, recording):
n_files = len(os.listdir(self.f_name_directory))
filename = os.path.join(self.f_name_directory, "{}.wav".format(n_files))
wf = wave.open(filename, "wb")
wf.setnchannels(self.CHANNELS)
wf.setsampwidth(self.p.get_sample_size(self.FORMAT))
wf.setframerate(self.RATE)
wf.writeframes(recording)
wf.close()
logging.info("Written to file: {}".format(filename))
return filename
def listen(self):
print("Listening beginning")
while True:
input = self.stream.read(self.chunk, exception_on_overflow=False)
rms_val = self.rms(input)
if rms_val > self.Threshold:
filename = self.record()
self.function_to_call(filename)
# Function to check for new recordings and print a message
def check_for_new_recordings(function_to_call):
previous_files = set(os.listdir("audio_recordings"))
while True:
current_files = set(os.listdir("audio_recordings"))
new_files = current_files - previous_files
for new_file in new_files:
print(f"New recording detected: {new_file}")
function_to_call(new_file)
print("Returning to listening")
previous_files = current_files
time.sleep(2) # Check for new recordings every 2 seconds
if __name__ == "__main__":
recordings_path = record_audio()
|