Skip to main content

timely/dataflow/operators/vec/
flow_controlled.rs

1//! Methods to construct flow-controlled sources.
2
3use crate::order::{PartialOrder, TotalOrder};
4use crate::progress::timestamp::Timestamp;
5use crate::dataflow::operators::generic::operator::source;
6use crate::dataflow::operators::probe::Handle;
7use crate::dataflow::{StreamVec, Scope};
8
9/// Output of the input reading function for iterator_source.
10pub struct IteratorSourceInput<T: Clone, D: 'static, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(T, DI)>> {
11    /// Lower bound on timestamps that can be emitted by this input in the future.
12    pub lower_bound: T,
13    /// Any `T: IntoIterator` of new input data in the form (time, data): time must be
14    /// monotonically increasing.
15    pub data: I,
16    /// A timestamp that represents the frontier that the probe should have
17    /// reached before the function is invoked again to ingest additional input.
18    pub target: T,
19}
20
21/// Construct a source that repeatedly calls the provided function to ingest input.
22///
23/// The function can return `None` to signal the end of the input.
24/// Otherwise, it should return a [`IteratorSourceInput`], where:
25/// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future,
26///   `Default::default()` can be used if this isn't needed (the source will assume that
27///   the timestamps in `data` are monotonically increasing and will release capabilities
28///   accordingly);
29/// * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be
30///   monotonically increasing;
31/// * `target` is a timestamp that represents the frontier that the probe should have
32///   reached before the function is invoked again to ingest additional input.
33///   The function will receive the current lower bound of timestamps that can be inserted,
34///   `lower_bound`.
35///
36/// # Example
37/// ```rust
38/// use timely::dataflow::operators::vec::flow_controlled::{iterator_source, IteratorSourceInput};
39/// use timely::dataflow::operators::{probe, Probe, Inspect};
40///
41/// timely::execute_from_args(std::env::args(), |worker| {
42///     let mut input = (0u64..100000).peekable();
43///     worker.dataflow(|scope| {
44///         let mut probe_handle = probe::Handle::new();
45///         let probe_handle_2 = probe_handle.clone();
46///
47///         let mut next_t: u64 = 0;
48///         iterator_source(
49///             scope,
50///             "Source",
51///             move |prev_t| {
52///                 if let Some(first_x) = input.peek().cloned() {
53///                     next_t = first_x / 100 * 100;
54///                     Some(IteratorSourceInput {
55///                         lower_bound: Default::default(),
56///                         data: vec![
57///                             (next_t,
58///                              input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
59///                         target: *prev_t,
60///                     })
61///                 } else {
62///                     None
63///                 }
64///             },
65///             probe_handle_2)
66///         .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
67///         .probe_with(&mut probe_handle);
68///     });
69/// }).unwrap();
70/// ```
71pub fn iterator_source<
72    G: Scope,
73    D: 'static,
74    DI: IntoIterator<Item=D>,
75    I: IntoIterator<Item=(G::Timestamp, DI)>,
76    F: FnMut(&G::Timestamp)->Option<IteratorSourceInput<G::Timestamp, D, DI, I>>+'static>(
77        scope: &G,
78        name: &str,
79        mut input_f: F,
80        probe: Handle<G::Timestamp>,
81        ) -> StreamVec<G, D> where G::Timestamp: TotalOrder {
82
83    let mut target = G::Timestamp::minimum();
84    source(scope, name, |cap, info| {
85        let mut cap = Some(cap);
86        let activator = scope.activator_for(info.address);
87        move |output| {
88            cap = cap.take().and_then(|mut cap| {
89                loop {
90                    if !probe.less_than(&target) {
91                        if let Some(IteratorSourceInput {
92                             lower_bound,
93                             data,
94                             target: new_target,
95                         }) = input_f(cap.time()) {
96                            target = new_target;
97                            let mut has_data = false;
98                            for (t, ds) in data.into_iter() {
99                                cap = if cap.time() != &t { cap.delayed(&t) } else { cap };
100                                let mut session = output.session(&cap);
101                                session.give_iterator(ds.into_iter());
102                                has_data = true;
103                            }
104
105                            cap = if cap.time().less_than(&lower_bound) { cap.delayed(&lower_bound) } else { cap };
106                            if !has_data {
107                                break Some(cap);
108                            }
109                        } else {
110                            break None;
111                        }
112                    } else {
113                        break Some(cap);
114                    }
115                }
116            });
117
118            if cap.is_some() {
119                activator.activate();
120            }
121        }
122    })
123}