timely_communication/allocator/zero_copy/
initialize.rs1use std::sync::Arc;
4use crate::networking::create_sockets;
5use super::tcp::{send_loop, recv_loop};
6use crate::allocator::ProcessBuilder;
7use super::allocator::{TcpBuilder, new_vector};
8use super::stream::Stream;
9
10pub struct CommsGuard {
15 send_guards: Vec<::std::thread::JoinHandle<()>>,
16 recv_guards: Vec<::std::thread::JoinHandle<()>>,
17}
18
19impl Drop for CommsGuard {
20 fn drop(&mut self) {
21 for handle in self.send_guards.drain(..) {
22 handle.join().expect("Send thread panic");
23 }
24 for handle in self.recv_guards.drain(..) {
26 handle.join().expect("Recv thread panic");
27 }
28 }
30}
31
32use crate::logging::CommunicationSetup;
33
34pub fn initialize_networking(
42 process_allocators: Vec<ProcessBuilder>,
43 addresses: Vec<String>,
44 my_index: usize,
45 threads: usize,
46 noisy: bool,
47 hooks: crate::Hooks,
48)
49-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
50{
51 let sockets = create_sockets(addresses, my_index, noisy)?;
52 initialize_networking_from_sockets(process_allocators, sockets, my_index, threads, hooks)
53}
54
55pub fn initialize_networking_from_sockets<S: Stream + 'static>(
63 process_allocators: Vec<ProcessBuilder>,
64 mut sockets: Vec<Option<S>>,
65 my_index: usize,
66 threads: usize,
67 hooks: crate::Hooks,
68)
69-> ::std::io::Result<(Vec<TcpBuilder>, CommsGuard)>
70{
71 for socket in sockets.iter_mut().flatten() {
73 socket.set_nonblocking(false).expect("failed to set socket to blocking");
74 }
75
76 let processes = sockets.len();
77
78 let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, hooks.refill.clone(), hooks.spill.clone());
79
80 let mut promises_iter = promises.into_iter();
81 let mut futures_iter = futures.into_iter();
82
83 let mut send_guards = Vec::with_capacity(sockets.len());
84 let mut recv_guards = Vec::with_capacity(sockets.len());
85
86 for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
88 let remote_recv = promises_iter.next().unwrap();
89
90 {
91 let log_sender = Arc::clone(&hooks.log_fn);
92 let stream = stream.try_clone()?;
93 let spill = hooks.spill.clone();
94 let join_guard =
95 ::std::thread::Builder::new()
96 .name(format!("timely:send-{}", index))
97 .spawn(move || {
98
99 let logger = log_sender(CommunicationSetup {
100 process: my_index,
101 sender: true,
102 remote: Some(index),
103 });
104
105 send_loop(stream, remote_recv, my_index, index, spill, logger);
106 })?;
107
108 send_guards.push(join_guard);
109 }
110
111 let remote_send = futures_iter.next().unwrap();
112
113 {
114 let log_sender = Arc::clone(&hooks.log_fn);
116 let stream = stream.try_clone()?;
117 let refill = hooks.refill.clone();
118 let join_guard =
119 ::std::thread::Builder::new()
120 .name(format!("timely:recv-{}", index))
121 .spawn(move || {
122 let logger = log_sender(CommunicationSetup {
123 process: my_index,
124 sender: false,
125 remote: Some(index),
126 });
127 recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
128 })?;
129
130 recv_guards.push(join_guard);
131 }
132 }
133
134 Ok((builders, CommsGuard { send_guards, recv_guards }))
135}