pub fn source<G: Scope, CB, B, L>(
scope: &G,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
Expand description
Creates a new data stream source for a scope.
The source is defined by a name, and a constructor which takes a default capability to a method that can be repeatedly called on a output handle. The method is then repeatedly invoked, and is expected to eventually send data and downgrade and release capabilities.
§Examples
use timely::scheduling::Scheduler;
use timely::dataflow::operators::Inspect;
use timely::dataflow::operators::generic::operator::source;
use timely::dataflow::Scope;
timely::example(|scope| {
source(scope, "Source", |capability, info| {
let activator = scope.activator_for(info.address);
let mut cap = Some(capability);
move |output| {
let mut done = false;
if let Some(cap) = cap.as_mut() {
// get some data and send it.
let time = cap.time().clone();
output.session(&cap)
.give(*cap.time());
// downgrade capability.
cap.downgrade(&(time + 1));
done = time > 20;
}
if done { cap = None; }
else { activator.activate(); }
}
})
.container::<Vec<_>>()
.inspect(|x| println!("number: {:?}", x));
});