timely/dataflow/channels/pushers/
counter.rs

1//! A wrapper which counts the number of records pushed past and updates a shared count map.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::ChangeBatch;
7use crate::dataflow::channels::Message;
8use crate::communication::Push;
9use crate::Accountable;
10
11/// A wrapper which updates shared `produced` based on the number of records pushed.
12#[derive(Debug)]
13pub struct Counter<T, P> {
14    pushee: P,
15    produced: Rc<RefCell<ChangeBatch<T>>>,
16}
17
18impl<T: Clone+Ord, C: Accountable, P> Push<Message<T, C>> for Counter<T, P> where P: Push<Message<T, C>> {
19    #[inline]
20    fn push(&mut self, message: &mut Option<Message<T, C>>) {
21        if let Some(message) = message {
22            self.produced.borrow_mut().update(message.time.clone(), message.data.record_count());
23        }
24
25        // only propagate `None` if dirty (indicates flush)
26        if message.is_some() || !self.produced.borrow_mut().is_empty() {
27            self.pushee.push(message);
28        }
29    }
30}
31
32impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
33    /// Allocates a new `Counter` from a pushee and shared counts.
34    pub fn new(pushee: P) -> Counter<T, P> {
35        Counter {
36            pushee,
37            produced: Rc::new(RefCell::new(ChangeBatch::new())),
38        }
39    }
40    /// A references to shared changes in counts, for cloning or draining.
41    #[inline]
42    pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
43        &self.produced
44    }
45    /// Ships a time and a container.
46    ///
47    /// This is not a validated capability, and this method should not be used without great care.
48    /// Ideally, users would not have direct access to a `Counter`, and preventing this is the way
49    /// to uphold invariants.
50    #[inline] pub fn give<C: crate::Container>(&mut self, time: T, container: &mut C) where P: Push<Message<T, C>> {
51        if !container.is_empty() { Message::push_at(container, time, self); }
52    }
53}