File size: 3,150 Bytes
2382376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b008c2b
2382376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b008c2b
2382376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b008c2b
 
2382376
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b008c2b
2382376
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

use aws_config::BehaviorVersion;
use std::time::Duration;
use async_stream::stream;
use poem::{
    get,
    listener::{Listener, Acceptor, TcpListener},
    EndpointExt, Route, Server,
};
use tokio::{pin, select};
use tokio::time::sleep;
use tokio_stream::StreamExt;
use tokio_tungstenite::{
    connect_async,
    tungstenite::Message,
};
use futures_util::sink::SinkExt;
use tracing::{info, error, debug};
use polyhedron::{
    stream_single,
    SingleEvent,
    asr::slice_i16_to_u8_le,
    Context
};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tracing_test::traced_test]
async fn test_single() {
    let shared_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let ctx = Context::new(&shared_config);

    let acceptor = TcpListener::bind("[::]:0")
        .into_acceptor()
        .await
        .unwrap();
    let addr = acceptor
        .local_addr()
        .remove(0)
        .as_socket_addr()
        .cloned()
        .unwrap();
    let server = Server::new_with_acceptor(acceptor);
    let handle = tokio::spawn(async move {
        let _ = server.run(
            Route::new()
                .at("/ws/voice", get(stream_single))
                .data(ctx)
        ).await;
    });

    let url = format!(
        "ws://{}/ws/voice?id=123abc&from=zh-CN&to=en-US&voice=Amy", addr
    );
    // let url = "ws://localhost:8080/ws/voice?id=123abc&from=zh-CN&to=en-US&voice=Amy".to_string();
    let (mut client_stream, _) = connect_async(url)
        .await
        .unwrap();

    client_stream
        .send(Message::Binary(Vec::new()))
        .await
        .unwrap();


    let wav = hound::WavReader::open("whisper/samples/samples_jfk.wav")
        .expect("failed to open wav");
    let spec = wav.spec();
    info!("{:?}", spec);
    let samples = wav
        .into_samples::<i16>()
        .map(|s| s.unwrap())
        .collect::<Vec<i16>>();
    let chunks = samples.chunks(1600)
        .map(|chunk| chunk.to_vec())
        .into_iter();

    let audio_stream = stream! {
        for chunk in chunks {
            yield slice_i16_to_u8_le(&chunk);
            sleep(Duration::from_millis(100)).await;
        }
    };
    pin!(audio_stream);

    let recv_fut = async {
        while let Some(voice_slice) = audio_stream.next().await {
            client_stream.send(Message::Binary(voice_slice)).await?;
        }
        info!("sent all voice chunks");

        while let Some(Ok(msg)) = client_stream.next().await {
            debug!("recv: {:?}", msg);
            let Message::Text(json_str) = msg else { continue };
            let Ok(evt) = serde_json::from_str::<SingleEvent>(&json_str) else { continue };
            if let SingleEvent::Voice { .. } = evt {
                return Ok(())
            }
        }

        Ok(()) as anyhow::Result<()>
    };

    select! {
        res = recv_fut => {
            if let Err(e) = res {
                error!("Error: {:?}", e);
                assert!(false, "Error: {}", e);
            }
        }
        _ = sleep(Duration::from_secs(15)) => {
            assert!(false, "timeout");
        }
    };

    handle.abort();
}