File size: 1,890 Bytes
5cb1ef7
a4dee07
2bb7b57
a4dee07
2bb7b57
 
a4dee07
 
2bb7b57
 
 
 
a4dee07
 
 
2bb7b57
 
 
 
 
 
 
 
 
a4dee07
 
5cb1ef7
a4dee07
 
 
 
 
5cb1ef7
a4dee07
2bb7b57
a4dee07
 
3569cbd
a4dee07
 
5cb1ef7
a4dee07
 
 
 
 
 
5cb1ef7
a4dee07
 
 
 
5cb1ef7
2bb7b57
5cb1ef7
 
a4dee07
 
 
 
 
2bb7b57
a4dee07
 
 
 
 
2bb7b57
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use std::collections::VecDeque;
use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::time::sleep;

pub struct GroupedWithin<I>
where
    I: 'static + Send,
{
    outlet: Receiver<Vec<I>>,
}

impl<I> GroupedWithin<I>
where
    I: 'static + Send,
{
    pub fn new(
        group_size: usize,
        window_time: Duration,
        mut inlet: Receiver<Vec<I>>,
        buffer: usize,
    ) -> Self {
        let (tx, outlet) = channel::<Vec<I>>(buffer);
        tokio::spawn(async move {
            let mut window = VecDeque::with_capacity(group_size);

            loop {
                let grouped_fut = async {
                    while let Some(c) = inlet.recv().await {
                        window.extend(c);
                        if window.len() >= group_size {
                            let will_send: Vec<I> = window.drain(0..group_size).collect();
                            return Some(will_send);
                        }
                    }
                    None
                };

                let grouped: Vec<I> = select! {
                    _ = sleep(window_time) => {
                        window.drain(..).collect()
                    },
                    grouped_opt = grouped_fut => {
                        match grouped_opt {
                            None => break,
                            Some(group) => group
                        }
                    }
                };

                if grouped.is_empty() {
                    continue;
                }

                if let Err(e) = tx.send(grouped).await {
                    tracing::error!("{}", e);
                }
            }
        });
        Self { outlet }
    }

    pub fn next(&mut self) -> Result<Vec<I>, TryRecvError> {
        self.outlet.try_recv()
    }
}