Function timely::execute::execute

source ·
pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>, String>
where T: Send + 'static, F: Fn(&mut Worker<Allocator>) -> T + Send + Sync + 'static,
Expand description

Executes a timely dataflow from a configuration and per-communicator logic.

The execute method takes a Configuration and spins up some number of workers threads, each of which execute the supplied closure to construct and run a timely dataflow computation.

The closure may return a T: Send+'static. The execute method returns immediately after initializing the timely computation with a result containing a WorkerGuards<T> (or error information), which can be joined to recover the result T values from the local workers.

Note: if the caller drops the result of execute, the drop code will block awaiting the completion of the timely computation. If the result of the method is not captured it will be dropped, which gives the experience of execute blocking; to regain control after execute be sure to capture the results and drop them only when the calling thread has no other work to perform.

§Examples

use timely::dataflow::operators::{ToStream, Inspect};

// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), |worker| {
    worker.dataflow::<(),_,_>(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    })
}).unwrap();

The following example demonstrates how one can extract data from a multi-worker execution. In a multi-process setting, each process will only receive those records present at workers in the process.

use std::sync::{Arc, Mutex};
use timely::dataflow::operators::{ToStream, Inspect, Capture};
use timely::dataflow::operators::capture::Extract;

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

// execute a timely dataflow using three worker threads.
timely::execute(timely::Config::process(3), move |worker| {
    let send = send.lock().unwrap().clone();
    worker.dataflow::<(),_,_>(move |scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x))
               .capture_into(send);
    });
}).unwrap();

// the extracted data should have data (0..10) thrice at timestamp 0.
assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());