pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>, String>
Expand description
Executes a timely dataflow from a configuration and per-communicator logic.
The execute
method takes a Configuration
and spins up some number of
workers threads, each of which execute the supplied closure to construct
and run a timely dataflow computation.
The closure may return a T: Send+'static
. The execute
method returns
immediately after initializing the timely computation with a result
containing a WorkerGuards<T>
(or error information), which can be joined
to recover the result T
values from the local workers.
Note: if the caller drops the result of execute
, the drop code will
block awaiting the completion of the timely computation. If the result
of the method is not captured it will be dropped, which gives the experience
of execute
blocking; to regain control after execute
be sure to
capture the results and drop them only when the calling thread has no
other work to perform.
§Examples
use timely::dataflow::operators::{ToStream, Inspect};
// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), |worker| {
worker.dataflow::<(),_,_>(|scope| {
(0..10).to_stream(scope)
.inspect(|x| println!("seen: {:?}", x));
})
}).unwrap();
The following example demonstrates how one can extract data from a multi-worker execution. In a multi-process setting, each process will only receive those records present at workers in the process.
use std::sync::{Arc, Mutex};
use timely::dataflow::operators::{ToStream, Inspect, Capture};
use timely::dataflow::operators::capture::Extract;
// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));
// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), move |worker| {
let send = send.lock().unwrap().clone();
worker.dataflow::<(),_,_>(move |scope| {
(0..10).to_stream(scope)
.inspect(|x| println!("seen: {:?}", x))
.capture_into(send);
});
}).unwrap();
// the extracted data should have data (0..10) thrice at timestamp 0.
assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());