Skip to main content

mz_cluster/
communication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to spin up communication mesh for a Timely cluster.
11//!
12//! The heart of this code is `create_sockets`, which establishes connections with the other
13//! processes ("peers") in the Timely cluster. This process needs to be fault-tolerant: If one or
14//! multiple processes restart while connections are established, this must not leave the cluster
15//! in a stalled state where the processes cannot finish setting up connections for whatever
16//! reason.
17//!
18//! A Timely cluster assumes reliable networking among all processes in the cluster and forces its
19//! processes to crash if this condition is not met. It is therefore impossible to set up a working
20//! Timely cluster in the presence of persistent process or network failures. However, we assume
21//! that any period of instability eventually resolves. `create_sockets` is written to ensure
22//! that once things are stable again, processes can correctly establish connections among each
23//! other.
24//!
25//! If a process returns from `create_sockets` with one or more sockets that are connected to
26//! processes that have crashed, that process will also crash upon initializing its side of the
27//! Timely cluster. We can say that processes connected to crashed processes are "doomed".
28//! Additionally, all processes connected to doomed processes are doomed themselves, as their
29//! doomed peer will eventually crash, forcing them to crash too. We need to avoid a circle of doom
30//! where new processes perpetually connect to doomed processes, become doomed themselves, doom
31//! other processes that connect to them, and then crash.
32//!
33//! `create_sockets` avoids the circle of doom by ensuring that a new generation of processes
34//! does not connect to the previous generation. We pessimistically assume that the entire previous
35//! generation has been doomed and to successfully connect we need to spin up an entire new
36//! generation. This approach can cause us to force restarts of non-doomed processes and therefore
37//! leaves some efficiency on the floor, but we are more concerned about our ability to reason
38//! about the system than about cluster startup time.
39//!
40//! To differentiate between generations, we rely on an epoch, i.e., a number that increases
41//! between process restarts. Unfortunately, we don't have a way to get a perfect epoch here, so we
42//! use the system time instead. Note that the system time is not guaranteed to always increase,
43//! but as long as it increases eventually, we will eventually succeed in establishing connections
44//! between processes.
45//!
46//! Each process performs the following protocol:
47//!
48//!  * Let `my_index` be the index of the process in the Timely cluster.
49//!  * If `my_index` == 0, mint a new `my_epoch`. Otherwise leave `my_epoch` uninitialized.
50//!  * For each `index` < `my_index`:
51//!    * Connect to the peer at `index`.
52//!    * Receive `peer_epoch`.
53//!    * If `my_epoch` is unitialized, set `my_epoch` to `peer_epoch`.
54//!    * Send `my_epoch`.
55//!    * Compare epochs:
56//!      * `my_epoch` < `peer_epoch`: fail the protocol
57//!      * `my_epoch` > `peer_epoch`: retry the connection
58//!      * `my_epoch` == `peer_epoch`: connection successfully established
59//!  * Until a connections has been established with all peers:
60//!    * Accept a connection from a peer at `index` > `my_index`.
61//!    * If a connection to a peer at `index` was already established, fail the protocol.
62//!    * Send `my_epoch`.
63//!    * Receive `peer_epoch`.
64//!    * Compare epochs and react as above.
65//!
66//! Process 0 acts as the leader of a generation. When a process connects to process 0 and learns
67//! its epoch, it becomes part of that generation and cannot connect to processes of other
68//! generations anymore. When a process crashes that was previously part of a generation, it dooms
69//! that generation. When it restarts, it can't connect to the same generation anymore because
70//! process 0 won't accept the connection. What's more, in attempting to connect to the processes
71//! of the doomed generation, the new process forces them to fail the protocol and rejoin as part
72//! of the new generation, ensuring we don't get stuck with multiple processes on different
73//! generations indefinitely.
74
75use std::any::Any;
76use std::cmp::Ordering;
77use std::fmt;
78use std::sync::Arc;
79use std::time::{Duration, SystemTime};
80
81use anyhow::{Context, bail};
82use futures::TryFutureExt;
83use mz_ore::cast::CastFrom;
84use mz_ore::netio::{Listener, Stream};
85use mz_ore::retry::Retry;
86use regex::Regex;
87use timely::communication::Hooks;
88use timely::communication::allocator::ProcessBuilder;
89use timely::communication::allocator::generic::AllocatorBuilder;
90use timely::communication::allocator::zero_copy::bytes_slab::BytesRefill;
91use timely::communication::allocator::zero_copy::initialize::initialize_networking_from_sockets;
92use tokio::io::{AsyncReadExt, AsyncWriteExt};
93use tracing::{info, warn};
94
95/// Creates communication mesh from cluster config.
96///
97/// `enable_zero_copy_binary` selects between the zero-copy serialized
98/// (`ProcessBuilder::new_bytes_vector`) and regular mpsc-based
99/// (`ProcessBuilder::new_typed_vector`) intra-process allocator flavors.
100pub async fn initialize_networking(
101    workers: usize,
102    process: usize,
103    addresses: Vec<String>,
104    refill: BytesRefill,
105    enable_zero_copy_binary: bool,
106) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any + Send>), anyhow::Error> {
107    info!(
108        process,
109        ?addresses,
110        "initializing network for timely instance",
111    );
112    let sockets = loop {
113        match create_sockets(process, &addresses).await {
114            Ok(sockets) => break sockets,
115            Err(error) if error.is_fatal() => bail!("failed to set up Timely sockets: {error}"),
116            Err(error) => info!("creating sockets failed: {error}; retrying"),
117        }
118    };
119
120    if sockets
121        .iter()
122        .filter_map(|s| s.as_ref())
123        .all(|s| s.is_tcp())
124    {
125        let sockets = sockets
126            .into_iter()
127            .map(|s| s.map(|s| s.unwrap_tcp().into_std()).transpose())
128            .collect::<Result<Vec<_>, _>>()
129            .map_err(anyhow::Error::from)
130            .context("failed to get standard sockets from tokio sockets")?;
131        initialize_networking_inner(sockets, process, workers, refill, enable_zero_copy_binary)
132    } else if sockets
133        .iter()
134        .filter_map(|s| s.as_ref())
135        .all(|s| s.is_unix())
136    {
137        let sockets = sockets
138            .into_iter()
139            .map(|s| s.map(|s| s.unwrap_unix().into_std()).transpose())
140            .collect::<Result<Vec<_>, _>>()
141            .map_err(anyhow::Error::from)
142            .context("failed to get standard sockets from tokio sockets")?;
143        initialize_networking_inner(sockets, process, workers, refill, enable_zero_copy_binary)
144    } else {
145        anyhow::bail!("cannot mix TCP and Unix streams");
146    }
147}
148
149fn initialize_networking_inner<S>(
150    sockets: Vec<Option<S>>,
151    process: usize,
152    workers: usize,
153    refill: BytesRefill,
154    enable_zero_copy_binary: bool,
155) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any + Send>), anyhow::Error>
156where
157    S: timely::communication::allocator::zero_copy::stream::Stream + 'static,
158{
159    for s in &sockets {
160        if let Some(s) = s {
161            s.set_nonblocking(false)
162                .context("failed to set socket to non-blocking")?;
163        }
164    }
165
166    // Choose the intra-process allocator flavor up front.
167    // TODO(CLU-99): wire our zero-copy pager into the timely spill policy
168    // instead of passing `None` here.
169    let process_allocators = if enable_zero_copy_binary {
170        ProcessBuilder::new_bytes_vector(workers, refill.clone(), None)
171    } else {
172        ProcessBuilder::new_typed_vector(workers, refill.clone(), None)
173    };
174
175    // Timely bundles the refill, spill policy, and per-thread logger hook into
176    // `Hooks`. We don't install a comm logger, and the spill policy is left
177    // unset pending CLU-99.
178    let hooks = Hooks {
179        log_fn: Arc::new(|_| None),
180        refill,
181        spill: None,
182    };
183
184    match initialize_networking_from_sockets(process_allocators, sockets, process, workers, hooks) {
185        Ok((tcp_builders, guard)) => {
186            info!(process = process, "successfully initialized network");
187            let builders = tcp_builders
188                .into_iter()
189                .map(AllocatorBuilder::Tcp)
190                .collect();
191            Ok((builders, Box::new(guard)))
192        }
193        Err(err) => {
194            warn!(process, "failed to initialize network: {err}");
195            Err(anyhow::Error::from(err).context("failed to initialize networking from sockets"))
196        }
197    }
198}
199
200/// Errors returned by `create_sockets`.
201#[derive(Debug)]
202pub(crate) enum CreateSocketsError {
203    Bind {
204        address: String,
205        error: std::io::Error,
206    },
207    EpochMismatch {
208        peer_index: usize,
209        peer_epoch: Epoch,
210        my_epoch: Epoch,
211    },
212    Reconnect {
213        peer_index: usize,
214    },
215}
216
217impl CreateSocketsError {
218    /// Whether the error isn't expected to resolve on a retry.
219    pub fn is_fatal(&self) -> bool {
220        matches!(self, Self::Bind { .. })
221    }
222}
223
224impl fmt::Display for CreateSocketsError {
225    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226        match self {
227            Self::Bind { address, error } => write!(f, "failed to bind at {address}: {error}"),
228            Self::EpochMismatch {
229                peer_index,
230                peer_epoch,
231                my_epoch,
232            } => write!(
233                f,
234                "peer {peer_index} has greater epoch: {peer_epoch} > {my_epoch}"
235            ),
236            Self::Reconnect { peer_index } => {
237                write!(f, "observed second instance of peer {peer_index}")
238            }
239        }
240    }
241}
242
243impl std::error::Error for CreateSocketsError {}
244
245/// Epoch type used in the `create_sockets` protocol.
246///
247/// Epochs are derived from the system time and therefore not guaranteed to be strictly
248/// increasing. For `create_sockets` it is sufficient for it to eventually increase.
249///
250/// Epoch values also include a random component, to ensure two values produced by different calls
251/// to `Epoch::mint` never compare as equal.
252#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
253pub(crate) struct Epoch {
254    time: u64,
255    nonce: u64,
256}
257
258impl Epoch {
259    fn mint() -> Self {
260        let time = SystemTime::UNIX_EPOCH
261            .elapsed()
262            .expect("current time is after 1970")
263            .as_millis()
264            .try_into()
265            .expect("fits");
266        let nonce = rand::random();
267        Self { time, nonce }
268    }
269
270    async fn read(s: &mut Stream) -> std::io::Result<Self> {
271        let time = s.read_u64().await?;
272        let nonce = s.read_u64().await?;
273        Ok(Self { time, nonce })
274    }
275
276    async fn write(&self, s: &mut Stream) -> std::io::Result<()> {
277        s.write_u64(self.time).await?;
278        s.write_u64(self.nonce).await?;
279        Ok(())
280    }
281}
282
283impl fmt::Display for Epoch {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        write!(f, "({}, {})", self.time, self.nonce)
286    }
287}
288
289/// Creates socket connections from a list of host addresses.
290///
291/// The item at index i in the resulting vec, is a connection to process i, except for item
292/// `my_index` which is None (no socket to self).
293pub(crate) async fn create_sockets(
294    my_index: usize,
295    addresses: &[String],
296) -> Result<Vec<Option<Stream>>, CreateSocketsError> {
297    let my_address = &addresses[my_index];
298
299    // Binding to a TCP address of the form `hostname:port` unnecessarily involves a DNS query. We
300    // should get the port from here, but otherwise just bind to `0.0.0.0`.
301    let port_re = Regex::new(r"(?<proto>\w+:)?(?<host>.*):(?<port>\d{1,5})$").unwrap();
302    let listen_address = match port_re.captures(my_address) {
303        Some(cap) => match cap.name("proto") {
304            Some(proto) => format!("{}0.0.0.0:{}", proto.as_str(), &cap["port"]),
305            None => format!("0.0.0.0:{}", &cap["port"]),
306        },
307        None => my_address.to_string(),
308    };
309
310    let listener = Retry::default()
311        .initial_backoff(Duration::from_secs(1))
312        .clamp_backoff(Duration::from_secs(1))
313        .max_tries(10)
314        .retry_async(|_| {
315            Listener::bind(&listen_address)
316                .inspect_err(|error| warn!(%listen_address, "failed to listen: {error}"))
317        })
318        .await
319        .map_err(|error| CreateSocketsError::Bind {
320            address: listen_address,
321            error,
322        })?;
323
324    let (my_epoch, sockets_lower) = match my_index {
325        0 => {
326            let epoch = Epoch::mint();
327            info!(my_index, "minted epoch: {epoch}");
328            (epoch, Vec::new())
329        }
330        _ => connect_lower(my_index, addresses).await?,
331    };
332
333    let n_peers = addresses.len();
334    let sockets_higher = accept_higher(my_index, my_epoch, n_peers, &listener).await?;
335
336    let connections_lower = sockets_lower.into_iter().map(Some);
337    let connections_higher = sockets_higher.into_iter().map(Some);
338    let connections = connections_lower
339        .chain([None])
340        .chain(connections_higher)
341        .collect();
342
343    Ok(connections)
344}
345
346/// Connect to peers with indexes less than `my_index`.
347///
348/// Returns a list of connections ordered by peer index, as well as the epoch of the current
349/// generation on success, or an error if the protocol has failed and must be restarted.
350async fn connect_lower(
351    my_index: usize,
352    addresses: &[String],
353) -> Result<(Epoch, Vec<Stream>), CreateSocketsError> {
354    assert!(my_index > 0);
355    assert!(my_index <= addresses.len());
356
357    async fn handshake(
358        my_index: usize,
359        my_epoch: Option<Epoch>,
360        address: &str,
361    ) -> anyhow::Result<(Epoch, Stream)> {
362        let mut s = Stream::connect(address).await?;
363
364        // Make sure all network calls have timeouts, so we don't get stuck when the peer goes
365        // away. Writes are buffered, so they probably don't need timeouts, but we're adding them
366        // anyway just to be safe.
367        timeout(s.write_u64(u64::cast_from(my_index))).await?;
368        let peer_epoch = timeout(Epoch::read(&mut s)).await?;
369        let my_epoch = my_epoch.unwrap_or(peer_epoch);
370        timeout(my_epoch.write(&mut s)).await?;
371
372        Ok((peer_epoch, s))
373    }
374
375    let mut my_epoch = None;
376    let mut sockets = Vec::new();
377
378    while sockets.len() < my_index {
379        let index = sockets.len();
380        let address = &addresses[index];
381
382        info!(my_index, "connecting to peer {index} at address: {address}");
383
384        let (peer_epoch, sock) = Retry::default()
385            .initial_backoff(Duration::from_secs(1))
386            .clamp_backoff(Duration::from_secs(1))
387            .retry_async(|_| {
388                handshake(my_index, my_epoch, address).inspect_err(|error| {
389                    info!(my_index, "error connecting to peer {index}: {error}")
390                })
391            })
392            .await
393            .expect("retries forever");
394
395        if let Some(epoch) = my_epoch {
396            match peer_epoch.cmp(&epoch) {
397                Ordering::Less => {
398                    info!(
399                        my_index,
400                        "refusing connection to peer {index} with smaller epoch: \
401                         {peer_epoch} < {epoch}",
402                    );
403                    continue;
404                }
405                Ordering::Greater => {
406                    return Err(CreateSocketsError::EpochMismatch {
407                        peer_index: index,
408                        peer_epoch,
409                        my_epoch: epoch,
410                    });
411                }
412                Ordering::Equal => info!(my_index, "connected to peer {index}"),
413            }
414        } else {
415            info!(my_index, "received epoch from peer {index}: {peer_epoch}");
416            my_epoch = Some(peer_epoch);
417        }
418
419        sockets.push(sock);
420    }
421
422    let my_epoch = my_epoch.expect("must exist");
423    Ok((my_epoch, sockets))
424}
425
426/// Accept connections from peers with indexes greater than `my_index`.
427///
428/// Returns a list of connections ordered by peer index, starting with `my_index` + 1,
429/// or an error if the protocol has failed and must be restarted.
430async fn accept_higher(
431    my_index: usize,
432    my_epoch: Epoch,
433    n_peers: usize,
434    listener: &Listener,
435) -> Result<Vec<Stream>, CreateSocketsError> {
436    assert!(my_index < n_peers);
437
438    async fn accept(listener: &Listener) -> anyhow::Result<(usize, Stream)> {
439        let (mut s, _) = listener.accept().await?;
440
441        // Make sure all network calls have timeouts, so we don't get stuck when the peer goes
442        // away.
443        let peer_index = timeout(s.read_u64()).await?;
444        let peer_index = usize::cast_from(peer_index);
445        Ok((peer_index, s))
446    }
447
448    async fn exchange_epochs(my_epoch: Epoch, s: &mut Stream) -> anyhow::Result<Epoch> {
449        // Make sure all network calls have timeouts, so we don't get stuck when the peer goes
450        // away. Writes are buffered, so they probably don't need timeouts, but we're adding them
451        // anyway just to be safe.
452        timeout(my_epoch.write(s)).await?;
453        let peer_epoch = timeout(Epoch::read(s)).await?;
454        Ok(peer_epoch)
455    }
456
457    let offset = my_index + 1;
458    let mut sockets: Vec<_> = (offset..n_peers).map(|_| None).collect();
459
460    while sockets.iter().any(|s| s.is_none()) {
461        info!(my_index, "accepting connection from peer");
462
463        let (index, mut sock) = match accept(listener).await {
464            Ok(result) => result,
465            Err(error) => {
466                info!(my_index, "error accepting connection: {error}");
467                continue;
468            }
469        };
470
471        if sockets[index - offset].is_some() {
472            return Err(CreateSocketsError::Reconnect { peer_index: index });
473        }
474
475        let peer_epoch = match exchange_epochs(my_epoch, &mut sock).await {
476            Ok(result) => result,
477            Err(error) => {
478                info!(my_index, "error exchanging epochs: {error}");
479                continue;
480            }
481        };
482
483        match peer_epoch.cmp(&my_epoch) {
484            Ordering::Less => {
485                info!(
486                    my_index,
487                    "refusing connection from peer {index} with smaller epoch: \
488                     {peer_epoch} < {my_epoch}",
489                );
490                continue;
491            }
492            Ordering::Greater => {
493                return Err(CreateSocketsError::EpochMismatch {
494                    peer_index: index,
495                    peer_epoch,
496                    my_epoch,
497                });
498            }
499            Ordering::Equal => info!(my_index, "connected to peer {index}"),
500        }
501
502        sockets[index - offset] = Some(sock);
503    }
504
505    Ok(sockets.into_iter().map(|s| s.unwrap()).collect())
506}
507
508/// Helper for performing network I/O under a timeout.
509///
510/// This is meant to be used on network calls performed after a connection to a peer has been
511/// successfully established, to avoid the `create_sockets` protocol becoming stuck because a peer
512/// goes away without resetting the connection. We assume fast same-datacenter connections, so we
513/// can choose a relatively small timeout.
514async fn timeout<F, R>(fut: F) -> anyhow::Result<R>
515where
516    F: Future<Output = std::io::Result<R>>,
517{
518    let timeout = Duration::from_secs(1);
519    let result = mz_ore::future::timeout(timeout, fut).await?;
520    Ok(result)
521}
522
523#[cfg(test)]
524mod turmoil_tests {
525    use rand::rngs::SmallRng;
526    use rand::{Rng, SeedableRng};
527    use tokio::sync::{mpsc, watch};
528    use tokio::time::timeout;
529
530    use super::*;
531
532    /// Turmoil test for [`create_sockets`].
533    ///
534    /// This test works by spawning a number of processes, and making them start to connect to each
535    /// other using [`create_sockets`]. At the same time, chaos is introduced by randomly
536    /// restarting a number of the processes. The test then enters a stable phase and expects that
537    /// all processes now manage to successfully connect to one another.
538    #[test] // allow(test-attribute)
539    #[cfg_attr(miri, ignore)] // too slow
540    fn test_create_sockets() {
541        const NUM_PROCESSES: usize = 10;
542        const NUM_CRASHES: usize = 3;
543
544        configure_tracing_for_turmoil();
545
546        let seed = std::env::var("SEED")
547            .ok()
548            .and_then(|x| x.parse().ok())
549            .unwrap_or_else(rand::random);
550
551        info!("initializing rng with seed {seed}");
552        let mut rng = SmallRng::seed_from_u64(seed);
553
554        let mut sim = turmoil::Builder::new()
555            .enable_random_order()
556            .rng_seed(rng.random())
557            .build();
558
559        let processes: Vec<_> = (0..NUM_PROCESSES).map(|i| format!("process-{i}")).collect();
560        let addresses: Vec<_> = processes
561            .iter()
562            .map(|n| format!("turmoil:{n}:7777"))
563            .collect();
564
565        // Channel for processes to report successful connection.
566        let (ready_tx, mut ready_rx) = mpsc::unbounded_channel();
567
568        // A watch for informing processes about the beginning of the stable phase.
569        // This is used to delay the processes' final connectivity checks until after we know that
570        // processes won't randomly crash anymore.
571        let (stable_tx, stable_rx) = watch::channel(false);
572
573        for (index, name) in processes.iter().enumerate() {
574            let addresses = addresses.clone();
575            let ready_tx = ready_tx.clone();
576            let stable_rx = stable_rx.clone();
577            sim.host(&name[..], move || {
578                let addresses = addresses.clone();
579                let ready_tx = ready_tx.clone();
580                let mut stable_rx = stable_rx.clone();
581                async move {
582                    'protocol: loop {
583                        let mut sockets = match create_sockets(index, &addresses).await {
584                            Ok(sockets) => sockets,
585                            Err(error) if error.is_fatal() => Err(error)?,
586                            Err(error) => {
587                                info!("creating sockets failed: {error}; retrying protocol");
588                                continue 'protocol;
589                            }
590                        };
591
592                        // We have a connection to each peer, but some of them might be broken, in
593                        // which case we should restart the `create_sockets` protocol. In the real
594                        // world we would notice the broken connections eventually after writing to
595                        // them enough, but in the test we want something more deterministic, so we
596                        // let processes ping each other instead.
597                        //
598                        // We need to wait until we've entered the stable phase. Otherwise it would
599                        // be possible for all processes to connect and send their ping before one
600                        // of them gets killed and gets stuck trying to perform the protocol when
601                        // everyone else has already finished the test.
602                        let _ = stable_rx.wait_for(|stable| *stable).await;
603
604                        info!("sockets created; checking connections");
605                        for sock in sockets.iter_mut().filter_map(|s| s.as_mut()) {
606                            if let Err(error) = sock.write_u8(111).await {
607                                info!("error pinging socket: {error}; retrying protocol");
608                                continue 'protocol;
609                            }
610                        }
611                        for sock in sockets.iter_mut().filter_map(|s| s.as_mut()) {
612                            info!("waiting for ping from {sock:?}");
613                            match timeout(Duration::from_secs(2), sock.read_u8()).await {
614                                Ok(Ok(ping)) => assert_eq!(ping, 111),
615                                Ok(Err(error)) => {
616                                    info!("error waiting for ping: {error}; retrying protocol");
617                                    continue 'protocol;
618                                }
619                                Err(_) => {
620                                    info!("timed out waiting for ping; retrying protocol");
621                                    continue 'protocol;
622                                }
623                            }
624                        }
625
626                        let _ = ready_tx.send(index);
627
628                        std::mem::forget(sockets);
629                        return Ok(());
630                    }
631                }
632            });
633        }
634
635        // Let random processes crash at random times.
636        for _ in 0..NUM_CRASHES {
637            let steps = rng.random_range(1..100);
638            for _ in 0..steps {
639                sim.step().unwrap();
640            }
641
642            let i = rng.random_range(0..NUM_PROCESSES);
643            info!("bouncing process {i}");
644            sim.bounce(format!("process-{i}"));
645        }
646
647        stable_tx.send(true).unwrap();
648
649        // Processes should now be able to connect.
650        let mut num_ready = 0;
651        loop {
652            while let Ok(index) = ready_rx.try_recv() {
653                info!("process {index} is ready");
654                num_ready += 1;
655            }
656            if num_ready == NUM_PROCESSES {
657                break;
658            }
659
660            sim.step().unwrap();
661            if sim.elapsed() > Duration::from_secs(60) {
662                panic!("simulation not finished after 60s");
663            }
664        }
665    }
666
667    /// Fuzz test [`create_sockets`] using turmoil.
668    #[test] // allow(test-attribute)
669    #[ignore = "runs forever"]
670    fn fuzz_create_sockets() {
671        loop {
672            test_create_sockets();
673        }
674    }
675
676    /// Configure tracing for turmoil tests.
677    ///
678    /// Log events are written to stdout and include the logical time of the simulation.
679    fn configure_tracing_for_turmoil() {
680        use std::sync::Once;
681        use tracing::level_filters::LevelFilter;
682        use tracing_subscriber::fmt::time::FormatTime;
683
684        #[derive(Clone)]
685        struct SimElapsedTime;
686
687        impl FormatTime for SimElapsedTime {
688            fn format_time(
689                &self,
690                w: &mut tracing_subscriber::fmt::format::Writer<'_>,
691            ) -> std::fmt::Result {
692                tracing_subscriber::fmt::time().format_time(w)?;
693                if let Some(sim_elapsed) = turmoil::sim_elapsed() {
694                    write!(w, " [{:?}]", sim_elapsed)?;
695                }
696                Ok(())
697            }
698        }
699
700        static INIT_TRACING: Once = Once::new();
701        INIT_TRACING.call_once(|| {
702            let env_filter = tracing_subscriber::EnvFilter::builder()
703                .with_default_directive(LevelFilter::INFO.into())
704                .from_env_lossy();
705            let subscriber = tracing_subscriber::fmt()
706                .with_test_writer()
707                .with_env_filter(env_filter)
708                .with_timer(SimElapsedTime)
709                .finish();
710
711            tracing::subscriber::set_global_default(subscriber).unwrap();
712        });
713    }
714}