mingyang91 commited on
Commit
af79bf4
·
verified ·
1 Parent(s): 44e95cf
Files changed (4) hide show
  1. src/lesson.rs +178 -48
  2. src/main.rs +83 -8
  3. static/client.js +20 -5
  4. static/index.html +2 -0
src/lesson.rs CHANGED
@@ -1,11 +1,17 @@
1
- use serde::Deserialize;
2
  use std::sync::{Arc, Weak};
3
  use tokio::sync::RwLock;
4
  use std::collections::BTreeMap;
 
5
  use aws_config::SdkConfig;
 
 
 
 
 
6
  use tokio::select;
 
7
 
8
- #[derive(Clone)]
9
  pub struct LessonsManager {
10
  translate_client: aws_sdk_translate::Client,
11
  polly_client: aws_sdk_polly::Client,
@@ -28,9 +34,9 @@ impl LessonsManager {
28
 
29
  pub(crate) async fn create_lesson(&self,
30
  id: u32,
31
- speaker_lang: String) -> Lesson {
32
  let mut map = self.lessons.write().await;
33
- let lesson: Lesson = InnerLesson::new(id, speaker_lang).into();
34
  map.insert(id, lesson.clone());
35
  lesson
36
  }
@@ -48,22 +54,34 @@ pub(crate) struct Lesson {
48
 
49
  impl Lesson {
50
  pub(crate) async fn get_or_init(&self, lang: String) -> LangLesson {
51
- let map = self.inner.lang_lessons.read().await;
52
- if let Some(lang_lesson) = map.get(&lang).and_then(|weak| weak.upgrade()) {
53
- return lang_lesson.into();
 
 
54
  }
55
- let mut map = self.inner.lang_lessons.write().await;
56
- if let Some(lang_lesson) = map.get(&lang).and_then(|weak| weak.upgrade()) {
57
- lang_lesson.into()
58
- } else {
59
- let lang_lesson = LangLesson::new(
60
- self.clone(),
61
- lang.clone(),
62
- );
63
- map.insert(lang.clone(), Arc::downgrade(&lang_lesson.inner));
64
- lang_lesson
 
 
65
  }
66
  }
 
 
 
 
 
 
 
 
67
  }
68
 
69
  impl From<InnerLesson> for Lesson {
@@ -76,8 +94,9 @@ impl From<InnerLesson> for Lesson {
76
 
77
  #[derive(Debug)]
78
  struct InnerLesson {
 
79
  id: u32,
80
- speaker_lang: String,
81
  speaker_voice_channel: tokio::sync::mpsc::Sender<Vec<u8>>,
82
  speaker_transcript: tokio::sync::broadcast::Sender<String>,
83
  lang_lessons: RwLock<BTreeMap<String, Weak<InnerLangLesson>>>,
@@ -86,30 +105,59 @@ struct InnerLesson {
86
 
87
  impl InnerLesson {
88
  fn new(
 
89
  id: u32,
90
- speaker_lang: String
91
  ) -> InnerLesson {
92
  let (speaker_transcript, _) = tokio::sync::broadcast::channel::<String>(128);
 
93
  let (speaker_voice_channel, mut speaker_voice_rx) = tokio::sync::mpsc::channel(128);
94
- let (drop_handler, mut drop_rx) = tokio::sync::oneshot::channel::<Signal>();
 
 
95
 
96
  tokio::spawn(async move {
97
  let fut = async {
98
- loop {
99
- tokio::time::sleep(std::time::Duration::from_secs(1)).await;
100
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  };
102
  select! {
103
- _ = fut => {}
 
 
 
 
104
  _ = drop_rx => {}
105
  }
106
  });
107
 
108
  InnerLesson {
 
109
  id,
110
  speaker_lang,
111
  speaker_voice_channel,
112
- speaker_transcript,
113
  lang_lessons: RwLock::new(BTreeMap::new()),
114
  drop_handler: Some(drop_handler),
115
  }
@@ -128,8 +176,9 @@ impl Drop for InnerLesson {
128
  struct InnerLangLesson {
129
  parent: Lesson,
130
  lang: String,
131
- translated: tokio::sync::broadcast::Sender<String>,
132
- voice_lessons: RwLock<BTreeMap<String, Weak<InnerVoiceLesson>>>
 
133
  }
134
 
135
  #[derive(Clone)]
@@ -137,6 +186,12 @@ pub(crate) struct LangLesson {
137
  inner: Arc<InnerLangLesson>
138
  }
139
 
 
 
 
 
 
 
140
  impl From<InnerLangLesson> for LangLesson {
141
  fn from(inner: InnerLangLesson) -> Self {
142
  LangLesson {
@@ -158,39 +213,91 @@ impl LangLesson {
158
  parent: Lesson,
159
  lang: String,
160
  ) -> Self {
161
- let (translated, _) = tokio::sync::broadcast::channel::<String>(128);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
162
  InnerLangLesson {
163
  parent,
164
  lang,
165
- translated,
166
  voice_lessons: RwLock::new(BTreeMap::new()),
 
167
  }.into()
168
  }
169
 
170
- async fn get_or_init(&mut self, voice: String) -> VoiceLesson {
171
- let map = self.inner.voice_lessons.read().await;
172
- if let Some(voice_lesson) = map.get(&voice).and_then(|weak| weak.upgrade()) {
173
- return voice_lesson.into();
 
 
174
  }
175
- let mut map = self.inner.voice_lessons.write().await;
176
- if let Some(voice_lesson) = map.get(&voice).and_then(|weak| weak.upgrade()) {
177
- voice_lesson.into()
178
- } else {
179
- let voice_lesson = Arc::new(InnerVoiceLesson::new(
180
- self.clone(),
181
- voice.clone(),
182
- ));
183
- map.insert(voice.clone(), Arc::downgrade(&voice_lesson));
184
- voice_lesson.into()
 
 
 
185
  }
186
  }
187
  }
188
 
189
  #[derive(Clone)]
190
- struct VoiceLesson {
191
  inner: Arc<InnerVoiceLesson>
192
  }
193
 
 
 
 
 
 
 
194
  impl From<InnerVoiceLesson> for VoiceLesson {
195
  fn from(inner: InnerVoiceLesson) -> Self {
196
  VoiceLesson {
@@ -209,7 +316,7 @@ impl From<Arc<InnerVoiceLesson>> for VoiceLesson {
209
 
210
  struct InnerVoiceLesson {
211
  parent: LangLesson,
212
- voice: String,
213
  voice_lesson: tokio::sync::broadcast::Sender<Vec<u8>>,
214
  drop_handler: Option<tokio::sync::oneshot::Sender<Signal>>,
215
  }
@@ -222,15 +329,38 @@ enum Signal {
222
  impl InnerVoiceLesson {
223
  fn new(
224
  parent: LangLesson,
225
- voice: String,
226
  ) -> InnerVoiceLesson {
 
227
  let (tx, rx) = tokio::sync::oneshot::channel::<Signal>();
 
228
  let (voice_lesson, _) = tokio::sync::broadcast::channel::<Vec<u8>>(128);
 
 
 
229
  tokio::spawn(async move {
230
  let fut = async {
231
- loop {
232
- tokio::time::sleep(std::time::Duration::from_secs(1)).await;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
  }
 
234
  };
235
  select! {
236
  _ = fut => {}
 
 
1
  use std::sync::{Arc, Weak};
2
  use tokio::sync::RwLock;
3
  use std::collections::BTreeMap;
4
+ use async_stream::stream;
5
  use aws_config::SdkConfig;
6
+ use aws_sdk_polly::types::VoiceId;
7
+ use aws_sdk_transcribestreaming::primitives::Blob;
8
+ use aws_sdk_transcribestreaming::types::{AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream};
9
+ use futures_util::{StreamExt, TryStreamExt};
10
+
11
  use tokio::select;
12
+ use crate::to_stream;
13
 
14
+ #[derive(Clone, Debug)]
15
  pub struct LessonsManager {
16
  translate_client: aws_sdk_translate::Client,
17
  polly_client: aws_sdk_polly::Client,
 
34
 
35
  pub(crate) async fn create_lesson(&self,
36
  id: u32,
37
+ speaker_lang: LanguageCode) -> Lesson {
38
  let mut map = self.lessons.write().await;
39
+ let lesson: Lesson = InnerLesson::new(self.clone(), id, speaker_lang).into();
40
  map.insert(id, lesson.clone());
41
  lesson
42
  }
 
54
 
55
  impl Lesson {
56
  pub(crate) async fn get_or_init(&self, lang: String) -> LangLesson {
57
+ {
58
+ let map = self.inner.lang_lessons.read().await;
59
+ if let Some(lang_lesson) = map.get(&lang).and_then(|weak| weak.upgrade()) {
60
+ return lang_lesson.into();
61
+ }
62
  }
63
+ {
64
+ let mut map = self.inner.lang_lessons.write().await;
65
+ if let Some(lang_lesson) = map.get(&lang).and_then(|weak| weak.upgrade()) {
66
+ lang_lesson.into()
67
+ } else {
68
+ let lang_lesson = LangLesson::new(
69
+ self.clone(),
70
+ lang.clone(),
71
+ );
72
+ map.insert(lang.clone(), Arc::downgrade(&lang_lesson.inner));
73
+ lang_lesson
74
+ }
75
  }
76
  }
77
+
78
+ pub(crate) fn voice_channel(&self) -> tokio::sync::mpsc::Sender<Vec<u8>> {
79
+ self.inner.speaker_voice_channel.clone()
80
+ }
81
+
82
+ pub(crate) fn transcript_channel(&self) -> tokio::sync::broadcast::Receiver<String> {
83
+ self.inner.speaker_transcript.subscribe()
84
+ }
85
  }
86
 
87
  impl From<InnerLesson> for Lesson {
 
94
 
95
  #[derive(Debug)]
96
  struct InnerLesson {
97
+ parent: LessonsManager,
98
  id: u32,
99
+ speaker_lang: LanguageCode,
100
  speaker_voice_channel: tokio::sync::mpsc::Sender<Vec<u8>>,
101
  speaker_transcript: tokio::sync::broadcast::Sender<String>,
102
  lang_lessons: RwLock<BTreeMap<String, Weak<InnerLangLesson>>>,
 
105
 
106
  impl InnerLesson {
107
  fn new(
108
+ parent: LessonsManager,
109
  id: u32,
110
+ speaker_lang: LanguageCode
111
  ) -> InnerLesson {
112
  let (speaker_transcript, _) = tokio::sync::broadcast::channel::<String>(128);
113
+ let shared_speaker_transcript = speaker_transcript.clone();
114
  let (speaker_voice_channel, mut speaker_voice_rx) = tokio::sync::mpsc::channel(128);
115
+ let (drop_handler, drop_rx) = tokio::sync::oneshot::channel::<Signal>();
116
+ let transcript_client = parent.transcript_client.clone();
117
+ let shared_speak_lang = speaker_lang.clone();
118
 
119
  tokio::spawn(async move {
120
  let fut = async {
121
+ let input_stream = stream! {
122
+ while let Some(raw) = speaker_voice_rx.recv().await {
123
+ yield Ok(AudioStream::AudioEvent(AudioEvent::builder().audio_chunk(Blob::new(raw)).build()));
124
+ }
125
+ };
126
+ let output = transcript_client
127
+ .start_stream_transcription()
128
+ .language_code(shared_speak_lang)//LanguageCode::EnGb
129
+ .media_sample_rate_hertz(16000)
130
+ .media_encoding(MediaEncoding::Pcm)
131
+ .audio_stream(input_stream.into())
132
+ .send()
133
+ .await
134
+ .map_err(|e| crate::StreamTranscriptionError::EstablishStreamError(Box::new(e)))?;
135
+
136
+ let mut output_stream = to_stream(output);
137
+ output_stream
138
+ .try_for_each(|text| async {
139
+ let _ = shared_speaker_transcript.send(text);
140
+ Ok(())
141
+ })
142
+ .await?;
143
+ Ok(()) as Result<(), crate::StreamTranscriptionError>
144
  };
145
  select! {
146
+ res = fut => {
147
+ if let Err(e) = res {
148
+ println!("Error: {:?}", e);
149
+ }
150
+ }
151
  _ = drop_rx => {}
152
  }
153
  });
154
 
155
  InnerLesson {
156
+ parent,
157
  id,
158
  speaker_lang,
159
  speaker_voice_channel,
160
+ speaker_transcript: speaker_transcript,
161
  lang_lessons: RwLock::new(BTreeMap::new()),
162
  drop_handler: Some(drop_handler),
163
  }
 
176
  struct InnerLangLesson {
177
  parent: Lesson,
178
  lang: String,
179
+ translated_tx: tokio::sync::broadcast::Sender<String>,
180
+ voice_lessons: RwLock<BTreeMap<VoiceId, Weak<InnerVoiceLesson>>>,
181
+ drop_handler: Option<tokio::sync::oneshot::Sender<Signal>>,
182
  }
183
 
184
  #[derive(Clone)]
 
186
  inner: Arc<InnerLangLesson>
187
  }
188
 
189
+ impl LangLesson {
190
+ pub(crate) fn translated_channel(&self) -> tokio::sync::broadcast::Receiver<String> {
191
+ self.inner.translated_tx.subscribe()
192
+ }
193
+ }
194
+
195
  impl From<InnerLangLesson> for LangLesson {
196
  fn from(inner: InnerLangLesson) -> Self {
197
  LangLesson {
 
213
  parent: Lesson,
214
  lang: String,
215
  ) -> Self {
216
+ let shared_lang = lang.clone();;
217
+ let shared_speaker_lang = parent.inner.speaker_lang.clone();
218
+ let (translated_tx, _) = tokio::sync::broadcast::channel::<String>(128);
219
+ let shared_translated_tx = translated_tx.clone();
220
+ let mut transcript_rx = parent.inner.speaker_transcript.subscribe();
221
+ let translate_client = parent.inner.parent.translate_client.clone();
222
+ let (drop_handler, drop_rx) = tokio::sync::oneshot::channel::<Signal>();
223
+ tokio::spawn(async move {
224
+ let fut = async {
225
+ while let Ok(text) = transcript_rx.recv().await {
226
+ let output = translate_client
227
+ .translate_text()
228
+ .text(text)
229
+ .source_language_code(shared_speaker_lang.as_str())
230
+ .target_language_code(shared_lang.clone())
231
+ .send()
232
+ .await;
233
+ match output {
234
+ Ok(res) => {
235
+ if let Some(translated) = res.translated_text {
236
+ let _ = shared_translated_tx.send(translated);
237
+ }
238
+ },
239
+ Err(e) => {
240
+ return Err(e);
241
+ }
242
+ _ => {}
243
+ }
244
+ }
245
+ Ok(())
246
+ };
247
+
248
+ select! {
249
+ res = fut => {
250
+ if let Err(e) = res {
251
+ println!("Error: {:?}", e);
252
+ }
253
+ }
254
+ _ = drop_rx => {}
255
+ }
256
+ });
257
  InnerLangLesson {
258
  parent,
259
  lang,
260
+ translated_tx,
261
  voice_lessons: RwLock::new(BTreeMap::new()),
262
+ drop_handler: Some(drop_handler),
263
  }.into()
264
  }
265
 
266
+ pub(crate) async fn get_or_init(&mut self, voice: VoiceId) -> VoiceLesson {
267
+ {
268
+ let map = self.inner.voice_lessons.read().await;
269
+ if let Some(voice_lesson) = map.get(&voice).and_then(|weak| weak.upgrade()) {
270
+ return voice_lesson.into();
271
+ }
272
  }
273
+
274
+ {
275
+ let mut map = self.inner.voice_lessons.write().await;
276
+ if let Some(voice_lesson) = map.get(&voice).and_then(|weak| weak.upgrade()) {
277
+ voice_lesson.into()
278
+ } else {
279
+ let voice_lesson = Arc::new(InnerVoiceLesson::new(
280
+ self.clone(),
281
+ voice.clone(),
282
+ ));
283
+ map.insert(voice, Arc::downgrade(&voice_lesson));
284
+ voice_lesson.into()
285
+ }
286
  }
287
  }
288
  }
289
 
290
  #[derive(Clone)]
291
+ pub(crate) struct VoiceLesson {
292
  inner: Arc<InnerVoiceLesson>
293
  }
294
 
295
+ impl VoiceLesson {
296
+ pub(crate) fn voice_channel(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
297
+ self.inner.voice_lesson.subscribe()
298
+ }
299
+ }
300
+
301
  impl From<InnerVoiceLesson> for VoiceLesson {
302
  fn from(inner: InnerVoiceLesson) -> Self {
303
  VoiceLesson {
 
316
 
317
  struct InnerVoiceLesson {
318
  parent: LangLesson,
319
+ voice: VoiceId,
320
  voice_lesson: tokio::sync::broadcast::Sender<Vec<u8>>,
321
  drop_handler: Option<tokio::sync::oneshot::Sender<Signal>>,
322
  }
 
329
  impl InnerVoiceLesson {
330
  fn new(
331
  parent: LangLesson,
332
+ voice: VoiceId,
333
  ) -> InnerVoiceLesson {
334
+ let shared_voice_id: VoiceId = voice.clone();
335
  let (tx, rx) = tokio::sync::oneshot::channel::<Signal>();
336
+ let mut translate_rx = parent.inner.translated_tx.subscribe();
337
  let (voice_lesson, _) = tokio::sync::broadcast::channel::<Vec<u8>>(128);
338
+ let shared_voice_lesson = voice_lesson.clone();
339
+ let client = parent.inner.parent.inner.parent.polly_client.clone();
340
+ // let lang: LanguageCode = parent.inner.lang.clone().parse().expect("Invalid language code");
341
  tokio::spawn(async move {
342
  let fut = async {
343
+ while let Ok(translated) = translate_rx.recv().await {
344
+ let res = client.synthesize_speech()
345
+ .set_text(Some(translated))
346
+ .voice_id(shared_voice_id.clone())
347
+ .output_format("pcm".into())
348
+ // .language_code(lang)
349
+ // .language_code("cmn-CN".into())
350
+ .send()
351
+ .await;
352
+ match res {
353
+ Ok(mut synthesized) => {
354
+ while let Some(Ok(bytes)) = synthesized.audio_stream.next().await {
355
+ let _ = &shared_voice_lesson.send(bytes.to_vec());
356
+ }
357
+ },
358
+ Err(e) => {
359
+ return Err(e);
360
+ }
361
+ }
362
  }
363
+ Ok(())
364
  };
365
  select! {
366
  _ = fut => {}
src/main.rs CHANGED
@@ -20,7 +20,7 @@ use clap::Parser;
20
 
21
  use poem::{Endpoint, EndpointExt, get, handler, IntoResponse, listener::TcpListener, Route, Server};
22
  use futures_util::{Sink, SinkExt, TryFutureExt, TryStreamExt};
23
- use poem::endpoint::StaticFilesEndpoint;
24
  use poem::web::websocket::{Message, WebSocket};
25
  use futures_util::stream::StreamExt;
26
  use poem::web::{Data, Query};
@@ -171,8 +171,10 @@ async fn main() -> Result<(), std::io::Error> {
171
  .index_file("index.html"),
172
  )
173
  .at("/translate", get(stream_translate))
174
- .at("/lesson-speaker", get(stream_speaker))
175
- .at("/lesson-listener", get(stream_listener))
 
 
176
  .data(ctx);
177
  let listener = TcpListener::bind("[::]:8080");
178
  let server = Server::new(listener);
@@ -189,9 +191,40 @@ pub struct LessonSpeakerQuery {
189
 
190
  #[handler]
191
  async fn stream_speaker(ctx: Data<&Context>, query: Query<LessonSpeakerQuery>, ws: WebSocket) -> impl IntoResponse {
192
- let lesson = ctx.lessons_manager.create_lesson(query.id, query.lang.clone()).await;
193
- println!("{:?}", lesson);
194
- println!("{:?}", query);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
  }
196
 
197
 
@@ -204,9 +237,51 @@ pub struct LessonListenerQuery {
204
 
205
  #[handler]
206
  async fn stream_listener(ctx: Data<&Context>, query: Query<LessonListenerQuery>, ws: WebSocket) -> impl IntoResponse {
207
- let lesson = ctx.lessons_manager.get_lesson(query.id).await;
208
- println!("{:?}", lesson);
209
  println!("{:?}", query);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
  }
211
 
212
  #[handler]
 
20
 
21
  use poem::{Endpoint, EndpointExt, get, handler, IntoResponse, listener::TcpListener, Route, Server};
22
  use futures_util::{Sink, SinkExt, TryFutureExt, TryStreamExt};
23
+ use poem::endpoint::{StaticFileEndpoint, StaticFilesEndpoint};
24
  use poem::web::websocket::{Message, WebSocket};
25
  use futures_util::stream::StreamExt;
26
  use poem::web::{Data, Query};
 
171
  .index_file("index.html"),
172
  )
173
  .at("/translate", get(stream_translate))
174
+ .at("/ws/lesson-speaker", get(stream_speaker))
175
+ .at("/ws/lesson-listener", get(stream_listener))
176
+ .at("lesson-speaker", StaticFileEndpoint::new("./static/index.html"))
177
+ .at("lesson-listener", StaticFileEndpoint::new("./static/index.html"))
178
  .data(ctx);
179
  let listener = TcpListener::bind("[::]:8080");
180
  let server = Server::new(listener);
 
191
 
192
  #[handler]
193
  async fn stream_speaker(ctx: Data<&Context>, query: Query<LessonSpeakerQuery>, ws: WebSocket) -> impl IntoResponse {
194
+ let lesson = ctx.lessons_manager.create_lesson(query.id, query.lang.clone().parse().expect("Not supported lang")).await;
195
+
196
+ ws.on_upgrade(|mut socket| async move {
197
+ let origin_tx = lesson.voice_channel();
198
+ let mut transcribe_rx = lesson.transcript_channel();
199
+ loop {
200
+ select! {
201
+ msg = socket.next() => {
202
+ match msg.as_ref() {
203
+ Some(Ok(Message::Binary(bin))) => {
204
+ origin_tx.send(bin.to_vec()).await.expect("failed to send");
205
+ },
206
+ Some(Ok(_)) => {
207
+ println!("Other: {:?}", msg);
208
+ },
209
+ Some(Err(e)) => {
210
+ println!("Error: {:?}", e);
211
+ },
212
+ None => {
213
+ socket.close().await.expect("failed to close");
214
+ println!("Other: {:?}", msg);
215
+ break;
216
+ }
217
+ }
218
+ },
219
+ output = transcribe_rx.recv() => {
220
+ if let Ok(transcript) = output {
221
+ println!("Transcribed: {}", transcript);
222
+ socket.send(Message::Text(transcript)).await.expect("failed to send");
223
+ }
224
+ },
225
+ }
226
+ }
227
+ })
228
  }
229
 
230
 
 
237
 
238
  #[handler]
239
  async fn stream_listener(ctx: Data<&Context>, query: Query<LessonListenerQuery>, ws: WebSocket) -> impl IntoResponse {
240
+ let lesson_opt = ctx.lessons_manager.get_lesson(query.id).await;
 
241
  println!("{:?}", query);
242
+ let voice_id = query.voice.parse().expect("Not supported voice");
243
+
244
+ ws.on_upgrade(|mut socket| async move {
245
+ let Some(lesson) = lesson_opt else {
246
+ let _ = socket.send(Message::Text("lesson not found".to_string())).await;
247
+ return
248
+ };
249
+
250
+ println!("lesson found");
251
+ let mut transcript_rx = lesson.transcript_channel();
252
+ println!("transcribe start");
253
+
254
+ let mut lang_lesson = lesson.get_or_init(query.lang.clone()).await;
255
+ let mut translate_rx = lang_lesson.translated_channel();
256
+ println!("translate start");
257
+
258
+ let mut voice_lesson = lang_lesson.get_or_init(voice_id).await;
259
+ let mut voice_rx = voice_lesson.voice_channel();
260
+ println!("synthesize start");
261
+
262
+ loop {
263
+ select! {
264
+ transcript = transcript_rx.recv() => {
265
+ if let Ok(transcript) = transcript {
266
+ println!("Transcribed: {}", transcript);
267
+ let _ = socket.send(Message::Text(transcript)).await;
268
+ }
269
+ },
270
+ translated = translate_rx.recv() => {
271
+ if let Ok(translated) = translated {
272
+ println!("Translated: {}", translated);
273
+ let _ = socket.send(Message::Text(translated)).await;
274
+ }
275
+ },
276
+ voice = voice_rx.recv() => {
277
+ if let Ok(voice) = voice {
278
+ println!("Synthesized: {:?}", voice.len());
279
+ let _ = socket.send(Message::Binary(voice)).await;
280
+ }
281
+ },
282
+ }
283
+ }
284
+ })
285
  }
286
 
287
  #[handler]
static/client.js CHANGED
@@ -13,7 +13,8 @@ if (location.protocol === "https:") {
13
  websocket_uri = "ws:"
14
  }
15
  websocket_uri += "//" + location.host
16
- websocket_uri += location.pathname + "translate"
 
17
  const socket = new WebSocket(websocket_uri);
18
 
19
  //================= CONFIG =================
@@ -122,10 +123,15 @@ function stopRecording() {
122
  // videoElement.srcObject = null;
123
  }
124
 
 
 
 
 
 
125
  //================= SOCKET IO =================
126
  socket.onmessage = function (msg) {
127
  if (msg.data instanceof Blob) {
128
- playAudio(msg.data)
129
  } else {
130
  // text
131
  onSpeechData(msg.data)
@@ -258,8 +264,11 @@ function capitalize(s) {
258
  return s.charAt(0).toUpperCase() + s.slice(1);
259
  }
260
 
 
 
 
 
261
  async function playAudio(chunk) {
262
- const audioContext = new (window.AudioContext || window.webkitAudioContext)();
263
  const totalLength = chunk.size;
264
 
265
  // Create an AudioBuffer of enough size
@@ -267,14 +276,20 @@ async function playAudio(chunk) {
267
  const output = audioBuffer.getChannelData(0);
268
 
269
  // Copy the PCM samples into the AudioBuffer
270
- const int16Array = new Int16Array(await chunk.arrayBuffer())
 
271
  for(let i = 0; i < int16Array.length; i++) {
272
  output[i] = int16Array[i] / 32768.0; // Convert to [-1, 1] float32 range
273
  }
274
 
275
  // 3. Play the audio using Web Audio API
 
276
  const source = audioContext.createBufferSource();
277
  source.buffer = audioBuffer;
278
  source.connect(audioContext.destination);
279
- source.start();
 
 
 
 
280
  }
 
13
  websocket_uri = "ws:"
14
  }
15
  websocket_uri += "//" + location.host
16
+ websocket_uri += "/ws" + location.pathname
17
+ websocket_uri += location.search
18
  const socket = new WebSocket(websocket_uri);
19
 
20
  //================= CONFIG =================
 
123
  // videoElement.srcObject = null;
124
  }
125
 
126
+
127
+ const audioQueue = new rxjs.Subject();
128
+ audioQueue
129
+ .pipe(rxjs.concatMap(playAudio))
130
+ .subscribe(_ => console.log('played audio'));
131
  //================= SOCKET IO =================
132
  socket.onmessage = function (msg) {
133
  if (msg.data instanceof Blob) {
134
+ audioQueue.next(msg.data)
135
  } else {
136
  // text
137
  onSpeechData(msg.data)
 
264
  return s.charAt(0).toUpperCase() + s.slice(1);
265
  }
266
 
267
+ const audioContext = new (window.AudioContext || window.webkitAudioContext)();
268
+
269
+ let nextStartTime = audioContext.currentTime;
270
+
271
  async function playAudio(chunk) {
 
272
  const totalLength = chunk.size;
273
 
274
  // Create an AudioBuffer of enough size
 
276
  const output = audioBuffer.getChannelData(0);
277
 
278
  // Copy the PCM samples into the AudioBuffer
279
+ const arrayBuf = await chunk.arrayBuffer();
280
+ const int16Array = new Int16Array(arrayBuf, 0, Math.floor(arrayBuf.byteLength / 2))
281
  for(let i = 0; i < int16Array.length; i++) {
282
  output[i] = int16Array[i] / 32768.0; // Convert to [-1, 1] float32 range
283
  }
284
 
285
  // 3. Play the audio using Web Audio API
286
+
287
  const source = audioContext.createBufferSource();
288
  source.buffer = audioBuffer;
289
  source.connect(audioContext.destination);
290
+ source.start(nextStartTime);
291
+ nextStartTime = Math.max(nextStartTime, audioContext.currentTime) + audioBuffer.duration;
292
+ source.onended = () => {
293
+ console.log('audio slice ended');
294
+ }
295
  }
static/index.html CHANGED
@@ -18,6 +18,7 @@
18
  <audio></audio>
19
 
20
  <br>
 
21
  <button id="startRecButton" type="button"> Start recording</button>
22
  <button id="stopRecButton" type="button"> Stop recording</button>
23
  <div id="recordingStatus">&nbsp;</div>
@@ -40,6 +41,7 @@
40
  <!-- Socket -->
41
  <!--<script src="assets/js/socket.io.js"></script>-->
42
 
 
43
  <!-- Client -->
44
  <script src="client.js"></script>
45
  </body>
 
18
  <audio></audio>
19
 
20
  <br>
21
+ <button id="startButton" type="button"> Start listening</button>
22
  <button id="startRecButton" type="button"> Start recording</button>
23
  <button id="stopRecButton" type="button"> Stop recording</button>
24
  <div id="recordingStatus">&nbsp;</div>
 
41
  <!-- Socket -->
42
  <!--<script src="assets/js/socket.io.js"></script>-->
43
 
44
+ <script src="https://unpkg.com/rxjs@%5E7/dist/bundles/rxjs.umd.min.js"></script>
45
  <!-- Client -->
46
  <script src="client.js"></script>
47
  </body>