Skip to main content

timely/dataflow/channels/pullers/
counter.rs

1//! A wrapper which accounts records pulled past in a shared count map.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::dataflow::channels::Message;
7use crate::progress::ChangeBatch;
8use crate::communication::Pull;
9use crate::Accountable;
10
11/// A wrapper which accounts records pulled past in a shared count map.
12pub struct Counter<T, C, P> {
13    pullable: P,
14    consumed: Rc<RefCell<ChangeBatch<T>>>,
15    phantom: ::std::marker::PhantomData<C>,
16}
17
18/// A guard type that updates the change batch counts on drop
19pub struct ConsumedGuard<T: Ord + Clone + 'static> {
20    consumed: Rc<RefCell<ChangeBatch<T>>>,
21    time: Option<T>,
22    record_count: i64,
23}
24
25impl<T:Ord+Clone+'static> ConsumedGuard<T> {
26    #[inline]
27    pub(crate) fn time(&self) -> &T {
28        self.time.as_ref().unwrap()
29    }
30}
31
32impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
33    fn drop(&mut self) {
34        // SAFETY: we're in a Drop impl, so this runs at most once
35        let time = self.time.take().unwrap();
36        self.consumed.borrow_mut().update(time, self.record_count);
37    }
38}
39
40impl<T:Ord+Clone+'static, C: Accountable, P: Pull<Message<T, C>>> Counter<T, C, P> {
41    /// Retrieves the next timestamp and batch of data.
42    #[inline]
43    pub fn next(&mut self) -> Option<&mut Message<T, C>> {
44        self.next_guarded().map(|(_guard, bundle)| bundle)
45    }
46
47    #[inline]
48    pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Message<T, C>)> {
49        if let Some(message) = self.pullable.pull() {
50            let guard = ConsumedGuard {
51                consumed: Rc::clone(&self.consumed),
52                time: Some(message.time.clone()),
53                record_count: message.data.record_count(),
54            };
55            Some((guard, message))
56        }
57        else { None }
58    }
59}
60
61impl<T:Ord+Clone+'static, C, P: Pull<Message<T, C>>> Counter<T, C, P> {
62    /// Allocates a new `Counter` from a boxed puller.
63    pub fn new(pullable: P) -> Self {
64        Counter {
65            phantom: ::std::marker::PhantomData,
66            pullable,
67            consumed: Rc::new(RefCell::new(ChangeBatch::new())),
68        }
69    }
70    /// A references to shared changes in counts, for cloning or draining.
71    pub fn consumed(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
72        &self.consumed
73    }
74}