use std::sync::Arc;
use crate::allocator::process::ProcessBuilder;
use crate::networking::create_sockets;
use super::tcp::{send_loop, recv_loop};
use super::allocator::{TcpBuilder, new_vector};
use super::stream::Stream;
pub struct CommsGuard {
send_guards: Vec<::std::thread::JoinHandle<()>>,
recv_guards: Vec<::std::thread::JoinHandle<()>>,
}
impl Drop for CommsGuard {
fn drop(&mut self) {
for handle in self.send_guards.drain(..) {
handle.join().expect("Send thread panic");
}
for handle in self.recv_guards.drain(..) {
handle.join().expect("Recv thread panic");
}
}
}
use crate::logging::{CommunicationSetup, CommunicationEvent};
use timely_logging::Logger;
pub fn initialize_networking(
addresses: Vec<String>,
my_index: usize,
threads: usize,
noisy: bool,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
let sockets = create_sockets(addresses, my_index, noisy)?;
initialize_networking_from_sockets(sockets, my_index, threads, log_sender)
}
pub fn initialize_networking_from_sockets<S: Stream + 'static>(
mut sockets: Vec<Option<S>>,
my_index: usize,
threads: usize,
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEvent>>+Send+Sync>,
)
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
{
for socket in sockets.iter_mut().flatten() {
socket.set_nonblocking(false).expect("failed to set socket to blocking");
}
let processes = sockets.len();
let process_allocators = crate::allocator::process::Process::new_vector(threads);
let (builders, promises, futures) = new_vector(process_allocators, my_index, processes);
let mut promises_iter = promises.into_iter();
let mut futures_iter = futures.into_iter();
let mut send_guards = Vec::with_capacity(sockets.len());
let mut recv_guards = Vec::with_capacity(sockets.len());
for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
let remote_recv = promises_iter.next().unwrap();
{
let log_sender = log_sender.clone();
let stream = stream.try_clone()?;
let join_guard =
::std::thread::Builder::new()
.name(format!("timely:send-{}", index))
.spawn(move || {
let logger = log_sender(CommunicationSetup {
process: my_index,
sender: true,
remote: Some(index),
});
send_loop(stream, remote_recv, my_index, index, logger);
})?;
send_guards.push(join_guard);
}
let remote_send = futures_iter.next().unwrap();
{
let log_sender = log_sender.clone();
let stream = stream.try_clone()?;
let join_guard =
::std::thread::Builder::new()
.name(format!("timely:recv-{}", index))
.spawn(move || {
let logger = log_sender(CommunicationSetup {
process: my_index,
sender: false,
remote: Some(index),
});
recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
})?;
recv_guards.push(join_guard);
}
}
Ok((builders, CommsGuard { send_guards, recv_guards }))
}