use std::rc::Rc;
use std::cell::RefCell;
use crate::dataflow::channels::Bundle;
use crate::progress::ChangeBatch;
use crate::communication::Pull;
use crate::Container;
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<C>,
}
pub struct ConsumedGuard<T: Ord + Clone + 'static> {
consumed: Rc<RefCell<ChangeBatch<T>>>,
time: Option<T>,
len: usize,
}
impl<T:Ord+Clone+'static> ConsumedGuard<T> {
pub(crate) fn time(&self) -> &T {
&self.time.as_ref().unwrap()
}
}
impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
fn drop(&mut self) {
let time = self.time.take().unwrap();
self.consumed.borrow_mut().update(time, self.len as i64);
}
}
impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
#[inline]
pub fn next(&mut self) -> Option<&mut Bundle<T, C>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}
#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, C>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
time: Some(message.time.clone()),
len: message.data.len(),
};
Some((guard, message))
}
else { None }
}
}
impl<T:Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
pub fn new(pullable: P) -> Self {
Counter {
phantom: ::std::marker::PhantomData,
pullable,
consumed: Rc::new(RefCell::new(ChangeBatch::new())),
}
}
pub fn consumed(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
&self.consumed
}
}