timely/dataflow/operators/core/input.rs
1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::container::{CapacityContainerBuilder, PushInto};
7use crate::scheduling::{Schedule, Activator};
8
9use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
10use crate::progress::Source;
11use crate::progress::operate::Connectivity;
12use crate::{Accountable, Container, ContainerBuilder};
13use crate::communication::Push;
14use crate::dataflow::{Scope, Stream};
15use crate::dataflow::channels::pushers::{Tee, Counter};
16use crate::dataflow::channels::Message;
17
18
19// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
20// TODO : more like a harness, with direct access to its inputs.
21
22// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
23// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
24// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
25
26/// Create a new `Stream` and `Handle` through which to supply input.
27pub trait Input<'scope> {
28 /// The timestamp at which this input scope operates.
29 type Timestamp: Timestamp;
30
31 /// Create a new [Stream] and [Handle] through which to supply input.
32 ///
33 /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
34 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
35 /// data into the timely dataflow computation.
36 ///
37 /// The `Handle` also provides a means to indicate
38 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
39 /// to issue progress notifications.
40 ///
41 /// # Examples
42 /// ```
43 /// use timely::*;
44 /// use timely::dataflow::operators::{Input, Inspect};
45 ///
46 /// // construct and execute a timely dataflow
47 /// timely::execute(Config::thread(), |worker| {
48 ///
49 /// // add an input and base computation off of it
50 /// let mut input = worker.dataflow(|scope| {
51 /// let (input, stream) = scope.new_input::<Vec<_>>();
52 /// stream.inspect(|x| println!("hello {:?}", x));
53 /// input
54 /// });
55 ///
56 /// // introduce input, advance computation
57 /// for round in 0..10 {
58 /// input.send(round);
59 /// input.advance_to(round + 1);
60 /// worker.step();
61 /// }
62 /// });
63 /// ```
64 fn new_input<C: Container+Clone>(&self) -> (Handle<Self::Timestamp, CapacityContainerBuilder<C>>, Stream<'scope, Self::Timestamp, C>);
65
66 /// Create a new [Stream] and [Handle] through which to supply input.
67 ///
68 /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used
69 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
70 /// data into the timely dataflow computation.
71 ///
72 /// The `Handle` also provides a means to indicate
73 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
74 /// to issue progress notifications.
75 ///
76 /// # Examples
77 /// ```
78 /// use std::rc::Rc;
79 /// use timely::*;
80 /// use timely::dataflow::operators::{Input, InspectCore};
81 /// use timely::container::CapacityContainerBuilder;
82 ///
83 /// // construct and execute a timely dataflow
84 /// timely::execute(Config::thread(), |worker| {
85 ///
86 /// // add an input and base computation off of it
87 /// let mut input = worker.dataflow(|scope| {
88 /// let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
89 /// stream.inspect_container(|x| println!("hello {:?}", x));
90 /// input
91 /// });
92 ///
93 /// // introduce input, advance computation
94 /// for round in 0..10 {
95 /// input.send_batch(&mut Rc::new(vec![round]));
96 /// input.advance_to(round + 1);
97 /// worker.step();
98 /// }
99 /// });
100 /// ```
101 fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&self) -> (Handle<Self::Timestamp, CB>, Stream<'scope, Self::Timestamp, CB::Container>);
102
103 /// Create a new stream from a supplied interactive handle.
104 ///
105 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
106 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
107 /// if it as attached to more than one stream.
108 ///
109 /// # Examples
110 /// ```
111 /// use timely::*;
112 /// use timely::dataflow::InputHandle;
113 /// use timely::dataflow::operators::{Input, Inspect};
114 ///
115 /// // construct and execute a timely dataflow
116 /// timely::execute(Config::thread(), |worker| {
117 ///
118 /// // add an input and base computation off of it
119 /// let mut input = InputHandle::new();
120 /// worker.dataflow(|scope| {
121 /// scope.input_from(&mut input)
122 /// .container::<Vec<_>>()
123 /// .inspect(|x| println!("hello {:?}", x));
124 /// });
125 ///
126 /// // introduce input, advance computation
127 /// for round in 0..10 {
128 /// input.send(round);
129 /// input.advance_to(round + 1);
130 /// worker.step();
131 /// }
132 /// });
133 /// ```
134 fn input_from<CB: ContainerBuilder<Container: Clone>>(&self, handle: &mut Handle<Self::Timestamp, CB>) -> Stream<'scope, Self::Timestamp, CB::Container>;
135}
136
137use crate::order::TotalOrder;
138impl<'scope, T: Timestamp + TotalOrder> Input<'scope> for Scope<'scope, T> {
139 type Timestamp = T;
140 fn new_input<C: Container+Clone>(&self) -> (Handle<T, CapacityContainerBuilder<C>>, Stream<'scope, T, C>) {
141 let mut handle = Handle::new();
142 let stream = self.input_from(&mut handle);
143 (handle, stream)
144 }
145
146 fn new_input_with_builder<CB: ContainerBuilder<Container: Clone>>(&self) -> (Handle<T, CB>, Stream<'scope, T, CB::Container>) {
147 let mut handle = Handle::new_with_builder();
148 let stream = self.input_from(&mut handle);
149 (handle, stream)
150 }
151
152 fn input_from<CB: ContainerBuilder<Container: Clone>>(&self, handle: &mut Handle<T, CB>) -> Stream<'scope, T, CB::Container> {
153 let (output, registrar) = Tee::<T, CB::Container>::new();
154 let counter = Counter::new(output);
155 let produced = Rc::clone(counter.produced());
156
157 let slot = self.reserve_operator();
158 let index = slot.index();
159 let address = slot.addr();
160
161 handle.activate.push(self.activator_for(Rc::clone(&address)));
162
163 let progress = Rc::new(RefCell::new(ChangeBatch::new()));
164
165 handle.register(counter, Rc::clone(&progress));
166
167 let copies = self.peers();
168
169 slot.install(Box::new(Operator {
170 name: "Input".to_owned(),
171 address,
172 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
173 progress,
174 messages: produced,
175 copies,
176 }));
177
178 Stream::new(Source::new(index, 0), registrar, *self)
179 }
180}
181
182#[derive(Debug)]
183struct Operator<T:Timestamp> {
184 name: String,
185 address: Rc<[usize]>,
186 shared_progress: Rc<RefCell<SharedProgress<T>>>,
187 progress: Rc<RefCell<ChangeBatch<T>>>, // times closed since last asked
188 messages: Rc<RefCell<ChangeBatch<T>>>, // messages sent since last asked
189 copies: usize,
190}
191
192impl<T:Timestamp> Schedule for Operator<T> {
193
194 fn name(&self) -> &str { &self.name }
195
196 fn path(&self) -> &[usize] { &self.address[..] }
197
198 fn schedule(&mut self) -> bool {
199 let shared_progress = &mut *self.shared_progress.borrow_mut();
200 self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
201 self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
202 false
203 }
204}
205
206use crate::progress::operate::FrontierInterest;
207impl<T:Timestamp> Operate<T> for Operator<T> {
208
209 fn inputs(&self) -> usize { 0 }
210 fn outputs(&self) -> usize { 1 }
211
212 fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
213 self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
214 (Vec::new(), Rc::clone(&self.shared_progress), self)
215 }
216
217 fn notify_me(&self) -> &[FrontierInterest] { &[] }
218}
219
220
221/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
222#[derive(Debug)]
223pub struct Handle<T: Timestamp, CB: ContainerBuilder<Container: Clone>> {
224 activate: Vec<Activator>,
225 progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
226 pushers: Vec<Counter<T, Tee<T, CB::Container>>>,
227 builder: CB,
228 buffer: CB::Container,
229 now_at: T,
230}
231
232impl<T: Timestamp, C: Container+Clone> Handle<T, CapacityContainerBuilder<C>> {
233 /// Allocates a new input handle, from which one can create timely streams.
234 ///
235 /// # Examples
236 /// ```
237 /// use timely::*;
238 /// use timely::dataflow::InputHandle;
239 /// use timely::dataflow::operators::{Input, Inspect};
240 ///
241 /// // construct and execute a timely dataflow
242 /// timely::execute(Config::thread(), |worker| {
243 ///
244 /// // add an input and base computation off of it
245 /// let mut input = InputHandle::new();
246 /// worker.dataflow(|scope| {
247 /// scope.input_from(&mut input)
248 /// .container::<Vec<_>>()
249 /// .inspect(|x| println!("hello {:?}", x));
250 /// });
251 ///
252 /// // introduce input, advance computation
253 /// for round in 0..10 {
254 /// input.send(round);
255 /// input.advance_to(round + 1);
256 /// worker.step();
257 /// }
258 /// });
259 /// ```
260 pub fn new() -> Self {
261 Self {
262 activate: Vec::new(),
263 progress: Vec::new(),
264 pushers: Vec::new(),
265 builder: CapacityContainerBuilder::default(),
266 buffer: Default::default(),
267 now_at: T::minimum(),
268 }
269 }
270}
271
272impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
273 /// Allocates a new input handle, from which one can create timely streams.
274 ///
275 /// # Examples
276 /// ```
277 /// use timely::*;
278 /// use timely::dataflow::InputHandle;
279 /// use timely::dataflow::operators::{Input, Inspect};
280 /// use timely_container::CapacityContainerBuilder;
281 ///
282 /// // construct and execute a timely dataflow
283 /// timely::execute(Config::thread(), |worker| {
284 ///
285 /// // add an input and base computation off of it
286 /// let mut input = InputHandle::<_, CapacityContainerBuilder<_>>::new_with_builder();
287 /// worker.dataflow(|scope| {
288 /// scope.input_from(&mut input)
289 /// .container::<Vec<_>>()
290 /// .inspect(|x| println!("hello {:?}", x));
291 /// });
292 ///
293 /// // introduce input, advance computation
294 /// for round in 0..10 {
295 /// input.send(round);
296 /// input.advance_to(round + 1);
297 /// worker.step();
298 /// }
299 /// });
300 /// ```
301 pub fn new_with_builder() -> Self {
302 Self {
303 activate: Vec::new(),
304 progress: Vec::new(),
305 pushers: Vec::new(),
306 builder: CB::default(),
307 buffer: Default::default(),
308 now_at: T::minimum(),
309 }
310 }
311
312 /// Creates an input stream from the handle in the supplied scope.
313 ///
314 /// # Examples
315 /// ```
316 /// use timely::*;
317 /// use timely::dataflow::InputHandle;
318 /// use timely::dataflow::operators::{Input, Inspect};
319 ///
320 /// // construct and execute a timely dataflow
321 /// timely::execute(Config::thread(), |worker| {
322 ///
323 /// // add an input and base computation off of it
324 /// let mut input = InputHandle::new();
325 /// worker.dataflow(|scope| {
326 /// input.to_stream(scope)
327 /// .container::<Vec<_>>()
328 /// .inspect(|x| println!("hello {:?}", x));
329 /// });
330 ///
331 /// // introduce input, advance computation
332 /// for round in 0..10 {
333 /// input.send(round);
334 /// input.advance_to(round + 1);
335 /// worker.step();
336 /// }
337 /// });
338 /// ```
339 pub fn to_stream<'scope>(&mut self, scope: Scope<'scope, T>) -> Stream<'scope, T, CB::Container>
340 where
341 T: TotalOrder,
342 {
343 scope.input_from(self)
344 }
345
346 fn register(
347 &mut self,
348 pusher: Counter<T, Tee<T, CB::Container>>,
349 progress: Rc<RefCell<ChangeBatch<T>>>,
350 ) {
351 // flush current contents, so new registrant does not see existing data.
352 self.flush();
353
354 // we need to produce an appropriate update to the capabilities for `progress`, in case a
355 // user has decided to drive the handle around a bit before registering it.
356 progress.borrow_mut().update(T::minimum(), -1);
357 progress.borrow_mut().update(self.now_at.clone(), 1);
358
359 self.progress.push(progress);
360 self.pushers.push(pusher);
361 }
362
363 /// Extract all ready contents from the builder and distribute to downstream operators.
364 #[inline]
365 fn extract_and_send(&mut self) {
366 while let Some(container) = self.builder.extract() {
367 Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
368 }
369 }
370
371 /// Flush all contents and distribute to downstream operators.
372 #[inline]
373 pub fn flush(&mut self) {
374 while let Some(container) = self.builder.finish() {
375 Self::send_container(container, &mut self.buffer, &mut self.pushers, &self.now_at);
376 }
377 }
378
379 /// Sends a container at each of the destinations. There can be more than one; clone if needed.
380 /// Does not take `self` because `flush` and `extract` borrow `self` mutably.
381 /// Leaves the container in an undefined state.
382 // TODO: Find a better name for this function.
383 #[inline]
384 fn send_container(
385 container: &mut CB::Container,
386 buffer: &mut CB::Container,
387 pushers: &mut [Counter<T, Tee<T, CB::Container>>],
388 now_at: &T
389 ) {
390 for index in 0 .. pushers.len() {
391 if index < pushers.len() - 1 {
392 buffer.clone_from(container);
393 Message::push_at(buffer, now_at.clone(), &mut pushers[index]);
394 }
395 else {
396 Message::push_at(container, now_at.clone(), &mut pushers[index]);
397 }
398 }
399 }
400
401 /// Closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
402 // TODO: Find a better name for this function.
403 fn close_epoch(&mut self) {
404 self.flush();
405 for pusher in self.pushers.iter_mut() {
406 pusher.done();
407 }
408 for progress in self.progress.iter() {
409 progress.borrow_mut().update(self.now_at.clone(), -1);
410 }
411 // Alert worker of each active input operator.
412 for activate in self.activate.iter() {
413 activate.activate();
414 }
415 }
416
417 /// Sends a batch of records into the corresponding timely dataflow [Stream], at the current epoch.
418 ///
419 /// This method flushes single elements previously sent with `send`, to keep the insertion order.
420 ///
421 /// # Examples
422 /// ```
423 /// use timely::*;
424 /// use timely::dataflow::InputHandle;
425 /// use timely::dataflow::operators::{Input, InspectCore};
426 ///
427 /// // construct and execute a timely dataflow
428 /// timely::execute(Config::thread(), |worker| {
429 ///
430 /// // add an input and base computation off of it
431 /// let mut input = InputHandle::new();
432 /// worker.dataflow(|scope| {
433 /// scope.input_from(&mut input)
434 /// .inspect_container(|x| println!("hello {:?}", x));
435 /// });
436 ///
437 /// // introduce input, advance computation
438 /// for round in 0..10 {
439 /// input.send_batch(&mut vec![format!("{}", round)]);
440 /// input.advance_to(round + 1);
441 /// worker.step();
442 /// }
443 /// });
444 /// ```
445 pub fn send_batch(&mut self, buffer: &mut CB::Container) {
446 if !buffer.is_empty() {
447 // flush buffered elements to ensure local fifo.
448 self.flush();
449 Self::send_container(buffer, &mut self.buffer, &mut self.pushers, &self.now_at);
450 }
451 }
452
453 /// Advances the current epoch to `next`.
454 ///
455 /// This method allows timely dataflow to issue progress notifications as it can now determine
456 /// that this input can no longer produce data at earlier timestamps.
457 pub fn advance_to(&mut self, next: T) {
458 // Assert that we do not rewind time.
459 assert!(self.now_at.less_equal(&next));
460 // Flush buffers if time has actually changed.
461 if !self.now_at.eq(&next) {
462 self.close_epoch();
463 self.now_at = next;
464 for progress in self.progress.iter() {
465 progress.borrow_mut().update(self.now_at.clone(), 1);
466 }
467 }
468 }
469
470 /// Closes the input.
471 ///
472 /// This method allows timely dataflow to issue all progress notifications blocked by this input
473 /// and to begin to shut down operators, as this input can no longer produce data.
474 pub fn close(self) { }
475
476 /// Reports the current timestamp.
477 pub fn time(&self) -> &T {
478 &self.now_at
479 }
480}
481
482impl<T, CB, D> PushInto<D> for Handle<T, CB>
483where
484 T: Timestamp,
485 CB: ContainerBuilder<Container: Clone> + PushInto<D>,
486{
487 #[inline]
488 fn push_into(&mut self, item: D) {
489 self.builder.push_into(item);
490 self.extract_and_send();
491 }
492}
493
494impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
495 /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
496 ///
497 /// # Examples
498 /// ```
499 /// use timely::*;
500 /// use timely::dataflow::InputHandle;
501 /// use timely::dataflow::operators::{Input, Inspect};
502 ///
503 /// // construct and execute a timely dataflow
504 /// timely::execute(Config::thread(), |worker| {
505 ///
506 /// // add an input and base computation off of it
507 /// let mut input = InputHandle::new();
508 /// worker.dataflow(|scope| {
509 /// scope.input_from(&mut input)
510 /// .container::<Vec<_>>()
511 /// .inspect(|x| println!("hello {:?}", x));
512 /// });
513 ///
514 /// // introduce input, advance computation
515 /// for round in 0..10 {
516 /// input.send(round);
517 /// input.advance_to(round + 1);
518 /// worker.step();
519 /// }
520 /// });
521 /// ```
522 #[inline]
523 pub fn send<D>(&mut self, data: D) where CB: PushInto<D> {
524 self.push_into(data)
525 }
526}
527
528impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Default for Handle<T, CB> {
529 fn default() -> Self {
530 Self::new_with_builder()
531 }
532}
533
534impl<T:Timestamp, CB: ContainerBuilder<Container: Clone>> Drop for Handle<T, CB> {
535 fn drop(&mut self) {
536 self.close_epoch();
537 }
538}