Spaces:
Sleeping
Sleeping
File size: 1,898 Bytes
5cb1ef7 a4dee07 2bb7b57 a4dee07 2bb7b57 a4dee07 2bb7b57 a4dee07 2bb7b57 a4dee07 5cb1ef7 a4dee07 5cb1ef7 a4dee07 2bb7b57 a4dee07 2bb7b57 a4dee07 5cb1ef7 a4dee07 5cb1ef7 a4dee07 5cb1ef7 2bb7b57 5cb1ef7 a4dee07 2bb7b57 a4dee07 2bb7b57 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
use std::collections::VecDeque;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::time::sleep;
pub struct GroupedWithin<I>
where
I: 'static + Send,
{
outlet: Receiver<Vec<I>>,
}
impl<I> GroupedWithin<I>
where
I: 'static + Send,
{
pub fn new(
group_size: usize,
window_time: Duration,
mut inlet: Receiver<Vec<I>>,
buffer: usize,
) -> Self {
let (tx, outlet) = channel::<Vec<I>>(buffer);
tokio::spawn(async move {
let mut window = VecDeque::with_capacity(group_size);
loop {
let grouped_fut = async {
while let Some(c) = inlet.recv().await {
window.extend(c);
if window.len() >= group_size {
let will_send: Vec<I> = window.drain(0..group_size).collect();
return Some(will_send);
}
}
return None;
};
let grouped: Vec<I> = select! {
_ = sleep(window_time) => {
window.drain(..).collect()
},
grouped_opt = grouped_fut => {
match grouped_opt {
None => break,
Some(group) => group
}
}
};
if grouped.is_empty() {
continue;
}
if let Err(e) = tx.send(grouped).await {
tracing::error!("{}", e);
}
}
});
Self { outlet }
}
pub fn next(&mut self) -> Result<Vec<I>, TryRecvError> {
self.outlet.try_recv()
}
}
|