mingyang91 commited on
Commit
03e88fd
·
verified ·
1 Parent(s): 254fd5f

send audio in single message of websocket

Browse files
Files changed (2) hide show
  1. src/lesson.rs +12 -8
  2. src/main.rs +4 -8
src/lesson.rs CHANGED
@@ -395,11 +395,9 @@ impl InnerVoiceLesson {
395
  while let Ok(translated) = translate_rx.recv().await {
396
  let res = synthesize_speech(&client, translated, shared_voice_id.clone()).await;
397
  match res {
398
- Ok((vec, mut audio_stream)) => {
399
  let _ = shared_lip_sync_tx.send(vec);
400
- while let Some(Ok(bytes)) = audio_stream.next().await {
401
- let _ = &shared_voice_lesson.send(bytes.to_vec());
402
- }
403
  }
404
  Err(e) => {
405
  return Err(e);
@@ -453,7 +451,7 @@ async fn synthesize_speech(
453
  client: &aws_sdk_polly::Client,
454
  text: String,
455
  voice_id: VoiceId,
456
- ) -> Result<(Vec<Viseme>, ByteStream), SynthesizeError> {
457
  let audio_fut = client
458
  .synthesize_speech()
459
  .engine(Engine::Neural)
@@ -469,10 +467,16 @@ async fn synthesize_speech(
469
  .speech_mark_types(SpeechMarkType::Viseme)
470
  .output_format(OutputFormat::Json)
471
  .send();
472
- let (audio, visemes) = try_join(audio_fut, visemes_fut)
473
  .await
474
  .map_err(|e| SynthesizeError::Polly(e.into()))?;
475
- let visemes = visemes
 
 
 
 
 
 
476
  .audio_stream
477
  .collect()
478
  .await
@@ -482,5 +486,5 @@ async fn synthesize_speech(
482
  .lines()
483
  .flat_map(|line| Ok::<Viseme, anyhow::Error>(serde_json::from_str::<Viseme>(&line?)?))
484
  .collect();
485
- Ok((parsed, audio.audio_stream))
486
  }
 
395
  while let Ok(translated) = translate_rx.recv().await {
396
  let res = synthesize_speech(&client, translated, shared_voice_id.clone()).await;
397
  match res {
398
+ Ok((vec, audio)) => {
399
  let _ = shared_lip_sync_tx.send(vec);
400
+ let _ = &shared_voice_lesson.send(audio);
 
 
401
  }
402
  Err(e) => {
403
  return Err(e);
 
451
  client: &aws_sdk_polly::Client,
452
  text: String,
453
  voice_id: VoiceId,
454
+ ) -> Result<(Vec<Viseme>, Vec<u8>), SynthesizeError> {
455
  let audio_fut = client
456
  .synthesize_speech()
457
  .engine(Engine::Neural)
 
467
  .speech_mark_types(SpeechMarkType::Viseme)
468
  .output_format(OutputFormat::Json)
469
  .send();
470
+ let (audio_out, visemes_out) = try_join(audio_fut, visemes_fut)
471
  .await
472
  .map_err(|e| SynthesizeError::Polly(e.into()))?;
473
+ let audio = audio_out
474
+ .audio_stream
475
+ .collect()
476
+ .await
477
+ .map_err(|e| SynthesizeError::Transmitting(e.into()))?
478
+ .to_vec();
479
+ let visemes = visemes_out
480
  .audio_stream
481
  .collect()
482
  .await
 
486
  .lines()
487
  .flat_map(|line| Ok::<Viseme, anyhow::Error>(serde_json::from_str::<Viseme>(&line?)?))
488
  .collect();
489
+ Ok((parsed, audio))
490
  }
src/main.rs CHANGED
@@ -407,7 +407,6 @@ fn u8_to_i16(input: &[u8]) -> Vec<i16> {
407
 
408
  #[cfg(test)]
409
  mod test {
410
- use std::cell::{Cell};
411
  use std::time::Duration;
412
  use async_stream::stream;
413
  use poem::listener::{Acceptor, Listener};
@@ -490,7 +489,6 @@ mod test {
490
  };
491
  pin!(audio_stream);
492
 
493
- let voice_flag = Cell::new(false);
494
  let recv_fut = async {
495
  while let Some(voice_slice) = audio_stream.next().await {
496
  client_stream.send(Message::Binary(voice_slice)).await?;
@@ -503,8 +501,7 @@ mod test {
503
  let Message::Text(json_str) = msg else { continue };
504
  let Ok(evt) = serde_json::from_str::<SingleEvent>(&json_str) else { continue };
505
  if let SingleEvent::Voice { .. } = evt {
506
- voice_flag.replace(true);
507
- break
508
  }
509
  }
510
 
@@ -515,15 +512,14 @@ mod test {
515
  res = recv_fut => {
516
  if let Err(e) = res {
517
  error!("Error: {:?}", e);
 
518
  }
519
  }
520
  _ = sleep(Duration::from_secs(10)) => {
521
- error!("timeout")
522
  }
523
- }
524
 
525
  handle.abort();
526
-
527
- assert!(voice_flag.get(), "voice not received");
528
  }
529
  }
 
407
 
408
  #[cfg(test)]
409
  mod test {
 
410
  use std::time::Duration;
411
  use async_stream::stream;
412
  use poem::listener::{Acceptor, Listener};
 
489
  };
490
  pin!(audio_stream);
491
 
 
492
  let recv_fut = async {
493
  while let Some(voice_slice) = audio_stream.next().await {
494
  client_stream.send(Message::Binary(voice_slice)).await?;
 
501
  let Message::Text(json_str) = msg else { continue };
502
  let Ok(evt) = serde_json::from_str::<SingleEvent>(&json_str) else { continue };
503
  if let SingleEvent::Voice { .. } = evt {
504
+ return Ok(())
 
505
  }
506
  }
507
 
 
512
  res = recv_fut => {
513
  if let Err(e) = res {
514
  error!("Error: {:?}", e);
515
+ assert!(false, "Error: {}", e);
516
  }
517
  }
518
  _ = sleep(Duration::from_secs(10)) => {
519
+ assert!(false, "timeout");
520
  }
521
+ };
522
 
523
  handle.abort();
 
 
524
  }
525
  }