mz_cluster/
communication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Note: This module contains a fallback version of the socket connection protocol. The new
11//! version lives in `communication_v2` and is used by default. The old version can still be
12//! selected through the `enable_create_sockets_v2` dyncfg.
13//!
14//! ----------------------------------------------------------------------------
15//!
16//! Code to spin up communication mesh for a cluster replica.
17//!
18//! The startup protocol is as follows:
19//! The controller in `environmentd`, after having connected to all the
20//! `clusterd` processes in a replica, sends each of them a `CreateTimely` command
21//! containing an epoch value (which is the same across all copies of the command).
22//! The meaning of this value is irrelevant,
23//! as long as it is totally ordered and
24//! increases monotonically (including across `environmentd` restarts)
25//!
26//! In the past, we've seen issues caused by `environmentd`'s replica connections
27//! flapping repeatedly and causing several instances of the startup code to spin up
28//! in short succession (or even simultaneously) in response to different `CreateTimely`
29//! commands, causing mass confusion among the processes
30//! and possible crash loops. To avoid this, we do not allow processes to connect to each
31//! other unless they are responding to a `CreateTimely` command with the same epoch value.
32//! If a process discovers the existence of a peer with a lower epoch value, it ignores it,
33//! and if it discovers one with a higher epoch value, it aborts the connection.
34//! Such a process is guaranteed to eventually hear about the higher epoch value
35//! (and, thus, successfully connect to its peers), since
36//! `environmentd` sends `CreateTimely` commands to all processes in a replica.
37//!
38//! Concretely, each process awaits connections from its peers with higher indices,
39//! and initiates connections to those with lower indices. Having established
40//! a TCP connection, they exchange epochs, to enable the logic described above.
41
42use std::any::Any;
43use std::cmp::Ordering;
44use std::fmt::Display;
45use std::sync::Arc;
46use std::time::Duration;
47
48use anyhow::{Context, bail};
49use futures::stream::FuturesUnordered;
50use futures::{FutureExt, StreamExt};
51use mz_cluster_client::client::ClusterStartupEpoch;
52use mz_ore::cast::CastFrom;
53use mz_ore::netio::{Listener, Stream};
54use timely::communication::allocator::zero_copy::allocator::TcpBuilder;
55use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
56use timely::communication::allocator::zero_copy::initialize::initialize_networking_from_sockets;
57use timely::communication::allocator::{GenericBuilder, PeerBuilder};
58use tracing::{debug, info, warn};
59
60use crate::communication_v2::create_sockets as create_sockets_v2;
61
62/// Creates communication mesh from cluster config
63pub async fn initialize_networking<P>(
64    workers: usize,
65    process: usize,
66    addresses: Vec<String>,
67    epoch: ClusterStartupEpoch,
68    refill: BytesRefill,
69    builder_fn: impl Fn(TcpBuilder<P::Peer>) -> GenericBuilder,
70    use_create_sockets_v2: bool,
71) -> Result<(Vec<GenericBuilder>, Box<dyn Any + Send>), anyhow::Error>
72where
73    P: PeerBuilder,
74{
75    let sockets = if use_create_sockets_v2 {
76        info!(
77            process,
78            ?addresses,
79            "initializing network for timely instance",
80        );
81        loop {
82            match create_sockets_v2(process, &addresses).await {
83                Ok(sockets) => break sockets,
84                Err(error) if error.is_fatal() => bail!("failed to set up Timely sockets: {error}"),
85                Err(error) => info!("creating sockets failed: {error}; retrying"),
86            }
87        }
88    } else {
89        info!(
90            process, ?addresses, %epoch,
91            "initializing network for timely instance (legacy protocol)",
92        );
93        create_sockets(addresses, u64::cast_from(process), epoch)
94            .await
95            .context("failed to set up timely sockets")?
96    };
97
98    if sockets
99        .iter()
100        .filter_map(|s| s.as_ref())
101        .all(|s| s.is_tcp())
102    {
103        let sockets = sockets
104            .into_iter()
105            .map(|s| s.map(|s| s.unwrap_tcp().into_std()).transpose())
106            .collect::<Result<Vec<_>, _>>()
107            .map_err(anyhow::Error::from)
108            .context("failed to get standard sockets from tokio sockets")?;
109        initialize_networking_inner::<_, P, _>(sockets, process, workers, refill, builder_fn)
110    } else if sockets
111        .iter()
112        .filter_map(|s| s.as_ref())
113        .all(|s| s.is_unix())
114    {
115        let sockets = sockets
116            .into_iter()
117            .map(|s| s.map(|s| s.unwrap_unix().into_std()).transpose())
118            .collect::<Result<Vec<_>, _>>()
119            .map_err(anyhow::Error::from)
120            .context("failed to get standard sockets from tokio sockets")?;
121        initialize_networking_inner::<_, P, _>(sockets, process, workers, refill, builder_fn)
122    } else {
123        anyhow::bail!("cannot mix TCP and Unix streams");
124    }
125}
126
127fn initialize_networking_inner<S, P, PF>(
128    sockets: Vec<Option<S>>,
129    process: usize,
130    workers: usize,
131    refill: BytesRefill,
132    builder_fn: PF,
133) -> Result<(Vec<GenericBuilder>, Box<dyn Any + Send>), anyhow::Error>
134where
135    S: timely::communication::allocator::zero_copy::stream::Stream + 'static,
136    P: PeerBuilder,
137    PF: Fn(TcpBuilder<P::Peer>) -> GenericBuilder,
138{
139    for s in &sockets {
140        if let Some(s) = s {
141            s.set_nonblocking(false)
142                .context("failed to set socket to non-blocking")?;
143        }
144    }
145
146    match initialize_networking_from_sockets::<_, P>(
147        sockets,
148        process,
149        workers,
150        refill,
151        Arc::new(|_| None),
152    ) {
153        Ok((stuff, guard)) => {
154            info!(process = process, "successfully initialized network");
155            Ok((stuff.into_iter().map(builder_fn).collect(), Box::new(guard)))
156        }
157        Err(err) => {
158            warn!(process, "failed to initialize network: {err}");
159            Err(anyhow::Error::from(err).context("failed to initialize networking from sockets"))
160        }
161    }
162}
163
164/// Creates socket connections from a list of host addresses.
165///
166/// The item at index i in the resulting vec, is a Some(TcpSocket) to process i, except
167/// for item `my_index` which is None (no socket to self).
168async fn create_sockets(
169    addresses: Vec<String>,
170    my_index: u64,
171    my_epoch: ClusterStartupEpoch,
172) -> Result<Vec<Option<Stream>>, anyhow::Error> {
173    let my_index_uz = usize::cast_from(my_index);
174    assert!(my_index_uz < addresses.len());
175    let n_peers = addresses.len() - 1;
176    let mut results: Vec<_> = (0..addresses.len()).map(|_| None).collect();
177
178    let my_address = &addresses[my_index_uz];
179
180    // [btv] Binding to the address (which is of the form
181    // `hostname:port`) unnecessarily involves a DNS query. We should
182    // get the port from here, but otherwise just bind to `0.0.0.0`.
183    // Previously we bound to `my_address`, which caused
184    // https://github.com/MaterializeInc/cloud/issues/5070 .
185    let bind_address = match regex::Regex::new(r":(\d{1,5})$")
186        .unwrap()
187        .captures(my_address)
188    {
189        Some(cap) => {
190            let p: u16 = cap[1].parse().context("Port out of range")?;
191            format!("0.0.0.0:{p}")
192        }
193        None => {
194            // Address is not of the form `hostname:port`; it's
195            // probably a path to a Unix-domain socket.
196            my_address.to_string()
197        }
198    };
199    let listener = loop {
200        let mut tries = 0;
201        match Listener::bind(&bind_address).await {
202            Ok(ok) => break ok,
203            Err(e) => {
204                warn!("failed to listen on address {bind_address}: {e}");
205                tries += 1;
206                if tries == 10 {
207                    return Err(e.into());
208                }
209                tokio::time::sleep(Duration::from_secs(1)).await;
210            }
211        }
212    };
213
214    let mut futs = FuturesUnordered::new();
215    for i in 0..my_index {
216        let address = addresses[usize::cast_from(i)].clone();
217        futs.push(
218            start_connection(address, my_index, my_epoch)
219                .map(move |res| (res.map(move |stream| (stream, i)))),
220        );
221    }
222
223    let mut listener_fut = std::pin::pin!(await_connection(&listener, my_index, my_epoch));
224
225    while results.iter().filter(|maybe| maybe.is_some()).count() != n_peers {
226        let (stream, peer_index) = tokio::select! {
227            Some(res) = futs.next() => res,
228            res = listener_fut.as_mut() => {
229                listener_fut.set(await_connection(&listener, my_index, my_epoch));
230                res
231            }
232        }?;
233        assert_ne!(my_index, peer_index, "someone claimed to be us");
234
235        let old = std::mem::replace(&mut results[usize::cast_from(peer_index)], Some(stream));
236        if old.is_some() {
237            panic!("connected to peer {peer_index} multiple times");
238        }
239    }
240
241    Ok(results)
242}
243
244#[derive(Debug)]
245/// This task can never successfully boot, since
246/// a peer has seen a higher epoch from `environmentd`.
247pub struct EpochMismatch {
248    /// The epoch we know about
249    mine: ClusterStartupEpoch,
250    /// The higher epoch from our peer
251    peer: ClusterStartupEpoch,
252}
253
254impl Display for EpochMismatch {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        let EpochMismatch { mine, peer } = self;
257        write!(f, "epoch mismatch: ours was {mine}; the peer's was {peer}")
258    }
259}
260
261impl std::error::Error for EpochMismatch {}
262
263async fn start_connection(
264    address: String,
265    my_index: u64,
266    my_epoch: ClusterStartupEpoch,
267) -> Result<Stream, anyhow::Error> {
268    loop {
269        info!(
270            process = my_index,
271            "Attempting to connect to process at {address}"
272        );
273
274        match Stream::connect(&address).await {
275            Ok(mut s) => {
276                if let Stream::Tcp(tcp) = &s {
277                    tcp.set_nodelay(true)?;
278                }
279                use tokio::io::AsyncWriteExt;
280
281                s.write_all(&my_index.to_be_bytes()).await?;
282
283                s.write_all(&my_epoch.to_bytes()).await?;
284
285                let mut buffer = [0u8; 16];
286                use tokio::io::AsyncReadExt;
287                s.read_exact(&mut buffer).await?;
288                let peer_epoch = ClusterStartupEpoch::from_bytes(buffer);
289                debug!("start: received peer epoch {peer_epoch}");
290
291                match my_epoch.cmp(&peer_epoch) {
292                    Ordering::Less => {
293                        return Err(EpochMismatch {
294                            mine: my_epoch,
295                            peer: peer_epoch,
296                        }
297                        .into());
298                    }
299                    Ordering::Greater => {
300                        warn!(
301                            process = my_index,
302                            "peer at address {address} gave older epoch ({peer_epoch}) than ours ({my_epoch})"
303                        );
304                    }
305                    Ordering::Equal => return Ok(s),
306                }
307            }
308            Err(err) => {
309                info!(
310                    process = my_index,
311                    "error connecting to process at {address}: {err}; will retry"
312                );
313            }
314        }
315        tokio::time::sleep(Duration::from_secs(1)).await;
316    }
317}
318
319async fn await_connection(
320    listener: &Listener,
321    my_index: u64, // only for logging
322    my_epoch: ClusterStartupEpoch,
323) -> Result<(Stream, u64), anyhow::Error> {
324    loop {
325        info!(process = my_index, "awaiting connection from peer");
326        let mut s = listener.accept().await?.0;
327        info!(process = my_index, "accepted connection from peer");
328        if let Stream::Tcp(tcp) = &s {
329            tcp.set_nodelay(true)?;
330        }
331
332        let mut buffer = [0u8; 16];
333        use tokio::io::AsyncReadExt;
334
335        s.read_exact(&mut buffer[0..8]).await?;
336        let peer_index = u64::from_be_bytes((&buffer[0..8]).try_into().unwrap());
337        debug!("await: received peer index {peer_index}");
338
339        s.read_exact(&mut buffer).await?;
340        let peer_epoch = ClusterStartupEpoch::from_bytes(buffer);
341        debug!("await: received peer epoch {peer_epoch}");
342
343        use tokio::io::AsyncWriteExt;
344        s.write_all(&my_epoch.to_bytes()[..]).await?;
345
346        match my_epoch.cmp(&peer_epoch) {
347            Ordering::Less => {
348                return Err(EpochMismatch {
349                    mine: my_epoch,
350                    peer: peer_epoch,
351                }
352                .into());
353            }
354            Ordering::Greater => {
355                warn!(
356                    process = my_index,
357                    "peer {peer_index} gave older epoch ({peer_epoch}) than ours ({my_epoch})"
358                );
359            }
360            Ordering::Equal => return Ok((s, peer_index)),
361        }
362    }
363}