timely_communication/allocator/zero_copy/
initialize.rs1use std::sync::Arc;
4use timely_logging::Logger;
5use crate::allocator::zero_copy::bytes_slab::BytesRefill;
6use crate::logging::CommunicationEventBuilder;
7use crate::networking::create_sockets;
8use super::tcp::{send_loop, recv_loop};
9use crate::allocator::ProcessBuilder;
10use super::allocator::{TcpBuilder, new_vector};
11use super::stream::Stream;
12
13pub struct CommsGuard {
18 send_guards: Vec<::std::thread::JoinHandle<()>>,
19 recv_guards: Vec<::std::thread::JoinHandle<()>>,
20}
21
22impl Drop for CommsGuard {
23 fn drop(&mut self) {
24 for handle in self.send_guards.drain(..) {
25 handle.join().expect("Send thread panic");
26 }
27 for handle in self.recv_guards.drain(..) {
29 handle.join().expect("Recv thread panic");
30 }
31 }
33}
34
35use crate::logging::CommunicationSetup;
36
37pub fn initialize_networking(
45 process_allocators: Vec<ProcessBuilder>,
46 addresses: Vec<String>,
47 my_index: usize,
48 threads: usize,
49 noisy: bool,
50 refill: BytesRefill,
51 log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
52)
53-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
54{
55 let sockets = create_sockets(addresses, my_index, noisy)?;
56 initialize_networking_from_sockets(process_allocators, sockets, my_index, threads, refill, log_sender)
57}
58
59pub fn initialize_networking_from_sockets<S: Stream + 'static>(
67 process_allocators: Vec<ProcessBuilder>,
68 mut sockets: Vec<Option<S>>,
69 my_index: usize,
70 threads: usize,
71 refill: BytesRefill,
72 log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
73)
74-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
75{
76 for socket in sockets.iter_mut().flatten() {
78 socket.set_nonblocking(false).expect("failed to set socket to blocking");
79 }
80
81 let processes = sockets.len();
82
83 let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, refill.clone());
84
85 let mut promises_iter = promises.into_iter();
86 let mut futures_iter = futures.into_iter();
87
88 let mut send_guards = Vec::with_capacity(sockets.len());
89 let mut recv_guards = Vec::with_capacity(sockets.len());
90 let refill = refill.clone();
91
92 for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
94 let remote_recv = promises_iter.next().unwrap();
95
96 {
97 let log_sender = Arc::clone(&log_sender);
98 let stream = stream.try_clone()?;
99 let join_guard =
100 ::std::thread::Builder::new()
101 .name(format!("timely:send-{}", index))
102 .spawn(move || {
103
104 let logger = log_sender(CommunicationSetup {
105 process: my_index,
106 sender: true,
107 remote: Some(index),
108 });
109
110 send_loop(stream, remote_recv, my_index, index, logger);
111 })?;
112
113 send_guards.push(join_guard);
114 }
115
116 let remote_send = futures_iter.next().unwrap();
117
118 {
119 let log_sender = Arc::clone(&log_sender);
121 let stream = stream.try_clone()?;
122 let refill = refill.clone();
123 let join_guard =
124 ::std::thread::Builder::new()
125 .name(format!("timely:recv-{}", index))
126 .spawn(move || {
127 let logger = log_sender(CommunicationSetup {
128 process: my_index,
129 sender: false,
130 remote: Some(index),
131 });
132 recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
133 })?;
134
135 recv_guards.push(join_guard);
136 }
137 }
138
139 Ok((builders, CommsGuard { send_guards, recv_guards }))
140}