timely/dataflow/channels/pushers/
progress.rs1use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::{ChangeBatch, Timestamp};
7use crate::dataflow::channels::Message;
8use crate::dataflow::operators::CapabilityTrait;
9use crate::communication::Push;
10use crate::Container;
11
12#[derive(Debug)]
14pub struct Progress<T, P> {
15 pushee: P,
16 internal: Rc<RefCell<ChangeBatch<T>>>,
17 port: usize,
18}
19
20impl<T: Timestamp, P> Progress<T, P> {
21 #[inline] pub fn give<C: Container, CT: CapabilityTrait<T>>(&mut self, capability: &CT, container: &mut C) where P: Push<Message<T, C>> {
25 debug_assert!(self.valid(capability), "Attempted to open output session with invalid capability");
26 if !container.is_empty() { Message::push_at(container, capability.time().clone(), &mut self.pushee); }
27 }
28 pub fn activate<'a, C>(&'a mut self) -> ProgressSession<'a, T, C, P> where P: Push<Message<T, C>> {
30 ProgressSession {
31 borrow: self,
32 marker: std::marker::PhantomData,
33 }
34 }
35 pub fn valid<CT: CapabilityTrait<T>>(&self, capability: &CT) -> bool {
37 capability.valid_for_output(&self.internal, self.port)
38 }
39}
40
41impl<T, P> Progress<T, P> where T : Ord+Clone+'static {
42 pub fn new(pushee: P, internal: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
44 Self { pushee, internal, port }
45 }
46}
47
48pub struct ProgressSession<'a, T: Timestamp, C, P: Push<Message<T, C>>> {
52 borrow: &'a mut Progress<T, P>,
53 marker: std::marker::PhantomData<C>,
54}
55
56impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::Deref for ProgressSession<'a, T, C, P> {
57 type Target = Progress<T, P>;
58 fn deref(&self) -> &Self::Target { self.borrow }
59}
60
61impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::DerefMut for ProgressSession<'a, T, C, P> {
62 fn deref_mut(&mut self) -> &mut Self::Target { self.borrow }
63}
64
65impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> Drop for ProgressSession<'a, T, C, P> {
66 fn drop(&mut self) { self.borrow.pushee.done(); }
67}