timely_communication/allocator/zero_copy/
initialize.rs

1//! Network initialization.
2
3use std::sync::Arc;
4use timely_logging::Logger;
5use crate::allocator::PeerBuilder;
6use crate::allocator::zero_copy::bytes_slab::BytesRefill;
7use crate::logging::CommunicationEventBuilder;
8use crate::networking::create_sockets;
9use super::tcp::{send_loop, recv_loop};
10use super::allocator::{TcpBuilder, new_vector};
11use super::stream::Stream;
12
13/// Join handles for send and receive threads.
14///
15/// On drop, the guard joins with each of the threads to ensure that they complete
16/// cleanly and send all necessary data.
17pub struct CommsGuard {
18    send_guards: Vec<::std::thread::JoinHandle<()>>,
19    recv_guards: Vec<::std::thread::JoinHandle<()>>,
20}
21
22impl Drop for CommsGuard {
23    fn drop(&mut self) {
24        for handle in self.send_guards.drain(..) {
25            handle.join().expect("Send thread panic");
26        }
27        // println!("SEND THREADS JOINED");
28        for handle in self.recv_guards.drain(..) {
29            handle.join().expect("Recv thread panic");
30        }
31        // println!("RECV THREADS JOINED");
32    }
33}
34
35use crate::logging::CommunicationSetup;
36
37/// Initializes network connections
38pub fn initialize_networking<P: PeerBuilder>(
39    addresses: Vec<String>,
40    my_index: usize,
41    threads: usize,
42    noisy: bool,
43    refill: BytesRefill,
44    log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
45)
46-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
47{
48    let sockets = create_sockets(addresses, my_index, noisy)?;
49    initialize_networking_from_sockets::<_, P>(sockets, my_index, threads, refill, log_sender)
50}
51
52/// Initialize send and recv threads from sockets.
53///
54/// This method is available for users who have already connected sockets and simply wish to construct
55/// a vector of process-local allocators connected to instantiated send and recv threads.
56///
57/// It is important that the `sockets` argument contain sockets for each remote process, in order, and
58/// with position `my_index` set to `None`.
59pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
60    mut sockets: Vec<Option<S>>,
61    my_index: usize,
62    threads: usize,
63    refill: BytesRefill,
64    log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
65)
66-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
67{
68    // Sockets are expected to be blocking,
69    for socket in sockets.iter_mut().flatten() {
70        socket.set_nonblocking(false).expect("failed to set socket to blocking");
71    }
72
73    let processes = sockets.len();
74
75    let process_allocators = P::new_vector(threads, refill.clone());
76    let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, refill.clone());
77
78    let mut promises_iter = promises.into_iter();
79    let mut futures_iter = futures.into_iter();
80
81    let mut send_guards = Vec::with_capacity(sockets.len());
82    let mut recv_guards = Vec::with_capacity(sockets.len());
83    let refill = refill.clone();
84
85    // for each process, if a stream exists (i.e. not local) ...
86    for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
87        let remote_recv = promises_iter.next().unwrap();
88
89        {
90            let log_sender = Arc::clone(&log_sender);
91            let stream = stream.try_clone()?;
92            let join_guard =
93            ::std::thread::Builder::new()
94                .name(format!("timely:send-{}", index))
95                .spawn(move || {
96
97                    let logger = log_sender(CommunicationSetup {
98                        process: my_index,
99                        sender: true,
100                        remote: Some(index),
101                    });
102
103                    send_loop(stream, remote_recv, my_index, index, logger);
104                })?;
105
106            send_guards.push(join_guard);
107        }
108
109        let remote_send = futures_iter.next().unwrap();
110
111        {
112            // let remote_sends = remote_sends.clone();
113            let log_sender = Arc::clone(&log_sender);
114            let stream = stream.try_clone()?;
115            let refill = refill.clone();
116            let join_guard =
117            ::std::thread::Builder::new()
118                .name(format!("timely:recv-{}", index))
119                .spawn(move || {
120                    let logger = log_sender(CommunicationSetup {
121                        process: my_index,
122                        sender: false,
123                        remote: Some(index),
124                    });
125                    recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
126                })?;
127
128            recv_guards.push(join_guard);
129        }
130    }
131
132    Ok((builders, CommsGuard { send_guards, recv_guards }))
133}