timely/dataflow/operators/core/
input.rs

1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::container::{CapacityContainerBuilder, ContainerBuilder, PushInto};
7
8use crate::scheduling::{Schedule, Activator};
9
10use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
11use crate::progress::Source;
12use crate::progress::operate::Connectivity;
13use crate::{Container, Data};
14use crate::communication::Push;
15use crate::dataflow::{Scope, ScopeParent, StreamCore};
16use crate::dataflow::channels::pushers::{Tee, Counter};
17use crate::dataflow::channels::Message;
18
19
20// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
21// TODO : more like a harness, with direct access to its inputs.
22
23// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
24// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
25// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
26
27/// Create a new `Stream` and `Handle` through which to supply input.
28pub trait Input : Scope {
29    /// Create a new [StreamCore] and [Handle] through which to supply input.
30    ///
31    /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used
32    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
33    /// data into the timely dataflow computation.
34    ///
35    /// The `Handle` also provides a means to indicate
36    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
37    /// to issue progress notifications.
38    ///
39    /// # Examples
40    /// ```
41    /// use timely::*;
42    /// use timely::dataflow::operators::core::{Input, Inspect};
43    ///
44    /// // construct and execute a timely dataflow
45    /// timely::execute(Config::thread(), |worker| {
46    ///
47    ///     // add an input and base computation off of it
48    ///     let mut input = worker.dataflow(|scope| {
49    ///         let (input, stream) = scope.new_input::<Vec<_>>();
50    ///         stream.inspect(|x| println!("hello {:?}", x));
51    ///         input
52    ///     });
53    ///
54    ///     // introduce input, advance computation
55    ///     for round in 0..10 {
56    ///         input.send(round);
57    ///         input.advance_to(round + 1);
58    ///         worker.step();
59    ///     }
60    /// });
61    /// ```
62    fn new_input<C: Container + Data>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, StreamCore<Self, C>);
63
64    /// Create a new [StreamCore] and [Handle] through which to supply input.
65    ///
66    /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used
67    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
68    /// data into the timely dataflow computation.
69    ///
70    /// The `Handle` also provides a means to indicate
71    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
72    /// to issue progress notifications.
73    ///
74    /// # Examples
75    /// ```
76    /// use std::rc::Rc;
77    /// use timely::*;
78    /// use timely::dataflow::operators::core::{Input, Inspect};
79    /// use timely::container::CapacityContainerBuilder;
80    ///
81    /// // construct and execute a timely dataflow
82    /// timely::execute(Config::thread(), |worker| {
83    ///
84    ///     // add an input and base computation off of it
85    ///     let mut input = worker.dataflow(|scope| {
86    ///         let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
87    ///         stream.inspect(|x| println!("hello {:?}", x));
88    ///         input
89    ///     });
90    ///
91    ///     // introduce input, advance computation
92    ///     for round in 0..10 {
93    ///         input.send_batch(&mut Rc::new(vec![round]));
94    ///         input.advance_to(round + 1);
95    ///         worker.step();
96    ///     }
97    /// });
98    /// ```
99    fn new_input_with_builder<CB: ContainerBuilder>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CB>, StreamCore<Self, CB::Container>);
100
101    /// Create a new stream from a supplied interactive handle.
102    ///
103    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
104    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
105    /// if it as attached to more than one stream.
106    ///
107    /// # Examples
108    /// ```
109    /// use timely::*;
110    /// use timely::dataflow::operators::core::{Input, Inspect};
111    /// use timely::dataflow::operators::core::input::Handle;
112    ///
113    /// // construct and execute a timely dataflow
114    /// timely::execute(Config::thread(), |worker| {
115    ///
116    ///     // add an input and base computation off of it
117    ///     let mut input = Handle::new();
118    ///     worker.dataflow(|scope| {
119    ///         scope.input_from(&mut input)
120    ///              .container::<Vec<_>>()
121    ///              .inspect(|x| println!("hello {:?}", x));
122    ///     });
123    ///
124    ///     // introduce input, advance computation
125    ///     for round in 0..10 {
126    ///         input.send(round);
127    ///         input.advance_to(round + 1);
128    ///         worker.step();
129    ///     }
130    /// });
131    /// ```
132    fn input_from<CB: ContainerBuilder>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, CB>) -> StreamCore<Self, CB::Container>;
133}
134
135use crate::order::TotalOrder;
136impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
137    fn new_input<C: Container + Data>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, StreamCore<G, C>) {
138        let mut handle = Handle::new();
139        let stream = self.input_from(&mut handle);
140        (handle, stream)
141    }
142
143    fn new_input_with_builder<CB: ContainerBuilder>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CB>, StreamCore<G, CB::Container>) {
144        let mut handle = Handle::new_with_builder();
145        let stream = self.input_from(&mut handle);
146        (handle, stream)
147    }
148
149    fn input_from<CB: ContainerBuilder>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, CB>) -> StreamCore<G, CB::Container> {
150        let (output, registrar) = Tee::<<G as ScopeParent>::Timestamp, CB::Container>::new();
151        let counter = Counter::new(output);
152        let produced = Rc::clone(counter.produced());
153
154        let index = self.allocate_operator_index();
155        let address = self.addr_for_child(index);
156
157        handle.activate.push(self.activator_for(Rc::clone(&address)));
158
159        let progress = Rc::new(RefCell::new(ChangeBatch::new()));
160
161        handle.register(counter, Rc::clone(&progress));
162
163        let copies = self.peers();
164
165        self.add_operator_with_index(Box::new(Operator {
166            name: "Input".to_owned(),
167            address,
168            shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
169            progress,
170            messages: produced,
171            copies,
172        }), index);
173
174        StreamCore::new(Source::new(index, 0), registrar, self.clone())
175    }
176}
177
178#[derive(Debug)]
179struct Operator<T:Timestamp> {
180    name: String,
181    address: Rc<[usize]>,
182    shared_progress: Rc<RefCell<SharedProgress<T>>>,
183    progress:   Rc<RefCell<ChangeBatch<T>>>,           // times closed since last asked
184    messages:   Rc<RefCell<ChangeBatch<T>>>,           // messages sent since last asked
185    copies:     usize,
186}
187
188impl<T:Timestamp> Schedule for Operator<T> {
189
190    fn name(&self) -> &str { &self.name }
191
192    fn path(&self) -> &[usize] { &self.address[..] }
193
194    fn schedule(&mut self) -> bool {
195        let shared_progress = &mut *self.shared_progress.borrow_mut();
196        self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
197        self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
198        false
199    }
200}
201
202impl<T:Timestamp> Operate<T> for Operator<T> {
203
204    fn inputs(&self) -> usize { 0 }
205    fn outputs(&self) -> usize { 1 }
206
207    fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
208        self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
209        (Vec::new(), Rc::clone(&self.shared_progress))
210    }
211
212    fn notify_me(&self) -> bool { false }
213}
214
215
216/// A handle to an input `StreamCore`, used to introduce data to a timely dataflow computation.
217#[derive(Debug)]
218pub struct Handle<T: Timestamp, CB: ContainerBuilder> {
219    activate: Vec<Activator>,
220    progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
221    pushers: Vec<Counter<T, CB::Container, Tee<T, CB::Container>>>,
222    builder: CB,
223    buffer: CB::Container,
224    now_at: T,
225}
226
227impl<T: Timestamp, C: Container + Data> Handle<T, CapacityContainerBuilder<C>> {
228    /// Allocates a new input handle, from which one can create timely streams.
229    ///
230    /// # Examples
231    /// ```
232    /// use timely::*;
233    /// use timely::dataflow::operators::core::{Input, Inspect};
234    /// use timely::dataflow::operators::core::input::Handle;
235    ///
236    /// // construct and execute a timely dataflow
237    /// timely::execute(Config::thread(), |worker| {
238    ///
239    ///     // add an input and base computation off of it
240    ///     let mut input = Handle::new();
241    ///     worker.dataflow(|scope| {
242    ///         scope.input_from(&mut input)
243    ///              .container::<Vec<_>>()
244    ///              .inspect(|x| println!("hello {:?}", x));
245    ///     });
246    ///
247    ///     // introduce input, advance computation
248    ///     for round in 0..10 {
249    ///         input.send(round);
250    ///         input.advance_to(round + 1);
251    ///         worker.step();
252    ///     }
253    /// });
254    /// ```
255    pub fn new() -> Self {
256        Self {
257            activate: Vec::new(),
258            progress: Vec::new(),
259            pushers: Vec::new(),
260            builder: CapacityContainerBuilder::default(),
261            buffer: Default::default(),
262            now_at: T::minimum(),
263        }
264    }
265}
266
267impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
268    /// Allocates a new input handle, from which one can create timely streams.
269    ///
270    /// # Examples
271    /// ```
272    /// use timely::*;
273    /// use timely::dataflow::operators::core::{Input, Inspect};
274    /// use timely::dataflow::operators::core::input::Handle;
275    /// use timely_container::CapacityContainerBuilder;
276    ///
277    /// // construct and execute a timely dataflow
278    /// timely::execute(Config::thread(), |worker| {
279    ///
280    ///     // add an input and base computation off of it
281    ///     let mut input = Handle::<_, CapacityContainerBuilder<_>>::new_with_builder();
282    ///     worker.dataflow(|scope| {
283    ///         scope.input_from(&mut input)
284    ///              .container::<Vec<_>>()
285    ///              .inspect(|x| println!("hello {:?}", x));
286    ///     });
287    ///
288    ///     // introduce input, advance computation
289    ///     for round in 0..10 {
290    ///         input.send(round);
291    ///         input.advance_to(round + 1);
292    ///         worker.step();
293    ///     }
294    /// });
295    /// ```
296    pub fn new_with_builder() -> Self {
297        Self {
298            activate: Vec::new(),
299            progress: Vec::new(),
300            pushers: Vec::new(),
301            builder: CB::default(),
302            buffer: Default::default(),
303            now_at: T::minimum(),
304        }
305    }
306
307    /// Creates an input stream from the handle in the supplied scope.
308    ///
309    /// # Examples
310    /// ```
311    /// use timely::*;
312    /// use timely::dataflow::operators::core::{Input, Inspect};
313    /// use timely::dataflow::operators::core::input::Handle;
314    ///
315    /// // construct and execute a timely dataflow
316    /// timely::execute(Config::thread(), |worker| {
317    ///
318    ///     // add an input and base computation off of it
319    ///     let mut input = Handle::new();
320    ///     worker.dataflow(|scope| {
321    ///         input.to_stream(scope)
322    ///              .container::<Vec<_>>()
323    ///              .inspect(|x| println!("hello {:?}", x));
324    ///     });
325    ///
326    ///     // introduce input, advance computation
327    ///     for round in 0..10 {
328    ///         input.send(round);
329    ///         input.advance_to(round + 1);
330    ///         worker.step();
331    ///     }
332    /// });
333    /// ```
334    pub fn to_stream<G>(&mut self, scope: &mut G) -> StreamCore<G, CB::Container>
335    where
336        T: TotalOrder,
337        G: Scope<Timestamp=T>,
338    {
339        scope.input_from(self)
340    }
341
342    fn register(
343        &mut self,
344        pusher: Counter<T, CB::Container, Tee<T, CB::Container>>,
345        progress: Rc<RefCell<ChangeBatch<T>>>,
346    ) {
347        // flush current contents, so new registrant does not see existing data.
348        self.flush();
349
350        // we need to produce an appropriate update to the capabilities for `progress`, in case a
351        // user has decided to drive the handle around a bit before registering it.
352        progress.borrow_mut().update(T::minimum(), -1);
353        progress.borrow_mut().update(self.now_at.clone(), 1);
354
355        self.progress.push(progress);
356        self.pushers.push(pusher);
357    }
358
359    /// Extract all ready contents from the builder and distribute to downstream operators.
360    #[inline]
361    fn extract_and_send(&mut self) {
362        while let Some(container) = self.builder.extract() {
363            Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
364        }
365    }
366
367    /// Flush all contents and distribute to downstream operators.
368    #[inline]
369    fn flush(&mut self) {
370        while let Some(container) = self.builder.finish() {
371            Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
372        }
373    }
374
375    /// Sends a container at each of the destinations. There can be more than one; clone if needed.
376    /// Does not take `self` because `flush` and `extract` borrow `self` mutably.
377    /// Clears the container.
378    // TODO: Find a better name for this function.
379    #[inline]
380    fn send_container(
381        container: &mut CB::Container,
382        buffer: &mut CB::Container,
383        pushers: &mut [Counter<T, CB::Container, Tee<T, CB::Container>>],
384        now_at: &T
385    ) {
386        for index in 0 .. pushers.len() {
387            if index < pushers.len() - 1 {
388                buffer.clone_from(container);
389                Message::push_at(buffer, now_at.clone(), &mut pushers[index]);
390            }
391            else {
392                Message::push_at(container, now_at.clone(), &mut pushers[index]);
393            }
394        }
395        container.clear();
396    }
397
398    /// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
399    // TODO: Find a better name for this function.
400    fn close_epoch(&mut self) {
401        self.flush();
402        for pusher in self.pushers.iter_mut() {
403            pusher.done();
404        }
405        for progress in self.progress.iter() {
406            progress.borrow_mut().update(self.now_at.clone(), -1);
407        }
408        // Alert worker of each active input operator.
409        for activate in self.activate.iter() {
410            activate.activate();
411        }
412    }
413
414    /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch.
415    ///
416    /// This method flushes single elements previously sent with `send`, to keep the insertion order.
417    ///
418    /// # Examples
419    /// ```
420    /// use timely::*;
421    /// use timely::dataflow::operators::core::{Input, InspectCore};
422    /// use timely::dataflow::operators::core::input::Handle;
423    ///
424    /// // construct and execute a timely dataflow
425    /// timely::execute(Config::thread(), |worker| {
426    ///
427    ///     // add an input and base computation off of it
428    ///     let mut input = Handle::new();
429    ///     worker.dataflow(|scope| {
430    ///         scope.input_from(&mut input)
431    ///              .inspect_container(|x| println!("hello {:?}", x));
432    ///     });
433    ///
434    ///     // introduce input, advance computation
435    ///     for round in 0..10 {
436    ///         input.send_batch(&mut vec![format!("{}", round)]);
437    ///         input.advance_to(round + 1);
438    ///         worker.step();
439    ///     }
440    /// });
441    /// ```
442    pub fn send_batch(&mut self, buffer: &mut CB::Container) {
443        if !buffer.is_empty() {
444            // flush buffered elements to ensure local fifo.
445            self.flush();
446            Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
447        }
448    }
449
450    /// Advances the current epoch to `next`.
451    ///
452    /// This method allows timely dataflow to issue progress notifications as it can now determine
453    /// that this input can no longer produce data at earlier timestamps.
454    pub fn advance_to(&mut self, next: T) {
455        // Assert that we do not rewind time.
456        assert!(self.now_at.less_equal(&next));
457        // Flush buffers if time has actually changed.
458        if !self.now_at.eq(&next) {
459            self.close_epoch();
460            self.now_at = next;
461            for progress in self.progress.iter() {
462                progress.borrow_mut().update(self.now_at.clone(), 1);
463            }
464        }
465    }
466
467    /// Closes the input.
468    ///
469    /// This method allows timely dataflow to issue all progress notifications blocked by this input
470    /// and to begin to shut down operators, as this input can no longer produce data.
471    pub fn close(self) { }
472
473    /// Reports the current epoch.
474    pub fn epoch(&self) -> &T {
475        &self.now_at
476    }
477
478    /// Reports the current timestamp.
479    pub fn time(&self) -> &T {
480        &self.now_at
481    }
482}
483
484impl<T, CB, D> PushInto<D> for Handle<T, CB>
485where
486    T: Timestamp,
487    CB: ContainerBuilder + PushInto<D>,
488{
489    #[inline]
490    fn push_into(&mut self, item: D) {
491        self.builder.push_into(item);
492        self.extract_and_send();
493    }
494}
495
496impl<T: Timestamp, CB: ContainerBuilder> Handle<T, CB> {
497    /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
498    ///
499    /// # Examples
500    /// ```
501    /// use timely::*;
502    /// use timely::dataflow::operators::core::{Input, Inspect};
503    /// use timely::dataflow::operators::core::input::Handle;
504    ///
505    /// // construct and execute a timely dataflow
506    /// timely::execute(Config::thread(), |worker| {
507    ///
508    ///     // add an input and base computation off of it
509    ///     let mut input = Handle::new();
510    ///     worker.dataflow(|scope| {
511    ///         scope.input_from(&mut input)
512    ///              .container::<Vec<_>>()
513    ///              .inspect(|x| println!("hello {:?}", x));
514    ///     });
515    ///
516    ///     // introduce input, advance computation
517    ///     for round in 0..10 {
518    ///         input.send(round);
519    ///         input.advance_to(round + 1);
520    ///         worker.step();
521    ///     }
522    /// });
523    /// ```
524    #[inline]
525    pub fn send<D>(&mut self, data: D) where CB: PushInto<D> {
526        self.push_into(data)
527    }
528}
529
530impl<T: Timestamp, CB: ContainerBuilder> Default for Handle<T, CB> {
531    fn default() -> Self {
532        Self::new_with_builder()
533    }
534}
535
536impl<T:Timestamp, CB: ContainerBuilder> Drop for Handle<T, CB> {
537    fn drop(&mut self) {
538        self.close_epoch();
539    }
540}