turmoil/
sim.rs

1use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
2use std::cell::RefCell;
3use std::future::Future;
4use std::net::IpAddr;
5use std::ops::DerefMut;
6use std::sync::Arc;
7use std::time::UNIX_EPOCH;
8
9use indexmap::IndexMap;
10use tokio::time::Duration;
11use tracing::Level;
12
13use crate::host::HostTimer;
14use crate::{
15    for_pairs, rt, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET,
16};
17
18/// A handle for interacting with the simulation.
19pub struct Sim<'a> {
20    /// Simulation configuration
21    config: Config,
22
23    /// Tracks the simulated world state
24    ///
25    /// This is what is stored in the thread-local
26    world: RefCell<World>,
27
28    /// Per simulated host runtimes
29    rts: IndexMap<IpAddr, Rt<'a>>,
30
31    /// Simulation duration since unix epoch. Set when the simulation is
32    /// created.
33    since_epoch: Duration,
34
35    /// Simulation elapsed time
36    elapsed: Duration,
37
38    steps: usize,
39}
40
41impl<'a> Sim<'a> {
42    pub(crate) fn new(config: Config, world: World) -> Self {
43        let since_epoch = config
44            .epoch
45            .duration_since(UNIX_EPOCH)
46            .expect("now must be >= UNIX_EPOCH");
47
48        Self {
49            config,
50            world: RefCell::new(world),
51            rts: IndexMap::new(),
52            since_epoch,
53            elapsed: Duration::ZERO,
54            steps: 1, // bumped after each step
55        }
56    }
57
58    /// How much logical time has elapsed since the simulation started.
59    pub fn elapsed(&self) -> Duration {
60        self.elapsed
61    }
62
63    /// The logical duration from [`UNIX_EPOCH`] until now.
64    ///
65    /// On creation the simulation picks a `SystemTime` and calculates the
66    /// duration since the epoch. Each `run()` invocation moves logical time
67    /// forward the configured tick duration.
68    pub fn since_epoch(&self) -> Duration {
69        self.since_epoch + self.elapsed
70    }
71
72    /// Register a client with the simulation.
73    pub fn client<F>(&mut self, addr: impl ToIpAddr, client: F)
74    where
75        F: Future<Output = Result> + 'static,
76    {
77        let addr = self.lookup(addr);
78        let nodename: Arc<str> = self
79            .world
80            .borrow_mut()
81            .dns
82            .reverse(addr)
83            .map(str::to_string)
84            .unwrap_or_else(|| addr.to_string())
85            .into();
86
87        {
88            let world = RefCell::get_mut(&mut self.world);
89
90            // Register host state with the world
91            world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
92        }
93
94        let seed = self.world.borrow_mut().rng.gen();
95        let rng = SmallRng::from_seed(seed);
96        let config = rt::Config {
97            enable_io: self.config.enable_tokio_io,
98            rng: Some(rng),
99        };
100
101        let rt = World::enter(&self.world, || Rt::client(nodename, client, config));
102
103        self.rts.insert(addr, rt);
104    }
105
106    /// Register a host with the simulation.
107    ///
108    /// This method takes a `Fn` that builds a future, as opposed to
109    /// [`Sim::client`] which just takes a future. The reason for this is we
110    /// might restart the host, and so need to be able to call the future
111    /// multiple times.
112    pub fn host<F, Fut>(&mut self, addr: impl ToIpAddr, host: F)
113    where
114        F: Fn() -> Fut + 'a,
115        Fut: Future<Output = Result> + 'static,
116    {
117        let addr = self.lookup(addr);
118        let nodename: Arc<str> = self
119            .world
120            .borrow_mut()
121            .dns
122            .reverse(addr)
123            .map(str::to_string)
124            .unwrap_or_else(|| addr.to_string())
125            .into();
126
127        {
128            let world = RefCell::get_mut(&mut self.world);
129
130            // Register host state with the world
131            world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
132        }
133
134        let seed = self.world.borrow_mut().rng.gen();
135        let rng = SmallRng::from_seed(seed);
136        let config = rt::Config {
137            enable_io: self.config.enable_tokio_io,
138            rng: Some(rng),
139        };
140
141        let rt = World::enter(&self.world, || Rt::host(nodename, host, config));
142
143        self.rts.insert(addr, rt);
144    }
145
146    /// Crashes the resolved hosts. Nothing will be running on the matched hosts
147    /// after this method. You can use [`Sim::bounce`] to start the hosts up
148    /// again.
149    pub fn crash(&mut self, addrs: impl ToIpAddrs) {
150        self.run_with_hosts(addrs, |addr, rt| {
151            rt.crash();
152
153            tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Crash");
154        });
155    }
156
157    /// Bounces the resolved hosts. The software is restarted.
158    pub fn bounce(&mut self, addrs: impl ToIpAddrs) {
159        self.run_with_hosts(addrs, |addr, rt| {
160            rt.bounce();
161
162            tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Bounce");
163        });
164    }
165
166    /// Run `f` with the resolved hosts at `addrs` set on the world.
167    fn run_with_hosts(&mut self, addrs: impl ToIpAddrs, mut f: impl FnMut(IpAddr, &mut Rt)) {
168        let hosts = self.world.borrow_mut().lookup_many(addrs);
169        for h in hosts {
170            let rt = self.rts.get_mut(&h).expect("missing host");
171
172            self.world.borrow_mut().current = Some(h);
173
174            World::enter(&self.world, || f(h, rt));
175        }
176
177        self.world.borrow_mut().current = None;
178    }
179
180    /// Check whether a host has software running.
181    pub fn is_host_running(&mut self, addr: impl ToIpAddr) -> bool {
182        let host = self.world.borrow_mut().lookup(addr);
183
184        self.rts
185            .get(&host)
186            .expect("missing host")
187            .is_software_running()
188    }
189
190    /// Lookup an IP address by host name.
191    pub fn lookup(&self, addr: impl ToIpAddr) -> IpAddr {
192        self.world.borrow_mut().lookup(addr)
193    }
194
195    /// Perform a reverse DNS lookup, returning the hostname if the entry
196    /// exists.
197    pub fn reverse_lookup(&self, addr: IpAddr) -> Option<String> {
198        self.world
199            .borrow()
200            .reverse_lookup(addr)
201            .map(|h| h.to_owned())
202    }
203
204    /// Hold messages between two hosts, or sets of hosts, until [`release`](crate::release) is
205    /// called.
206    pub fn hold(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
207        let mut world = self.world.borrow_mut();
208        world.hold_many(a, b);
209    }
210
211    /// Repair the connection between two hosts, or sets of hosts, resulting in
212    /// messages to be delivered.
213    pub fn repair(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
214        let mut world = self.world.borrow_mut();
215        world.repair_many(a, b);
216    }
217
218    /// Repair the connection between two hosts, or sets of hosts, removing
219    /// the effect of a previous [`Self::partition_oneway`].
220    ///
221    /// Combining this feature with [`Self::hold`] is presently not supported.
222    pub fn repair_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
223        let mut world = self.world.borrow_mut();
224        world.repair_oneway_many(from, to);
225    }
226
227    /// The opposite of [`hold`](crate::hold). All held messages are immediately delivered.
228    pub fn release(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
229        let mut world = self.world.borrow_mut();
230        world.release_many(a, b);
231    }
232
233    /// Partition two hosts, or sets of hosts, resulting in all messages sent
234    /// between them to be dropped.
235    pub fn partition(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
236        let mut world = self.world.borrow_mut();
237        world.partition_many(a, b);
238    }
239
240    /// Partition two hosts, or sets of hosts, such that messages can not be sent
241    /// from 'from' to 'to', while not affecting the ability for them to be delivered
242    /// in the other direction.
243    ///
244    /// Partitioning first from->to, then to->from, will stop all messages, similarly to
245    /// a single call to [`Self::partition`].
246    ///
247    /// Combining this feature with [`Self::hold`] is presently not supported.
248    pub fn partition_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
249        let mut world = self.world.borrow_mut();
250        world.partition_oneway_many(from, to);
251    }
252
253    /// Resolve host names for an [`IpAddr`] pair.
254    ///
255    /// Useful when interacting with network [links](#method.links).
256    pub fn reverse_lookup_pair(&self, pair: (IpAddr, IpAddr)) -> (String, String) {
257        let world = self.world.borrow();
258
259        (
260            world
261                .dns
262                .reverse(pair.0)
263                .expect("no hostname found for ip address")
264                .to_owned(),
265            world
266                .dns
267                .reverse(pair.1)
268                .expect("no hostname found for ip address")
269                .to_owned(),
270        )
271    }
272
273    /// Lookup IP addresses for resolved hosts.
274    pub fn lookup_many(&self, addr: impl ToIpAddrs) -> Vec<IpAddr> {
275        self.world.borrow_mut().lookup_many(addr)
276    }
277
278    /// Set the max message latency for all links.
279    pub fn set_max_message_latency(&self, value: Duration) {
280        self.world
281            .borrow_mut()
282            .topology
283            .set_max_message_latency(value);
284    }
285
286    /// Set the message latency for any links matching `a` and `b`.
287    ///
288    /// This sets the min and max to the same value eliminating any variance in
289    /// latency.
290    pub fn set_link_latency(&self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: Duration) {
291        let mut world = self.world.borrow_mut();
292        let a = world.lookup_many(a);
293        let b = world.lookup_many(b);
294
295        for_pairs(&a, &b, |a, b| {
296            world.topology.set_link_message_latency(a, b, value);
297        });
298    }
299
300    /// Set the max message latency for any links matching `a` and `b`.
301    pub fn set_link_max_message_latency(
302        &self,
303        a: impl ToIpAddrs,
304        b: impl ToIpAddrs,
305        value: Duration,
306    ) {
307        let mut world = self.world.borrow_mut();
308        let a = world.lookup_many(a);
309        let b = world.lookup_many(b);
310
311        for_pairs(&a, &b, |a, b| {
312            world.topology.set_link_max_message_latency(a, b, value);
313        });
314    }
315
316    /// Set the message latency distribution curve for all links.
317    ///
318    /// Message latency follows an exponential distribution curve. The `value`
319    /// is the lambda argument to the probability function.
320    pub fn set_message_latency_curve(&self, value: f64) {
321        self.world
322            .borrow_mut()
323            .topology
324            .set_message_latency_curve(value);
325    }
326
327    pub fn set_fail_rate(&mut self, value: f64) {
328        self.world.borrow_mut().topology.set_fail_rate(value);
329    }
330
331    pub fn set_link_fail_rate(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: f64) {
332        let mut world = self.world.borrow_mut();
333        let a = world.lookup_many(a);
334        let b = world.lookup_many(b);
335
336        for_pairs(&a, &b, |a, b| {
337            world.topology.set_link_fail_rate(a, b, value);
338        });
339    }
340
341    /// Access a [`LinksIter`] to introspect inflight messages between hosts.
342    pub fn links(&self, f: impl FnOnce(LinksIter)) {
343        let top = &mut self.world.borrow_mut().topology;
344
345        f(top.iter_mut())
346    }
347
348    /// Run the simulation until all client hosts have completed.
349    ///
350    /// Executes a simple event loop that calls [step](#method.step) each
351    /// iteration, returning early if any host software errors.
352    pub fn run(&mut self) -> Result {
353        // check if we have any clients
354        if !self
355            .rts
356            .iter()
357            .any(|(_, rt)| matches!(rt.kind, rt::Kind::Client))
358        {
359            tracing::info!(target: TRACING_TARGET, "No client hosts registered, exiting simulation");
360            return Ok(());
361        }
362
363        loop {
364            let is_finished = self.step()?;
365
366            if is_finished {
367                return Ok(());
368            }
369        }
370    }
371
372    /// Step the simulation.
373    ///
374    /// Runs each host in the simulation a fixed duration configured by
375    /// `tick_duration` in the builder.
376    ///
377    /// The simulated network also steps, processing in flight messages, and
378    /// delivering them to their destination if appropriate.
379    ///
380    /// Returns whether or not all clients have completed.
381    pub fn step(&mut self) -> Result<bool> {
382        tracing::trace!(target: TRACING_TARGET, "step {}", self.steps);
383
384        let tick = self.config.tick;
385        let mut is_finished = true;
386
387        // Tick the networking, processing messages. This is done before
388        // ticking any other runtime, as they might be waiting on network
389        // IO. (It also might be waiting on something else, such as time.)
390        self.world.borrow_mut().topology.tick_by(tick);
391
392        // Tick each host runtimes with running software. If the software
393        // completes, extract the result and return early if an error is
394        // encountered.
395
396        let (mut running, stopped): (Vec<_>, Vec<_>) = self
397            .rts
398            .iter_mut()
399            .partition(|(_, rt)| rt.is_software_running());
400        if self.config.random_node_order {
401            running.shuffle(&mut self.world.borrow_mut().rng);
402        }
403
404        for (&addr, rt) in running {
405            let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename,).entered();
406            {
407                let mut world = self.world.borrow_mut();
408                // We need to move deliverable messages off the network and
409                // into the dst host. This requires two mutable borrows.
410                let World {
411                    rng,
412                    topology,
413                    hosts,
414                    ..
415                } = world.deref_mut();
416                topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));
417
418                // Set the current host (see method docs)
419                world.current = Some(addr);
420
421                world.current_host_mut().timer.now(rt.now());
422            }
423
424            let is_software_finished = World::enter(&self.world, || rt.tick(tick))?;
425
426            if rt.is_client() {
427                is_finished = is_finished && is_software_finished;
428            }
429
430            // Unset the current host
431            let mut world = self.world.borrow_mut();
432            world.current = None;
433
434            world.tick(addr, tick);
435        }
436
437        // Tick the nodes that are not actively running (i.e., crashed) to ensure their clock keeps up
438        // with the rest of the simulation when they are restarted (bounced).
439        for (&addr, _rt) in stopped {
440            let mut world = self.world.borrow_mut();
441            world.tick(addr, tick);
442        }
443
444        self.elapsed += tick;
445        self.steps += 1;
446
447        if self.elapsed > self.config.duration && !is_finished {
448            return Err(format!(
449                "Ran for duration: {:?} steps: {} without completing",
450                self.config.duration, self.steps,
451            ))?;
452        }
453
454        Ok(is_finished)
455    }
456}
457
458#[cfg(test)]
459mod test {
460    use rand::Rng;
461    use std::future;
462    use std::{
463        net::{IpAddr, Ipv4Addr},
464        rc::Rc,
465        sync::{
466            atomic::{AtomicU64, Ordering},
467            Arc, Mutex,
468        },
469        time::Duration,
470    };
471
472    use tokio::{
473        io::{AsyncReadExt, AsyncWriteExt},
474        sync::Semaphore,
475        time::Instant,
476    };
477
478    use crate::net::UdpSocket;
479    use crate::{
480        elapsed, hold,
481        net::{TcpListener, TcpStream},
482        sim_elapsed, Builder, Result, Sim, World,
483    };
484
485    #[test]
486    fn client_error() {
487        let mut sim = Builder::new().build();
488
489        sim.client("doomed", async { Err("An Error")? });
490
491        assert!(sim.run().is_err());
492    }
493
494    #[test]
495    fn timeout() {
496        let mut sim = Builder::new()
497            .simulation_duration(Duration::from_millis(500))
498            .build();
499
500        sim.client("timeout", async {
501            tokio::time::sleep(Duration::from_secs(1)).await;
502
503            Ok(())
504        });
505
506        assert!(sim.run().is_err());
507    }
508
509    #[test]
510    fn multiple_clients_all_finish() -> Result {
511        let how_many = 3;
512        let tick_ms = 10;
513
514        // N = how_many runs, each with a different client finishing immediately
515        for run in 0..how_many {
516            let mut sim = Builder::new()
517                .tick_duration(Duration::from_millis(tick_ms))
518                .build();
519
520            let ct = Rc::new(Semaphore::new(how_many));
521
522            for client in 0..how_many {
523                let ct = ct.clone();
524
525                sim.client(format!("client-{client}"), async move {
526                    let ms = if run == client { 0 } else { 2 * tick_ms };
527                    tokio::time::sleep(Duration::from_millis(ms)).await;
528
529                    let p = ct.acquire().await?;
530                    p.forget();
531
532                    Ok(())
533                });
534            }
535
536            sim.run()?;
537            assert_eq!(0, ct.available_permits());
538        }
539
540        Ok(())
541    }
542
543    /// This is a regression test that ensures host software completes when the
544    /// host crashes. Before this fix we simply dropped the LocalSet, which did
545    /// not ensure resources owned by spawned tasks were dropped. Now we drop
546    /// and replace both the tokio Runtime and the LocalSet.
547    #[test]
548    fn crash_blocks_until_complete() -> Result {
549        let ct = Arc::new(());
550
551        let mut sim = Builder::new().build();
552
553        sim.host("host", || {
554            let ct = ct.clone();
555
556            async move {
557                tokio::spawn(async move {
558                    let _into_task = ct;
559                    future::pending::<()>().await;
560                });
561
562                future::pending().await
563            }
564        });
565
566        sim.run()?;
567        assert_eq!(2, Arc::strong_count(&ct));
568
569        sim.crash("host");
570        assert_eq!(1, Arc::strong_count(&ct));
571
572        Ok(())
573    }
574
575    #[test]
576    fn elapsed_time() -> Result {
577        let tick = Duration::from_millis(5);
578        let mut sim = Builder::new().tick_duration(tick).build();
579
580        let duration = Duration::from_millis(500);
581
582        sim.client("c1", async move {
583            tokio::time::sleep(duration).await;
584            assert_eq!(duration, elapsed());
585            // For hosts/clients started before the first `sim.run()` call, the
586            // `elapsed` and `sim_elapsed` time will be identical.
587            assert_eq!(duration, sim_elapsed().unwrap());
588
589            Ok(())
590        });
591
592        sim.client("c2", async move {
593            tokio::time::sleep(duration).await;
594            assert_eq!(duration, elapsed());
595            assert_eq!(duration, sim_elapsed().unwrap());
596
597            Ok(())
598        });
599
600        sim.run()?;
601
602        // sleep duration plus one tick to complete
603        assert_eq!(duration + tick, sim.elapsed());
604
605        let start = sim.elapsed();
606        sim.client("c3", async move {
607            assert_eq!(Duration::ZERO, elapsed());
608            // Note that sim_elapsed is total simulation time while elapsed is
609            // still zero for this newly created host.
610            assert_eq!(duration + tick, sim_elapsed().unwrap());
611            tokio::time::sleep(duration).await;
612            assert_eq!(duration, elapsed());
613            assert_eq!(duration + tick + duration, sim_elapsed().unwrap());
614
615            Ok(())
616        });
617
618        sim.run()?;
619
620        // Client "c3" takes one sleep duration plus one tick to complete
621        assert_eq!(duration + tick, sim.elapsed() - start);
622
623        Ok(())
624    }
625
626    /// This is a regression test to ensure it is safe to call sim_elapsed
627    /// if current world of host is not set.
628    #[test]
629    fn sim_elapsed_time() -> Result {
630        // Safe to call outside of simution while there
631        // is no current world set
632        assert!(sim_elapsed().is_none());
633
634        let sim = Builder::new().build();
635        // Safe to call while there is no current host set
636        World::enter(&sim.world, || assert!(sim_elapsed().is_none()));
637
638        Ok(())
639    }
640
641    #[test]
642    fn hold_release_peers() -> Result {
643        let global = Duration::from_millis(2);
644
645        let mut sim = Builder::new()
646            .min_message_latency(global)
647            .max_message_latency(global)
648            .build();
649
650        sim.host("server", || async {
651            let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
652
653            while let Ok((mut s, _)) = listener.accept().await {
654                assert!(s.write_u8(42).await.is_ok());
655            }
656
657            Ok(())
658        });
659
660        sim.client("client", async move {
661            let mut s = TcpStream::connect("server:1234").await?;
662
663            s.read_u8().await?;
664
665            Ok(())
666        });
667
668        sim.hold("server", "client");
669
670        // Verify that msg is not delivered.
671        sim.step()?;
672
673        sim.links(|l| {
674            assert!(l.count() == 1);
675        });
676
677        // Verify that msg is still not delivered.
678        sim.step()?;
679
680        sim.release("server", "client");
681
682        sim.run()?;
683
684        Ok(())
685    }
686
687    #[test]
688    fn partition_peers() -> Result {
689        let global = Duration::from_millis(2);
690
691        let mut sim = Builder::new()
692            .min_message_latency(global)
693            .max_message_latency(global)
694            .build();
695
696        sim.host("server", || async {
697            let _listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
698
699            Ok(())
700        });
701
702        sim.client("client", async move {
703            // Peers are partitioned. TCP setup should fail.
704            let _ = TcpStream::connect("server:1234").await.unwrap_err();
705
706            Ok(())
707        });
708
709        sim.partition("server", "client");
710
711        sim.run()?;
712
713        Ok(())
714    }
715
716    struct Expectation {
717        expect_a_receive: bool,
718        expect_b_receive: bool,
719    }
720
721    #[derive(Debug)]
722    enum Action {
723        Partition,
724        PartitionOnewayAB,
725        PartitionOnewayBA,
726        RepairOnewayAB,
727        RepairOnewayBA,
728        Repair,
729    }
730
731    fn run_with_partitioning(
732        host_a: &'static str,
733        host_b: &'static str,
734        mut partitioning: impl FnMut(&mut Sim) -> Expectation,
735    ) -> Result {
736        let global = Duration::from_millis(1);
737
738        let mut sim = Builder::new()
739            .min_message_latency(global)
740            .max_message_latency(global)
741            .build();
742
743        let a_did_receive = Arc::new(Mutex::new(None));
744        let b_did_receive = Arc::new(Mutex::new(None));
745
746        let make_a = |sim: &mut Sim| {
747            sim.client(host_a, {
748                let a_did_receive = Arc::clone(&a_did_receive);
749                async move {
750                    let udp_socket =
751                        UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
752                    udp_socket
753                        .send_to(&[42], format!("{host_b}:1234"))
754                        .await
755                        .expect("sending packet should appear to work, even if partitioned");
756
757                    *a_did_receive.lock().unwrap() = Some(matches!(
758                        tokio::time::timeout(
759                            Duration::from_secs(1),
760                            udp_socket.recv_from(&mut [0])
761                        )
762                        .await,
763                        Ok(Ok(_))
764                    ));
765
766                    Ok(())
767                }
768            })
769        };
770
771        let make_b = |sim: &mut Sim| {
772            sim.client(host_b, {
773                let b_did_receive = Arc::clone(&b_did_receive);
774                async move {
775                    let udp_socket =
776                        UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
777                    udp_socket
778                        .send_to(&[42], format!("{host_a}:1234"))
779                        .await
780                        .expect("sending packet should work");
781
782                    *b_did_receive.lock().unwrap() = Some(matches!(
783                        tokio::time::timeout(
784                            Duration::from_secs(1),
785                            udp_socket.recv_from(&mut [0])
786                        )
787                        .await,
788                        Ok(Ok(_))
789                    ));
790
791                    Ok(())
792                }
793            })
794        };
795
796        let construct_a_first = sim.world.borrow_mut().rng.gen_bool(0.5);
797        if construct_a_first {
798            make_a(&mut sim);
799            make_b(&mut sim);
800        } else {
801            make_b(&mut sim);
802            make_a(&mut sim);
803        }
804
805        let Expectation {
806            expect_a_receive,
807            expect_b_receive,
808        } = partitioning(&mut sim);
809        sim.run()?;
810
811        assert_eq!(*a_did_receive.lock().unwrap(), Some(expect_a_receive));
812        assert_eq!(*b_did_receive.lock().unwrap(), Some(expect_b_receive));
813
814        Ok(())
815    }
816
817    #[test]
818    fn partition_peers_oneway() -> Result {
819        run_with_partitioning("a", "b", |sim: &mut Sim| {
820            sim.partition_oneway("a", "b");
821            Expectation {
822                expect_a_receive: true,
823                expect_b_receive: false,
824            }
825        })
826    }
827
828    #[test]
829    fn partition_peers_oneway_many_cases() -> Result {
830        const HOST_A: &str = "a";
831        const HOST_B: &str = "b";
832
833        // Test all permutations of the above 6 actions.
834
835        fn run_with_actions(actions: &[Action]) -> Result {
836            run_with_partitioning(HOST_A, HOST_B, |sim: &mut Sim| {
837                let mut expect_a_receive = true;
838                let mut expect_b_receive = true;
839                for action in actions {
840                    match action {
841                        Action::Partition => {
842                            sim.partition(HOST_A, HOST_B);
843                            expect_a_receive = false;
844                            expect_b_receive = false;
845                        }
846                        Action::PartitionOnewayAB => {
847                            sim.partition_oneway(HOST_A, HOST_B);
848                            expect_b_receive = false;
849                        }
850                        Action::PartitionOnewayBA => {
851                            sim.partition_oneway(HOST_B, HOST_A);
852                            expect_a_receive = false;
853                        }
854                        Action::RepairOnewayAB => {
855                            sim.repair_oneway(HOST_A, HOST_B);
856                            expect_b_receive = true;
857                        }
858                        Action::RepairOnewayBA => {
859                            sim.repair_oneway(HOST_B, HOST_A);
860                            expect_a_receive = true;
861                        }
862                        Action::Repair => {
863                            sim.repair(HOST_A, HOST_B);
864                            expect_a_receive = true;
865                            expect_b_receive = true;
866                        }
867                    }
868                }
869                Expectation {
870                    expect_a_receive,
871                    expect_b_receive,
872                }
873            })?;
874            Ok(())
875        }
876
877        run_with_actions(&[Action::PartitionOnewayAB])?;
878        run_with_actions(&[Action::PartitionOnewayBA])?;
879        run_with_actions(&[Action::Partition, Action::RepairOnewayAB])?;
880        run_with_actions(&[Action::Partition, Action::RepairOnewayBA])?;
881        run_with_actions(&[Action::PartitionOnewayAB, Action::Repair])?;
882        run_with_actions(&[Action::PartitionOnewayBA, Action::Repair])?;
883        run_with_actions(&[Action::PartitionOnewayBA, Action::RepairOnewayAB])?;
884        run_with_actions(&[Action::PartitionOnewayAB, Action::PartitionOnewayBA])?;
885        run_with_actions(&[
886            Action::Partition,
887            Action::RepairOnewayAB,
888            Action::RepairOnewayBA,
889        ])?;
890
891        Ok(())
892    }
893
894    #[test]
895    fn elapsed_time_across_restarts() -> Result {
896        let tick_ms = 5;
897        let mut sim = Builder::new()
898            .tick_duration(Duration::from_millis(tick_ms))
899            .build();
900
901        let clock = Arc::new(AtomicU64::new(0));
902        let actual = clock.clone();
903
904        sim.host("host", move || {
905            let clock = clock.clone();
906
907            async move {
908                loop {
909                    tokio::time::sleep(Duration::from_millis(1)).await;
910                    clock.store(elapsed().as_millis() as u64, Ordering::SeqCst);
911                }
912            }
913        });
914
915        sim.step()?;
916        assert_eq!(tick_ms - 1, actual.load(Ordering::SeqCst));
917
918        sim.bounce("host");
919        sim.step()?;
920        assert_eq!((tick_ms * 2) - 1, actual.load(Ordering::SeqCst));
921
922        Ok(())
923    }
924
925    #[test]
926    fn elapsed_time_across_crashes() -> Result {
927        let tick_ms = 5;
928        let mut sim = Builder::new()
929            .tick_duration(Duration::from_millis(tick_ms))
930            .build();
931
932        let clock_1 = Arc::new(AtomicU64::new(0));
933        let clock_1_moved = clock_1.clone();
934
935        sim.host("host1", move || {
936            let clock = clock_1_moved.clone();
937            async move {
938                loop {
939                    tokio::time::sleep(Duration::from_millis(1)).await;
940                    clock.store(sim_elapsed().unwrap().as_millis() as u64, Ordering::SeqCst);
941                }
942            }
943        });
944
945        // Crashing host 1
946        sim.crash("host1");
947        sim.step()?;
948        // After bouncing host 1, host's clock must be synced.
949        sim.bounce("host1");
950        sim.step()?;
951        assert_eq!(
952            2 * tick_ms - 1,
953            clock_1.load(Ordering::SeqCst),
954            "Host 1 should have caught up"
955        );
956
957        Ok(())
958    }
959
960    #[test]
961    fn host_finishes_with_error() {
962        let mut sim = Builder::new().build();
963
964        sim.host("host", || async {
965            Err("Host software finished unexpectedly")?
966        });
967
968        assert!(sim.step().is_err());
969    }
970
971    #[test]
972    fn manual_message_delivery() -> Result {
973        let mut sim = Builder::new().build();
974
975        sim.host("a", || async {
976            let l = TcpListener::bind("0.0.0.0:1234").await?;
977
978            _ = l.accept().await?;
979
980            Ok(())
981        });
982
983        sim.client("b", async {
984            hold("a", "b");
985
986            _ = TcpStream::connect("a:1234").await?;
987
988            Ok(())
989        });
990
991        assert!(!sim.step()?);
992
993        sim.links(|mut l| {
994            let a_to_b = l.next().unwrap();
995            a_to_b.deliver_all();
996        });
997
998        assert!(sim.step()?);
999
1000        Ok(())
1001    }
1002
1003    /// This is a regression test that ensures JoinError::Cancelled is not
1004    /// propagated to the test when the host crashes, which was causing
1005    /// incorrect test failure.
1006    #[test]
1007    fn run_after_host_crashes() -> Result {
1008        let mut sim = Builder::new().build();
1009
1010        sim.host("h", || async { future::pending().await });
1011
1012        sim.crash("h");
1013
1014        sim.run()
1015    }
1016
1017    #[test]
1018    fn restart_host_after_crash() -> Result {
1019        let mut sim = Builder::new().build();
1020
1021        let data = Arc::new(AtomicU64::new(0));
1022        let data_cloned = data.clone();
1023
1024        sim.host("h", move || {
1025            let data_cloned = data_cloned.clone();
1026            async move {
1027                data_cloned.store(data_cloned.load(Ordering::SeqCst) + 1, Ordering::SeqCst);
1028                Ok(())
1029            }
1030        });
1031
1032        // crash and step to execute the err handling logic
1033        sim.crash("h");
1034        sim.step()?;
1035
1036        // restart and step to ensure the host software runs
1037        sim.bounce("h");
1038        sim.step()?;
1039        // check that software actually runs
1040        assert_eq!(1, data.load(Ordering::SeqCst));
1041
1042        Ok(())
1043    }
1044
1045    #[test]
1046    fn override_link_latency() -> Result {
1047        let global = Duration::from_millis(2);
1048
1049        let mut sim = Builder::new()
1050            .min_message_latency(global)
1051            .max_message_latency(global)
1052            .build();
1053
1054        sim.host("server", || async {
1055            let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
1056
1057            while let Ok((mut s, _)) = listener.accept().await {
1058                assert!(s.write_u8(9).await.is_ok());
1059            }
1060
1061            Ok(())
1062        });
1063
1064        sim.client("client", async move {
1065            let mut s = TcpStream::connect("server:1234").await?;
1066
1067            let start = Instant::now();
1068            s.read_u8().await?;
1069            assert_eq!(global, start.elapsed());
1070
1071            Ok(())
1072        });
1073
1074        sim.run()?;
1075
1076        let degraded = Duration::from_millis(10);
1077
1078        sim.client("client2", async move {
1079            let mut s = TcpStream::connect("server:1234").await?;
1080
1081            let start = Instant::now();
1082            s.read_u8().await?;
1083            assert_eq!(degraded, start.elapsed());
1084
1085            Ok(())
1086        });
1087
1088        sim.set_link_latency("client2", "server", degraded);
1089
1090        sim.run()
1091    }
1092
1093    #[test]
1094    fn is_host_running() -> Result {
1095        let mut sim = Builder::new().build();
1096
1097        sim.client("client", async { future::pending().await });
1098        sim.host("host", || async { future::pending().await });
1099
1100        assert!(!sim.step()?);
1101
1102        assert!(sim.is_host_running("client"));
1103        assert!(sim.is_host_running("host"));
1104
1105        sim.crash("host");
1106        assert!(!sim.is_host_running("host"));
1107
1108        Ok(())
1109    }
1110
1111    #[test]
1112    #[cfg(feature = "regex")]
1113    fn host_scan() -> Result {
1114        let mut sim = Builder::new().build();
1115
1116        let how_many = 3;
1117        for i in 0..how_many {
1118            sim.host(format!("host-{i}"), || async { future::pending().await })
1119        }
1120
1121        let mut ips = sim.lookup_many(regex::Regex::new(".*")?);
1122        ips.sort();
1123
1124        assert_eq!(how_many, ips.len());
1125
1126        for (i, ip) in ips.iter().enumerate() {
1127            assert_eq!(
1128                format!("host-{i}"),
1129                sim.reverse_lookup(*ip).ok_or("Unable to resolve ip")?
1130            );
1131        }
1132
1133        Ok(())
1134    }
1135
1136    #[test]
1137    #[cfg(feature = "regex")]
1138    fn bounce_multiple_hosts_with_regex() -> Result {
1139        let mut sim = Builder::new().build();
1140
1141        let count = Arc::new(AtomicU64::new(0));
1142        for i in 1..=3 {
1143            let count = count.clone();
1144            sim.host(format!("host-{}", i), move || {
1145                let count = count.clone();
1146                async move {
1147                    count.fetch_add(1, Ordering::SeqCst);
1148                    future::pending().await
1149                }
1150            });
1151        }
1152
1153        sim.step()?;
1154        assert_eq!(count.load(Ordering::SeqCst), 3);
1155        sim.bounce(regex::Regex::new("host-[12]")?);
1156        sim.step()?;
1157        assert_eq!(count.load(Ordering::SeqCst), 5);
1158
1159        Ok(())
1160    }
1161
1162    #[test]
1163    #[cfg(feature = "regex")]
1164    fn hold_all() -> Result {
1165        let mut sim = Builder::new().build();
1166
1167        sim.host("host", || async {
1168            let l = TcpListener::bind("0.0.0.0:1234").await?;
1169
1170            loop {
1171                _ = l.accept().await?;
1172            }
1173        });
1174
1175        sim.client("test", async {
1176            hold(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1177
1178            assert!(tokio::time::timeout(
1179                Duration::from_millis(100),
1180                TcpStream::connect("host:1234")
1181            )
1182            .await
1183            .is_err());
1184
1185            crate::release(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1186
1187            assert!(TcpStream::connect("host:1234").await.is_ok());
1188
1189            Ok(())
1190        });
1191
1192        sim.run()
1193    }
1194}