timely/dataflow/channels/pushers/
counter.rs
1use std::marker::PhantomData;
4use std::rc::Rc;
5use std::cell::RefCell;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::dataflow::channels::Message;
9use crate::communication::Push;
10use crate::Container;
11
12#[derive(Debug)]
14pub struct Counter<T, C, P: Push<Message<T, C>>> {
15 pushee: P,
16 produced: Rc<RefCell<ChangeBatch<T>>>,
17 phantom: PhantomData<C>,
18}
19
20impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
21 #[inline]
22 fn push(&mut self, message: &mut Option<Message<T, C>>) {
23 if let Some(message) = message {
24 self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
25 }
26
27 if message.is_some() || !self.produced.borrow_mut().is_empty() {
29 self.pushee.push(message);
30 }
31 }
32}
33
34impl<T, C, P: Push<Message<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
35 pub fn new(pushee: P) -> Counter<T, C, P> {
37 Counter {
38 pushee,
39 produced: Rc::new(RefCell::new(ChangeBatch::new())),
40 phantom: PhantomData,
41 }
42 }
43 #[inline]
45 pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
46 &self.produced
47 }
48}