timely/dataflow/operators/generic/
handles.rs

1//! Handles to an operator's input and output streams.
2//!
3//! These handles are used by the generic operator interfaces to allow user closures to interact as
4//! the operator would with its input and output streams.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::progress::Timestamp;
10use crate::progress::ChangeBatch;
11use crate::progress::frontier::MutableAntichain;
12use crate::progress::operate::PortConnectivity;
13use crate::dataflow::channels::pullers::Counter as PullCounter;
14use crate::dataflow::channels::pushers::Counter as PushCounter;
15use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
16use crate::dataflow::channels::Message;
17use crate::communication::{Push, Pull};
18use crate::{Container, Data};
19use crate::container::{ContainerBuilder, CapacityContainerBuilder};
20use crate::logging::TimelyLogger as Logger;
21
22use crate::dataflow::operators::InputCapability;
23use crate::dataflow::operators::capability::CapabilityTrait;
24
25/// Handle to an operator's input stream.
26pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
27    pull_counter: PullCounter<T, C, P>,
28    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
29    /// Timestamp summaries from this input to each output.
30    ///
31    /// Each timestamp received through this input may only produce output timestamps
32    /// greater or equal to the input timestamp subjected to at least one of these summaries.
33    summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, 
34    logging: Option<Logger>,
35}
36
37/// Handle to an operator's input stream, specialized to vectors.
38pub type InputHandle<T, D, P> = InputHandleCore<T, Vec<D>, P>;
39
40/// Handle to an operator's input stream and frontier.
41pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull<Message<T, C>>+'a> {
42    /// The underlying input handle.
43    pub handle: &'a mut InputHandleCore<T, C, P>,
44    /// The frontier as reported by timely progress tracking.
45    pub frontier: &'a MutableAntichain<T>,
46}
47
48/// Handle to an operator's input stream and frontier, specialized to vectors.
49pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec<D>, P>;
50
51impl<T: Timestamp, C: Container, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
52
53    /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
54    /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
55    /// Returns `None` when there's no more data available.
56    #[inline]
57    pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
58        let internal = &self.internal;
59        let summaries = &self.summaries;
60        self.pull_counter.next_guarded().map(|(guard, bundle)| {
61            (InputCapability::new(Rc::clone(internal), Rc::clone(summaries), guard), &mut bundle.data)
62        })
63    }
64
65    /// Repeatedly calls `logic` till exhaustion of the available input data.
66    /// `logic` receives a capability and an input buffer.
67    ///
68    /// # Examples
69    /// ```
70    /// use timely::dataflow::operators::ToStream;
71    /// use timely::dataflow::operators::generic::Operator;
72    /// use timely::dataflow::channels::pact::Pipeline;
73    ///
74    /// timely::example(|scope| {
75    ///     (0..10).to_stream(scope)
76    ///            .unary(Pipeline, "example", |_cap, _info| |input, output| {
77    ///                input.for_each(|cap, data| {
78    ///                    output.session(&cap).give_container(data);
79    ///                });
80    ///            });
81    /// });
82    /// ```
83    #[inline]
84    pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, mut logic: F) {
85        let mut logging = self.logging.take();
86        while let Some((cap, data)) = self.next() {
87            logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
88            logic(cap, data);
89            logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
90        }
91        self.logging = logging;
92    }
93
94}
95
96impl<'a, T: Timestamp, C: Container, P: Pull<Message<T, C>>+'a> FrontieredInputHandleCore<'a, T, C, P> {
97    /// Allocate a new frontiered input handle.
98    pub fn new(handle: &'a mut InputHandleCore<T, C, P>, frontier: &'a MutableAntichain<T>) -> Self {
99        FrontieredInputHandleCore {
100            handle,
101            frontier,
102        }
103    }
104
105    /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
106    /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
107    /// Returns `None` when there's no more data available.
108    #[inline]
109    pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
110        self.handle.next()
111    }
112
113    /// Repeatedly calls `logic` till exhaustion of the available input data.
114    /// `logic` receives a capability and an input buffer.
115    ///
116    /// # Examples
117    /// ```
118    /// use timely::dataflow::operators::ToStream;
119    /// use timely::dataflow::operators::generic::Operator;
120    /// use timely::dataflow::channels::pact::Pipeline;
121    ///
122    /// timely::example(|scope| {
123    ///     (0..10).to_stream(scope)
124    ///            .unary(Pipeline, "example", |_cap,_info| |input, output| {
125    ///                input.for_each(|cap, data| {
126    ///                    output.session(&cap).give_container(data);
127    ///                });
128    ///            });
129    /// });
130    /// ```
131    #[inline]
132    pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, logic: F) {
133        self.handle.for_each(logic)
134    }
135
136    /// Inspect the frontier associated with this input.
137    #[inline]
138    pub fn frontier(&self) -> &'a MutableAntichain<T> {
139        self.frontier
140    }
141}
142
143pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(input: &mut InputHandleCore<T, C, P>) -> &mut PullCounter<T, C, P> {
144    &mut input.pull_counter
145}
146
147/// Constructs an input handle.
148/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
149pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
150    pull_counter: PullCounter<T, C, P>, 
151    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>, 
152    summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, 
153    logging: Option<Logger>
154) -> InputHandleCore<T, C, P> {
155    InputHandleCore {
156        pull_counter,
157        internal,
158        summaries,
159        logging,
160    }
161}
162
163/// An owned instance of an output buffer which ensures certain API use.
164///
165/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
166/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
167/// pusher is flushed (via the `cease` method) once it is no longer used.
168#[derive(Debug)]
169pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
170    push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
171    internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
172}
173
174impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
175    /// Creates a new output wrapper from a push buffer.
176    pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>) -> Self {
177        OutputWrapper {
178            push_buffer,
179            internal_buffer,
180        }
181    }
182    /// Borrows the push buffer into a handle, which can be used to send records.
183    ///
184    /// This method ensures that the only access to the push buffer is through the `OutputHandle`
185    /// type which ensures the use of capabilities, and which calls `cease` when it is dropped.
186    pub fn activate(&mut self) -> OutputHandleCore<T, CB, P> {
187        OutputHandleCore {
188            push_buffer: &mut self.push_buffer,
189            internal_buffer: &self.internal_buffer,
190        }
191    }
192}
193
194/// Handle to an operator's output stream.
195pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
196    push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
197    internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
198}
199
200/// Handle specialized to `Vec`-based container.
201pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, CapacityContainerBuilder<Vec<D>>, P>;
202
203impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputHandleCore<'a, T, CB, P> {
204    /// Obtains a session that can send data at the timestamp associated with capability `cap`.
205    ///
206    /// In order to send data at a future timestamp, obtain a capability for the new timestamp
207    /// first, as show in the example.
208    ///
209    /// # Examples
210    /// ```
211    /// use timely::dataflow::operators::ToStream;
212    /// use timely::dataflow::operators::generic::Operator;
213    /// use timely::dataflow::channels::pact::Pipeline;
214    /// use timely::container::CapacityContainerBuilder;
215    ///
216    /// timely::example(|scope| {
217    ///     (0..10).to_stream(scope)
218    ///            .unary::<CapacityContainerBuilder<_>, _, _, _>(Pipeline, "example", |_cap, _info| |input, output| {
219    ///                input.for_each(|cap, data| {
220    ///                    let time = cap.time().clone() + 1;
221    ///                    output.session_with_builder(&cap.delayed(&time))
222    ///                          .give_container(data);
223    ///                });
224    ///            });
225    /// });
226    /// ```
227    pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
228        assert!(cap.valid_for_output(self.internal_buffer), "Attempted to open output session with invalid capability");
229        self.push_buffer.session_with_builder(cap.time())
230    }
231
232    /// Flushes all pending data and indicate that no more data immediately follows.
233    pub fn cease(&mut self) {
234        self.push_buffer.cease();
235    }
236}
237
238impl<'a, T: Timestamp, C: Container + Data, P: Push<Message<T, C>>> OutputHandleCore<'a, T, CapacityContainerBuilder<C>, P> {
239    /// Obtains a session that can send data at the timestamp associated with capability `cap`.
240    ///
241    /// In order to send data at a future timestamp, obtain a capability for the new timestamp
242    /// first, as show in the example.
243    ///
244    /// # Examples
245    /// ```
246    /// use timely::dataflow::operators::ToStream;
247    /// use timely::dataflow::operators::generic::Operator;
248    /// use timely::dataflow::channels::pact::Pipeline;
249    ///
250    /// timely::example(|scope| {
251    ///     (0..10).to_stream(scope)
252    ///            .unary(Pipeline, "example", |_cap, _info| |input, output| {
253    ///                input.for_each(|cap, data| {
254    ///                    let time = cap.time().clone() + 1;
255    ///                    output.session(&cap.delayed(&time))
256    ///                          .give_container(data);
257    ///                });
258    ///            });
259    /// });
260    /// ```
261    #[inline]
262    pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b {
263        self.session_with_builder(cap)
264    }
265}
266
267impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Drop for OutputHandleCore<'_, T, CB, P> {
268    fn drop(&mut self) {
269        self.push_buffer.cease();
270    }
271}