timely/dataflow/operators/generic/
handles.rs

1//! Handles to an operator's input and output streams.
2//!
3//! These handles are used by the generic operator interfaces to allow user closures to interact as
4//! the operator would with its input and output streams.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::progress::Timestamp;
10use crate::progress::ChangeBatch;
11use crate::progress::frontier::MutableAntichain;
12use crate::progress::operate::PortConnectivity;
13use crate::dataflow::channels::pullers::Counter as PullCounter;
14use crate::dataflow::channels::pushers::Counter as PushCounter;
15use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
16use crate::dataflow::channels::Message;
17use crate::communication::{Push, Pull};
18use crate::{Container, Data};
19use crate::container::{ContainerBuilder, CapacityContainerBuilder};
20
21use crate::dataflow::operators::InputCapability;
22use crate::dataflow::operators::capability::CapabilityTrait;
23
24/// Handle to an operator's input stream.
25pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
26    pull_counter: PullCounter<T, C, P>,
27    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
28    /// Timestamp summaries from this input to each output.
29    ///
30    /// Each timestamp received through this input may only produce output timestamps
31    /// greater or equal to the input timestamp subjected to at least one of these summaries.
32    summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
33}
34
35/// Handle to an operator's input stream, specialized to vectors.
36pub type InputHandle<T, D, P> = InputHandleCore<T, Vec<D>, P>;
37
38/// Handle to an operator's input stream and frontier.
39pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull<Message<T, C>>+'a> {
40    /// The underlying input handle.
41    pub handle: &'a mut InputHandleCore<T, C, P>,
42    /// The frontier as reported by timely progress tracking.
43    pub frontier: &'a MutableAntichain<T>,
44}
45
46/// Handle to an operator's input stream and frontier, specialized to vectors.
47pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec<D>, P>;
48
49impl<T: Timestamp, C: Container, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
50
51    /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
52    /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
53    /// Returns `None` when there's no more data available.
54    #[inline]
55    pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
56        let internal = &self.internal;
57        let summaries = &self.summaries;
58        self.pull_counter.next_guarded().map(|(guard, bundle)| {
59            (InputCapability::new(Rc::clone(internal), Rc::clone(summaries), guard), &mut bundle.data)
60        })
61    }
62
63    /// Repeatedly calls `logic` till exhaustion of the available input data.
64    /// `logic` receives a capability and an input buffer.
65    ///
66    /// # Examples
67    /// ```
68    /// use timely::dataflow::operators::ToStream;
69    /// use timely::dataflow::operators::generic::Operator;
70    /// use timely::dataflow::channels::pact::Pipeline;
71    ///
72    /// timely::example(|scope| {
73    ///     (0..10).to_stream(scope)
74    ///            .unary(Pipeline, "example", |_cap, _info| |input, output| {
75    ///                input.for_each(|cap, data| {
76    ///                    output.session(&cap).give_container(data);
77    ///                });
78    ///            });
79    /// });
80    /// ```
81    #[inline]
82    pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, mut logic: F) {
83        while let Some((cap, data)) = self.next() {
84            logic(cap, data);
85        }
86    }
87
88}
89
90impl<'a, T: Timestamp, C: Container, P: Pull<Message<T, C>>+'a> FrontieredInputHandleCore<'a, T, C, P> {
91    /// Allocate a new frontiered input handle.
92    pub fn new(handle: &'a mut InputHandleCore<T, C, P>, frontier: &'a MutableAntichain<T>) -> Self {
93        FrontieredInputHandleCore {
94            handle,
95            frontier,
96        }
97    }
98
99    /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
100    /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
101    /// Returns `None` when there's no more data available.
102    #[inline]
103    pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
104        self.handle.next()
105    }
106
107    /// Repeatedly calls `logic` till exhaustion of the available input data.
108    /// `logic` receives a capability and an input buffer.
109    ///
110    /// # Examples
111    /// ```
112    /// use timely::dataflow::operators::ToStream;
113    /// use timely::dataflow::operators::generic::Operator;
114    /// use timely::dataflow::channels::pact::Pipeline;
115    ///
116    /// timely::example(|scope| {
117    ///     (0..10).to_stream(scope)
118    ///            .unary(Pipeline, "example", |_cap,_info| |input, output| {
119    ///                input.for_each(|cap, data| {
120    ///                    output.session(&cap).give_container(data);
121    ///                });
122    ///            });
123    /// });
124    /// ```
125    #[inline]
126    pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, logic: F) {
127        self.handle.for_each(logic)
128    }
129
130    /// Inspect the frontier associated with this input.
131    #[inline]
132    pub fn frontier(&self) -> &'a MutableAntichain<T> {
133        self.frontier
134    }
135}
136
137pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(input: &mut InputHandleCore<T, C, P>) -> &mut PullCounter<T, C, P> {
138    &mut input.pull_counter
139}
140
141/// Constructs an input handle.
142/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
143pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
144    pull_counter: PullCounter<T, C, P>,
145    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
146    summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
147) -> InputHandleCore<T, C, P> {
148    InputHandleCore {
149        pull_counter,
150        internal,
151        summaries,
152    }
153}
154
155/// An owned instance of an output buffer which ensures certain API use.
156///
157/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
158/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
159/// pusher is flushed (via the `cease` method) once it is no longer used.
160#[derive(Debug)]
161pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
162    push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
163    internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
164    port: usize,
165}
166
167impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
168    /// Creates a new output wrapper from a push buffer.
169    pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
170        OutputWrapper {
171            push_buffer,
172            internal_buffer,
173            port,
174        }
175    }
176    /// Borrows the push buffer into a handle, which can be used to send records.
177    ///
178    /// This method ensures that the only access to the push buffer is through the `OutputHandle`
179    /// type which ensures the use of capabilities, and which calls `cease` when it is dropped.
180    pub fn activate(&mut self) -> OutputHandleCore<'_, T, CB, P> {
181        OutputHandleCore {
182            push_buffer: &mut self.push_buffer,
183            internal_buffer: &self.internal_buffer,
184            port: self.port,
185        }
186    }
187}
188
189/// Handle to an operator's output stream.
190pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
191    push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
192    internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
193    port: usize,
194}
195
196/// Handle specialized to `Vec`-based container.
197pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, CapacityContainerBuilder<Vec<D>>, P>;
198
199impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputHandleCore<'a, T, CB, P> {
200    /// Obtains a session that can send data at the timestamp associated with capability `cap`.
201    ///
202    /// In order to send data at a future timestamp, obtain a capability for the new timestamp
203    /// first, as show in the example.
204    ///
205    /// # Examples
206    /// ```
207    /// use timely::dataflow::operators::ToStream;
208    /// use timely::dataflow::operators::generic::Operator;
209    /// use timely::dataflow::channels::pact::Pipeline;
210    /// use timely::container::CapacityContainerBuilder;
211    ///
212    /// timely::example(|scope| {
213    ///     (0..10).to_stream(scope)
214    ///            .unary::<CapacityContainerBuilder<_>, _, _, _>(Pipeline, "example", |_cap, _info| |input, output| {
215    ///                input.for_each(|cap, data| {
216    ///                    let time = cap.time().clone() + 1;
217    ///                    output.session_with_builder(&cap.delayed(&time))
218    ///                          .give_container(data);
219    ///                });
220    ///            });
221    /// });
222    /// ```
223    pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
224        debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability");
225        self.push_buffer.session_with_builder(cap.time())
226    }
227
228    /// Flushes all pending data and indicate that no more data immediately follows.
229    pub fn cease(&mut self) {
230        self.push_buffer.cease();
231    }
232}
233
234impl<'a, T: Timestamp, C: Container + Data, P: Push<Message<T, C>>> OutputHandleCore<'a, T, CapacityContainerBuilder<C>, P> {
235    /// Obtains a session that can send data at the timestamp associated with capability `cap`.
236    ///
237    /// In order to send data at a future timestamp, obtain a capability for the new timestamp
238    /// first, as show in the example.
239    ///
240    /// # Examples
241    /// ```
242    /// use timely::dataflow::operators::ToStream;
243    /// use timely::dataflow::operators::generic::Operator;
244    /// use timely::dataflow::channels::pact::Pipeline;
245    ///
246    /// timely::example(|scope| {
247    ///     (0..10).to_stream(scope)
248    ///            .unary(Pipeline, "example", |_cap, _info| |input, output| {
249    ///                input.for_each(|cap, data| {
250    ///                    let time = cap.time().clone() + 1;
251    ///                    output.session(&cap.delayed(&time))
252    ///                          .give_container(data);
253    ///                });
254    ///            });
255    /// });
256    /// ```
257    #[inline]
258    pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b {
259        self.session_with_builder(cap)
260    }
261}
262
263impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Drop for OutputHandleCore<'_, T, CB, P> {
264    fn drop(&mut self) {
265        self.push_buffer.cease();
266    }
267}