Function timely::dataflow::operators::flow_controlled::iterator_source

source ·
pub fn iterator_source<G: Scope, D: Data, DI: IntoIterator<Item = D>, I: IntoIterator<Item = (G::Timestamp, DI)>, F: FnMut(&G::Timestamp) -> Option<IteratorSourceInput<G::Timestamp, D, DI, I>> + 'static>(
    scope: &G,
    name: &str,
    input_f: F,
    probe: Handle<G::Timestamp>,
) -> Stream<G, D>
Expand description

Construct a source that repeatedly calls the provided function to ingest input.

  • The function can return None to signal the end of the input;
  • otherwise, it should return a IteratorSourceInput, where:
    • lower_bound is a lower bound on timestamps that can be emitted by this input in the future, Default::default() can be used if this isn’t needed (the source will assume that the timestamps in data are monotonically increasing and will release capabilities accordingly);
    • data is any T: IntoIterator of new input data in the form (time, data): time must be monotonically increasing;
    • target is a timestamp that represents the frontier that the probe should have reached before the function is invoked again to ingest additional input. The function will receive the current lower bound of timestamps that can be inserted, lower_bound.

§Example

use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput};
use timely::dataflow::operators::{probe, Probe, Inspect};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        let mut input = (0u64..100000).peekable();
        worker.dataflow(|scope| {
            let mut probe_handle = probe::Handle::new();
            let probe_handle_2 = probe_handle.clone();

            let mut next_t: u64 = 0;
            iterator_source(
                scope,
                "Source",
                move |prev_t| {
                    if let Some(first_x) = input.peek().cloned() {
                        next_t = first_x / 100 * 100;
                        Some(IteratorSourceInput {
                            lower_bound: Default::default(),
                            data: vec![
                                (next_t,
                                 input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
                            target: *prev_t,
                        })
                    } else {
                        None
                    }
                },
                probe_handle_2)
            .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
            .probe_with(&mut probe_handle);
        });
    }).unwrap();
}