timely/dataflow/operators/core/input.rs
1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::container::{CapacityContainerBuilder, PushInto};
7
8use crate::scheduling::{Schedule, Activator};
9
10use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
11use crate::progress::Source;
12use crate::progress::operate::Connectivity;
13use crate::{Accountable, Container, ContainerBuilder};
14use crate::communication::Push;
15use crate::dataflow::{Scope, ScopeParent, Stream};
16use crate::dataflow::channels::pushers::{Tee, Counter};
17use crate::dataflow::channels::Message;
18
19
20// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
21// TODO : more like a harness, with direct access to its inputs.
22
23// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
24// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
25// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
26
27/// Create a new `Stream` and `Handle` through which to supply input.
28pub trait Input : Scope {
29 /// Create a new [Stream] and [Handle] through which to supply input.
30 ///
31 /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
32 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
33 /// data into the timely dataflow computation.
34 ///
35 /// The `Handle` also provides a means to indicate
36 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
37 /// to issue progress notifications.
38 ///
39 /// # Examples
40 /// ```
41 /// use timely::*;
42 /// use timely::dataflow::operators::{Input, Inspect};
43 ///
44 /// // construct and execute a timely dataflow
45 /// timely::execute(Config::thread(), |worker| {
46 ///
47 /// // add an input and base computation off of it
48 /// let mut input = worker.dataflow(|scope| {
49 /// let (input, stream) = scope.new_input::<Vec<_>>();
50 /// stream.inspect(|x| println!("hello {:?}", x));
51 /// input
52 /// });
53 ///
54 /// // introduce input, advance computation
55 /// for round in 0..10 {
56 /// input.send(round);
57 /// input.advance_to(round + 1);
58 /// worker.step();
59 /// }
60 /// });
61 /// ```
62 fn new_input<C: Container+Clone>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, Stream<Self, C>);
63
64 /// Create a new [Stream] and [Handle] through which to supply input.
65 ///
66 /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
67 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
68 /// data into the timely dataflow computation.
69 ///
70 /// The `Handle` also provides a means to indicate
71 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
72 /// to issue progress notifications.
73 ///
74 /// # Examples
75 /// ```
76 /// use std::rc::Rc;
77 /// use timely::*;
78 /// use timely::dataflow::operators::{Input, InspectCore};
79 /// use timely::container::CapacityContainerBuilder;
80 ///
81 /// // construct and execute a timely dataflow
82 /// timely::execute(Config::thread(), |worker| {
83 ///
84 /// // add an input and base computation off of it
85 /// let mut input = worker.dataflow(|scope| {
86 /// let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
87 /// stream.inspect_container(|x| println!("hello {:?}", x));
88 /// input
89 /// });
90 ///
91 /// // introduce input, advance computation
92 /// for round in 0..10 {
93 /// input.send_batch(&mut Rc::new(vec![round]));
94 /// input.advance_to(round + 1);
95 /// worker.step();
96 /// }
97 /// });
98 /// ```
99 fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, CB>, Stream<Self, CB::Container>);
100
101 /// Create a new stream from a supplied interactive handle.
102 ///
103 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
104 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
105 /// if it as attached to more than one stream.
106 ///
107 /// # Examples
108 /// ```
109 /// use timely::*;
110 /// use timely::dataflow::InputHandle;
111 /// use timely::dataflow::operators::{Input, Inspect};
112 ///
113 /// // construct and execute a timely dataflow
114 /// timely::execute(Config::thread(), |worker| {
115 ///
116 /// // add an input and base computation off of it
117 /// let mut input = InputHandle::new();
118 /// worker.dataflow(|scope| {
119 /// scope.input_from(&mut input)
120 /// .container::<Vec<_>>()
121 /// .inspect(|x| println!("hello {:?}", x));
122 /// });
123 ///
124 /// // introduce input, advance computation
125 /// for round in 0..10 {
126 /// input.send(round);
127 /// input.advance_to(round + 1);
128 /// worker.step();
129 /// }
130 /// });
131 /// ```
132 fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, CB>) -> Stream<Self, CB::Container>;
133}
134
135use crate::order::TotalOrder;
136impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
137 fn new_input<C: Container+Clone>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, Stream<G, C>) {
138 let mut handle = Handle::new();
139 let stream = self.input_from(&mut handle);
140 (handle, stream)
141 }
142
143 fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, CB>, Stream<G, CB::Container>) {
144 let mut handle = Handle::new_with_builder();
145 let stream = self.input_from(&mut handle);
146 (handle, stream)
147 }
148
149 fn input_from<CB: ContainerBuilder<Container: Clone>>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, CB>) -> Stream<G, CB::Container> {
150 let (output, registrar) = Tee::<<G as ScopeParent>::Timestamp, CB::Container>::new();
151 let counter = Counter::new(output);
152 let produced = Rc::clone(counter.produced());
153
154 let index = self.allocate_operator_index();
155 let address = self.addr_for_child(index);
156
157 handle.activate.push(self.activator_for(Rc::clone(&address)));
158
159 let progress = Rc::new(RefCell::new(ChangeBatch::new()));
160
161 handle.register(counter, Rc::clone(&progress));
162
163 let copies = self.peers();
164
165 self.add_operator_with_index(Box::new(Operator {
166 name: "Input".to_owned(),
167 address,
168 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
169 progress,
170 messages: produced,
171 copies,
172 }), index);
173
174 Stream::new(Source::new(index, 0), registrar, self.clone())
175 }
176}
177
178#[derive(Debug)]
179struct Operator<T:Timestamp> {
180 name: String,
181 address: Rc<[usize]>,
182 shared_progress: Rc<RefCell<SharedProgress<T>>>,
183 progress: Rc<RefCell<ChangeBatch<T>>>, // times closed since last asked
184 messages: Rc<RefCell<ChangeBatch<T>>>, // messages sent since last asked
185 copies: usize,
186}
187
188impl<T:Timestamp> Schedule for Operator<T> {
189
190 fn name(&self) -> &str { &self.name }
191
192 fn path(&self) -> &[usize] { &self.address[..] }
193
194 fn schedule(&mut self) -> bool {
195 let shared_progress = &mut *self.shared_progress.borrow_mut();
196 self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
197 self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
198 false
199 }
200}
201
202use crate::progress::operate::FrontierInterest;
203impl<T:Timestamp> Operate<T> for Operator<T> {
204
205 fn inputs(&self) -> usize { 0 }
206 fn outputs(&self) -> usize { 1 }
207
208 fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
209 self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
210 (Vec::new(), Rc::clone(&self.shared_progress), self)
211 }
212
213 fn notify_me(&self) -> &[FrontierInterest] { &[] }
214}
215
216
217/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
218#[derive(Debug)]
219pub struct Handle<T: Timestamp, CB: ContainerBuilder<Container: Clone>> {
220 activate: Vec<Activator>,
221 progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
222 pushers: Vec<Counter<T, Tee<T, CB::Container>>>,
223 builder: CB,
224 buffer: CB::Container,
225 now_at: T,
226}
227
228impl<T: Timestamp, C: Container+Clone> Handle<T, CapacityContainerBuilder<C>> {
229 /// Allocates a new input handle, from which one can create timely streams.
230 ///
231 /// # Examples
232 /// ```
233 /// use timely::*;
234 /// use timely::dataflow::InputHandle;
235 /// use timely::dataflow::operators::{Input, Inspect};
236 ///
237 /// // construct and execute a timely dataflow
238 /// timely::execute(Config::thread(), |worker| {
239 ///
240 /// // add an input and base computation off of it
241 /// let mut input = InputHandle::new();
242 /// worker.dataflow(|scope| {
243 /// scope.input_from(&mut input)
244 /// .container::<Vec<_>>()
245 /// .inspect(|x| println!("hello {:?}", x));
246 /// });
247 ///
248 /// // introduce input, advance computation
249 /// for round in 0..10 {
250 /// input.send(round);
251 /// input.advance_to(round + 1);
252 /// worker.step();
253 /// }
254 /// });
255 /// ```
256 pub fn new() -> Self {
257 Self {
258 activate: Vec::new(),
259 progress: Vec::new(),
260 pushers: Vec::new(),
261 builder: CapacityContainerBuilder::default(),
262 buffer: Default::default(),
263 now_at: T::minimum(),
264 }
265 }
266}
267
268impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
269 /// Allocates a new input handle, from which one can create timely streams.
270 ///
271 /// # Examples
272 /// ```
273 /// use timely::*;
274 /// use timely::dataflow::InputHandle;
275 /// use timely::dataflow::operators::{Input, Inspect};
276 /// use timely_container::CapacityContainerBuilder;
277 ///
278 /// // construct and execute a timely dataflow
279 /// timely::execute(Config::thread(), |worker| {
280 ///
281 /// // add an input and base computation off of it
282 /// let mut input = InputHandle::<_, CapacityContainerBuilder<_>>::new_with_builder();
283 /// worker.dataflow(|scope| {
284 /// scope.input_from(&mut input)
285 /// .container::<Vec<_>>()
286 /// .inspect(|x| println!("hello {:?}", x));
287 /// });
288 ///
289 /// // introduce input, advance computation
290 /// for round in 0..10 {
291 /// input.send(round);
292 /// input.advance_to(round + 1);
293 /// worker.step();
294 /// }
295 /// });
296 /// ```
297 pub fn new_with_builder() -> Self {
298 Self {
299 activate: Vec::new(),
300 progress: Vec::new(),
301 pushers: Vec::new(),
302 builder: CB::default(),
303 buffer: Default::default(),
304 now_at: T::minimum(),
305 }
306 }
307
308 /// Creates an input stream from the handle in the supplied scope.
309 ///
310 /// # Examples
311 /// ```
312 /// use timely::*;
313 /// use timely::dataflow::InputHandle;
314 /// use timely::dataflow::operators::{Input, Inspect};
315 ///
316 /// // construct and execute a timely dataflow
317 /// timely::execute(Config::thread(), |worker| {
318 ///
319 /// // add an input and base computation off of it
320 /// let mut input = InputHandle::new();
321 /// worker.dataflow(|scope| {
322 /// input.to_stream(scope)
323 /// .container::<Vec<_>>()
324 /// .inspect(|x| println!("hello {:?}", x));
325 /// });
326 ///
327 /// // introduce input, advance computation
328 /// for round in 0..10 {
329 /// input.send(round);
330 /// input.advance_to(round + 1);
331 /// worker.step();
332 /// }
333 /// });
334 /// ```
335 pub fn to_stream<G>(&mut self, scope: &mut G) -> Stream<G, CB::Container>
336 where
337 T: TotalOrder,
338 G: Scope<Timestamp=T>,
339 {
340 scope.input_from(self)
341 }
342
343 fn register(
344 &mut self,
345 pusher: Counter<T, Tee<T, CB::Container>>,
346 progress: Rc<RefCell<ChangeBatch<T>>>,
347 ) {
348 // flush current contents, so new registrant does not see existing data.
349 self.flush();
350
351 // we need to produce an appropriate update to the capabilities for `progress`, in case a
352 // user has decided to drive the handle around a bit before registering it.
353 progress.borrow_mut().update(T::minimum(), -1);
354 progress.borrow_mut().update(self.now_at.clone(), 1);
355
356 self.progress.push(progress);
357 self.pushers.push(pusher);
358 }
359
360 /// Extract all ready contents from the builder and distribute to downstream operators.
361 #[inline]
362 fn extract_and_send(&mut self) {
363 while let Some(container) = self.builder.extract() {
364 Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
365 }
366 }
367
368 /// Flush all contents and distribute to downstream operators.
369 #[inline]
370 pub fn flush(&mut self) {
371 while let Some(container) = self.builder.finish() {
372 Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
373 }
374 }
375
376 /// Sends a container at each of the destinations. There can be more than one; clone if needed.
377 /// Does not take `self` because `flush` and `extract` borrow `self` mutably.
378 /// Leaves the container in an undefined state.
379 // TODO: Find a better name for this function.
380 #[inline]
381 fn send_container(
382 container: &mut CB::Container,
383 buffer: &mut CB::Container,
384 pushers: &mut [Counter<T, Tee<T, CB::Container>>],
385 now_at: &T
386 ) {
387 for index in 0 .. pushers.len() {
388 if index < pushers.len() - 1 {
389 buffer.clone_from(container);
390 Message::push_at(buffer, now_at.clone(), &mut pushers[index]);
391 }
392 else {
393 Message::push_at(container, now_at.clone(), &mut pushers[index]);
394 }
395 }
396 }
397
398 /// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
399 // TODO: Find a better name for this function.
400 fn close_epoch(&mut self) {
401 self.flush();
402 for pusher in self.pushers.iter_mut() {
403 pusher.done();
404 }
405 for progress in self.progress.iter() {
406 progress.borrow_mut().update(self.now_at.clone(), -1);
407 }
408 // Alert worker of each active input operator.
409 for activate in self.activate.iter() {
410 activate.activate();
411 }
412 }
413
414 /// Sends a batch of records into the corresponding timely dataflow [Stream], at the current epoch.
415 ///
416 /// This method flushes single elements previously sent with `send`, to keep the insertion order.
417 ///
418 /// # Examples
419 /// ```
420 /// use timely::*;
421 /// use timely::dataflow::InputHandle;
422 /// use timely::dataflow::operators::{Input, InspectCore};
423 ///
424 /// // construct and execute a timely dataflow
425 /// timely::execute(Config::thread(), |worker| {
426 ///
427 /// // add an input and base computation off of it
428 /// let mut input = InputHandle::new();
429 /// worker.dataflow(|scope| {
430 /// scope.input_from(&mut input)
431 /// .inspect_container(|x| println!("hello {:?}", x));
432 /// });
433 ///
434 /// // introduce input, advance computation
435 /// for round in 0..10 {
436 /// input.send_batch(&mut vec![format!("{}", round)]);
437 /// input.advance_to(round + 1);
438 /// worker.step();
439 /// }
440 /// });
441 /// ```
442 pub fn send_batch(&mut self, buffer: &mut CB::Container) {
443 if !buffer.is_empty() {
444 // flush buffered elements to ensure local fifo.
445 self.flush();
446 Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
447 }
448 }
449
450 /// Advances the current epoch to `next`.
451 ///
452 /// This method allows timely dataflow to issue progress notifications as it can now determine
453 /// that this input can no longer produce data at earlier timestamps.
454 pub fn advance_to(&mut self, next: T) {
455 // Assert that we do not rewind time.
456 assert!(self.now_at.less_equal(&next));
457 // Flush buffers if time has actually changed.
458 if !self.now_at.eq(&next) {
459 self.close_epoch();
460 self.now_at = next;
461 for progress in self.progress.iter() {
462 progress.borrow_mut().update(self.now_at.clone(), 1);
463 }
464 }
465 }
466
467 /// Closes the input.
468 ///
469 /// This method allows timely dataflow to issue all progress notifications blocked by this input
470 /// and to begin to shut down operators, as this input can no longer produce data.
471 pub fn close(self) { }
472
473 /// Reports the current timestamp.
474 pub fn time(&self) -> &T {
475 &self.now_at
476 }
477}
478
479impl<T, CB, D> PushInto<D> for Handle<T, CB>
480where
481 T: Timestamp,
482 CB: ContainerBuilder<Container: Clone> + PushInto<D>,
483{
484 #[inline]
485 fn push_into(&mut self, item: D) {
486 self.builder.push_into(item);
487 self.extract_and_send();
488 }
489}
490
491impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
492 /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
493 ///
494 /// # Examples
495 /// ```
496 /// use timely::*;
497 /// use timely::dataflow::InputHandle;
498 /// use timely::dataflow::operators::{Input, Inspect};
499 ///
500 /// // construct and execute a timely dataflow
501 /// timely::execute(Config::thread(), |worker| {
502 ///
503 /// // add an input and base computation off of it
504 /// let mut input = InputHandle::new();
505 /// worker.dataflow(|scope| {
506 /// scope.input_from(&mut input)
507 /// .container::<Vec<_>>()
508 /// .inspect(|x| println!("hello {:?}", x));
509 /// });
510 ///
511 /// // introduce input, advance computation
512 /// for round in 0..10 {
513 /// input.send(round);
514 /// input.advance_to(round + 1);
515 /// worker.step();
516 /// }
517 /// });
518 /// ```
519 #[inline]
520 pub fn send<D>(&mut self, data: D) where CB: PushInto<D> {
521 self.push_into(data)
522 }
523}
524
525impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Default for Handle<T, CB> {
526 fn default() -> Self {
527 Self::new_with_builder()
528 }
529}
530
531impl<T:Timestamp, CB: ContainerBuilder<Container: Clone>> Drop for Handle<T, CB> {
532 fn drop(&mut self) {
533 self.close_epoch();
534 }
535}