timely/dataflow/operators/flow_controlled.rs
1//! Methods to construct flow-controlled sources.
2
3use crate::Data;
4use crate::order::{PartialOrder, TotalOrder};
5use crate::progress::timestamp::Timestamp;
6use crate::dataflow::operators::generic::operator::source;
7use crate::dataflow::operators::probe::Handle;
8use crate::dataflow::{Stream, Scope};
9
10/// Output of the input reading function for iterator_source.
11pub struct IteratorSourceInput<T: Clone, D: Data, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(T, DI)>> {
12 /// Lower bound on timestamps that can be emitted by this input in the future.
13 pub lower_bound: T,
14 /// Any `T: IntoIterator` of new input data in the form (time, data): time must be
15 /// monotonically increasing.
16 pub data: I,
17 /// A timestamp that represents the frontier that the probe should have
18 /// reached before the function is invoked again to ingest additional input.
19 pub target: T,
20}
21
22/// Construct a source that repeatedly calls the provided function to ingest input.
23///
24/// The function can return `None` to signal the end of the input.
25/// Otherwise, it should return a [`IteratorSourceInput`], where:
26/// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future,
27/// `Default::default()` can be used if this isn't needed (the source will assume that
28/// the timestamps in `data` are monotonically increasing and will release capabilities
29/// accordingly);
30/// * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be
31/// monotonically increasing;
32/// * `target` is a timestamp that represents the frontier that the probe should have
33/// reached before the function is invoked again to ingest additional input.
34/// The function will receive the current lower bound of timestamps that can be inserted,
35/// `lower_bound`.
36///
37/// # Example
38/// ```rust
39/// use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput};
40/// use timely::dataflow::operators::{probe, Probe, Inspect};
41///
42/// timely::execute_from_args(std::env::args(), |worker| {
43/// let mut input = (0u64..100000).peekable();
44/// worker.dataflow(|scope| {
45/// let mut probe_handle = probe::Handle::new();
46/// let probe_handle_2 = probe_handle.clone();
47///
48/// let mut next_t: u64 = 0;
49/// iterator_source(
50/// scope,
51/// "Source",
52/// move |prev_t| {
53/// if let Some(first_x) = input.peek().cloned() {
54/// next_t = first_x / 100 * 100;
55/// Some(IteratorSourceInput {
56/// lower_bound: Default::default(),
57/// data: vec![
58/// (next_t,
59/// input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
60/// target: *prev_t,
61/// })
62/// } else {
63/// None
64/// }
65/// },
66/// probe_handle_2)
67/// .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
68/// .probe_with(&mut probe_handle);
69/// });
70/// }).unwrap();
71/// ```
72pub fn iterator_source<
73 G: Scope,
74 D: Data,
75 DI: IntoIterator<Item=D>,
76 I: IntoIterator<Item=(G::Timestamp, DI)>,
77 F: FnMut(&G::Timestamp)->Option<IteratorSourceInput<G::Timestamp, D, DI, I>>+'static>(
78 scope: &G,
79 name: &str,
80 mut input_f: F,
81 probe: Handle<G::Timestamp>,
82 ) -> Stream<G, D> where G::Timestamp: TotalOrder {
83
84 let mut target = G::Timestamp::minimum();
85 source(scope, name, |cap, info| {
86 let mut cap = Some(cap);
87 let activator = scope.activator_for(info.address);
88 move |output| {
89 cap = cap.take().and_then(|mut cap| {
90 loop {
91 if !probe.less_than(&target) {
92 if let Some(IteratorSourceInput {
93 lower_bound,
94 data,
95 target: new_target,
96 }) = input_f(cap.time()) {
97 target = new_target;
98 let mut has_data = false;
99 for (t, ds) in data.into_iter() {
100 cap = if cap.time() != &t { cap.delayed(&t) } else { cap };
101 let mut session = output.session(&cap);
102 session.give_iterator(ds.into_iter());
103 has_data = true;
104 }
105
106 cap = if cap.time().less_than(&lower_bound) { cap.delayed(&lower_bound) } else { cap };
107 if !has_data {
108 break Some(cap);
109 }
110 } else {
111 break None;
112 }
113 } else {
114 break Some(cap);
115 }
116 }
117 });
118
119 if cap.is_some() {
120 activator.activate();
121 }
122 }
123 })
124}