timely/dataflow/channels/pullers/
counter.rs1use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::dataflow::channels::Message;
7use crate::progress::ChangeBatch;
8use crate::communication::Pull;
9use crate::Accountable;
10
11pub struct Counter<T, C, P> {
13 pullable: P,
14 consumed: Rc<RefCell<ChangeBatch<T>>>,
15 phantom: ::std::marker::PhantomData<C>,
16}
17
18pub struct ConsumedGuard<T: Ord + Clone + 'static> {
20 consumed: Rc<RefCell<ChangeBatch<T>>>,
21 time: Option<T>,
22 record_count: i64,
23}
24
25impl<T:Ord+Clone+'static> ConsumedGuard<T> {
26 #[inline]
27 pub(crate) fn time(&self) -> &T {
28 self.time.as_ref().unwrap()
29 }
30}
31
32impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
33 fn drop(&mut self) {
34 let time = self.time.take().unwrap();
36 self.consumed.borrow_mut().update(time, self.record_count);
37 }
38}
39
40impl<T:Ord+Clone+'static, C: Accountable, P: Pull<Message<T, C>>> Counter<T, C, P> {
41 #[inline]
43 pub fn next(&mut self) -> Option<&mut Message<T, C>> {
44 self.next_guarded().map(|(_guard, bundle)| bundle)
45 }
46
47 #[inline]
48 pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Message<T, C>)> {
49 if let Some(message) = self.pullable.pull() {
50 let guard = ConsumedGuard {
51 consumed: Rc::clone(&self.consumed),
52 time: Some(message.time.clone()),
53 record_count: message.data.record_count(),
54 };
55 Some((guard, message))
56 }
57 else { None }
58 }
59}
60
61impl<T:Ord+Clone+'static, C, P: Pull<Message<T, C>>> Counter<T, C, P> {
62 pub fn new(pullable: P) -> Self {
64 Counter {
65 phantom: ::std::marker::PhantomData,
66 pullable,
67 consumed: Rc::new(RefCell::new(ChangeBatch::new())),
68 }
69 }
70 pub fn consumed(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
72 &self.consumed
73 }
74}