Function timely::dataflow::operators::generic::operator::source

source ·
pub fn source<G: Scope, C, B, L>(
    scope: &G,
    name: &str,
    constructor: B
) -> StreamCore<G, C>
where C: Container, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut OutputHandleCore<'_, G::Timestamp, C, Tee<G::Timestamp, C>>) + 'static,
Expand description

Creates a new data stream source for a scope.

The source is defined by a name, and a constructor which takes a default capability to a method that can be repeatedly called on a output handle. The method is then repeatedly invoked, and is expected to eventually send data and downgrade and release capabilities.

§Examples

use timely::scheduling::Scheduler;
use timely::dataflow::operators::Inspect;
use timely::dataflow::operators::generic::operator::source;
use timely::dataflow::Scope;

timely::example(|scope| {

    source(scope, "Source", |capability, info| {

        let activator = scope.activator_for(&info.address[..]);

        let mut cap = Some(capability);
        move |output| {

            let mut done = false;
            if let Some(cap) = cap.as_mut() {
                // get some data and send it.
                let time = cap.time().clone();
                output.session(&cap)
                      .give(*cap.time());

                // downgrade capability.
                cap.downgrade(&(time + 1));
                done = time > 20;
            }

            if done { cap = None; }
            else    { activator.activate(); }
        }
    })
    .container::<Vec<_>>()
    .inspect(|x| println!("number: {:?}", x));
});