Skip to main content

timely_communication/allocator/zero_copy/
initialize.rs

1//! Network initialization.
2
3use std::sync::Arc;
4use timely_logging::Logger;
5use crate::allocator::zero_copy::bytes_slab::BytesRefill;
6use crate::logging::CommunicationEventBuilder;
7use crate::networking::create_sockets;
8use super::tcp::{send_loop, recv_loop};
9use crate::allocator::ProcessBuilder;
10use super::allocator::{TcpBuilder, new_vector};
11use super::stream::Stream;
12
13/// Join handles for send and receive threads.
14///
15/// On drop, the guard joins with each of the threads to ensure that they complete
16/// cleanly and send all necessary data.
17pub struct CommsGuard {
18    send_guards: Vec<::std::thread::JoinHandle<()>>,
19    recv_guards: Vec<::std::thread::JoinHandle<()>>,
20}
21
22impl Drop for CommsGuard {
23    fn drop(&mut self) {
24        for handle in self.send_guards.drain(..) {
25            handle.join().expect("Send thread panic");
26        }
27        // println!("SEND THREADS JOINED");
28        for handle in self.recv_guards.drain(..) {
29            handle.join().expect("Recv thread panic");
30        }
31        // println!("RECV THREADS JOINED");
32    }
33}
34
35use crate::logging::CommunicationSetup;
36
37/// Initializes network connections.
38///
39/// `process_allocators` is a vector of pre-built intra-process allocator builders, one per local
40/// worker thread, that will be wrapped by the TCP layer for cross-process communication. Callers
41/// (typically `Config::try_build`) construct this vector by calling
42/// `Process::new_vector(...).into_iter().map(ProcessBuilder::Process).collect()` (or the
43/// `ProcessBinary` variant for the zero-copy flavor).
44pub fn initialize_networking(
45    process_allocators: Vec<ProcessBuilder>,
46    addresses: Vec<String>,
47    my_index: usize,
48    threads: usize,
49    noisy: bool,
50    refill: BytesRefill,
51    log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
52)
53-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
54{
55    let sockets = create_sockets(addresses, my_index, noisy)?;
56    initialize_networking_from_sockets(process_allocators, sockets, my_index, threads, refill, log_sender)
57}
58
59/// Initialize send and recv threads from sockets.
60///
61/// This method is available for users who have already connected sockets and simply wish to construct
62/// a vector of process-local allocators connected to instantiated send and recv threads.
63///
64/// It is important that the `sockets` argument contain sockets for each remote process, in order, and
65/// with position `my_index` set to `None`.
66pub fn initialize_networking_from_sockets<S: Stream + 'static>(
67    process_allocators: Vec<ProcessBuilder>,
68    mut sockets: Vec<Option<S>>,
69    my_index: usize,
70    threads: usize,
71    refill: BytesRefill,
72    log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
73)
74-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
75{
76    // Sockets are expected to be blocking,
77    for socket in sockets.iter_mut().flatten() {
78        socket.set_nonblocking(false).expect("failed to set socket to blocking");
79    }
80
81    let processes = sockets.len();
82
83    let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, refill.clone());
84
85    let mut promises_iter = promises.into_iter();
86    let mut futures_iter = futures.into_iter();
87
88    let mut send_guards = Vec::with_capacity(sockets.len());
89    let mut recv_guards = Vec::with_capacity(sockets.len());
90    let refill = refill.clone();
91
92    // for each process, if a stream exists (i.e. not local) ...
93    for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
94        let remote_recv = promises_iter.next().unwrap();
95
96        {
97            let log_sender = Arc::clone(&log_sender);
98            let stream = stream.try_clone()?;
99            let join_guard =
100            ::std::thread::Builder::new()
101                .name(format!("timely:send-{}", index))
102                .spawn(move || {
103
104                    let logger = log_sender(CommunicationSetup {
105                        process: my_index,
106                        sender: true,
107                        remote: Some(index),
108                    });
109
110                    send_loop(stream, remote_recv, my_index, index, logger);
111                })?;
112
113            send_guards.push(join_guard);
114        }
115
116        let remote_send = futures_iter.next().unwrap();
117
118        {
119            // let remote_sends = remote_sends.clone();
120            let log_sender = Arc::clone(&log_sender);
121            let stream = stream.try_clone()?;
122            let refill = refill.clone();
123            let join_guard =
124            ::std::thread::Builder::new()
125                .name(format!("timely:recv-{}", index))
126                .spawn(move || {
127                    let logger = log_sender(CommunicationSetup {
128                        process: my_index,
129                        sender: false,
130                        remote: Some(index),
131                    });
132                    recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
133                })?;
134
135            recv_guards.push(join_guard);
136        }
137    }
138
139    Ok((builders, CommsGuard { send_guards, recv_guards }))
140}