pub fn iterator_source<G: Scope, D: Data, DI: IntoIterator<Item = D>, I: IntoIterator<Item = (G::Timestamp, DI)>, F: FnMut(&G::Timestamp) -> Option<IteratorSourceInput<G::Timestamp, D, DI, I>> + 'static>(
    scope: &G,
    name: &str,
    input_f: F,
    probe: Handle<G::Timestamp>,
) -> Stream<G, D>where
    G::Timestamp: TotalOrder,Expand description
Construct a source that repeatedly calls the provided function to ingest input.
The function can return None to signal the end of the input.
Otherwise, it should return a IteratorSourceInput, where:
- lower_boundis a lower bound on timestamps that can be emitted by this input in the future,- Default::default()can be used if this isn’t needed (the source will assume that the timestamps in- dataare monotonically increasing and will release capabilities accordingly);
- datais any- T: IntoIteratorof new input data in the form (time, data): time must be monotonically increasing;
- targetis a timestamp that represents the frontier that the probe should have reached before the function is invoked again to ingest additional input. The function will receive the current lower bound of timestamps that can be inserted,- lower_bound.
§Example
use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput};
use timely::dataflow::operators::{probe, Probe, Inspect};
timely::execute_from_args(std::env::args(), |worker| {
    let mut input = (0u64..100000).peekable();
    worker.dataflow(|scope| {
        let mut probe_handle = probe::Handle::new();
        let probe_handle_2 = probe_handle.clone();
        let mut next_t: u64 = 0;
        iterator_source(
            scope,
            "Source",
            move |prev_t| {
                if let Some(first_x) = input.peek().cloned() {
                    next_t = first_x / 100 * 100;
                    Some(IteratorSourceInput {
                        lower_bound: Default::default(),
                        data: vec![
                            (next_t,
                             input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
                        target: *prev_t,
                    })
                } else {
                    None
                }
            },
            probe_handle_2)
        .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
        .probe_with(&mut probe_handle);
    });
}).unwrap();