timely/dataflow/channels/pushers/
counter.rs1use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::ChangeBatch;
7use crate::dataflow::channels::Message;
8use crate::communication::Push;
9use crate::Accountable;
10
11#[derive(Debug)]
13pub struct Counter<T, P> {
14 pushee: P,
15 produced: Rc<RefCell<ChangeBatch<T>>>,
16}
17
18impl<T: Clone+Ord, C: Accountable, P> Push<Message<T, C>> for Counter<T, P> where P: Push<Message<T, C>> {
19 #[inline]
20 fn push(&mut self, message: &mut Option<Message<T, C>>) {
21 if let Some(message) = message {
22 self.produced.borrow_mut().update(message.time.clone(), message.data.record_count());
23 }
24
25 if message.is_some() || !self.produced.borrow_mut().is_empty() {
27 self.pushee.push(message);
28 }
29 }
30}
31
32impl<T, P> Counter<T, P> where T : Ord+Clone+'static {
33 pub fn new(pushee: P) -> Counter<T, P> {
35 Counter {
36 pushee,
37 produced: Rc::new(RefCell::new(ChangeBatch::new())),
38 }
39 }
40 #[inline]
42 pub fn produced(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
43 &self.produced
44 }
45 #[inline] pub fn give<C: crate::Container>(&mut self, time: T, container: &mut C) where P: Push<Message<T, C>> {
51 if !container.is_empty() { Message::push_at(container, time, self); }
52 }
53}