timely/dataflow/channels/pushers/
progress.rs

1//! A wrapper that allows containers to be sent by validating capabilities.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::{ChangeBatch, Timestamp};
7use crate::dataflow::channels::Message;
8use crate::dataflow::operators::CapabilityTrait;
9use crate::communication::Push;
10use crate::Container;
11
12/// A wrapper that allows containers to be sent by validating capabilities.
13#[derive(Debug)]
14pub struct Progress<T, P> {
15    pushee: P,
16    internal: Rc<RefCell<ChangeBatch<T>>>,
17    port: usize,
18}
19
20impl<T: Timestamp, P> Progress<T, P> {
21    /// Ships a container using a provided capability.
22    ///
23    /// On return, the container may hold undefined contents and should be cleared before it is reused.
24    #[inline] pub fn give<C: Container, CT: CapabilityTrait<T>>(&mut self, capability: &CT, container: &mut C) where P: Push<Message<T, C>> {
25        debug_assert!(self.valid(capability), "Attempted to open output session with invalid capability");
26        if !container.is_empty() { Message::push_at(container, capability.time().clone(), &mut self.pushee); }
27    }
28    /// Activates a `Progress` into a `ProgressSession` which will flush when dropped.
29    pub fn activate<'a, C>(&'a mut self) -> ProgressSession<'a, T, C, P> where P: Push<Message<T, C>> {
30        ProgressSession {
31            borrow: self,
32            marker: std::marker::PhantomData,
33        }
34    }
35    /// Determines if the capability is valid for this output.
36    pub fn valid<CT: CapabilityTrait<T>>(&self, capability: &CT) -> bool {
37        capability.valid_for_output(&self.internal, self.port)
38    }
39}
40
41impl<T, P> Progress<T, P> where T : Ord+Clone+'static {
42    /// Allocates a new `Progress` from a pushee and capability validation information.
43    pub fn new(pushee: P, internal: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
44        Self { pushee, internal, port }
45    }
46}
47
48/// A session that provides access to a `Progress` but will flush when dropped.
49///
50/// The type of the container `C` must be known, as long as the flushing action requires a specific `Push` implementation.
51pub struct ProgressSession<'a, T: Timestamp, C, P: Push<Message<T, C>>> {
52    borrow: &'a mut Progress<T, P>,
53    marker: std::marker::PhantomData<C>,
54}
55
56impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::Deref for ProgressSession<'a, T, C, P> {
57    type Target = Progress<T, P>;
58    fn deref(&self) -> &Self::Target { self.borrow }
59}
60
61impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> std::ops::DerefMut for ProgressSession<'a, T, C, P> {
62    fn deref_mut(&mut self) -> &mut Self::Target { self.borrow }
63}
64
65impl<'a, T: Timestamp, C, P: Push<Message<T, C>>> Drop for ProgressSession<'a, T, C, P> {
66    fn drop(&mut self) { self.borrow.pushee.done(); }
67}