timely_communication/allocator/zero_copy/
initialize.rs1use std::sync::Arc;
4use timely_logging::Logger;
5use crate::allocator::PeerBuilder;
6use crate::allocator::zero_copy::bytes_slab::BytesRefill;
7use crate::logging::CommunicationEventBuilder;
8use crate::networking::create_sockets;
9use super::tcp::{send_loop, recv_loop};
10use super::allocator::{TcpBuilder, new_vector};
11use super::stream::Stream;
12
13pub struct CommsGuard {
18 send_guards: Vec<::std::thread::JoinHandle<()>>,
19 recv_guards: Vec<::std::thread::JoinHandle<()>>,
20}
21
22impl Drop for CommsGuard {
23 fn drop(&mut self) {
24 for handle in self.send_guards.drain(..) {
25 handle.join().expect("Send thread panic");
26 }
27 for handle in self.recv_guards.drain(..) {
29 handle.join().expect("Recv thread panic");
30 }
31 }
33}
34
35use crate::logging::CommunicationSetup;
36
37pub fn initialize_networking<P: PeerBuilder>(
39 addresses: Vec<String>,
40 my_index: usize,
41 threads: usize,
42 noisy: bool,
43 refill: BytesRefill,
44 log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
45)
46-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
47{
48 let sockets = create_sockets(addresses, my_index, noisy)?;
49 initialize_networking_from_sockets::<_, P>(sockets, my_index, threads, refill, log_sender)
50}
51
52pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
60 mut sockets: Vec<Option<S>>,
61 my_index: usize,
62 threads: usize,
63 refill: BytesRefill,
64 log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
65)
66-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
67{
68 for socket in sockets.iter_mut().flatten() {
70 socket.set_nonblocking(false).expect("failed to set socket to blocking");
71 }
72
73 let processes = sockets.len();
74
75 let process_allocators = P::new_vector(threads, refill.clone());
76 let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, refill.clone());
77
78 let mut promises_iter = promises.into_iter();
79 let mut futures_iter = futures.into_iter();
80
81 let mut send_guards = Vec::with_capacity(sockets.len());
82 let mut recv_guards = Vec::with_capacity(sockets.len());
83 let refill = refill.clone();
84
85 for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
87 let remote_recv = promises_iter.next().unwrap();
88
89 {
90 let log_sender = Arc::clone(&log_sender);
91 let stream = stream.try_clone()?;
92 let join_guard =
93 ::std::thread::Builder::new()
94 .name(format!("timely:send-{}", index))
95 .spawn(move || {
96
97 let logger = log_sender(CommunicationSetup {
98 process: my_index,
99 sender: true,
100 remote: Some(index),
101 });
102
103 send_loop(stream, remote_recv, my_index, index, logger);
104 })?;
105
106 send_guards.push(join_guard);
107 }
108
109 let remote_send = futures_iter.next().unwrap();
110
111 {
112 let log_sender = Arc::clone(&log_sender);
114 let stream = stream.try_clone()?;
115 let refill = refill.clone();
116 let join_guard =
117 ::std::thread::Builder::new()
118 .name(format!("timely:recv-{}", index))
119 .spawn(move || {
120 let logger = log_sender(CommunicationSetup {
121 process: my_index,
122 sender: false,
123 remote: Some(index),
124 });
125 recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
126 })?;
127
128 recv_guards.push(join_guard);
129 }
130 }
131
132 Ok((builders, CommsGuard { send_guards, recv_guards }))
133}