1use std::any::Any;
43use std::cmp::Ordering;
44use std::fmt::Display;
45use std::sync::Arc;
46use std::time::Duration;
47
48use anyhow::{Context, bail};
49use futures::stream::FuturesUnordered;
50use futures::{FutureExt, StreamExt};
51use mz_cluster_client::client::ClusterStartupEpoch;
52use mz_ore::cast::CastFrom;
53use mz_ore::netio::{Listener, Stream};
54use timely::communication::allocator::zero_copy::allocator::TcpBuilder;
55use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
56use timely::communication::allocator::zero_copy::initialize::initialize_networking_from_sockets;
57use timely::communication::allocator::{GenericBuilder, PeerBuilder};
58use tracing::{debug, info, warn};
59
60use crate::communication_v2::create_sockets as create_sockets_v2;
61
62pub async fn initialize_networking<P>(
64 workers: usize,
65 process: usize,
66 addresses: Vec<String>,
67 epoch: ClusterStartupEpoch,
68 refill: BytesRefill,
69 builder_fn: impl Fn(TcpBuilder<P::Peer>) -> GenericBuilder,
70 use_create_sockets_v2: bool,
71) -> Result<(Vec<GenericBuilder>, Box<dyn Any + Send>), anyhow::Error>
72where
73 P: PeerBuilder,
74{
75 let sockets = if use_create_sockets_v2 {
76 info!(
77 process,
78 ?addresses,
79 "initializing network for timely instance",
80 );
81 loop {
82 match create_sockets_v2(process, &addresses).await {
83 Ok(sockets) => break sockets,
84 Err(error) if error.is_fatal() => bail!("failed to set up Timely sockets: {error}"),
85 Err(error) => info!("creating sockets failed: {error}; retrying"),
86 }
87 }
88 } else {
89 info!(
90 process, ?addresses, %epoch,
91 "initializing network for timely instance (legacy protocol)",
92 );
93 create_sockets(addresses, u64::cast_from(process), epoch)
94 .await
95 .context("failed to set up timely sockets")?
96 };
97
98 if sockets
99 .iter()
100 .filter_map(|s| s.as_ref())
101 .all(|s| s.is_tcp())
102 {
103 let sockets = sockets
104 .into_iter()
105 .map(|s| s.map(|s| s.unwrap_tcp().into_std()).transpose())
106 .collect::<Result<Vec<_>, _>>()
107 .map_err(anyhow::Error::from)
108 .context("failed to get standard sockets from tokio sockets")?;
109 initialize_networking_inner::<_, P, _>(sockets, process, workers, refill, builder_fn)
110 } else if sockets
111 .iter()
112 .filter_map(|s| s.as_ref())
113 .all(|s| s.is_unix())
114 {
115 let sockets = sockets
116 .into_iter()
117 .map(|s| s.map(|s| s.unwrap_unix().into_std()).transpose())
118 .collect::<Result<Vec<_>, _>>()
119 .map_err(anyhow::Error::from)
120 .context("failed to get standard sockets from tokio sockets")?;
121 initialize_networking_inner::<_, P, _>(sockets, process, workers, refill, builder_fn)
122 } else {
123 anyhow::bail!("cannot mix TCP and Unix streams");
124 }
125}
126
127fn initialize_networking_inner<S, P, PF>(
128 sockets: Vec<Option<S>>,
129 process: usize,
130 workers: usize,
131 refill: BytesRefill,
132 builder_fn: PF,
133) -> Result<(Vec<GenericBuilder>, Box<dyn Any + Send>), anyhow::Error>
134where
135 S: timely::communication::allocator::zero_copy::stream::Stream + 'static,
136 P: PeerBuilder,
137 PF: Fn(TcpBuilder<P::Peer>) -> GenericBuilder,
138{
139 for s in &sockets {
140 if let Some(s) = s {
141 s.set_nonblocking(false)
142 .context("failed to set socket to non-blocking")?;
143 }
144 }
145
146 match initialize_networking_from_sockets::<_, P>(
147 sockets,
148 process,
149 workers,
150 refill,
151 Arc::new(|_| None),
152 ) {
153 Ok((stuff, guard)) => {
154 info!(process = process, "successfully initialized network");
155 Ok((stuff.into_iter().map(builder_fn).collect(), Box::new(guard)))
156 }
157 Err(err) => {
158 warn!(process, "failed to initialize network: {err}");
159 Err(anyhow::Error::from(err).context("failed to initialize networking from sockets"))
160 }
161 }
162}
163
164async fn create_sockets(
169 addresses: Vec<String>,
170 my_index: u64,
171 my_epoch: ClusterStartupEpoch,
172) -> Result<Vec<Option<Stream>>, anyhow::Error> {
173 let my_index_uz = usize::cast_from(my_index);
174 assert!(my_index_uz < addresses.len());
175 let n_peers = addresses.len() - 1;
176 let mut results: Vec<_> = (0..addresses.len()).map(|_| None).collect();
177
178 let my_address = &addresses[my_index_uz];
179
180 let bind_address = match regex::Regex::new(r":(\d{1,5})$")
186 .unwrap()
187 .captures(my_address)
188 {
189 Some(cap) => {
190 let p: u16 = cap[1].parse().context("Port out of range")?;
191 format!("0.0.0.0:{p}")
192 }
193 None => {
194 my_address.to_string()
197 }
198 };
199 let listener = loop {
200 let mut tries = 0;
201 match Listener::bind(&bind_address).await {
202 Ok(ok) => break ok,
203 Err(e) => {
204 warn!("failed to listen on address {bind_address}: {e}");
205 tries += 1;
206 if tries == 10 {
207 return Err(e.into());
208 }
209 tokio::time::sleep(Duration::from_secs(1)).await;
210 }
211 }
212 };
213
214 let mut futs = FuturesUnordered::new();
215 for i in 0..my_index {
216 let address = addresses[usize::cast_from(i)].clone();
217 futs.push(
218 start_connection(address, my_index, my_epoch)
219 .map(move |res| (res.map(move |stream| (stream, i)))),
220 );
221 }
222
223 let mut listener_fut = std::pin::pin!(await_connection(&listener, my_index, my_epoch));
224
225 while results.iter().filter(|maybe| maybe.is_some()).count() != n_peers {
226 let (stream, peer_index) = tokio::select! {
227 Some(res) = futs.next() => res,
228 res = listener_fut.as_mut() => {
229 listener_fut.set(await_connection(&listener, my_index, my_epoch));
230 res
231 }
232 }?;
233 assert_ne!(my_index, peer_index, "someone claimed to be us");
234
235 let old = std::mem::replace(&mut results[usize::cast_from(peer_index)], Some(stream));
236 if old.is_some() {
237 panic!("connected to peer {peer_index} multiple times");
238 }
239 }
240
241 Ok(results)
242}
243
244#[derive(Debug)]
245pub struct EpochMismatch {
248 mine: ClusterStartupEpoch,
250 peer: ClusterStartupEpoch,
252}
253
254impl Display for EpochMismatch {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 let EpochMismatch { mine, peer } = self;
257 write!(f, "epoch mismatch: ours was {mine}; the peer's was {peer}")
258 }
259}
260
261impl std::error::Error for EpochMismatch {}
262
263async fn start_connection(
264 address: String,
265 my_index: u64,
266 my_epoch: ClusterStartupEpoch,
267) -> Result<Stream, anyhow::Error> {
268 loop {
269 info!(
270 process = my_index,
271 "Attempting to connect to process at {address}"
272 );
273
274 match Stream::connect(&address).await {
275 Ok(mut s) => {
276 if let Stream::Tcp(tcp) = &s {
277 tcp.set_nodelay(true)?;
278 }
279 use tokio::io::AsyncWriteExt;
280
281 s.write_all(&my_index.to_be_bytes()).await?;
282
283 s.write_all(&my_epoch.to_bytes()).await?;
284
285 let mut buffer = [0u8; 16];
286 use tokio::io::AsyncReadExt;
287 s.read_exact(&mut buffer).await?;
288 let peer_epoch = ClusterStartupEpoch::from_bytes(buffer);
289 debug!("start: received peer epoch {peer_epoch}");
290
291 match my_epoch.cmp(&peer_epoch) {
292 Ordering::Less => {
293 return Err(EpochMismatch {
294 mine: my_epoch,
295 peer: peer_epoch,
296 }
297 .into());
298 }
299 Ordering::Greater => {
300 warn!(
301 process = my_index,
302 "peer at address {address} gave older epoch ({peer_epoch}) than ours ({my_epoch})"
303 );
304 }
305 Ordering::Equal => return Ok(s),
306 }
307 }
308 Err(err) => {
309 info!(
310 process = my_index,
311 "error connecting to process at {address}: {err}; will retry"
312 );
313 }
314 }
315 tokio::time::sleep(Duration::from_secs(1)).await;
316 }
317}
318
319async fn await_connection(
320 listener: &Listener,
321 my_index: u64, my_epoch: ClusterStartupEpoch,
323) -> Result<(Stream, u64), anyhow::Error> {
324 loop {
325 info!(process = my_index, "awaiting connection from peer");
326 let mut s = listener.accept().await?.0;
327 info!(process = my_index, "accepted connection from peer");
328 if let Stream::Tcp(tcp) = &s {
329 tcp.set_nodelay(true)?;
330 }
331
332 let mut buffer = [0u8; 16];
333 use tokio::io::AsyncReadExt;
334
335 s.read_exact(&mut buffer[0..8]).await?;
336 let peer_index = u64::from_be_bytes((&buffer[0..8]).try_into().unwrap());
337 debug!("await: received peer index {peer_index}");
338
339 s.read_exact(&mut buffer).await?;
340 let peer_epoch = ClusterStartupEpoch::from_bytes(buffer);
341 debug!("await: received peer epoch {peer_epoch}");
342
343 use tokio::io::AsyncWriteExt;
344 s.write_all(&my_epoch.to_bytes()[..]).await?;
345
346 match my_epoch.cmp(&peer_epoch) {
347 Ordering::Less => {
348 return Err(EpochMismatch {
349 mine: my_epoch,
350 peer: peer_epoch,
351 }
352 .into());
353 }
354 Ordering::Greater => {
355 warn!(
356 process = my_index,
357 "peer {peer_index} gave older epoch ({peer_epoch}) than ours ({my_epoch})"
358 );
359 }
360 Ordering::Equal => return Ok((s, peer_index)),
361 }
362 }
363}