Function timely::dataflow::operators::flow_controlled::iterator_source
source · pub fn iterator_source<G: Scope, D: Data, DI: IntoIterator<Item = D>, I: IntoIterator<Item = (G::Timestamp, DI)>, F: FnMut(&G::Timestamp) -> Option<IteratorSourceInput<G::Timestamp, D, DI, I>> + 'static>(
scope: &G,
name: &str,
input_f: F,
probe: Handle<G::Timestamp>,
) -> Stream<G, D>where
G::Timestamp: TotalOrder,
Expand description
Construct a source that repeatedly calls the provided function to ingest input.
The function can return None
to signal the end of the input.
Otherwise, it should return a IteratorSourceInput
, where:
lower_bound
is a lower bound on timestamps that can be emitted by this input in the future,Default::default()
can be used if this isn’t needed (the source will assume that the timestamps indata
are monotonically increasing and will release capabilities accordingly);data
is anyT: IntoIterator
of new input data in the form (time, data): time must be monotonically increasing;target
is a timestamp that represents the frontier that the probe should have reached before the function is invoked again to ingest additional input. The function will receive the current lower bound of timestamps that can be inserted,lower_bound
.
§Example
use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput};
use timely::dataflow::operators::{probe, Probe, Inspect};
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
let mut input = (0u64..100000).peekable();
worker.dataflow(|scope| {
let mut probe_handle = probe::Handle::new();
let probe_handle_2 = probe_handle.clone();
let mut next_t: u64 = 0;
iterator_source(
scope,
"Source",
move |prev_t| {
if let Some(first_x) = input.peek().cloned() {
next_t = first_x / 100 * 100;
Some(IteratorSourceInput {
lower_bound: Default::default(),
data: vec![
(next_t,
input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
target: *prev_t,
})
} else {
None
}
},
probe_handle_2)
.inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
.probe_with(&mut probe_handle);
});
}).unwrap();
}