timely/dataflow/operators/core/input.rs
1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
7
8use crate::scheduling::{Schedule, Activator};
9
10use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
11use crate::progress::Source;
12use crate::progress::operate::Connectivity;
13use crate::{Container, Data};
14use crate::communication::Push;
15use crate::dataflow::{Scope, ScopeParent, StreamCore};
16use crate::dataflow::channels::pushers::{Tee, Counter};
17use crate::dataflow::channels::Message;
18
19
20// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
21// TODO : more like a harness, with direct access to its inputs.
22
23// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
24// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
25// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
26
27/// Create a new `Stream` and `Handle` through which to supply input.
28pub trait Input : Scope {
29 /// Create a new [StreamCore] and [Handle] through which to supply input.
30 ///
31 /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used
32 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
33 /// data into the timely dataflow computation.
34 ///
35 /// The `Handle` also provides a means to indicate
36 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
37 /// to issue progress notifications.
38 ///
39 /// # Examples
40 /// ```
41 /// use timely::*;
42 /// use timely::dataflow::operators::core::{Input, Inspect};
43 ///
44 /// // construct and execute a timely dataflow
45 /// timely::execute(Config::thread(), |worker| {
46 ///
47 /// // add an input and base computation off of it
48 /// let mut input = worker.dataflow(|scope| {
49 /// let (input, stream) = scope.new_input::<Vec<_>>();
50 /// stream.inspect(|x| println!("hello {:?}", x));
51 /// input
52 /// });
53 ///
54 /// // introduce input, advance computation
55 /// for round in 0..10 {
56 /// input.send(round);
57 /// input.advance_to(round + 1);
58 /// worker.step();
59 /// }
60 /// });
61 /// ```
62 fn new_input<C: Container + Data>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, StreamCore<Self, C>);
63
64 /// Create a new [StreamCore] and [Handle] through which to supply input.
65 ///
66 /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used
67 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
68 /// data into the timely dataflow computation.
69 ///
70 /// The `Handle` also provides a means to indicate
71 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
72 /// to issue progress notifications.
73 ///
74 /// # Examples
75 /// ```
76 /// use std::rc::Rc;
77 /// use timely::*;
78 /// use timely::dataflow::operators::core::{Input, Inspect};
79 /// use timely::container::CapacityContainerBuilder;
80 ///
81 /// // construct and execute a timely dataflow
82 /// timely::execute(Config::thread(), |worker| {
83 ///
84 /// // add an input and base computation off of it
85 /// let mut input = worker.dataflow(|scope| {
86 /// let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
87 /// stream.inspect(|x| println!("hello {:?}", x));
88 /// input
89 /// });
90 ///
91 /// // introduce input, advance computation
92 /// for round in 0..10 {
93 /// input.send_batch(&mut Rc::new(vec![round]));
94 /// input.advance_to(round + 1);
95 /// worker.step();
96 /// }
97 /// });
98 /// ```
99 fn new_input_with_builder<CB: ContainerBuilder>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CB>, StreamCore<Self, CB::Container>);
100
101 /// Create a new stream from a supplied interactive handle.
102 ///
103 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
104 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
105 /// if it as attached to more than one stream.
106 ///
107 /// # Examples
108 /// ```
109 /// use timely::*;
110 /// use timely::dataflow::operators::core::{Input, Inspect};
111 /// use timely::dataflow::operators::core::input::Handle;
112 ///
113 /// // construct and execute a timely dataflow
114 /// timely::execute(Config::thread(), |worker| {
115 ///
116 /// // add an input and base computation off of it
117 /// let mut input = Handle::new();
118 /// worker.dataflow(|scope| {
119 /// scope.input_from(&mut input)
120 /// .container::<Vec<_>>()
121 /// .inspect(|x| println!("hello {:?}", x));
122 /// });
123 ///
124 /// // introduce input, advance computation
125 /// for round in 0..10 {
126 /// input.send(round);
127 /// input.advance_to(round + 1);
128 /// worker.step();
129 /// }
130 /// });
131 /// ```
132 fn input_from<CB: ContainerBuilder>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, CB>) -> StreamCore<Self, CB::Container>;
133}
134
135use crate::order::TotalOrder;
136impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
137 fn new_input<C: Container + Data>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, StreamCore<G, C>) {
138 let mut handle = Handle::new();
139 let stream = self.input_from(&mut handle);
140 (handle, stream)
141 }
142
143 fn new_input_with_builder<CB: ContainerBuilder>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CB>, StreamCore<G, CB::Container>) {
144 let mut handle = Handle::new_with_builder();
145 let stream = self.input_from(&mut handle);
146 (handle, stream)
147 }
148
149 fn input_from<CB: ContainerBuilder>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, CB>) -> StreamCore<G, CB::Container> {
150 let (output, registrar) = Tee::<<G as ScopeParent>::Timestamp, CB::Container>::new();
151 let counter = Counter::new(output);
152 let produced = Rc::clone(counter.produced());
153
154 let index = self.allocate_operator_index();
155 let address = self.addr_for_child(index);
156
157 handle.activate.push(self.activator_for(Rc::clone(&address)));
158
159 let progress = Rc::new(RefCell::new(ChangeBatch::new()));
160
161 handle.register(counter, Rc::clone(&progress));
162
163 let copies = self.peers();
164
165 self.add_operator_with_index(Box::new(Operator {
166 name: "Input".to_owned(),
167 address,
168 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
169 progress,
170 messages: produced,
171 copies,
172 }), index);
173
174 StreamCore::new(Source::new(index, 0), registrar, self.clone())
175 }
176}
177
178#[derive(Debug)]
179struct Operator<T:Timestamp> {
180 name: String,
181 address: Rc<[usize]>,
182 shared_progress: Rc<RefCell<SharedProgress<T>>>,
183 progress: Rc<RefCell<ChangeBatch<T>>>, // times closed since last asked
184 messages: Rc<RefCell<ChangeBatch<T>>>, // messages sent since last asked
185 copies: usize,
186}
187
188impl<T:Timestamp> Schedule for Operator<T> {
189
190 fn name(&self) -> &str { &self.name }
191
192 fn path(&self) -> &[usize] { &self.address[..] }
193
194 fn schedule(&mut self) -> bool {
195 let shared_progress = &mut *self.shared_progress.borrow_mut();
196 self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
197 self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
198 false
199 }
200}
201
202impl<T:Timestamp> Operate<T> for Operator<T> {
203
204 fn inputs(&self) -> usize { 0 }
205 fn outputs(&self) -> usize { 1 }
206
207 fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
208 self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
209 (Vec::new(), Rc::clone(&self.shared_progress))
210 }
211
212 fn notify_me(&self) -> bool { false }
213}
214
215
216/// A handle to an input `StreamCore`, used to introduce data to a timely dataflow computation.
217#[derive(Debug)]
218pub struct Handle<T: Timestamp, CB: ContainerBuilder> {
219 activate: Vec<Activator>,
220 progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
221 pushers: Vec<Counter<T, CB::Container, Tee<T, CB::Container>>>,
222 builder: CB,
223 buffer: CB::Container,
224 now_at: T,
225}
226
227impl<T: Timestamp, C: Container + Data> Handle<T, CapacityContainerBuilder<C>> {
228 /// Allocates a new input handle, from which one can create timely streams.
229 ///
230 /// # Examples
231 /// ```
232 /// use timely::*;
233 /// use timely::dataflow::operators::core::{Input, Inspect};
234 /// use timely::dataflow::operators::core::input::Handle;
235 ///
236 /// // construct and execute a timely dataflow
237 /// timely::execute(Config::thread(), |worker| {
238 ///
239 /// // add an input and base computation off of it
240 /// let mut input = Handle::new();
241 /// worker.dataflow(|scope| {
242 /// scope.input_from(&mut input)
243 /// .container::<Vec<_>>()
244 /// .inspect(|x| println!("hello {:?}", x));
245 /// });
246 ///
247 /// // introduce input, advance computation
248 /// for round in 0..10 {
249 /// input.send(round);
250 /// input.advance_to(round + 1);
251 /// worker.step();
252 /// }
253 /// });
254 /// ```
255 pub fn new() -> Self {
256 Self {
257 activate: Vec::new(),
258 progress: Vec::new(),
259 pushers: Vec::new(),
260 builder: CapacityContainerBuilder::default(),
261 buffer: Default::default(),
262 now_at: T::minimum(),
263 }
264 }
265}
266
267impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
268 /// Allocates a new input handle, from which one can create timely streams.
269 ///
270 /// # Examples
271 /// ```
272 /// use timely::*;
273 /// use timely::dataflow::operators::core::{Input, Inspect};
274 /// use timely::dataflow::operators::core::input::Handle;
275 /// use timely_container::CapacityContainerBuilder;
276 ///
277 /// // construct and execute a timely dataflow
278 /// timely::execute(Config::thread(), |worker| {
279 ///
280 /// // add an input and base computation off of it
281 /// let mut input = Handle::<_, CapacityContainerBuilder<_>>::new_with_builder();
282 /// worker.dataflow(|scope| {
283 /// scope.input_from(&mut input)
284 /// .container::<Vec<_>>()
285 /// .inspect(|x| println!("hello {:?}", x));
286 /// });
287 ///
288 /// // introduce input, advance computation
289 /// for round in 0..10 {
290 /// input.send(round);
291 /// input.advance_to(round + 1);
292 /// worker.step();
293 /// }
294 /// });
295 /// ```
296 pub fn new_with_builder() -> Self {
297 Self {
298 activate: Vec::new(),
299 progress: Vec::new(),
300 pushers: Vec::new(),
301 builder: CB::default(),
302 buffer: Default::default(),
303 now_at: T::minimum(),
304 }
305 }
306
307 /// Creates an input stream from the handle in the supplied scope.
308 ///
309 /// # Examples
310 /// ```
311 /// use timely::*;
312 /// use timely::dataflow::operators::core::{Input, Inspect};
313 /// use timely::dataflow::operators::core::input::Handle;
314 ///
315 /// // construct and execute a timely dataflow
316 /// timely::execute(Config::thread(), |worker| {
317 ///
318 /// // add an input and base computation off of it
319 /// let mut input = Handle::new();
320 /// worker.dataflow(|scope| {
321 /// input.to_stream(scope)
322 /// .container::<Vec<_>>()
323 /// .inspect(|x| println!("hello {:?}", x));
324 /// });
325 ///
326 /// // introduce input, advance computation
327 /// for round in 0..10 {
328 /// input.send(round);
329 /// input.advance_to(round + 1);
330 /// worker.step();
331 /// }
332 /// });
333 /// ```
334 pub fn to_stream<G>(&mut self, scope: &mut G) -> StreamCore<G, CB::Container>
335 where
336 T: TotalOrder,
337 G: Scope<Timestamp=T>,
338 {
339 scope.input_from(self)
340 }
341
342 fn register(
343 &mut self,
344 pusher: Counter<T, CB::Container, Tee<T, CB::Container>>,
345 progress: Rc<RefCell<ChangeBatch<T>>>,
346 ) {
347 // flush current contents, so new registrant does not see existing data.
348 self.flush();
349
350 // we need to produce an appropriate update to the capabilities for `progress`, in case a
351 // user has decided to drive the handle around a bit before registering it.
352 progress.borrow_mut().update(T::minimum(), -1);
353 progress.borrow_mut().update(self.now_at.clone(), 1);
354
355 self.progress.push(progress);
356 self.pushers.push(pusher);
357 }
358
359 /// Extract all ready contents from the builder and distribute to downstream operators.
360 #[inline]
361 fn extract_and_send(&mut self) {
362 while let Some(container) = self.builder.extract() {
363 Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
364 }
365 }
366
367 /// Flush all contents and distribute to downstream operators.
368 #[inline]
369 fn flush(&mut self) {
370 while let Some(container) = self.builder.finish() {
371 Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
372 }
373 }
374
375 /// Sends a container at each of the destinations. There can be more than one; clone if needed.
376 /// Does not take `self` because `flush` and `extract` borrow `self` mutably.
377 /// Clears the container.
378 // TODO: Find a better name for this function.
379 #[inline]
380 fn send_container(
381 container: &mut CB::Container,
382 buffer: &mut CB::Container,
383 pushers: &mut [Counter<T, CB::Container, Tee<T, CB::Container>>],
384 now_at: &T
385 ) {
386 for index in 0 .. pushers.len() {
387 if index < pushers.len() - 1 {
388 buffer.clone_from(container);
389 Message::push_at(buffer, now_at.clone(), &mut pushers[index]);
390 }
391 else {
392 Message::push_at(container, now_at.clone(), &mut pushers[index]);
393 }
394 }
395 container.clear();
396 }
397
398 /// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
399 // TODO: Find a better name for this function.
400 fn close_epoch(&mut self) {
401 self.flush();
402 for pusher in self.pushers.iter_mut() {
403 pusher.done();
404 }
405 for progress in self.progress.iter() {
406 progress.borrow_mut().update(self.now_at.clone(), -1);
407 }
408 // Alert worker of each active input operator.
409 for activate in self.activate.iter() {
410 activate.activate();
411 }
412 }
413
414 /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch.
415 ///
416 /// This method flushes single elements previously sent with `send`, to keep the insertion order.
417 ///
418 /// # Examples
419 /// ```
420 /// use timely::*;
421 /// use timely::dataflow::operators::core::{Input, InspectCore};
422 /// use timely::dataflow::operators::core::input::Handle;
423 ///
424 /// // construct and execute a timely dataflow
425 /// timely::execute(Config::thread(), |worker| {
426 ///
427 /// // add an input and base computation off of it
428 /// let mut input = Handle::new();
429 /// worker.dataflow(|scope| {
430 /// scope.input_from(&mut input)
431 /// .inspect_container(|x| println!("hello {:?}", x));
432 /// });
433 ///
434 /// // introduce input, advance computation
435 /// for round in 0..10 {
436 /// input.send_batch(&mut vec![format!("{}", round)]);
437 /// input.advance_to(round + 1);
438 /// worker.step();
439 /// }
440 /// });
441 /// ```
442 pub fn send_batch(&mut self, buffer: &mut CB::Container) {
443 if !buffer.is_empty() {
444 // flush buffered elements to ensure local fifo.
445 self.flush();
446 Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
447 }
448 }
449
450 /// Advances the current epoch to `next`.
451 ///
452 /// This method allows timely dataflow to issue progress notifications as it can now determine
453 /// that this input can no longer produce data at earlier timestamps.
454 pub fn advance_to(&mut self, next: T) {
455 // Assert that we do not rewind time.
456 assert!(self.now_at.less_equal(&next));
457 // Flush buffers if time has actually changed.
458 if !self.now_at.eq(&next) {
459 self.close_epoch();
460 self.now_at = next;
461 for progress in self.progress.iter() {
462 progress.borrow_mut().update(self.now_at.clone(), 1);
463 }
464 }
465 }
466
467 /// Closes the input.
468 ///
469 /// This method allows timely dataflow to issue all progress notifications blocked by this input
470 /// and to begin to shut down operators, as this input can no longer produce data.
471 pub fn close(self) { }
472
473 /// Reports the current epoch.
474 pub fn epoch(&self) -> &T {
475 &self.now_at
476 }
477
478 /// Reports the current timestamp.
479 pub fn time(&self) -> &T {
480 &self.now_at
481 }
482}
483
484impl<T, CB, D> PushInto<D> for Handle<T, CB>
485where
486 T: Timestamp,
487 CB: ContainerBuilder + PushInto<D>,
488{
489 #[inline]
490 fn push_into(&mut self, item: D) {
491 self.builder.push_into(item);
492 self.extract_and_send();
493 }
494}
495
496impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
497 /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
498 ///
499 /// # Examples
500 /// ```
501 /// use timely::*;
502 /// use timely::dataflow::operators::core::{Input, Inspect};
503 /// use timely::dataflow::operators::core::input::Handle;
504 ///
505 /// // construct and execute a timely dataflow
506 /// timely::execute(Config::thread(), |worker| {
507 ///
508 /// // add an input and base computation off of it
509 /// let mut input = Handle::new();
510 /// worker.dataflow(|scope| {
511 /// scope.input_from(&mut input)
512 /// .container::<Vec<_>>()
513 /// .inspect(|x| println!("hello {:?}", x));
514 /// });
515 ///
516 /// // introduce input, advance computation
517 /// for round in 0..10 {
518 /// input.send(round);
519 /// input.advance_to(round + 1);
520 /// worker.step();
521 /// }
522 /// });
523 /// ```
524 #[inline]
525 pub fn send<D>(&mut self, data: D) where CB: PushInto<D> {
526 self.push_into(data)
527 }
528}
529
530impl<T: Timestamp, CB: ContainerBuilder> Default for Handle<T, CB> {
531 fn default() -> Self {
532 Self::new_with_builder()
533 }
534}
535
536impl<T:Timestamp, CB: ContainerBuilder> Drop for Handle<T, CB> {
537 fn drop(&mut self) {
538 self.close_epoch();
539 }
540}