timely/dataflow/channels/pullers/
counter.rs

1//! A wrapper which accounts records pulled past in a shared count map.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::dataflow::channels::Message;
7use crate::progress::ChangeBatch;
8use crate::communication::Pull;
9use crate::Container;
10
11/// A wrapper which accounts records pulled past in a shared count map.
12pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
13    pullable: P,
14    consumed: Rc<RefCell<ChangeBatch<T>>>,
15    phantom: ::std::marker::PhantomData<C>,
16}
17
18/// A guard type that updates the change batch counts on drop
19pub struct ConsumedGuard<T: Ord + Clone + 'static> {
20    consumed: Rc<RefCell<ChangeBatch<T>>>,
21    time: Option<T>,
22    len: usize,
23}
24
25impl<T:Ord+Clone+'static> ConsumedGuard<T> {
26    pub(crate) fn time(&self) -> &T {
27        self.time.as_ref().unwrap()
28    }
29}
30
31impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
32    fn drop(&mut self) {
33        // SAFETY: we're in a Drop impl, so this runs at most once
34        let time = self.time.take().unwrap();
35        self.consumed.borrow_mut().update(time, self.len as i64);
36    }
37}
38
39impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
40    /// Retrieves the next timestamp and batch of data.
41    #[inline]
42    pub fn next(&mut self) -> Option<&mut Message<T, C>> {
43        self.next_guarded().map(|(_guard, bundle)| bundle)
44    }
45
46    #[inline]
47    pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Message<T, C>)> {
48        if let Some(message) = self.pullable.pull() {
49            let guard = ConsumedGuard {
50                consumed: Rc::clone(&self.consumed),
51                time: Some(message.time.clone()),
52                len: message.data.len(),
53            };
54            Some((guard, message))
55        }
56        else { None }
57    }
58}
59
60impl<T:Ord+Clone+'static, C, P: Pull<Message<T, C>>> Counter<T, C, P> {
61    /// Allocates a new `Counter` from a boxed puller.
62    pub fn new(pullable: P) -> Self {
63        Counter {
64            phantom: ::std::marker::PhantomData,
65            pullable,
66            consumed: Rc::new(RefCell::new(ChangeBatch::new())),
67        }
68    }
69    /// A references to shared changes in counts, for cloning or draining.
70    pub fn consumed(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
71        &self.consumed
72    }
73}