mingyang91 commited on
Commit
bb6818c
·
verified ·
1 Parent(s): a2ebe0b

add lip-sync

Browse files
Files changed (6) hide show
  1. Cargo.lock +1 -0
  2. Cargo.toml +1 -0
  3. src/lesson.rs +67 -13
  4. src/main.rs +32 -7
  5. static/client.js +11 -3
  6. static/index.html +7 -1
Cargo.lock CHANGED
@@ -1333,6 +1333,7 @@ dependencies = [
1333
  "hound",
1334
  "poem",
1335
  "serde",
 
1336
  "symphonia",
1337
  "tokio",
1338
  "tokio-stream",
 
1333
  "hound",
1334
  "poem",
1335
  "serde",
1336
+ "serde_json",
1337
  "symphonia",
1338
  "tokio",
1339
  "tokio-stream",
Cargo.toml CHANGED
@@ -22,6 +22,7 @@ futures-util = "0.3.28"
22
  #symphonia-format-mkv = "0.5.3"
23
  symphonia = { version = "0.5.3", features = ["mkv", "pcm"] }
24
  serde = { version = "1.0.189", features = ["derive"] }
 
25
 
26
  [dependencies.poem]
27
  version = "1.3.58"
 
22
  #symphonia-format-mkv = "0.5.3"
23
  symphonia = { version = "0.5.3", features = ["mkv", "pcm"] }
24
  serde = { version = "1.0.189", features = ["derive"] }
25
+ serde_json = { version = "1.0.107", features = [] }
26
 
27
  [dependencies.poem]
28
  version = "1.3.58"
src/lesson.rs CHANGED
@@ -1,15 +1,20 @@
1
  use std::sync::{Arc, Weak};
2
  use tokio::sync::RwLock;
3
  use std::collections::BTreeMap;
 
4
  use async_stream::stream;
5
  use aws_config::SdkConfig;
6
- use aws_sdk_polly::types::VoiceId;
 
7
  use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput;
8
  use aws_sdk_transcribestreaming::primitives::Blob;
9
  use aws_sdk_transcribestreaming::types::{AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream};
 
10
  use futures_util::{Stream, StreamExt, TryStreamExt};
 
 
11
 
12
- use tokio::select;
13
  use crate::StreamTranscriptionError;
14
 
15
  #[derive(Clone, Debug)]
@@ -297,6 +302,10 @@ impl VoiceLesson {
297
  pub(crate) fn voice_channel(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
298
  self.inner.voice_lesson.subscribe()
299
  }
 
 
 
 
300
  }
301
 
302
  impl From<InnerVoiceLesson> for VoiceLesson {
@@ -318,6 +327,7 @@ impl From<Arc<InnerVoiceLesson>> for VoiceLesson {
318
  struct InnerVoiceLesson {
319
  parent: LangLesson,
320
  voice: VoiceId,
 
321
  voice_lesson: tokio::sync::broadcast::Sender<Vec<u8>>,
322
  drop_handler: Option<tokio::sync::oneshot::Sender<Signal>>,
323
  }
@@ -337,22 +347,18 @@ impl InnerVoiceLesson {
337
  let mut translate_rx = parent.inner.translated_tx.subscribe();
338
  let (voice_lesson, _) = tokio::sync::broadcast::channel::<Vec<u8>>(128);
339
  let shared_voice_lesson = voice_lesson.clone();
 
 
340
  let client = parent.inner.parent.inner.parent.polly_client.clone();
341
  // let lang: LanguageCode = parent.inner.lang.clone().parse().expect("Invalid language code");
342
  tokio::spawn(async move {
343
  let fut = async {
344
  while let Ok(translated) = translate_rx.recv().await {
345
- let res = client.synthesize_speech()
346
- .set_text(Some(translated))
347
- .voice_id(shared_voice_id.clone())
348
- .output_format("pcm".into())
349
- // .language_code(lang)
350
- // .language_code("cmn-CN".into())
351
- .send()
352
- .await;
353
  match res {
354
- Ok(mut synthesized) => {
355
- while let Some(Ok(bytes)) = synthesized.audio_stream.next().await {
 
356
  let _ = &shared_voice_lesson.send(bytes.to_vec());
357
  }
358
  },
@@ -364,7 +370,12 @@ impl InnerVoiceLesson {
364
  Ok(())
365
  };
366
  select! {
367
- _ = fut => {}
 
 
 
 
 
368
  _ = rx => {}
369
  }
370
  });
@@ -372,6 +383,7 @@ impl InnerVoiceLesson {
372
  InnerVoiceLesson {
373
  parent,
374
  voice,
 
375
  voice_lesson,
376
  drop_handler: Some(tx),
377
  }
@@ -411,3 +423,45 @@ fn to_stream(mut output: StartStreamTranscriptionOutput) -> impl Stream<Item=Res
411
  }
412
  }
413
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  use std::sync::{Arc, Weak};
2
  use tokio::sync::RwLock;
3
  use std::collections::BTreeMap;
4
+ use std::io::BufRead;
5
  use async_stream::stream;
6
  use aws_config::SdkConfig;
7
+ use aws_sdk_polly::primitives::ByteStream;
8
+ use aws_sdk_polly::types::{Engine, OutputFormat, SpeechMarkType, VoiceId};
9
  use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput;
10
  use aws_sdk_transcribestreaming::primitives::Blob;
11
  use aws_sdk_transcribestreaming::types::{AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream};
12
+ use clap::builder::TypedValueParser;
13
  use futures_util::{Stream, StreamExt, TryStreamExt};
14
+ use futures_util::future::try_join;
15
+ use serde::{Deserialize, Serialize};
16
 
17
+ use tokio::{select, try_join};
18
  use crate::StreamTranscriptionError;
19
 
20
  #[derive(Clone, Debug)]
 
302
  pub(crate) fn voice_channel(&self) -> tokio::sync::broadcast::Receiver<Vec<u8>> {
303
  self.inner.voice_lesson.subscribe()
304
  }
305
+
306
+ pub(crate) fn lip_sync_channel(&self) -> tokio::sync::broadcast::Receiver<Vec<Viseme>> {
307
+ self.inner.lip_sync_tx.subscribe()
308
+ }
309
  }
310
 
311
  impl From<InnerVoiceLesson> for VoiceLesson {
 
327
  struct InnerVoiceLesson {
328
  parent: LangLesson,
329
  voice: VoiceId,
330
+ lip_sync_tx: tokio::sync::broadcast::Sender<Vec<Viseme>>,
331
  voice_lesson: tokio::sync::broadcast::Sender<Vec<u8>>,
332
  drop_handler: Option<tokio::sync::oneshot::Sender<Signal>>,
333
  }
 
347
  let mut translate_rx = parent.inner.translated_tx.subscribe();
348
  let (voice_lesson, _) = tokio::sync::broadcast::channel::<Vec<u8>>(128);
349
  let shared_voice_lesson = voice_lesson.clone();
350
+ let (lip_sync_tx, _) = tokio::sync::broadcast::channel::<Vec<Viseme>>(128);
351
+ let shared_lip_sync_tx = lip_sync_tx.clone();
352
  let client = parent.inner.parent.inner.parent.polly_client.clone();
353
  // let lang: LanguageCode = parent.inner.lang.clone().parse().expect("Invalid language code");
354
  tokio::spawn(async move {
355
  let fut = async {
356
  while let Ok(translated) = translate_rx.recv().await {
357
+ let res = synthesize_speech(&client, translated, shared_voice_id.clone()).await;
 
 
 
 
 
 
 
358
  match res {
359
+ Ok((vec, mut audio_stream)) => {
360
+ let _ = shared_lip_sync_tx.send(vec);
361
+ while let Some(Ok(bytes)) = audio_stream.next().await {
362
  let _ = &shared_voice_lesson.send(bytes.to_vec());
363
  }
364
  },
 
370
  Ok(())
371
  };
372
  select! {
373
+ res = fut => match res {
374
+ Ok(_) => {}
375
+ Err(e) => {
376
+ println!("Error: {:?}", e);
377
+ }
378
+ },
379
  _ = rx => {}
380
  }
381
  });
 
383
  InnerVoiceLesson {
384
  parent,
385
  voice,
386
+ lip_sync_tx,
387
  voice_lesson,
388
  drop_handler: Some(tx),
389
  }
 
423
  }
424
  }
425
 
426
+ // {"time":180,"type":"viseme","value":"r"}
427
+ #[derive(Debug, Deserialize, Clone, Serialize)]
428
+ pub(crate) struct Viseme {
429
+ time: u32,
430
+ value: String,
431
+ }
432
+
433
+ #[derive(Debug)]
434
+ enum SynthesizeError {
435
+ Polly(aws_sdk_polly::Error),
436
+ Transmitting(aws_sdk_polly::error::BoxError),
437
+ }
438
+
439
+ async fn synthesize_speech(client: &aws_sdk_polly::Client,
440
+ text: String,
441
+ voice_id: VoiceId) -> Result<(Vec<Viseme>, ByteStream), SynthesizeError> {
442
+ let audio_fut = client.synthesize_speech()
443
+ .engine(Engine::Neural)
444
+ .set_text(Some(text.clone()))
445
+ .voice_id(voice_id.clone())
446
+ .output_format(OutputFormat::Pcm)
447
+ .send();
448
+ let visemes_fut = client.synthesize_speech()
449
+ .engine(Engine::Neural)
450
+ .set_text(Some(text))
451
+ .voice_id(voice_id)
452
+ .speech_mark_types(SpeechMarkType::Viseme)
453
+ .output_format(OutputFormat::Json)
454
+ .send();
455
+ let (audio, visemes) = try_join(audio_fut, visemes_fut)
456
+ .await
457
+ .map_err(|e| SynthesizeError::Polly(e.into()))?;
458
+ let visemes = visemes.audio_stream.collect().await
459
+ .map_err(|e| SynthesizeError::Transmitting(e.into()))?.to_vec();
460
+ let parsed: Vec<Viseme> = visemes
461
+ .lines()
462
+ .filter_map(|line| line.ok())
463
+ .filter_map(|line| serde_json::from_str::<Viseme>(&line).ok())
464
+ .collect();
465
+ Ok((parsed, audio.audio_stream))
466
+ }
467
+
src/main.rs CHANGED
@@ -28,8 +28,9 @@ use poem::web::{Data, Query};
28
  use tokio::select;
29
  use tokio::sync::mpsc::{Receiver, Sender};
30
  use tokio_stream::Stream;
31
- use serde::Deserialize;
32
  use lesson::{LessonsManager};
 
33
 
34
  mod lesson;
35
 
@@ -149,7 +150,10 @@ async fn stream_speaker(ctx: Data<&Context>, query: Query<LessonSpeakerQuery>, w
149
  msg = socket.next() => {
150
  match msg.as_ref() {
151
  Some(Ok(Message::Binary(bin))) => {
152
- origin_tx.send(bin.to_vec()).await.expect("failed to send");
 
 
 
153
  },
154
  Some(Ok(_)) => {
155
  println!("Other: {:?}", msg);
@@ -158,7 +162,7 @@ async fn stream_speaker(ctx: Data<&Context>, query: Query<LessonSpeakerQuery>, w
158
  println!("Error: {:?}", e);
159
  },
160
  None => {
161
- socket.close().await.expect("failed to close");
162
  println!("Other: {:?}", msg);
163
  break;
164
  }
@@ -183,6 +187,14 @@ pub struct LessonListenerQuery {
183
  voice: String,
184
  }
185
 
 
 
 
 
 
 
 
 
186
  #[handler]
187
  async fn stream_listener(ctx: Data<&Context>, query: Query<LessonListenerQuery>, ws: WebSocket) -> impl IntoResponse {
188
  let lesson_opt = ctx.lessons_manager.get_lesson(query.id).await;
@@ -199,19 +211,24 @@ async fn stream_listener(ctx: Data<&Context>, query: Query<LessonListenerQuery>,
199
  let mut translate_rx = lang_lesson.translated_channel();
200
  let mut voice_lesson = lang_lesson.get_or_init(voice_id).await;
201
  let mut voice_rx = voice_lesson.voice_channel();
 
202
 
203
  loop {
204
  select! {
205
  transcript = transcript_rx.recv() => {
206
  if let Ok(transcript) = transcript {
207
- println!("Transcribed: {}", transcript);
208
- let _ = socket.send(Message::Text(transcript)).await;
 
 
209
  }
210
  },
211
  translated = translate_rx.recv() => {
212
  if let Ok(translated) = translated {
213
- println!("Translated: {}", translated);
214
- let _ = socket.send(Message::Text(translated)).await;
 
 
215
  }
216
  },
217
  voice = voice_rx.recv() => {
@@ -220,6 +237,14 @@ async fn stream_listener(ctx: Data<&Context>, query: Query<LessonListenerQuery>,
220
  let _ = socket.send(Message::Binary(voice)).await;
221
  }
222
  },
 
 
 
 
 
 
 
 
223
  }
224
  }
225
  })
 
28
  use tokio::select;
29
  use tokio::sync::mpsc::{Receiver, Sender};
30
  use tokio_stream::Stream;
31
+ use serde::{Deserialize, Serialize};
32
  use lesson::{LessonsManager};
33
+ use crate::lesson::Viseme;
34
 
35
  mod lesson;
36
 
 
150
  msg = socket.next() => {
151
  match msg.as_ref() {
152
  Some(Ok(Message::Binary(bin))) => {
153
+ if origin_tx.send(bin.to_vec()).await.is_err() {
154
+ println!("tx closed");
155
+ break;
156
+ }
157
  },
158
  Some(Ok(_)) => {
159
  println!("Other: {:?}", msg);
 
162
  println!("Error: {:?}", e);
163
  },
164
  None => {
165
+ let _ = socket.close().await;
166
  println!("Other: {:?}", msg);
167
  break;
168
  }
 
187
  voice: String,
188
  }
189
 
190
+ #[derive(Serialize)]
191
+ #[serde(tag = "type")]
192
+ enum LiveLessonTextEvent {
193
+ Transcription { text: String },
194
+ Translation { text: String },
195
+ LipSync{ visemes: Vec<Viseme> },
196
+ }
197
+
198
  #[handler]
199
  async fn stream_listener(ctx: Data<&Context>, query: Query<LessonListenerQuery>, ws: WebSocket) -> impl IntoResponse {
200
  let lesson_opt = ctx.lessons_manager.get_lesson(query.id).await;
 
211
  let mut translate_rx = lang_lesson.translated_channel();
212
  let mut voice_lesson = lang_lesson.get_or_init(voice_id).await;
213
  let mut voice_rx = voice_lesson.voice_channel();
214
+ let mut lip_sync_rx = voice_lesson.lip_sync_channel();
215
 
216
  loop {
217
  select! {
218
  transcript = transcript_rx.recv() => {
219
  if let Ok(transcript) = transcript {
220
+ let evt = LiveLessonTextEvent::Transcription { text: transcript };
221
+ let json = serde_json::to_string(&evt).expect("failed to serialize");
222
+ println!("Transcribed: {}", json);
223
+ let _ = socket.send(Message::Text(json)).await;
224
  }
225
  },
226
  translated = translate_rx.recv() => {
227
  if let Ok(translated) = translated {
228
+ let evt = LiveLessonTextEvent::Translation { text: translated };
229
+ let json = serde_json::to_string(&evt).expect("failed to serialize");
230
+ println!("Translated: {}", json);
231
+ let _ = socket.send(Message::Text(json)).await;
232
  }
233
  },
234
  voice = voice_rx.recv() => {
 
237
  let _ = socket.send(Message::Binary(voice)).await;
238
  }
239
  },
240
+ visemes = lip_sync_rx.recv() => {
241
+ if let Ok(visemes) = visemes {
242
+ let evt = LiveLessonTextEvent::LipSync { visemes };
243
+ let json = serde_json::to_string(&evt).expect("failed to serialize");
244
+ println!("Visemes: {:?}", json);
245
+ let _ = socket.send(Message::Text(json)).await;
246
+ }
247
+ },
248
  }
249
  }
250
  })
static/client.js CHANGED
@@ -29,7 +29,8 @@ let bufferSize = 2048,
29
  //vars
30
  let audioElement = document.querySelector('audio'),
31
  finalWord = false,
32
- resultText = document.getElementById('ResultText'),
 
33
  removeLastSentence = true,
34
  streamStreaming = false;
35
 
@@ -134,14 +135,21 @@ socket.onmessage = function (msg) {
134
  audioQueue.next(msg.data)
135
  } else {
136
  // text
137
- onSpeechData(msg.data)
 
 
 
 
 
 
 
138
  }
139
  }
140
  socket.onclose = function () {
141
  processor.stop()
142
  }
143
 
144
- function onSpeechData(data) {
145
  var dataFinal = false;
146
 
147
  if (dataFinal === false) {
 
29
  //vars
30
  let audioElement = document.querySelector('audio'),
31
  finalWord = false,
32
+ translationText = document.getElementById('Translation'),
33
+ transcriptionText = document.getElementById('Transcription'),
34
  removeLastSentence = true,
35
  streamStreaming = false;
36
 
 
135
  audioQueue.next(msg.data)
136
  } else {
137
  // text
138
+ const evt = JSON.parse(msg.data)
139
+ if (evt.type === 'Translation') {
140
+ onSpeechData(transcriptionText, evt.text)
141
+ } else if (evt.type === 'Transcription') {
142
+ onSpeechData(translationText, evt.text)
143
+ } else {
144
+ console.log(evt.visemes)
145
+ }
146
  }
147
  }
148
  socket.onclose = function () {
149
  processor.stop()
150
  }
151
 
152
+ function onSpeechData(resultText, data) {
153
  var dataFinal = false;
154
 
155
  if (dataFinal === false) {
static/index.html CHANGED
@@ -25,7 +25,13 @@
25
  <br>
26
 
27
  <div>
28
- <p id="ResultText">
 
 
 
 
 
 
29
  <span class="greyText">No Speech to Text yet
30
  <span>
31
  </p>
 
25
  <br>
26
 
27
  <div>
28
+ <h1>Translation</h1>
29
+ <p id="Translation">
30
+ <span class="greyText">No Speech to Text yet
31
+ <span>
32
+ </p>
33
+ <h1>Transcription</h1>
34
+ <p id="Transcription">
35
  <span class="greyText">No Speech to Text yet
36
  <span>
37
  </p>