Spaces:
Sleeping
Sleeping
File size: 1,890 Bytes
5cb1ef7 a4dee07 2bb7b57 a4dee07 2bb7b57 a4dee07 2bb7b57 a4dee07 2bb7b57 a4dee07 5cb1ef7 a4dee07 5cb1ef7 a4dee07 2bb7b57 a4dee07 3569cbd a4dee07 5cb1ef7 a4dee07 5cb1ef7 a4dee07 5cb1ef7 2bb7b57 5cb1ef7 a4dee07 2bb7b57 a4dee07 2bb7b57 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
use std::collections::VecDeque;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::time::sleep;
pub struct GroupedWithin<I>
where
I: 'static + Send,
{
outlet: Receiver<Vec<I>>,
}
impl<I> GroupedWithin<I>
where
I: 'static + Send,
{
pub fn new(
group_size: usize,
window_time: Duration,
mut inlet: Receiver<Vec<I>>,
buffer: usize,
) -> Self {
let (tx, outlet) = channel::<Vec<I>>(buffer);
tokio::spawn(async move {
let mut window = VecDeque::with_capacity(group_size);
loop {
let grouped_fut = async {
while let Some(c) = inlet.recv().await {
window.extend(c);
if window.len() >= group_size {
let will_send: Vec<I> = window.drain(0..group_size).collect();
return Some(will_send);
}
}
None
};
let grouped: Vec<I> = select! {
_ = sleep(window_time) => {
window.drain(..).collect()
},
grouped_opt = grouped_fut => {
match grouped_opt {
None => break,
Some(group) => group
}
}
};
if grouped.is_empty() {
continue;
}
if let Err(e) = tx.send(grouped).await {
tracing::error!("{}", e);
}
}
});
Self { outlet }
}
pub fn next(&mut self) -> Result<Vec<I>, TryRecvError> {
self.outlet.try_recv()
}
}
|