Skip to main content

timely_communication/allocator/zero_copy/
initialize.rs

1//! Network initialization.
2
3use std::sync::Arc;
4use crate::networking::create_sockets;
5use super::tcp::{send_loop, recv_loop};
6use crate::allocator::ProcessBuilder;
7use super::allocator::{TcpBuilder, new_vector};
8use super::stream::Stream;
9
10/// Join handles for send and receive threads.
11///
12/// On drop, the guard joins with each of the threads to ensure that they complete
13/// cleanly and send all necessary data.
14pub struct CommsGuard {
15    send_guards: Vec<::std::thread::JoinHandle<()>>,
16    recv_guards: Vec<::std::thread::JoinHandle<()>>,
17}
18
19impl Drop for CommsGuard {
20    fn drop(&mut self) {
21        for handle in self.send_guards.drain(..) {
22            handle.join().expect("Send thread panic");
23        }
24        // println!("SEND THREADS JOINED");
25        for handle in self.recv_guards.drain(..) {
26            handle.join().expect("Recv thread panic");
27        }
28        // println!("RECV THREADS JOINED");
29    }
30}
31
32use crate::logging::CommunicationSetup;
33
34/// Initializes network connections.
35///
36/// `process_allocators` is a vector of pre-built intra-process allocator builders, one per local
37/// worker thread, that will be wrapped by the TCP layer for cross-process communication. Callers
38/// (typically `Config::try_build`) construct this vector by calling
39/// `Process::new_vector(...).into_iter().map(ProcessBuilder::Process).collect()` (or the
40/// `ProcessBinary` variant for the zero-copy flavor).
41pub fn initialize_networking(
42    process_allocators: Vec<ProcessBuilder>,
43    addresses: Vec<String>,
44    my_index: usize,
45    threads: usize,
46    noisy: bool,
47    hooks: crate::Hooks,
48)
49-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
50{
51    let sockets = create_sockets(addresses, my_index, noisy)?;
52    initialize_networking_from_sockets(process_allocators, sockets, my_index, threads, hooks)
53}
54
55/// Initialize send and recv threads from sockets.
56///
57/// This method is available for users who have already connected sockets and simply wish to construct
58/// a vector of process-local allocators connected to instantiated send and recv threads.
59///
60/// It is important that the `sockets` argument contain sockets for each remote process, in order, and
61/// with position `my_index` set to `None`.
62pub fn initialize_networking_from_sockets<S: Stream + 'static>(
63    process_allocators: Vec<ProcessBuilder>,
64    mut sockets: Vec<Option<S>>,
65    my_index: usize,
66    threads: usize,
67    hooks: crate::Hooks,
68)
69-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
70{
71    // Sockets are expected to be blocking,
72    for socket in sockets.iter_mut().flatten() {
73        socket.set_nonblocking(false).expect("failed to set socket to blocking");
74    }
75
76    let processes = sockets.len();
77
78    let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, hooks.refill.clone(), hooks.spill.clone());
79
80    let mut promises_iter = promises.into_iter();
81    let mut futures_iter = futures.into_iter();
82
83    let mut send_guards = Vec::with_capacity(sockets.len());
84    let mut recv_guards = Vec::with_capacity(sockets.len());
85
86    // for each process, if a stream exists (i.e. not local) ...
87    for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
88        let remote_recv = promises_iter.next().unwrap();
89
90        {
91            let log_sender = Arc::clone(&hooks.log_fn);
92            let stream = stream.try_clone()?;
93            let spill = hooks.spill.clone();
94            let join_guard =
95            ::std::thread::Builder::new()
96                .name(format!("timely:send-{}", index))
97                .spawn(move || {
98
99                    let logger = log_sender(CommunicationSetup {
100                        process: my_index,
101                        sender: true,
102                        remote: Some(index),
103                    });
104
105                    send_loop(stream, remote_recv, my_index, index, spill, logger);
106                })?;
107
108            send_guards.push(join_guard);
109        }
110
111        let remote_send = futures_iter.next().unwrap();
112
113        {
114            // let remote_sends = remote_sends.clone();
115            let log_sender = Arc::clone(&hooks.log_fn);
116            let stream = stream.try_clone()?;
117            let refill = hooks.refill.clone();
118            let join_guard =
119            ::std::thread::Builder::new()
120                .name(format!("timely:recv-{}", index))
121                .spawn(move || {
122                    let logger = log_sender(CommunicationSetup {
123                        process: my_index,
124                        sender: false,
125                        remote: Some(index),
126                    });
127                    recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
128                })?;
129
130            recv_guards.push(join_guard);
131        }
132    }
133
134    Ok((builders, CommsGuard { send_guards, recv_guards }))
135}