Skip to main content

timely/dataflow/operators/core/
input.rs

1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::container::{CapacityContainerBuilder, PushInto};
7use crate::scheduling::{Schedule, Activator};
8
9use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
10use crate::progress::Source;
11use crate::progress::operate::Connectivity;
12use crate::{Accountable, Container, ContainerBuilder};
13use crate::communication::Push;
14use crate::dataflow::{Scope, Stream};
15use crate::dataflow::channels::pushers::{Tee, Counter};
16use crate::dataflow::channels::Message;
17
18
19// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
20// TODO : more like a harness, with direct access to its inputs.
21
22// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
23// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
24// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
25
26/// Create a new `Stream` and `Handle` through which to supply input.
27pub trait Input<'scope> {
28    /// The timestamp at which this input scope operates.
29    type Timestamp: Timestamp;
30
31    /// Create a new [Stream] and [Handle] through which to supply input.
32    ///
33    /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
34    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
35    /// data into the timely dataflow computation.
36    ///
37    /// The `Handle` also provides a means to indicate
38    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
39    /// to issue progress notifications.
40    ///
41    /// # Examples
42    /// ```
43    /// use timely::*;
44    /// use timely::dataflow::operators::{Input, Inspect};
45    ///
46    /// // construct and execute a timely dataflow
47    /// timely::execute(Config::thread(), |worker| {
48    ///
49    ///     // add an input and base computation off of it
50    ///     let mut input = worker.dataflow(|scope| {
51    ///         let (input, stream) = scope.new_input::<Vec<_>>();
52    ///         stream.inspect(|x| println!("hello {:?}", x));
53    ///         input
54    ///     });
55    ///
56    ///     // introduce input, advance computation
57    ///     for round in 0..10 {
58    ///         input.send(round);
59    ///         input.advance_to(round + 1);
60    ///         worker.step();
61    ///     }
62    /// });
63    /// ```
64    fn new_input<C: Container+Clone>(&self) -> (Handle<Self::Timestamp, CapacityContainerBuilder<C>>, Stream<'scope, Self::Timestamp, C>);
65
66    /// Create a new [Stream] and [Handle] through which to supply input.
67    ///
68    /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
69    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
70    /// data into the timely dataflow computation.
71    ///
72    /// The `Handle` also provides a means to indicate
73    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
74    /// to issue progress notifications.
75    ///
76    /// # Examples
77    /// ```
78    /// use std::rc::Rc;
79    /// use timely::*;
80    /// use timely::dataflow::operators::{Input, InspectCore};
81    /// use timely::container::CapacityContainerBuilder;
82    ///
83    /// // construct and execute a timely dataflow
84    /// timely::execute(Config::thread(), |worker| {
85    ///
86    ///     // add an input and base computation off of it
87    ///     let mut input = worker.dataflow(|scope| {
88    ///         let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
89    ///         stream.inspect_container(|x| println!("hello {:?}", x));
90    ///         input
91    ///     });
92    ///
93    ///     // introduce input, advance computation
94    ///     for round in 0..10 {
95    ///         input.send_batch(&mut Rc::new(vec![round]));
96    ///         input.advance_to(round + 1);
97    ///         worker.step();
98    ///     }
99    /// });
100    /// ```
101    fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&self) -> (Handle<Self::Timestamp, CB>, Stream<'scope, Self::Timestamp, CB::Container>);
102
103    /// Create a new stream from a supplied interactive handle.
104    ///
105    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
106    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
107    /// if it as attached to more than one stream.
108    ///
109    /// # Examples
110    /// ```
111    /// use timely::*;
112    /// use timely::dataflow::InputHandle;
113    /// use timely::dataflow::operators::{Input, Inspect};
114    ///
115    /// // construct and execute a timely dataflow
116    /// timely::execute(Config::thread(), |worker| {
117    ///
118    ///     // add an input and base computation off of it
119    ///     let mut input = InputHandle::new();
120    ///     worker.dataflow(|scope| {
121    ///         scope.input_from(&mut input)
122    ///              .container::<Vec<_>>()
123    ///              .inspect(|x| println!("hello {:?}", x));
124    ///     });
125    ///
126    ///     // introduce input, advance computation
127    ///     for round in 0..10 {
128    ///         input.send(round);
129    ///         input.advance_to(round + 1);
130    ///         worker.step();
131    ///     }
132    /// });
133    /// ```
134    fn input_from<CB: ContainerBuilder<Container: Clone>>(&self, handle: &mut Handle<Self::Timestamp, CB>) -> Stream<'scope, Self::Timestamp, CB::Container>;
135}
136
137use crate::order::TotalOrder;
138impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
139    type Timestamp = T;
140    fn new_input<C: Container+Clone>(&self) -> (Handle<T, CapacityContainerBuilder<C>>, Stream<'scope, T, C>) {
141        let mut handle = Handle::new();
142        let stream = self.input_from(&mut handle);
143        (handle, stream)
144    }
145
146    fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&self) -> (Handle<T, CB>, Stream<'scope, T, CB::Container>) {
147        let mut handle = Handle::new_with_builder();
148        let stream = self.input_from(&mut handle);
149        (handle, stream)
150    }
151
152    fn input_from<CB: ContainerBuilder<Container: Clone>>(&self, handle: &mut Handle<T, CB>) -> Stream<'scope, T, CB::Container> {
153        let (output, registrar) = Tee::<T, CB::Container>::new();
154        let counter = Counter::new(output);
155        let produced = Rc::clone(counter.produced());
156
157        let slot = self.reserve_operator();
158        let index = slot.index();
159        let address = slot.addr();
160
161        handle.activate.push(self.activator_for(Rc::clone(&address)));
162
163        let progress = Rc::new(RefCell::new(ChangeBatch::new()));
164
165        handle.register(counter, Rc::clone(&progress));
166
167        let copies = self.peers();
168
169        slot.install(Box::new(Operator {
170            name: "Input".to_owned(),
171            address,
172            shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
173            progress,
174            messages: produced,
175            copies,
176        }));
177
178        Stream::new(Source::new(index, 0), registrar, *self)
179    }
180}
181
182#[derive(Debug)]
183struct Operator<T:Timestamp> {
184    name: String,
185    address: Rc<[usize]>,
186    shared_progress: Rc<RefCell<SharedProgress<T>>>,
187    progress:   Rc<RefCell<ChangeBatch<T>>>,           // times closed since last asked
188    messages:   Rc<RefCell<ChangeBatch<T>>>,           // messages sent since last asked
189    copies:     usize,
190}
191
192impl<T:Timestamp> Schedule for Operator<T> {
193
194    fn name(&self) -> &str { &self.name }
195
196    fn path(&self) -> &[usize] { &self.address[..] }
197
198    fn schedule(&mut self) -> bool {
199        let shared_progress = &mut *self.shared_progress.borrow_mut();
200        self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
201        self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
202        false
203    }
204}
205
206use crate::progress::operate::FrontierInterest;
207impl<T:Timestamp> Operate<T> for Operator<T> {
208
209    fn inputs(&self) -> usize { 0 }
210    fn outputs(&self) -> usize { 1 }
211
212    fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
213        self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
214        (Vec::new(), Rc::clone(&self.shared_progress), self)
215    }
216
217    fn notify_me(&self) -> &[FrontierInterest] { &[] }
218}
219
220
221/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
222#[derive(Debug)]
223pub struct Handle<T: Timestamp, CB: ContainerBuilder<Container: Clone>> {
224    activate: Vec<Activator>,
225    progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
226    pushers: Vec<Counter<T, Tee<T, CB::Container>>>,
227    builder: CB,
228    buffer: CB::Container,
229    now_at: T,
230}
231
232impl<T: Timestamp, C: Container+Clone> Handle<T, CapacityContainerBuilder<C>> {
233    /// Allocates a new input handle, from which one can create timely streams.
234    ///
235    /// # Examples
236    /// ```
237    /// use timely::*;
238    /// use timely::dataflow::InputHandle;
239    /// use timely::dataflow::operators::{Input, Inspect};
240    ///
241    /// // construct and execute a timely dataflow
242    /// timely::execute(Config::thread(), |worker| {
243    ///
244    ///     // add an input and base computation off of it
245    ///     let mut input = InputHandle::new();
246    ///     worker.dataflow(|scope| {
247    ///         scope.input_from(&mut input)
248    ///              .container::<Vec<_>>()
249    ///              .inspect(|x| println!("hello {:?}", x));
250    ///     });
251    ///
252    ///     // introduce input, advance computation
253    ///     for round in 0..10 {
254    ///         input.send(round);
255    ///         input.advance_to(round + 1);
256    ///         worker.step();
257    ///     }
258    /// });
259    /// ```
260    pub fn new() -> Self {
261        Self {
262            activate: Vec::new(),
263            progress: Vec::new(),
264            pushers: Vec::new(),
265            builder: CapacityContainerBuilder::default(),
266            buffer: Default::default(),
267            now_at: T::minimum(),
268        }
269    }
270}
271
272impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
273    /// Allocates a new input handle, from which one can create timely streams.
274    ///
275    /// # Examples
276    /// ```
277    /// use timely::*;
278    /// use timely::dataflow::InputHandle;
279    /// use timely::dataflow::operators::{Input, Inspect};
280    /// use timely_container::CapacityContainerBuilder;
281    ///
282    /// // construct and execute a timely dataflow
283    /// timely::execute(Config::thread(), |worker| {
284    ///
285    ///     // add an input and base computation off of it
286    ///     let mut input = InputHandle::<_, CapacityContainerBuilder<_>>::new_with_builder();
287    ///     worker.dataflow(|scope| {
288    ///         scope.input_from(&mut input)
289    ///              .container::<Vec<_>>()
290    ///              .inspect(|x| println!("hello {:?}", x));
291    ///     });
292    ///
293    ///     // introduce input, advance computation
294    ///     for round in 0..10 {
295    ///         input.send(round);
296    ///         input.advance_to(round + 1);
297    ///         worker.step();
298    ///     }
299    /// });
300    /// ```
301    pub fn new_with_builder() -> Self {
302        Self {
303            activate: Vec::new(),
304            progress: Vec::new(),
305            pushers: Vec::new(),
306            builder: CB::default(),
307            buffer: Default::default(),
308            now_at: T::minimum(),
309        }
310    }
311
312    /// Creates an input stream from the handle in the supplied scope.
313    ///
314    /// # Examples
315    /// ```
316    /// use timely::*;
317    /// use timely::dataflow::InputHandle;
318    /// use timely::dataflow::operators::{Input, Inspect};
319    ///
320    /// // construct and execute a timely dataflow
321    /// timely::execute(Config::thread(), |worker| {
322    ///
323    ///     // add an input and base computation off of it
324    ///     let mut input = InputHandle::new();
325    ///     worker.dataflow(|scope| {
326    ///         input.to_stream(scope)
327    ///              .container::<Vec<_>>()
328    ///              .inspect(|x| println!("hello {:?}", x));
329    ///     });
330    ///
331    ///     // introduce input, advance computation
332    ///     for round in 0..10 {
333    ///         input.send(round);
334    ///         input.advance_to(round + 1);
335    ///         worker.step();
336    ///     }
337    /// });
338    /// ```
339    pub fn to_stream<'scope>(&mut self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
340    where
341        T: TotalOrder,
342    {
343        scope.input_from(self)
344    }
345
346    fn register(
347        &mut self,
348        pusher: Counter<T, Tee<T, CB::Container>>,
349        progress: Rc<RefCell<ChangeBatch<T>>>,
350    ) {
351        // flush current contents, so new registrant does not see existing data.
352        self.flush();
353
354        // we need to produce an appropriate update to the capabilities for `progress`, in case a
355        // user has decided to drive the handle around a bit before registering it.
356        progress.borrow_mut().update(T::minimum(), -1);
357        progress.borrow_mut().update(self.now_at.clone(), 1);
358
359        self.progress.push(progress);
360        self.pushers.push(pusher);
361    }
362
363    /// Extract all ready contents from the builder and distribute to downstream operators.
364    #[inline]
365    fn extract_and_send(&mut self) {
366        while let Some(container) = self.builder.extract() {
367            Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
368        }
369    }
370
371    /// Flush all contents and distribute to downstream operators.
372    #[inline]
373    pub fn flush(&mut self) {
374        while let Some(container) = self.builder.finish() {
375            Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
376        }
377    }
378
379    /// Sends a container at each of the destinations. There can be more than one; clone if needed.
380    /// Does not take `self` because `flush` and `extract` borrow `self` mutably.
381    /// Leaves the container in an undefined state.
382    // TODO: Find a better name for this function.
383    #[inline]
384    fn send_container(
385        container: &mut CB::Container,
386        buffer: &mut CB::Container,
387        pushers: &mut [Counter<T, Tee<T, CB::Container>>],
388        now_at: &T
389    ) {
390        for index in 0 .. pushers.len() {
391            if index < pushers.len() - 1 {
392                buffer.clone_from(container);
393                Message::push_at(buffer, now_at.clone(), &mut pushers[index]);
394            }
395            else {
396                Message::push_at(container, now_at.clone(), &mut pushers[index]);
397            }
398        }
399    }
400
401    /// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
402    // TODO: Find a better name for this function.
403    fn close_epoch(&mut self) {
404        self.flush();
405        for pusher in self.pushers.iter_mut() {
406            pusher.done();
407        }
408        for progress in self.progress.iter() {
409            progress.borrow_mut().update(self.now_at.clone(), -1);
410        }
411        // Alert worker of each active input operator.
412        for activate in self.activate.iter() {
413            activate.activate();
414        }
415    }
416
417    /// Sends a batch of records into the corresponding timely dataflow [Stream], at the current epoch.
418    ///
419    /// This method flushes single elements previously sent with `send`, to keep the insertion order.
420    ///
421    /// # Examples
422    /// ```
423    /// use timely::*;
424    /// use timely::dataflow::InputHandle;
425    /// use timely::dataflow::operators::{Input, InspectCore};
426    ///
427    /// // construct and execute a timely dataflow
428    /// timely::execute(Config::thread(), |worker| {
429    ///
430    ///     // add an input and base computation off of it
431    ///     let mut input = InputHandle::new();
432    ///     worker.dataflow(|scope| {
433    ///         scope.input_from(&mut input)
434    ///              .inspect_container(|x| println!("hello {:?}", x));
435    ///     });
436    ///
437    ///     // introduce input, advance computation
438    ///     for round in 0..10 {
439    ///         input.send_batch(&mut vec![format!("{}", round)]);
440    ///         input.advance_to(round + 1);
441    ///         worker.step();
442    ///     }
443    /// });
444    /// ```
445    pub fn send_batch(&mut self, buffer: &mut CB::Container) {
446        if !buffer.is_empty() {
447            // flush buffered elements to ensure local fifo.
448            self.flush();
449            Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
450        }
451    }
452
453    /// Advances the current epoch to `next`.
454    ///
455    /// This method allows timely dataflow to issue progress notifications as it can now determine
456    /// that this input can no longer produce data at earlier timestamps.
457    pub fn advance_to(&mut self, next: T) {
458        // Assert that we do not rewind time.
459        assert!(self.now_at.less_equal(&next));
460        // Flush buffers if time has actually changed.
461        if !self.now_at.eq(&next) {
462            self.close_epoch();
463            self.now_at = next;
464            for progress in self.progress.iter() {
465                progress.borrow_mut().update(self.now_at.clone(), 1);
466            }
467        }
468    }
469
470    /// Closes the input.
471    ///
472    /// This method allows timely dataflow to issue all progress notifications blocked by this input
473    /// and to begin to shut down operators, as this input can no longer produce data.
474    pub fn close(self) { }
475
476    /// Reports the current timestamp.
477    pub fn time(&self) -> &T {
478        &self.now_at
479    }
480}
481
482impl<T, CB, D> PushInto<D> for Handle<T, CB>
483where
484    T: Timestamp,
485    CB: ContainerBuilder<Container: Clone> + PushInto<D>,
486{
487    #[inline]
488    fn push_into(&mut self, item: D) {
489        self.builder.push_into(item);
490        self.extract_and_send();
491    }
492}
493
494impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
495    /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
496    ///
497    /// # Examples
498    /// ```
499    /// use timely::*;
500    /// use timely::dataflow::InputHandle;
501    /// use timely::dataflow::operators::{Input, Inspect};
502    ///
503    /// // construct and execute a timely dataflow
504    /// timely::execute(Config::thread(), |worker| {
505    ///
506    ///     // add an input and base computation off of it
507    ///     let mut input = InputHandle::new();
508    ///     worker.dataflow(|scope| {
509    ///         scope.input_from(&mut input)
510    ///              .container::<Vec<_>>()
511    ///              .inspect(|x| println!("hello {:?}", x));
512    ///     });
513    ///
514    ///     // introduce input, advance computation
515    ///     for round in 0..10 {
516    ///         input.send(round);
517    ///         input.advance_to(round + 1);
518    ///         worker.step();
519    ///     }
520    /// });
521    /// ```
522    #[inline]
523    pub fn send<D>(&mut self, data: D) where CB: PushInto<D> {
524        self.push_into(data)
525    }
526}
527
528impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Default for Handle<T, CB> {
529    fn default() -> Self {
530        Self::new_with_builder()
531    }
532}
533
534impl<T:Timestamp, CB: ContainerBuilder<Container: Clone>> Drop for Handle<T, CB> {
535    fn drop(&mut self) {
536        self.close_epoch();
537    }
538}