File size: 4,323 Bytes
6af1e98 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import gradio as gr
import time
import numpy as np
import os
import requests
import io
from pydub import AudioSegment
def translate_audio(audio, SARVAM_API_KEY):
# API endpoint for speech-to-text translation
api_url = "https://api.sarvam.ai/speech-to-text-translate"
# Headers containing the API subscription key
headers = {
"api-subscription-key": SARVAM_API_KEY # Replace with your API key
}
# Data payload for the translation request
model_data = {
"model": "saaras:v2", # Specify the model to be used
"with_diarization": False # Set to True for speaker diarization
}
chunk_buffer = io.BytesIO()
audio.export(chunk_buffer, format="wav")
chunk_buffer.seek(0) # Reset the pointer to the start of the stream
# Prepare the file for the API request
files = {'file': ('audiofile.wav', chunk_buffer, 'audio/wav')}
try:
# Make the POST request to the API
response = requests.post(api_url, headers=headers, files=files, data=model_data)
if response.status_code == 200 or response.status_code == 201:
response_data = response.json()
transcript = response_data.get("transcript", "")
else:
# Handle failed requests
print(f"failed with status code: {response.status_code}")
print("Response:", response.text)
except Exception as e:
# Handle any exceptions during the request
print(f"Error processing chunk {e}")
finally:
# Ensure the buffer is closed after processing
chunk_buffer.close()
return transcript
def stream_transcribe(history, new_chunk, SARVAM_API_KEY):
start_time = time.time()
if history is None:
history = ""
try:
sr, y = new_chunk
# Convert to mono if stereo
if y.ndim > 1:
y = y.mean(axis=1)
# Convert to int16 for AudioSegment
y_int16 = y.astype(np.int16)
# Create AudioSegment from raw PCM data
audio_segment = AudioSegment(
data=y_int16.tobytes(),
sample_width=2,
frame_rate=sr,
channels=1
)
transcription = translate_audio(audio_segment, SARVAM_API_KEY)
end_time = time.time()
latency = end_time - start_time
history = history + '\n' + transcription
return history, history, f"{latency:.2f}"
except Exception as e:
print(f"Error during Transcription: {e}")
return history, str(e), "Error"
def clear():
return ""
def clear_state():
return None
def clear_api_key():
return ""
with open("gradio.css", "r") as f:
custom_css = f.read()
with gr.Blocks(theme=gr.themes.Glass()) as microphone:
with gr.Column():
gr.Markdown(
"""
### π Sarvam AI API Key Required
To use this app, you need a free API key from [Sarvam AI](https://sarvam.ai).
π **Step 1:** Visit [https://sarvam.ai](https://sarvam.ai)
π **Step 2:** Sign up or log in
π **Step 3:** Generate your API key and paste it below
Your key stays on your device and is not stored.
"""
)
api_key_box = gr.Textbox(label="Enter SARVAM AI API Key", type="password")
with gr.Row():
input_audio_microphone = gr.Audio(streaming=True)
output = gr.Textbox(label="Transcription", value="")
latency_textbox = gr.Textbox(label="Latency (seconds)", value="0.0", scale=0)
with gr.Row():
clear_button = gr.Button("Clear Output")
clear_api_key_button = gr.Button("Clear API Key")
state = gr.State(value="")
def wrapped_stream_transcribe(history, new_chunk, api_key):
return stream_transcribe(history, new_chunk, api_key)
input_audio_microphone.stream(
wrapped_stream_transcribe,
[state, input_audio_microphone, api_key_box],
[state, output, latency_textbox],
time_limit=30,
stream_every=5,
concurrency_limit=None,
)
clear_button.click(clear_state, outputs=[state]).then(clear, outputs=[output])
clear_api_key_button.click(clear_api_key, outputs=[api_key_box])
demo = microphone
demo.launch() |