timely/dataflow/operators/generic/handles.rs
1//! Handles to an operator's input and output streams.
2//!
3//! These handles are used by the generic operator interfaces to allow user closures to interact as
4//! the operator would with its input and output streams.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::progress::Timestamp;
10use crate::progress::ChangeBatch;
11use crate::progress::frontier::MutableAntichain;
12use crate::progress::operate::PortConnectivity;
13use crate::dataflow::channels::pullers::Counter as PullCounter;
14use crate::dataflow::channels::pushers::Counter as PushCounter;
15use crate::dataflow::channels::pushers::buffer::{Buffer, Session};
16use crate::dataflow::channels::Message;
17use crate::communication::{Push, Pull};
18use crate::{Container, Data};
19use crate::container::{ContainerBuilder, CapacityContainerBuilder};
20use crate::logging::TimelyLogger as Logger;
21
22use crate::dataflow::operators::InputCapability;
23use crate::dataflow::operators::capability::CapabilityTrait;
24
25/// Handle to an operator's input stream.
26pub struct InputHandleCore<T: Timestamp, C: Container, P: Pull<Message<T, C>>> {
27 pull_counter: PullCounter<T, C, P>,
28 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
29 /// Timestamp summaries from this input to each output.
30 ///
31 /// Each timestamp received through this input may only produce output timestamps
32 /// greater or equal to the input timestamp subjected to at least one of these summaries.
33 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
34 logging: Option<Logger>,
35}
36
37/// Handle to an operator's input stream, specialized to vectors.
38pub type InputHandle<T, D, P> = InputHandleCore<T, Vec<D>, P>;
39
40/// Handle to an operator's input stream and frontier.
41pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull<Message<T, C>>+'a> {
42 /// The underlying input handle.
43 pub handle: &'a mut InputHandleCore<T, C, P>,
44 /// The frontier as reported by timely progress tracking.
45 pub frontier: &'a MutableAntichain<T>,
46}
47
48/// Handle to an operator's input stream and frontier, specialized to vectors.
49pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec<D>, P>;
50
51impl<T: Timestamp, C: Container, P: Pull<Message<T, C>>> InputHandleCore<T, C, P> {
52
53 /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
54 /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
55 /// Returns `None` when there's no more data available.
56 #[inline]
57 pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
58 let internal = &self.internal;
59 let summaries = &self.summaries;
60 self.pull_counter.next_guarded().map(|(guard, bundle)| {
61 (InputCapability::new(Rc::clone(internal), Rc::clone(summaries), guard), &mut bundle.data)
62 })
63 }
64
65 /// Repeatedly calls `logic` till exhaustion of the available input data.
66 /// `logic` receives a capability and an input buffer.
67 ///
68 /// # Examples
69 /// ```
70 /// use timely::dataflow::operators::ToStream;
71 /// use timely::dataflow::operators::generic::Operator;
72 /// use timely::dataflow::channels::pact::Pipeline;
73 ///
74 /// timely::example(|scope| {
75 /// (0..10).to_stream(scope)
76 /// .unary(Pipeline, "example", |_cap, _info| |input, output| {
77 /// input.for_each(|cap, data| {
78 /// output.session(&cap).give_container(data);
79 /// });
80 /// });
81 /// });
82 /// ```
83 #[inline]
84 pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, mut logic: F) {
85 let mut logging = self.logging.take();
86 while let Some((cap, data)) = self.next() {
87 logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
88 logic(cap, data);
89 logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false }));
90 }
91 self.logging = logging;
92 }
93
94}
95
96impl<'a, T: Timestamp, C: Container, P: Pull<Message<T, C>>+'a> FrontieredInputHandleCore<'a, T, C, P> {
97 /// Allocate a new frontiered input handle.
98 pub fn new(handle: &'a mut InputHandleCore<T, C, P>, frontier: &'a MutableAntichain<T>) -> Self {
99 FrontieredInputHandleCore {
100 handle,
101 frontier,
102 }
103 }
104
105 /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`.
106 /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
107 /// Returns `None` when there's no more data available.
108 #[inline]
109 pub fn next(&mut self) -> Option<(InputCapability<T>, &mut C)> {
110 self.handle.next()
111 }
112
113 /// Repeatedly calls `logic` till exhaustion of the available input data.
114 /// `logic` receives a capability and an input buffer.
115 ///
116 /// # Examples
117 /// ```
118 /// use timely::dataflow::operators::ToStream;
119 /// use timely::dataflow::operators::generic::Operator;
120 /// use timely::dataflow::channels::pact::Pipeline;
121 ///
122 /// timely::example(|scope| {
123 /// (0..10).to_stream(scope)
124 /// .unary(Pipeline, "example", |_cap,_info| |input, output| {
125 /// input.for_each(|cap, data| {
126 /// output.session(&cap).give_container(data);
127 /// });
128 /// });
129 /// });
130 /// ```
131 #[inline]
132 pub fn for_each<F: FnMut(InputCapability<T>, &mut C)>(&mut self, logic: F) {
133 self.handle.for_each(logic)
134 }
135
136 /// Inspect the frontier associated with this input.
137 #[inline]
138 pub fn frontier(&self) -> &'a MutableAntichain<T> {
139 self.frontier
140 }
141}
142
143pub fn _access_pull_counter<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(input: &mut InputHandleCore<T, C, P>) -> &mut PullCounter<T, C, P> {
144 &mut input.pull_counter
145}
146
147/// Constructs an input handle.
148/// Declared separately so that it can be kept private when `InputHandle` is re-exported.
149pub fn new_input_handle<T: Timestamp, C: Container, P: Pull<Message<T, C>>>(
150 pull_counter: PullCounter<T, C, P>,
151 internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
152 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
153 logging: Option<Logger>
154) -> InputHandleCore<T, C, P> {
155 InputHandleCore {
156 pull_counter,
157 internal,
158 summaries,
159 logging,
160 }
161}
162
163/// An owned instance of an output buffer which ensures certain API use.
164///
165/// An `OutputWrapper` exists to prevent anyone from using the wrapped buffer in any way other
166/// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the
167/// pusher is flushed (via the `cease` method) once it is no longer used.
168#[derive(Debug)]
169pub struct OutputWrapper<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> {
170 push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>,
171 internal_buffer: Rc<RefCell<ChangeBatch<T>>>,
172}
173
174impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputWrapper<T, CB, P> {
175 /// Creates a new output wrapper from a push buffer.
176 pub fn new(push_buffer: Buffer<T, CB, PushCounter<T, CB::Container, P>>, internal_buffer: Rc<RefCell<ChangeBatch<T>>>) -> Self {
177 OutputWrapper {
178 push_buffer,
179 internal_buffer,
180 }
181 }
182 /// Borrows the push buffer into a handle, which can be used to send records.
183 ///
184 /// This method ensures that the only access to the push buffer is through the `OutputHandle`
185 /// type which ensures the use of capabilities, and which calls `cease` when it is dropped.
186 pub fn activate(&mut self) -> OutputHandleCore<T, CB, P> {
187 OutputHandleCore {
188 push_buffer: &mut self.push_buffer,
189 internal_buffer: &self.internal_buffer,
190 }
191 }
192}
193
194/// Handle to an operator's output stream.
195pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push<Message<T, CB::Container>>+'a> {
196 push_buffer: &'a mut Buffer<T, CB, PushCounter<T, CB::Container, P>>,
197 internal_buffer: &'a Rc<RefCell<ChangeBatch<T>>>,
198}
199
200/// Handle specialized to `Vec`-based container.
201pub type OutputHandle<'a, T, D, P> = OutputHandleCore<'a, T, CapacityContainerBuilder<Vec<D>>, P>;
202
203impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> OutputHandleCore<'a, T, CB, P> {
204 /// Obtains a session that can send data at the timestamp associated with capability `cap`.
205 ///
206 /// In order to send data at a future timestamp, obtain a capability for the new timestamp
207 /// first, as show in the example.
208 ///
209 /// # Examples
210 /// ```
211 /// use timely::dataflow::operators::ToStream;
212 /// use timely::dataflow::operators::generic::Operator;
213 /// use timely::dataflow::channels::pact::Pipeline;
214 /// use timely::container::CapacityContainerBuilder;
215 ///
216 /// timely::example(|scope| {
217 /// (0..10).to_stream(scope)
218 /// .unary::<CapacityContainerBuilder<_>, _, _, _>(Pipeline, "example", |_cap, _info| |input, output| {
219 /// input.for_each(|cap, data| {
220 /// let time = cap.time().clone() + 1;
221 /// output.session_with_builder(&cap.delayed(&time))
222 /// .give_container(data);
223 /// });
224 /// });
225 /// });
226 /// ```
227 pub fn session_with_builder<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CB, PushCounter<T, CB::Container, P>> where 'a: 'b {
228 assert!(cap.valid_for_output(self.internal_buffer), "Attempted to open output session with invalid capability");
229 self.push_buffer.session_with_builder(cap.time())
230 }
231
232 /// Flushes all pending data and indicate that no more data immediately follows.
233 pub fn cease(&mut self) {
234 self.push_buffer.cease();
235 }
236}
237
238impl<'a, T: Timestamp, C: Container + Data, P: Push<Message<T, C>>> OutputHandleCore<'a, T, CapacityContainerBuilder<C>, P> {
239 /// Obtains a session that can send data at the timestamp associated with capability `cap`.
240 ///
241 /// In order to send data at a future timestamp, obtain a capability for the new timestamp
242 /// first, as show in the example.
243 ///
244 /// # Examples
245 /// ```
246 /// use timely::dataflow::operators::ToStream;
247 /// use timely::dataflow::operators::generic::Operator;
248 /// use timely::dataflow::channels::pact::Pipeline;
249 ///
250 /// timely::example(|scope| {
251 /// (0..10).to_stream(scope)
252 /// .unary(Pipeline, "example", |_cap, _info| |input, output| {
253 /// input.for_each(|cap, data| {
254 /// let time = cap.time().clone() + 1;
255 /// output.session(&cap.delayed(&time))
256 /// .give_container(data);
257 /// });
258 /// });
259 /// });
260 /// ```
261 #[inline]
262 pub fn session<'b, CT: CapabilityTrait<T>>(&'b mut self, cap: &'b CT) -> Session<'b, T, CapacityContainerBuilder<C>, PushCounter<T, C, P>> where 'a: 'b {
263 self.session_with_builder(cap)
264 }
265}
266
267impl<T: Timestamp, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Drop for OutputHandleCore<'_, T, CB, P> {
268 fn drop(&mut self) {
269 self.push_buffer.cease();
270 }
271}