timely/dataflow/channels/pushers/
counter.rs

1//! A wrapper which counts the number of records pushed past and updates a shared count map.
2
3use std::marker::PhantomData;
4use std::rc::Rc;
5use std::cell::RefCell;
6
7use crate::progress::{ChangeBatch, Timestamp};
8use crate::dataflow::channels::Message;
9use crate::communication::Push;
10use crate::Container;
11
12/// A wrapper which updates shared `produced` based on the number of records pushed.
13#[derive(Debug)]
14pub struct Counter<T, C, P: Push<Message<T, C>>> {
15    pushee: P,
16    produced: Rc<RefCell<ChangeBatch<T>>>,
17    phantom: PhantomData<C>,
18}
19
20impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
21    #[inline]
22    fn push(&mut self, message: &mut Option<Message<T, C>>) {
23        if let Some(message) = message {
24            self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
25        }
26
27        // only propagate `None` if dirty (indicates flush)
28        if message.is_some() || !self.produced.borrow_mut().is_empty() {
29            self.pushee.push(message);
30        }
31    }
32}
33
34impl<T, C, P: Push<Message<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
35    /// Allocates a new `Counter` from a pushee and shared counts.
36    pub fn new(pushee: P) -> Counter<T, C, P> {
37        Counter {
38            pushee,
39            produced: Rc::new(RefCell::new(ChangeBatch::new())),
40            phantom: PhantomData,
41        }
42    }
43    /// A references to shared changes in counts, for cloning or draining.
44    #[inline]
45    pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
46        &self.produced
47    }
48}