Spaces:
Sleeping
Sleeping
File size: 3,150 Bytes
2382376 b008c2b 2382376 b008c2b 2382376 b008c2b 2382376 b008c2b 2382376 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
use aws_config::BehaviorVersion;
use std::time::Duration;
use async_stream::stream;
use poem::{
get,
listener::{Listener, Acceptor, TcpListener},
EndpointExt, Route, Server,
};
use tokio::{pin, select};
use tokio::time::sleep;
use tokio_stream::StreamExt;
use tokio_tungstenite::{
connect_async,
tungstenite::Message,
};
use futures_util::sink::SinkExt;
use tracing::{info, error, debug};
use polyhedron::{
stream_single,
SingleEvent,
asr::slice_i16_to_u8_le,
Context
};
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tracing_test::traced_test]
async fn test_single() {
let shared_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let ctx = Context::new(&shared_config);
let acceptor = TcpListener::bind("[::]:0")
.into_acceptor()
.await
.unwrap();
let addr = acceptor
.local_addr()
.remove(0)
.as_socket_addr()
.cloned()
.unwrap();
let server = Server::new_with_acceptor(acceptor);
let handle = tokio::spawn(async move {
let _ = server.run(
Route::new()
.at("/ws/voice", get(stream_single))
.data(ctx)
).await;
});
let url = format!(
"ws://{}/ws/voice?id=123abc&from=zh-CN&to=en-US&voice=Amy", addr
);
// let url = "ws://localhost:8080/ws/voice?id=123abc&from=zh-CN&to=en-US&voice=Amy".to_string();
let (mut client_stream, _) = connect_async(url)
.await
.unwrap();
client_stream
.send(Message::Binary(Vec::new()))
.await
.unwrap();
let wav = hound::WavReader::open("whisper/samples/samples_jfk.wav")
.expect("failed to open wav");
let spec = wav.spec();
info!("{:?}", spec);
let samples = wav
.into_samples::<i16>()
.map(|s| s.unwrap())
.collect::<Vec<i16>>();
let chunks = samples.chunks(1600)
.map(|chunk| chunk.to_vec())
.into_iter();
let audio_stream = stream! {
for chunk in chunks {
yield slice_i16_to_u8_le(&chunk);
sleep(Duration::from_millis(100)).await;
}
};
pin!(audio_stream);
let recv_fut = async {
while let Some(voice_slice) = audio_stream.next().await {
client_stream.send(Message::Binary(voice_slice)).await?;
}
info!("sent all voice chunks");
while let Some(Ok(msg)) = client_stream.next().await {
debug!("recv: {:?}", msg);
let Message::Text(json_str) = msg else { continue };
let Ok(evt) = serde_json::from_str::<SingleEvent>(&json_str) else { continue };
if let SingleEvent::Voice { .. } = evt {
return Ok(())
}
}
Ok(()) as anyhow::Result<()>
};
select! {
res = recv_fut => {
if let Err(e) = res {
error!("Error: {:?}", e);
assert!(false, "Error: {}", e);
}
}
_ = sleep(Duration::from_secs(15)) => {
assert!(false, "timeout");
}
};
handle.abort();
} |