Skip to main content

timely/dataflow/operators/core/
input.rs

1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::container::{CapacityContainerBuilder, PushInto};
7
8use crate::scheduling::{Schedule, Activator};
9
10use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
11use crate::progress::Source;
12use crate::progress::operate::Connectivity;
13use crate::{Accountable, Container, ContainerBuilder};
14use crate::communication::Push;
15use crate::dataflow::{Scope, ScopeParent, Stream};
16use crate::dataflow::channels::pushers::{Tee, Counter};
17use crate::dataflow::channels::Message;
18
19
20// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
21// TODO : more like a harness, with direct access to its inputs.
22
23// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
24// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
25// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
26
27/// Create a new `Stream` and `Handle` through which to supply input.
28pub trait Input : Scope {
29    /// Create a new [Stream] and [Handle] through which to supply input.
30    ///
31    /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
32    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
33    /// data into the timely dataflow computation.
34    ///
35    /// The `Handle` also provides a means to indicate
36    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
37    /// to issue progress notifications.
38    ///
39    /// # Examples
40    /// ```
41    /// use timely::*;
42    /// use timely::dataflow::operators::{Input, Inspect};
43    ///
44    /// // construct and execute a timely dataflow
45    /// timely::execute(Config::thread(), |worker| {
46    ///
47    ///     // add an input and base computation off of it
48    ///     let mut input = worker.dataflow(|scope| {
49    ///         let (input, stream) = scope.new_input::<Vec<_>>();
50    ///         stream.inspect(|x| println!("hello {:?}", x));
51    ///         input
52    ///     });
53    ///
54    ///     // introduce input, advance computation
55    ///     for round in 0..10 {
56    ///         input.send(round);
57    ///         input.advance_to(round + 1);
58    ///         worker.step();
59    ///     }
60    /// });
61    /// ```
62    fn new_input<C: Container+Clone>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, Stream<Self, C>);
63
64    /// Create a new [Stream] and [Handle] through which to supply input.
65    ///
66    /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
67    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
68    /// data into the timely dataflow computation.
69    ///
70    /// The `Handle` also provides a means to indicate
71    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
72    /// to issue progress notifications.
73    ///
74    /// # Examples
75    /// ```
76    /// use std::rc::Rc;
77    /// use timely::*;
78    /// use timely::dataflow::operators::{Input, InspectCore};
79    /// use timely::container::CapacityContainerBuilder;
80    ///
81    /// // construct and execute a timely dataflow
82    /// timely::execute(Config::thread(), |worker| {
83    ///
84    ///     // add an input and base computation off of it
85    ///     let mut input = worker.dataflow(|scope| {
86    ///         let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
87    ///         stream.inspect_container(|x| println!("hello {:?}", x));
88    ///         input
89    ///     });
90    ///
91    ///     // introduce input, advance computation
92    ///     for round in 0..10 {
93    ///         input.send_batch(&mut Rc::new(vec![round]));
94    ///         input.advance_to(round + 1);
95    ///         worker.step();
96    ///     }
97    /// });
98    /// ```
99    fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CB>, Stream<Self, CB::Container>);
100
101    /// Create a new stream from a supplied interactive handle.
102    ///
103    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
104    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
105    /// if it as attached to more than one stream.
106    ///
107    /// # Examples
108    /// ```
109    /// use timely::*;
110    /// use timely::dataflow::InputHandle;
111    /// use timely::dataflow::operators::{Input, Inspect};
112    ///
113    /// // construct and execute a timely dataflow
114    /// timely::execute(Config::thread(), |worker| {
115    ///
116    ///     // add an input and base computation off of it
117    ///     let mut input = InputHandle::new();
118    ///     worker.dataflow(|scope| {
119    ///         scope.input_from(&mut input)
120    ///              .container::<Vec<_>>()
121    ///              .inspect(|x| println!("hello {:?}", x));
122    ///     });
123    ///
124    ///     // introduce input, advance computation
125    ///     for round in 0..10 {
126    ///         input.send(round);
127    ///         input.advance_to(round + 1);
128    ///         worker.step();
129    ///     }
130    /// });
131    /// ```
132    fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, CB>) -> Stream<Self, CB::Container>;
133}
134
135use crate::order::TotalOrder;
136impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
137    fn new_input<C: Container+Clone>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, Stream<G, C>) {
138        let mut handle = Handle::new();
139        let stream = self.input_from(&mut handle);
140        (handle, stream)
141    }
142
143    fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CB>, Stream<G, CB::Container>) {
144        let mut handle = Handle::new_with_builder();
145        let stream = self.input_from(&mut handle);
146        (handle, stream)
147    }
148
149    fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, CB>) -> Stream<G, CB::Container> {
150        let (output, registrar) = Tee::<<G as ScopeParent>::Timestamp, CB::Container>::new();
151        let counter = Counter::new(output);
152        let produced = Rc::clone(counter.produced());
153
154        let index = self.allocate_operator_index();
155        let address = self.addr_for_child(index);
156
157        handle.activate.push(self.activator_for(Rc::clone(&address)));
158
159        let progress = Rc::new(RefCell::new(ChangeBatch::new()));
160
161        handle.register(counter, Rc::clone(&progress));
162
163        let copies = self.peers();
164
165        self.add_operator_with_index(Box::new(Operator {
166            name: "Input".to_owned(),
167            address,
168            shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
169            progress,
170            messages: produced,
171            copies,
172        }), index);
173
174        Stream::new(Source::new(index, 0), registrar, self.clone())
175    }
176}
177
178#[derive(Debug)]
179struct Operator<T:Timestamp> {
180    name: String,
181    address: Rc<[usize]>,
182    shared_progress: Rc<RefCell<SharedProgress<T>>>,
183    progress:   Rc<RefCell<ChangeBatch<T>>>,           // times closed since last asked
184    messages:   Rc<RefCell<ChangeBatch<T>>>,           // messages sent since last asked
185    copies:     usize,
186}
187
188impl<T:Timestamp> Schedule for Operator<T> {
189
190    fn name(&self) -> &str { &self.name }
191
192    fn path(&self) -> &[usize] { &self.address[..] }
193
194    fn schedule(&mut self) -> bool {
195        let shared_progress = &mut *self.shared_progress.borrow_mut();
196        self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
197        self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
198        false
199    }
200}
201
202use crate::progress::operate::FrontierInterest;
203impl<T:Timestamp> Operate<T> for Operator<T> {
204
205    fn inputs(&self) -> usize { 0 }
206    fn outputs(&self) -> usize { 1 }
207
208    fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
209        self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
210        (Vec::new(), Rc::clone(&self.shared_progress), self)
211    }
212
213    fn notify_me(&self) -> &[FrontierInterest] { &[] }
214}
215
216
217/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
218#[derive(Debug)]
219pub struct Handle<T: Timestamp, CB: ContainerBuilder<Container: Clone>> {
220    activate: Vec<Activator>,
221    progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
222    pushers: Vec<Counter<T, Tee<T, CB::Container>>>,
223    builder: CB,
224    buffer: CB::Container,
225    now_at: T,
226}
227
228impl<T: Timestamp, C: Container+Clone> Handle<T, CapacityContainerBuilder<C>> {
229    /// Allocates a new input handle, from which one can create timely streams.
230    ///
231    /// # Examples
232    /// ```
233    /// use timely::*;
234    /// use timely::dataflow::InputHandle;
235    /// use timely::dataflow::operators::{Input, Inspect};
236    ///
237    /// // construct and execute a timely dataflow
238    /// timely::execute(Config::thread(), |worker| {
239    ///
240    ///     // add an input and base computation off of it
241    ///     let mut input = InputHandle::new();
242    ///     worker.dataflow(|scope| {
243    ///         scope.input_from(&mut input)
244    ///              .container::<Vec<_>>()
245    ///              .inspect(|x| println!("hello {:?}", x));
246    ///     });
247    ///
248    ///     // introduce input, advance computation
249    ///     for round in 0..10 {
250    ///         input.send(round);
251    ///         input.advance_to(round + 1);
252    ///         worker.step();
253    ///     }
254    /// });
255    /// ```
256    pub fn new() -> Self {
257        Self {
258            activate: Vec::new(),
259            progress: Vec::new(),
260            pushers: Vec::new(),
261            builder: CapacityContainerBuilder::default(),
262            buffer: Default::default(),
263            now_at: T::minimum(),
264        }
265    }
266}
267
268impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
269    /// Allocates a new input handle, from which one can create timely streams.
270    ///
271    /// # Examples
272    /// ```
273    /// use timely::*;
274    /// use timely::dataflow::InputHandle;
275    /// use timely::dataflow::operators::{Input, Inspect};
276    /// use timely_container::CapacityContainerBuilder;
277    ///
278    /// // construct and execute a timely dataflow
279    /// timely::execute(Config::thread(), |worker| {
280    ///
281    ///     // add an input and base computation off of it
282    ///     let mut input = InputHandle::<_, CapacityContainerBuilder<_>>::new_with_builder();
283    ///     worker.dataflow(|scope| {
284    ///         scope.input_from(&mut input)
285    ///              .container::<Vec<_>>()
286    ///              .inspect(|x| println!("hello {:?}", x));
287    ///     });
288    ///
289    ///     // introduce input, advance computation
290    ///     for round in 0..10 {
291    ///         input.send(round);
292    ///         input.advance_to(round + 1);
293    ///         worker.step();
294    ///     }
295    /// });
296    /// ```
297    pub fn new_with_builder() -> Self {
298        Self {
299            activate: Vec::new(),
300            progress: Vec::new(),
301            pushers: Vec::new(),
302            builder: CB::default(),
303            buffer: Default::default(),
304            now_at: T::minimum(),
305        }
306    }
307
308    /// Creates an input stream from the handle in the supplied scope.
309    ///
310    /// # Examples
311    /// ```
312    /// use timely::*;
313    /// use timely::dataflow::InputHandle;
314    /// use timely::dataflow::operators::{Input, Inspect};
315    ///
316    /// // construct and execute a timely dataflow
317    /// timely::execute(Config::thread(), |worker| {
318    ///
319    ///     // add an input and base computation off of it
320    ///     let mut input = InputHandle::new();
321    ///     worker.dataflow(|scope| {
322    ///         input.to_stream(scope)
323    ///              .container::<Vec<_>>()
324    ///              .inspect(|x| println!("hello {:?}", x));
325    ///     });
326    ///
327    ///     // introduce input, advance computation
328    ///     for round in 0..10 {
329    ///         input.send(round);
330    ///         input.advance_to(round + 1);
331    ///         worker.step();
332    ///     }
333    /// });
334    /// ```
335    pub fn to_stream<G>(&mut self, scope: &mut G) -> Stream<G, CB::Container>
336    where
337        T: TotalOrder,
338        G: Scope<Timestamp=T>,
339    {
340        scope.input_from(self)
341    }
342
343    fn register(
344        &mut self,
345        pusher: Counter<T, Tee<T, CB::Container>>,
346        progress: Rc<RefCell<ChangeBatch<T>>>,
347    ) {
348        // flush current contents, so new registrant does not see existing data.
349        self.flush();
350
351        // we need to produce an appropriate update to the capabilities for `progress`, in case a
352        // user has decided to drive the handle around a bit before registering it.
353        progress.borrow_mut().update(T::minimum(), -1);
354        progress.borrow_mut().update(self.now_at.clone(), 1);
355
356        self.progress.push(progress);
357        self.pushers.push(pusher);
358    }
359
360    /// Extract all ready contents from the builder and distribute to downstream operators.
361    #[inline]
362    fn extract_and_send(&mut self) {
363        while let Some(container) = self.builder.extract() {
364            Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
365        }
366    }
367
368    /// Flush all contents and distribute to downstream operators.
369    #[inline]
370    pub fn flush(&mut self) {
371        while let Some(container) = self.builder.finish() {
372            Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
373        }
374    }
375
376    /// Sends a container at each of the destinations. There can be more than one; clone if needed.
377    /// Does not take `self` because `flush` and `extract` borrow `self` mutably.
378    /// Leaves the container in an undefined state.
379    // TODO: Find a better name for this function.
380    #[inline]
381    fn send_container(
382        container: &mut CB::Container,
383        buffer: &mut CB::Container,
384        pushers: &mut [Counter<T, Tee<T, CB::Container>>],
385        now_at: &T
386    ) {
387        for index in 0 .. pushers.len() {
388            if index < pushers.len() - 1 {
389                buffer.clone_from(container);
390                Message::push_at(buffer, now_at.clone(), &mut pushers[index]);
391            }
392            else {
393                Message::push_at(container, now_at.clone(), &mut pushers[index]);
394            }
395        }
396    }
397
398    /// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
399    // TODO: Find a better name for this function.
400    fn close_epoch(&mut self) {
401        self.flush();
402        for pusher in self.pushers.iter_mut() {
403            pusher.done();
404        }
405        for progress in self.progress.iter() {
406            progress.borrow_mut().update(self.now_at.clone(), -1);
407        }
408        // Alert worker of each active input operator.
409        for activate in self.activate.iter() {
410            activate.activate();
411        }
412    }
413
414    /// Sends a batch of records into the corresponding timely dataflow [Stream], at the current epoch.
415    ///
416    /// This method flushes single elements previously sent with `send`, to keep the insertion order.
417    ///
418    /// # Examples
419    /// ```
420    /// use timely::*;
421    /// use timely::dataflow::InputHandle;
422    /// use timely::dataflow::operators::{Input, InspectCore};
423    ///
424    /// // construct and execute a timely dataflow
425    /// timely::execute(Config::thread(), |worker| {
426    ///
427    ///     // add an input and base computation off of it
428    ///     let mut input = InputHandle::new();
429    ///     worker.dataflow(|scope| {
430    ///         scope.input_from(&mut input)
431    ///              .inspect_container(|x| println!("hello {:?}", x));
432    ///     });
433    ///
434    ///     // introduce input, advance computation
435    ///     for round in 0..10 {
436    ///         input.send_batch(&mut vec![format!("{}", round)]);
437    ///         input.advance_to(round + 1);
438    ///         worker.step();
439    ///     }
440    /// });
441    /// ```
442    pub fn send_batch(&mut self, buffer: &mut CB::Container) {
443        if !buffer.is_empty() {
444            // flush buffered elements to ensure local fifo.
445            self.flush();
446            Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
447        }
448    }
449
450    /// Advances the current epoch to `next`.
451    ///
452    /// This method allows timely dataflow to issue progress notifications as it can now determine
453    /// that this input can no longer produce data at earlier timestamps.
454    pub fn advance_to(&mut self, next: T) {
455        // Assert that we do not rewind time.
456        assert!(self.now_at.less_equal(&next));
457        // Flush buffers if time has actually changed.
458        if !self.now_at.eq(&next) {
459            self.close_epoch();
460            self.now_at = next;
461            for progress in self.progress.iter() {
462                progress.borrow_mut().update(self.now_at.clone(), 1);
463            }
464        }
465    }
466
467    /// Closes the input.
468    ///
469    /// This method allows timely dataflow to issue all progress notifications blocked by this input
470    /// and to begin to shut down operators, as this input can no longer produce data.
471    pub fn close(self) { }
472
473    /// Reports the current timestamp.
474    pub fn time(&self) -> &T {
475        &self.now_at
476    }
477}
478
479impl<T, CB, D> PushInto<D> for Handle<T, CB>
480where
481    T: Timestamp,
482    CB: ContainerBuilder<Container: Clone> + PushInto<D>,
483{
484    #[inline]
485    fn push_into(&mut self, item: D) {
486        self.builder.push_into(item);
487        self.extract_and_send();
488    }
489}
490
491impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
492    /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
493    ///
494    /// # Examples
495    /// ```
496    /// use timely::*;
497    /// use timely::dataflow::InputHandle;
498    /// use timely::dataflow::operators::{Input, Inspect};
499    ///
500    /// // construct and execute a timely dataflow
501    /// timely::execute(Config::thread(), |worker| {
502    ///
503    ///     // add an input and base computation off of it
504    ///     let mut input = InputHandle::new();
505    ///     worker.dataflow(|scope| {
506    ///         scope.input_from(&mut input)
507    ///              .container::<Vec<_>>()
508    ///              .inspect(|x| println!("hello {:?}", x));
509    ///     });
510    ///
511    ///     // introduce input, advance computation
512    ///     for round in 0..10 {
513    ///         input.send(round);
514    ///         input.advance_to(round + 1);
515    ///         worker.step();
516    ///     }
517    /// });
518    /// ```
519    #[inline]
520    pub fn send<D>(&mut self, data: D) where CB: PushInto<D> {
521        self.push_into(data)
522    }
523}
524
525impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Default for Handle<T, CB> {
526    fn default() -> Self {
527        Self::new_with_builder()
528    }
529}
530
531impl<T:Timestamp, CB: ContainerBuilder<Container: Clone>> Drop for Handle<T, CB> {
532    fn drop(&mut self) {
533        self.close_epoch();
534    }
535}