1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
//! Methods to construct flow-controlled sources.
use crate::Data;
use crate::order::{PartialOrder, TotalOrder};
use crate::progress::timestamp::Timestamp;
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::operators::probe::Handle;
use crate::dataflow::{Stream, Scope};
/// Output of the input reading function for iterator_source.
pub struct IteratorSourceInput<T: Clone, D: Data, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(T, DI)>> {
/// Lower bound on timestamps that can be emitted by this input in the future.
pub lower_bound: T,
/// Any `T: IntoIterator` of new input data in the form (time, data): time must be
/// monotonically increasing.
pub data: I,
/// A timestamp that represents the frontier that the probe should have
/// reached before the function is invoked again to ingest additional input.
pub target: T,
}
/// Construct a source that repeatedly calls the provided function to ingest input.
/// - The function can return `None` to signal the end of the input;
/// - otherwise, it should return a [`IteratorSourceInput`], where:
/// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future,
/// `Default::default()` can be used if this isn't needed (the source will assume that
/// the timestamps in `data` are monotonically increasing and will release capabilities
/// accordingly);
/// * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be
/// monotonically increasing;
/// * `target` is a timestamp that represents the frontier that the probe should have
/// reached before the function is invoked again to ingest additional input.
/// The function will receive the current lower bound of timestamps that can be inserted,
/// `lower_bound`.
///
/// # Example
/// ```rust
/// use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput};
/// use timely::dataflow::operators::{probe, Probe, Inspect};
///
/// fn main() {
/// timely::execute_from_args(std::env::args(), |worker| {
/// let mut input = (0u64..100000).peekable();
/// worker.dataflow(|scope| {
/// let mut probe_handle = probe::Handle::new();
/// let probe_handle_2 = probe_handle.clone();
///
/// let mut next_t: u64 = 0;
/// iterator_source(
/// scope,
/// "Source",
/// move |prev_t| {
/// if let Some(first_x) = input.peek().cloned() {
/// next_t = first_x / 100 * 100;
/// Some(IteratorSourceInput {
/// lower_bound: Default::default(),
/// data: vec![
/// (next_t,
/// input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
/// target: *prev_t,
/// })
/// } else {
/// None
/// }
/// },
/// probe_handle_2)
/// .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
/// .probe_with(&mut probe_handle);
/// });
/// }).unwrap();
/// }
/// ```
pub fn iterator_source<
G: Scope,
D: Data,
DI: IntoIterator<Item=D>,
I: IntoIterator<Item=(G::Timestamp, DI)>,
F: FnMut(&G::Timestamp)->Option<IteratorSourceInput<G::Timestamp, D, DI, I>>+'static>(
scope: &G,
name: &str,
mut input_f: F,
probe: Handle<G::Timestamp>,
) -> Stream<G, D> where G::Timestamp: TotalOrder {
let mut target = G::Timestamp::minimum();
source(scope, name, |cap, info| {
let mut cap = Some(cap);
let activator = scope.activator_for(info.address);
move |output| {
cap = cap.take().and_then(|mut cap| {
loop {
if !probe.less_than(&target) {
if let Some(IteratorSourceInput {
lower_bound,
data,
target: new_target,
}) = input_f(cap.time()) {
target = new_target;
let mut has_data = false;
for (t, ds) in data.into_iter() {
cap = if cap.time() != &t { cap.delayed(&t) } else { cap };
let mut session = output.session(&cap);
session.give_iterator(ds.into_iter());
has_data = true;
}
cap = if cap.time().less_than(&lower_bound) { cap.delayed(&lower_bound) } else { cap };
if !has_data {
break Some(cap);
}
} else {
break None;
}
} else {
break Some(cap);
}
}
});
if cap.is_some() {
activator.activate();
}
}
})
}