use std::collections::VecDeque; use std::time::Duration; use tokio::select; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::{channel, Receiver}; use tokio::time::sleep; pub struct GroupedWithin where I: 'static + Send, { outlet: Receiver>, } impl GroupedWithin where I: 'static + Send, { pub fn new( group_size: usize, window_time: Duration, mut inlet: Receiver>, buffer: usize, ) -> Self { let (tx, outlet) = channel::>(buffer); tokio::spawn(async move { let mut window = VecDeque::with_capacity(group_size); loop { let grouped_fut = async { while let Some(c) = inlet.recv().await { window.extend(c); if window.len() >= group_size { let will_send: Vec = window.drain(0..group_size).collect(); return Some(will_send); } } None }; let grouped: Vec = select! { _ = sleep(window_time) => { window.drain(..).collect() }, grouped_opt = grouped_fut => { match grouped_opt { None => break, Some(group) => group } } }; if grouped.is_empty() { continue; } if let Err(e) = tx.send(grouped).await { tracing::error!("{}", e); } } }); Self { outlet } } pub fn next(&mut self) -> Result, TryRecvError> { self.outlet.try_recv() } }