timely/dataflow/channels/pullers/
counter.rs
1use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::dataflow::channels::Message;
7use crate::progress::ChangeBatch;
8use crate::communication::Pull;
9use crate::Container;
10
11pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
13 pullable: P,
14 consumed: Rc<RefCell<ChangeBatch<T>>>,
15 phantom: ::std::marker::PhantomData<C>,
16}
17
18pub struct ConsumedGuard<T: Ord + Clone + 'static> {
20 consumed: Rc<RefCell<ChangeBatch<T>>>,
21 time: Option<T>,
22 len: usize,
23}
24
25impl<T:Ord+Clone+'static> ConsumedGuard<T> {
26 pub(crate) fn time(&self) -> &T {
27 self.time.as_ref().unwrap()
28 }
29}
30
31impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
32 fn drop(&mut self) {
33 let time = self.time.take().unwrap();
35 self.consumed.borrow_mut().update(time, self.len as i64);
36 }
37}
38
39impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
40 #[inline]
42 pub fn next(&mut self) -> Option<&mut Message<T, C>> {
43 self.next_guarded().map(|(_guard, bundle)| bundle)
44 }
45
46 #[inline]
47 pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Message<T, C>)> {
48 if let Some(message) = self.pullable.pull() {
49 let guard = ConsumedGuard {
50 consumed: Rc::clone(&self.consumed),
51 time: Some(message.time.clone()),
52 len: message.data.len(),
53 };
54 Some((guard, message))
55 }
56 else { None }
57 }
58}
59
60impl<T:Ord+Clone+'static, C, P: Pull<Message<T, C>>> Counter<T, C, P> {
61 pub fn new(pullable: P) -> Self {
63 Counter {
64 phantom: ::std::marker::PhantomData,
65 pullable,
66 consumed: Rc::new(RefCell::new(ChangeBatch::new())),
67 }
68 }
69 pub fn consumed(&self) -> &Rc<RefCell<ChangeBatch<T>>> {
71 &self.consumed
72 }
73}