mingyang91 commited on
Commit
44e95cf
·
verified ·
1 Parent(s): 4840c8f
Files changed (4) hide show
  1. Cargo.lock +1 -0
  2. Cargo.toml +1 -0
  3. src/lesson.rs +257 -0
  4. src/main.rs +45 -16
Cargo.lock CHANGED
@@ -1332,6 +1332,7 @@ dependencies = [
1332
  "futures-util",
1333
  "hound",
1334
  "poem",
 
1335
  "symphonia",
1336
  "tokio",
1337
  "tokio-stream",
 
1332
  "futures-util",
1333
  "hound",
1334
  "poem",
1335
+ "serde",
1336
  "symphonia",
1337
  "tokio",
1338
  "tokio-stream",
Cargo.toml CHANGED
@@ -21,6 +21,7 @@ futures-util = "0.3.28"
21
  #symphonia-codec-pcm = "0.5.3"
22
  #symphonia-format-mkv = "0.5.3"
23
  symphonia = { version = "0.5.3", features = ["mkv", "pcm"] }
 
24
 
25
  [dependencies.poem]
26
  version = "1.3.58"
 
21
  #symphonia-codec-pcm = "0.5.3"
22
  #symphonia-format-mkv = "0.5.3"
23
  symphonia = { version = "0.5.3", features = ["mkv", "pcm"] }
24
+ serde = { version = "1.0.189", features = ["derive"] }
25
 
26
  [dependencies.poem]
27
  version = "1.3.58"
src/lesson.rs ADDED
@@ -0,0 +1,257 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ use serde::Deserialize;
2
+ use std::sync::{Arc, Weak};
3
+ use tokio::sync::RwLock;
4
+ use std::collections::BTreeMap;
5
+ use aws_config::SdkConfig;
6
+ use tokio::select;
7
+
8
+ #[derive(Clone)]
9
+ pub struct LessonsManager {
10
+ translate_client: aws_sdk_translate::Client,
11
+ polly_client: aws_sdk_polly::Client,
12
+ transcript_client: aws_sdk_transcribestreaming::Client,
13
+ lessons: Arc<RwLock<BTreeMap<u32, Lesson>>>
14
+ }
15
+
16
+ impl LessonsManager {
17
+ pub(crate) fn new(sdk_config: &SdkConfig) -> Self {
18
+ let transcript_client = aws_sdk_transcribestreaming::Client::new(&sdk_config);
19
+ let translate_client = aws_sdk_translate::Client::new(&sdk_config);
20
+ let polly_client = aws_sdk_polly::Client::new(&sdk_config);
21
+ LessonsManager {
22
+ translate_client,
23
+ polly_client,
24
+ transcript_client,
25
+ lessons: Arc::new(RwLock::new(BTreeMap::new()))
26
+ }
27
+ }
28
+
29
+ pub(crate) async fn create_lesson(&self,
30
+ id: u32,
31
+ speaker_lang: String) -> Lesson {
32
+ let mut map = self.lessons.write().await;
33
+ let lesson: Lesson = InnerLesson::new(id, speaker_lang).into();
34
+ map.insert(id, lesson.clone());
35
+ lesson
36
+ }
37
+
38
+ pub(crate) async fn get_lesson(&self, id: u32) -> Option<Lesson> {
39
+ let map = self.lessons.read().await;
40
+ map.get(&id).cloned()
41
+ }
42
+ }
43
+
44
+ #[derive(Clone, Debug)]
45
+ pub(crate) struct Lesson {
46
+ inner: Arc<InnerLesson>
47
+ }
48
+
49
+ impl Lesson {
50
+ pub(crate) async fn get_or_init(&self, lang: String) -> LangLesson {
51
+ let map = self.inner.lang_lessons.read().await;
52
+ if let Some(lang_lesson) = map.get(&lang).and_then(|weak| weak.upgrade()) {
53
+ return lang_lesson.into();
54
+ }
55
+ let mut map = self.inner.lang_lessons.write().await;
56
+ if let Some(lang_lesson) = map.get(&lang).and_then(|weak| weak.upgrade()) {
57
+ lang_lesson.into()
58
+ } else {
59
+ let lang_lesson = LangLesson::new(
60
+ self.clone(),
61
+ lang.clone(),
62
+ );
63
+ map.insert(lang.clone(), Arc::downgrade(&lang_lesson.inner));
64
+ lang_lesson
65
+ }
66
+ }
67
+ }
68
+
69
+ impl From<InnerLesson> for Lesson {
70
+ fn from(inner: InnerLesson) -> Self {
71
+ Lesson {
72
+ inner: Arc::new(inner)
73
+ }
74
+ }
75
+ }
76
+
77
+ #[derive(Debug)]
78
+ struct InnerLesson {
79
+ id: u32,
80
+ speaker_lang: String,
81
+ speaker_voice_channel: tokio::sync::mpsc::Sender<Vec<u8>>,
82
+ speaker_transcript: tokio::sync::broadcast::Sender<String>,
83
+ lang_lessons: RwLock<BTreeMap<String, Weak<InnerLangLesson>>>,
84
+ drop_handler: Option<tokio::sync::oneshot::Sender<Signal>>,
85
+ }
86
+
87
+ impl InnerLesson {
88
+ fn new(
89
+ id: u32,
90
+ speaker_lang: String
91
+ ) -> InnerLesson {
92
+ let (speaker_transcript, _) = tokio::sync::broadcast::channel::<String>(128);
93
+ let (speaker_voice_channel, mut speaker_voice_rx) = tokio::sync::mpsc::channel(128);
94
+ let (drop_handler, mut drop_rx) = tokio::sync::oneshot::channel::<Signal>();
95
+
96
+ tokio::spawn(async move {
97
+ let fut = async {
98
+ loop {
99
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
100
+ }
101
+ };
102
+ select! {
103
+ _ = fut => {}
104
+ _ = drop_rx => {}
105
+ }
106
+ });
107
+
108
+ InnerLesson {
109
+ id,
110
+ speaker_lang,
111
+ speaker_voice_channel,
112
+ speaker_transcript,
113
+ lang_lessons: RwLock::new(BTreeMap::new()),
114
+ drop_handler: Some(drop_handler),
115
+ }
116
+ }
117
+ }
118
+
119
+ impl Drop for InnerLesson {
120
+ fn drop(&mut self) {
121
+ if let Some(tx) = self.drop_handler.take() {
122
+ let _ = tx.send(Signal::Stop);
123
+ }
124
+ }
125
+ }
126
+
127
+
128
+ struct InnerLangLesson {
129
+ parent: Lesson,
130
+ lang: String,
131
+ translated: tokio::sync::broadcast::Sender<String>,
132
+ voice_lessons: RwLock<BTreeMap<String, Weak<InnerVoiceLesson>>>
133
+ }
134
+
135
+ #[derive(Clone)]
136
+ pub(crate) struct LangLesson {
137
+ inner: Arc<InnerLangLesson>
138
+ }
139
+
140
+ impl From<InnerLangLesson> for LangLesson {
141
+ fn from(inner: InnerLangLesson) -> Self {
142
+ LangLesson {
143
+ inner: Arc::new(inner)
144
+ }
145
+ }
146
+ }
147
+
148
+ impl From<Arc<InnerLangLesson>> for LangLesson {
149
+ fn from(inner: Arc<InnerLangLesson>) -> Self {
150
+ LangLesson {
151
+ inner
152
+ }
153
+ }
154
+ }
155
+
156
+ impl LangLesson {
157
+ fn new(
158
+ parent: Lesson,
159
+ lang: String,
160
+ ) -> Self {
161
+ let (translated, _) = tokio::sync::broadcast::channel::<String>(128);
162
+ InnerLangLesson {
163
+ parent,
164
+ lang,
165
+ translated,
166
+ voice_lessons: RwLock::new(BTreeMap::new()),
167
+ }.into()
168
+ }
169
+
170
+ async fn get_or_init(&mut self, voice: String) -> VoiceLesson {
171
+ let map = self.inner.voice_lessons.read().await;
172
+ if let Some(voice_lesson) = map.get(&voice).and_then(|weak| weak.upgrade()) {
173
+ return voice_lesson.into();
174
+ }
175
+ let mut map = self.inner.voice_lessons.write().await;
176
+ if let Some(voice_lesson) = map.get(&voice).and_then(|weak| weak.upgrade()) {
177
+ voice_lesson.into()
178
+ } else {
179
+ let voice_lesson = Arc::new(InnerVoiceLesson::new(
180
+ self.clone(),
181
+ voice.clone(),
182
+ ));
183
+ map.insert(voice.clone(), Arc::downgrade(&voice_lesson));
184
+ voice_lesson.into()
185
+ }
186
+ }
187
+ }
188
+
189
+ #[derive(Clone)]
190
+ struct VoiceLesson {
191
+ inner: Arc<InnerVoiceLesson>
192
+ }
193
+
194
+ impl From<InnerVoiceLesson> for VoiceLesson {
195
+ fn from(inner: InnerVoiceLesson) -> Self {
196
+ VoiceLesson {
197
+ inner: Arc::new(inner)
198
+ }
199
+ }
200
+ }
201
+
202
+ impl From<Arc<InnerVoiceLesson>> for VoiceLesson {
203
+ fn from(inner: Arc<InnerVoiceLesson>) -> Self {
204
+ VoiceLesson {
205
+ inner
206
+ }
207
+ }
208
+ }
209
+
210
+ struct InnerVoiceLesson {
211
+ parent: LangLesson,
212
+ voice: String,
213
+ voice_lesson: tokio::sync::broadcast::Sender<Vec<u8>>,
214
+ drop_handler: Option<tokio::sync::oneshot::Sender<Signal>>,
215
+ }
216
+
217
+ #[derive(Debug)]
218
+ enum Signal {
219
+ Stop,
220
+ }
221
+
222
+ impl InnerVoiceLesson {
223
+ fn new(
224
+ parent: LangLesson,
225
+ voice: String,
226
+ ) -> InnerVoiceLesson {
227
+ let (tx, rx) = tokio::sync::oneshot::channel::<Signal>();
228
+ let (voice_lesson, _) = tokio::sync::broadcast::channel::<Vec<u8>>(128);
229
+ tokio::spawn(async move {
230
+ let fut = async {
231
+ loop {
232
+ tokio::time::sleep(std::time::Duration::from_secs(1)).await;
233
+ }
234
+ };
235
+ select! {
236
+ _ = fut => {}
237
+ _ = rx => {}
238
+ }
239
+ });
240
+
241
+ InnerVoiceLesson {
242
+ parent,
243
+ voice,
244
+ voice_lesson,
245
+ drop_handler: Some(tx),
246
+ }
247
+ }
248
+ }
249
+
250
+ impl Drop for InnerVoiceLesson {
251
+ fn drop(&mut self) {
252
+ if let Some(tx) = self.drop_handler.take() {
253
+ let _ = tx.send(Signal::Stop);
254
+ }
255
+ }
256
+ }
257
+
src/main.rs CHANGED
@@ -5,7 +5,6 @@
5
 
6
  #![allow(clippy::result_large_err)]
7
 
8
- use std::collections::BTreeMap;
9
  use std::default::Default;
10
  use std::error::Error;
11
  use std::fmt::{Debug, Display, Formatter};
@@ -14,21 +13,25 @@ use tokio::sync::mpsc::channel;
14
  use async_stream::stream;
15
  use aws_config::meta::region::RegionProviderChain;
16
  use aws_sdk_transcribestreaming::primitives::Blob;
17
- use aws_sdk_transcribestreaming::types::{AudioStream, AudioEvent, LanguageCode, MediaEncoding, TranscriptResultStream};
18
  use aws_sdk_transcribestreaming::{config::Region, meta::PKG_VERSION};
19
  use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput;
20
  use clap::Parser;
21
 
22
- use poem::{handler, listener::TcpListener, Server, get, Route, IntoResponse, Endpoint, EndpointExt};
23
  use futures_util::{Sink, SinkExt, TryFutureExt, TryStreamExt};
24
  use poem::endpoint::StaticFilesEndpoint;
25
  use poem::web::websocket::{Message, WebSocket};
26
  use futures_util::stream::StreamExt;
27
- use poem::web::Data;
28
 
29
- use tokio::{select};
30
  use tokio::sync::mpsc::{Receiver, Sender};
31
  use tokio_stream::Stream;
 
 
 
 
32
 
33
 
34
  #[derive(Debug, Parser)]
@@ -46,6 +49,7 @@ struct Opt {
46
  verbose: bool,
47
  }
48
 
 
49
  enum ReplyEvent {
50
  Transcribed(String),
51
  Translated(String),
@@ -121,14 +125,7 @@ struct Context {
121
  translate_client: aws_sdk_translate::Client,
122
  polly_client: aws_sdk_polly::Client,
123
  transcript_client: aws_sdk_transcribestreaming::Client,
124
- }
125
-
126
- struct Lessons {
127
- lessons: BTreeMap<u32, Lesson>
128
- }
129
-
130
- struct Lesson {
131
-
132
  }
133
 
134
  #[tokio::main]
@@ -152,7 +149,6 @@ async fn main() -> Result<(), std::io::Error> {
152
  "Region: {}",
153
  region_provider.region().await.unwrap().as_ref()
154
  );
155
- // println!("Audio filename: {}", &audio_file);
156
  println!();
157
  }
158
 
@@ -164,14 +160,19 @@ async fn main() -> Result<(), std::io::Error> {
164
  translate_client,
165
  polly_client,
166
  transcript_client,
 
167
  };
168
 
169
- let app = Route::new().nest(
 
170
  "/",
171
  StaticFilesEndpoint::new("./static")
172
  .show_files_listing()
173
  .index_file("index.html"),
174
- ).at("/translate", get(stream_translate))
 
 
 
175
  .data(ctx);
176
  let listener = TcpListener::bind("[::]:8080");
177
  let server = Server::new(listener);
@@ -180,6 +181,34 @@ async fn main() -> Result<(), std::io::Error> {
180
  }
181
 
182
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
  #[handler]
184
  async fn stream_translate(ctx: Data<&Context>, ws: WebSocket) -> impl IntoResponse {
185
  let translate_client = ctx.translate_client.clone();
 
5
 
6
  #![allow(clippy::result_large_err)]
7
 
 
8
  use std::default::Default;
9
  use std::error::Error;
10
  use std::fmt::{Debug, Display, Formatter};
 
13
  use async_stream::stream;
14
  use aws_config::meta::region::RegionProviderChain;
15
  use aws_sdk_transcribestreaming::primitives::Blob;
16
+ use aws_sdk_transcribestreaming::types::{AudioEvent, AudioStream, LanguageCode, MediaEncoding, TranscriptResultStream};
17
  use aws_sdk_transcribestreaming::{config::Region, meta::PKG_VERSION};
18
  use aws_sdk_transcribestreaming::operation::start_stream_transcription::StartStreamTranscriptionOutput;
19
  use clap::Parser;
20
 
21
+ use poem::{Endpoint, EndpointExt, get, handler, IntoResponse, listener::TcpListener, Route, Server};
22
  use futures_util::{Sink, SinkExt, TryFutureExt, TryStreamExt};
23
  use poem::endpoint::StaticFilesEndpoint;
24
  use poem::web::websocket::{Message, WebSocket};
25
  use futures_util::stream::StreamExt;
26
+ use poem::web::{Data, Query};
27
 
28
+ use tokio::select;
29
  use tokio::sync::mpsc::{Receiver, Sender};
30
  use tokio_stream::Stream;
31
+ use serde::Deserialize;
32
+ use lesson::{LessonsManager};
33
+
34
+ mod lesson;
35
 
36
 
37
  #[derive(Debug, Parser)]
 
49
  verbose: bool,
50
  }
51
 
52
+ #[derive(Clone)]
53
  enum ReplyEvent {
54
  Transcribed(String),
55
  Translated(String),
 
125
  translate_client: aws_sdk_translate::Client,
126
  polly_client: aws_sdk_polly::Client,
127
  transcript_client: aws_sdk_transcribestreaming::Client,
128
+ lessons_manager: LessonsManager,
 
 
 
 
 
 
 
129
  }
130
 
131
  #[tokio::main]
 
149
  "Region: {}",
150
  region_provider.region().await.unwrap().as_ref()
151
  );
 
152
  println!();
153
  }
154
 
 
160
  translate_client,
161
  polly_client,
162
  transcript_client,
163
+ lessons_manager: LessonsManager::new(&shared_config),
164
  };
165
 
166
+ let app = Route::new()
167
+ .nest(
168
  "/",
169
  StaticFilesEndpoint::new("./static")
170
  .show_files_listing()
171
  .index_file("index.html"),
172
+ )
173
+ .at("/translate", get(stream_translate))
174
+ .at("/lesson-speaker", get(stream_speaker))
175
+ .at("/lesson-listener", get(stream_listener))
176
  .data(ctx);
177
  let listener = TcpListener::bind("[::]:8080");
178
  let server = Server::new(listener);
 
181
  }
182
 
183
 
184
+ #[derive(Deserialize, Debug)]
185
+ pub struct LessonSpeakerQuery {
186
+ id: u32,
187
+ lang: String,
188
+ }
189
+
190
+ #[handler]
191
+ async fn stream_speaker(ctx: Data<&Context>, query: Query<LessonSpeakerQuery>, ws: WebSocket) -> impl IntoResponse {
192
+ let lesson = ctx.lessons_manager.create_lesson(query.id, query.lang.clone()).await;
193
+ println!("{:?}", lesson);
194
+ println!("{:?}", query);
195
+ }
196
+
197
+
198
+ #[derive(Deserialize, Debug)]
199
+ pub struct LessonListenerQuery {
200
+ id: u32,
201
+ lang: String,
202
+ voice: String,
203
+ }
204
+
205
+ #[handler]
206
+ async fn stream_listener(ctx: Data<&Context>, query: Query<LessonListenerQuery>, ws: WebSocket) -> impl IntoResponse {
207
+ let lesson = ctx.lessons_manager.get_lesson(query.id).await;
208
+ println!("{:?}", lesson);
209
+ println!("{:?}", query);
210
+ }
211
+
212
  #[handler]
213
  async fn stream_translate(ctx: Data<&Context>, ws: WebSocket) -> impl IntoResponse {
214
  let translate_client = ctx.translate_client.clone();