timely/dataflow/operators/generic/handles.rs
1//! Handles to an operator's input and output streams.
2//!
3//! These handles are used by the generic operator interfaces to allow user closures to interact as
4//! the operator would with its input and output streams.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::progress::Timestamp;
10use crate::progress::ChangeBatch;
11use crate::progress::frontier::MutableAntichain;
12use crate::progress::operate::PortConnectivity;
13use crate::dataflow::channels::pullers::Counter as PullCounter;
14use crate::dataflow::channels::pushers::Counter as PushCounter;
15use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
16use crate::dataflow::channels::Message;
17use crate::communication::{Push, Pull};
18use crate::{Container, Data};
19use crate::container::{ContainerBuilder, CapacityContainerBuilder};
20
21use crate::dataflow::operators::InputCapability;
22use crate::dataflow::operators::capability::CapabilityTrait;
23
24/// Handle to an operator's input stream.
25pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
26 pull_counter: PullCounter<T, C, P>,
27 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
28 /// Timestamp summaries from this input to each output.
29 ///
30 /// Each timestamp received through this input may only produce output timestamps
31 /// greater or equal to the input timestamp subjected to at least one of these summaries.
32 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
33}
34
35/// Handle to an operator's input stream, specialized to vectors.
36pub type InputHandle<T, D, P> = InputHandleCore<T, Vec<D>, P>;
37
38/// Handle to an operator's input stream and frontier.
39pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull<Message<T, C>>+'a> {
40 /// The underlying input handle.
41 pub handle: &'a mut InputHandleCore<T, C, P>,
42 /// The frontier as reported by timely progress tracking.
43 pub frontier: &'a MutableAntichain<T>,
44}
45
46/// Handle to an operator's input stream and frontier, specialized to vectors.
47pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec<D>, P>;
48
49impl<T: Timestamp, C: Container, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
50
51 /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
52 /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
53 /// Returns `None` when there's no more data available.
54 #[inline]
55 pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
56 let internal = &self.internal;
57 let summaries = &self.summaries;
58 self.pull_counter.next_guarded().map(|(guard, bundle)| {
59 (InputCapability::new(Rc::clone(internal), Rc::clone(summaries), guard), &mut bundle.data)
60 })
61 }
62
63 /// Repeatedly calls `logic` till exhaustion of the available input data.
64 /// `logic` receives a capability and an input buffer.
65 ///
66 /// # Examples
67 /// ```
68 /// use timely::dataflow::operators::ToStream;
69 /// use timely::dataflow::operators::generic::Operator;
70 /// use timely::dataflow::channels::pact::Pipeline;
71 ///
72 /// timely::example(|scope| {
73 /// (0..10).to_stream(scope)
74 /// .unary(Pipeline, "example", |_cap, _info| |input, output| {
75 /// input.for_each(|cap, data| {
76 /// output.session(&cap).give_container(data);
77 /// });
78 /// });
79 /// });
80 /// ```
81 #[inline]
82 pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, mut logic: F) {
83 while let Some((cap, data)) = self.next() {
84 logic(cap, data);
85 }
86 }
87
88}
89
90impl<'a, T: Timestamp, C: Container, P: Pull<Message<T, C>>+'a> FrontieredInputHandleCore<'a, T, C, P> {
91 /// Allocate a new frontiered input handle.
92 pub fn new(handle: &'a mut InputHandleCore<T, C, P>, frontier: &'a MutableAntichain<T>) -> Self {
93 FrontieredInputHandleCore {
94 handle,
95 frontier,
96 }
97 }
98
99 /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
100 /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
101 /// Returns `None` when there's no more data available.
102 #[inline]
103 pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
104 self.handle.next()
105 }
106
107 /// Repeatedly calls `logic` till exhaustion of the available input data.
108 /// `logic` receives a capability and an input buffer.
109 ///
110 /// # Examples
111 /// ```
112 /// use timely::dataflow::operators::ToStream;
113 /// use timely::dataflow::operators::generic::Operator;
114 /// use timely::dataflow::channels::pact::Pipeline;
115 ///
116 /// timely::example(|scope| {
117 /// (0..10).to_stream(scope)
118 /// .unary(Pipeline, "example", |_cap,_info| |input, output| {
119 /// input.for_each(|cap, data| {
120 /// output.session(&cap).give_container(data);
121 /// });
122 /// });
123 /// });
124 /// ```
125 #[inline]
126 pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, logic: F) {
127 self.handle.for_each(logic)
128 }
129
130 /// Inspect the frontier associated with this input.
131 #[inline]
132 pub fn frontier(&self) -> &'a MutableAntichain<T> {
133 self.frontier
134 }
135}
136
137pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(input: &mut InputHandleCore<T, C, P>) -> &mut PullCounter<T, C, P> {
138 &mut input.pull_counter
139}
140
141/// Constructs an input handle.
142/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
143pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
144 pull_counter: PullCounter<T, C, P>,
145 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
146 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
147) -> InputHandleCore<T, C, P> {
148 InputHandleCore {
149 pull_counter,
150 internal,
151 summaries,
152 }
153}
154
155/// An owned instance of an output buffer which ensures certain API use.
156///
157/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
158/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
159/// pusher is flushed (via the `cease` method) once it is no longer used.
160#[derive(Debug)]
161pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
162 push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
163 internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
164 port: usize,
165}
166
167impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
168 /// Creates a new output wrapper from a push buffer.
169 pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>, port: usize) -> Self {
170 OutputWrapper {
171 push_buffer,
172 internal_buffer,
173 port,
174 }
175 }
176 /// Borrows the push buffer into a handle, which can be used to send records.
177 ///
178 /// This method ensures that the only access to the push buffer is through the `OutputHandle`
179 /// type which ensures the use of capabilities, and which calls `cease` when it is dropped.
180 pub fn activate(&mut self) -> OutputHandleCore<'_, T, CB, P> {
181 OutputHandleCore {
182 push_buffer: &mut self.push_buffer,
183 internal_buffer: &self.internal_buffer,
184 port: self.port,
185 }
186 }
187}
188
189/// Handle to an operator's output stream.
190pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
191 push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
192 internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
193 port: usize,
194}
195
196/// Handle specialized to `Vec`-based container.
197pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, CapacityContainerBuilder<Vec<D>>, P>;
198
199impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputHandleCore<'a, T, CB, P> {
200 /// Obtains a session that can send data at the timestamp associated with capability `cap`.
201 ///
202 /// In order to send data at a future timestamp, obtain a capability for the new timestamp
203 /// first, as show in the example.
204 ///
205 /// # Examples
206 /// ```
207 /// use timely::dataflow::operators::ToStream;
208 /// use timely::dataflow::operators::generic::Operator;
209 /// use timely::dataflow::channels::pact::Pipeline;
210 /// use timely::container::CapacityContainerBuilder;
211 ///
212 /// timely::example(|scope| {
213 /// (0..10).to_stream(scope)
214 /// .unary::<CapacityContainerBuilder<_>, _, _, _>(Pipeline, "example", |_cap, _info| |input, output| {
215 /// input.for_each(|cap, data| {
216 /// let time = cap.time().clone() + 1;
217 /// output.session_with_builder(&cap.delayed(&time))
218 /// .give_container(data);
219 /// });
220 /// });
221 /// });
222 /// ```
223 pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
224 debug_assert!(cap.valid_for_output(self.internal_buffer, self.port), "Attempted to open output session with invalid capability");
225 self.push_buffer.session_with_builder(cap.time())
226 }
227
228 /// Flushes all pending data and indicate that no more data immediately follows.
229 pub fn cease(&mut self) {
230 self.push_buffer.cease();
231 }
232}
233
234impl<'a, T: Timestamp, C: Container + Data, P: Push<Message<T, C>>> OutputHandleCore<'a, T, CapacityContainerBuilder<C>, P> {
235 /// Obtains a session that can send data at the timestamp associated with capability `cap`.
236 ///
237 /// In order to send data at a future timestamp, obtain a capability for the new timestamp
238 /// first, as show in the example.
239 ///
240 /// # Examples
241 /// ```
242 /// use timely::dataflow::operators::ToStream;
243 /// use timely::dataflow::operators::generic::Operator;
244 /// use timely::dataflow::channels::pact::Pipeline;
245 ///
246 /// timely::example(|scope| {
247 /// (0..10).to_stream(scope)
248 /// .unary(Pipeline, "example", |_cap, _info| |input, output| {
249 /// input.for_each(|cap, data| {
250 /// let time = cap.time().clone() + 1;
251 /// output.session(&cap.delayed(&time))
252 /// .give_container(data);
253 /// });
254 /// });
255 /// });
256 /// ```
257 #[inline]
258 pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b {
259 self.session_with_builder(cap)
260 }
261}
262
263impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Drop for OutputHandleCore<'_, T, CB, P> {
264 fn drop(&mut self) {
265 self.push_buffer.cease();
266 }
267}