Function timely_communication::initialize::initialize_from

source ·
pub fn initialize_from<A, T, F>(
    builders: Vec<A>,
    others: Box<dyn Any + Send>,
    func: F
) -> Result<WorkerGuards<T>, String>
where A: AllocateBuilder + 'static, T: Send + 'static, F: Fn(<A as AllocateBuilder>::Allocator) -> T + Send + Sync + 'static,
Expand description

Initializes computation and runs a distributed computation.

This version of initialize allows you to explicitly specify the allocators that you want to use, by providing an explicit list of allocator builders. Additionally, you provide others, a Box<Any> which will be held by the resulting worker guard and dropped when it is dropped, which allows you to join communication threads.

§Examples

use timely_communication::Allocate;

// configure for two threads, just one process.
let builders = timely_communication::allocator::process::Process::new_vector(2);

// initializes communication, spawns workers
let guards = timely_communication::initialize_from(builders, Box::new(()), |mut allocator| {
    println!("worker {} started", allocator.index());

    // allocates a pair of senders list and one receiver.
    let (mut senders, mut receiver) = allocator.allocate(0);

    // send typed data along each channel
    use timely_communication::Message;
    senders[0].send(Message::from_typed(format!("hello, {}", 0)));
    senders[1].send(Message::from_typed(format!("hello, {}", 1)));

    // no support for termination notification,
    // we have to count down ourselves.
    let mut expecting = 2;
    while expecting > 0 {
        allocator.receive();
        if let Some(message) = receiver.recv() {
            use std::ops::Deref;
            println!("worker {}: received: <{}>", allocator.index(), message.deref());
            expecting -= 1;
        }
        allocator.release();
    }

    // optionally, return something
    allocator.index()
});

// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
    for guard in guards.join() {
        println!("result: {:?}", guard);
    }
}
else { println!("error in computation"); }