Skip to main content

turmoil/
sim.rs

1use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
2use std::cell::RefCell;
3use std::future::Future;
4use std::net::IpAddr;
5use std::ops::DerefMut;
6use std::sync::Arc;
7use std::time::UNIX_EPOCH;
8
9use indexmap::IndexMap;
10use tokio::time::Duration;
11use tracing::Level;
12
13use crate::host::HostTimer;
14use crate::{
15    for_pairs, rt, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET,
16};
17
18/// A handle for interacting with the simulation.
19pub struct Sim<'a> {
20    /// Simulation configuration
21    config: Config,
22
23    /// Tracks the simulated world state
24    ///
25    /// This is what is stored in the thread-local
26    world: RefCell<World>,
27
28    /// Per simulated host runtimes
29    rts: IndexMap<IpAddr, Rt<'a>>,
30
31    /// Simulation duration since unix epoch. Set when the simulation is
32    /// created.
33    since_epoch: Duration,
34
35    /// Simulation elapsed time
36    elapsed: Duration,
37
38    steps: usize,
39}
40
41impl<'a> Sim<'a> {
42    pub(crate) fn new(config: Config, world: World) -> Self {
43        let since_epoch = config
44            .epoch
45            .duration_since(UNIX_EPOCH)
46            .expect("now must be >= UNIX_EPOCH");
47
48        Self {
49            config,
50            world: RefCell::new(world),
51            rts: IndexMap::new(),
52            since_epoch,
53            elapsed: Duration::ZERO,
54            steps: 1, // bumped after each step
55        }
56    }
57
58    /// How much logical time has elapsed since the simulation started.
59    pub fn elapsed(&self) -> Duration {
60        self.elapsed
61    }
62
63    /// The logical duration from [`UNIX_EPOCH`] until now.
64    ///
65    /// On creation the simulation picks a `SystemTime` and calculates the
66    /// duration since the epoch. Each `run()` invocation moves logical time
67    /// forward the configured tick duration.
68    pub fn since_epoch(&self) -> Duration {
69        self.since_epoch + self.elapsed
70    }
71
72    /// Register a client with the simulation.
73    pub fn client<F>(&mut self, addr: impl ToIpAddr, client: F)
74    where
75        F: Future<Output = Result> + 'static,
76    {
77        let addr = self.lookup(addr);
78        let nodename: Arc<str> = self
79            .world
80            .borrow_mut()
81            .dns
82            .reverse(addr)
83            .map(str::to_string)
84            .unwrap_or_else(|| addr.to_string())
85            .into();
86
87        {
88            let world = RefCell::get_mut(&mut self.world);
89
90            // Register host state with the world
91            world.register(
92                addr,
93                &nodename,
94                HostTimer::new(self.elapsed, self.since_epoch),
95                &self.config,
96            );
97        }
98
99        let seed = self.world.borrow_mut().rng.random();
100        let rng = SmallRng::from_seed(seed);
101        let config = rt::Config {
102            enable_io: self.config.enable_tokio_io,
103            rng: Some(rng),
104        };
105
106        let rt = World::enter(&self.world, || Rt::client(nodename, client, config));
107
108        self.rts.insert(addr, rt);
109    }
110
111    /// Register a host with the simulation.
112    ///
113    /// This method takes a `Fn` that builds a future, as opposed to
114    /// [`Sim::client`] which just takes a future. The reason for this is we
115    /// might restart the host, and so need to be able to call the future
116    /// multiple times.
117    pub fn host<F, Fut>(&mut self, addr: impl ToIpAddr, host: F)
118    where
119        F: Fn() -> Fut + 'a,
120        Fut: Future<Output = Result> + 'static,
121    {
122        let addr = self.lookup(addr);
123        let nodename: Arc<str> = self
124            .world
125            .borrow_mut()
126            .dns
127            .reverse(addr)
128            .map(str::to_string)
129            .unwrap_or_else(|| addr.to_string())
130            .into();
131
132        {
133            let world = RefCell::get_mut(&mut self.world);
134
135            // Register host state with the world
136            world.register(
137                addr,
138                &nodename,
139                HostTimer::new(self.elapsed, self.since_epoch),
140                &self.config,
141            );
142        }
143
144        let seed = self.world.borrow_mut().rng.random();
145        let rng = SmallRng::from_seed(seed);
146        let config = rt::Config {
147            enable_io: self.config.enable_tokio_io,
148            rng: Some(rng),
149        };
150
151        let rt = World::enter(&self.world, || Rt::host(nodename, host, config));
152
153        self.rts.insert(addr, rt);
154    }
155
156    /// Crashes the resolved hosts. Nothing will be running on the matched hosts
157    /// after this method. You can use [`Sim::bounce`] to start the hosts up
158    /// again.
159    ///
160    /// For filesystem simulation (requires `unstable-fs` feature), this
161    /// discards any pending (unsynced) writes. Data that was synced via
162    /// `sync_all()` survives the crash.
163    pub fn crash(&mut self, addrs: impl ToIpAddrs) {
164        let hosts = self.world.borrow_mut().lookup_many(addrs);
165        for h in hosts {
166            let rt = self.rts.get_mut(&h).expect("missing host");
167
168            self.world.borrow_mut().current = Some(h);
169
170            World::enter(&self.world, || {
171                rt.crash();
172
173                // Discard pending filesystem operations
174                #[cfg(feature = "unstable-fs")]
175                World::current(|world| {
176                    let addr = world.current.expect("current host missing");
177                    let World { hosts, rng, .. } = world;
178                    let host = hosts.get_mut(&addr).unwrap();
179                    host.fs.lock().unwrap().discard_pending(&mut **rng);
180                });
181
182                tracing::trace!(target: TRACING_TARGET, addr = ?h, "Crash");
183            });
184        }
185
186        self.world.borrow_mut().current = None;
187    }
188
189    /// Bounces the resolved hosts. The software is restarted.
190    pub fn bounce(&mut self, addrs: impl ToIpAddrs) {
191        self.run_with_hosts(addrs, |addr, rt| {
192            rt.bounce();
193
194            tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Bounce");
195        });
196    }
197
198    /// Run `f` with the resolved hosts at `addrs` set on the world.
199    fn run_with_hosts(&mut self, addrs: impl ToIpAddrs, mut f: impl FnMut(IpAddr, &mut Rt)) {
200        let hosts = self.world.borrow_mut().lookup_many(addrs);
201        for h in hosts {
202            let rt = self.rts.get_mut(&h).expect("missing host");
203
204            self.world.borrow_mut().current = Some(h);
205
206            World::enter(&self.world, || f(h, rt));
207        }
208
209        self.world.borrow_mut().current = None;
210    }
211
212    /// Check whether a host has software running.
213    pub fn is_host_running(&mut self, addr: impl ToIpAddr) -> bool {
214        let host = self.world.borrow_mut().lookup(addr);
215
216        self.rts
217            .get(&host)
218            .expect("missing host")
219            .is_software_running()
220    }
221
222    /// Lookup an IP address by host name.
223    pub fn lookup(&self, addr: impl ToIpAddr) -> IpAddr {
224        self.world.borrow_mut().lookup(addr)
225    }
226
227    /// Perform a reverse DNS lookup, returning the hostname if the entry
228    /// exists.
229    pub fn reverse_lookup(&self, addr: IpAddr) -> Option<String> {
230        self.world
231            .borrow()
232            .reverse_lookup(addr)
233            .map(|h| h.to_owned())
234    }
235
236    /// Hold messages between two hosts, or sets of hosts, until [`release`](crate::release) is
237    /// called.
238    pub fn hold(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
239        let mut world = self.world.borrow_mut();
240        world.hold_many(a, b);
241    }
242
243    /// Repair the connection between two hosts, or sets of hosts, resulting in
244    /// messages to be delivered.
245    pub fn repair(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
246        let mut world = self.world.borrow_mut();
247        world.repair_many(a, b);
248    }
249
250    /// Repair the connection between two hosts, or sets of hosts, removing
251    /// the effect of a previous [`Self::partition_oneway`].
252    ///
253    /// Combining this feature with [`Self::hold`] is presently not supported.
254    pub fn repair_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
255        let mut world = self.world.borrow_mut();
256        world.repair_oneway_many(from, to);
257    }
258
259    /// The opposite of [`hold`](crate::hold). All held messages are immediately delivered.
260    pub fn release(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
261        let mut world = self.world.borrow_mut();
262        world.release_many(a, b);
263    }
264
265    /// Partition two hosts, or sets of hosts, resulting in all messages sent
266    /// between them to be dropped.
267    pub fn partition(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
268        let mut world = self.world.borrow_mut();
269        world.partition_many(a, b);
270    }
271
272    /// Partition two hosts, or sets of hosts, such that messages can not be sent
273    /// from 'from' to 'to', while not affecting the ability for them to be delivered
274    /// in the other direction.
275    ///
276    /// Partitioning first from->to, then to->from, will stop all messages, similarly to
277    /// a single call to [`Self::partition`].
278    ///
279    /// Combining this feature with [`Self::hold`] is presently not supported.
280    pub fn partition_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
281        let mut world = self.world.borrow_mut();
282        world.partition_oneway_many(from, to);
283    }
284
285    /// Resolve host names for an [`IpAddr`] pair.
286    ///
287    /// Useful when interacting with network [links](#method.links).
288    pub fn reverse_lookup_pair(&self, pair: (IpAddr, IpAddr)) -> (String, String) {
289        let world = self.world.borrow();
290
291        (
292            world
293                .dns
294                .reverse(pair.0)
295                .expect("no hostname found for ip address")
296                .to_owned(),
297            world
298                .dns
299                .reverse(pair.1)
300                .expect("no hostname found for ip address")
301                .to_owned(),
302        )
303    }
304
305    /// Lookup IP addresses for resolved hosts.
306    pub fn lookup_many(&self, addr: impl ToIpAddrs) -> Vec<IpAddr> {
307        self.world.borrow_mut().lookup_many(addr)
308    }
309
310    /// Set the max message latency for all links.
311    pub fn set_max_message_latency(&self, value: Duration) {
312        self.world
313            .borrow_mut()
314            .topology
315            .set_max_message_latency(value);
316    }
317
318    /// Set the message latency for any links matching `a` and `b`.
319    ///
320    /// This sets the min and max to the same value eliminating any variance in
321    /// latency.
322    pub fn set_link_latency(&self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: Duration) {
323        let mut world = self.world.borrow_mut();
324        let a = world.lookup_many(a);
325        let b = world.lookup_many(b);
326
327        for_pairs(&a, &b, |a, b| {
328            world.topology.set_link_message_latency(a, b, value);
329        });
330    }
331
332    /// Set the max message latency for any links matching `a` and `b`.
333    pub fn set_link_max_message_latency(
334        &self,
335        a: impl ToIpAddrs,
336        b: impl ToIpAddrs,
337        value: Duration,
338    ) {
339        let mut world = self.world.borrow_mut();
340        let a = world.lookup_many(a);
341        let b = world.lookup_many(b);
342
343        for_pairs(&a, &b, |a, b| {
344            world.topology.set_link_max_message_latency(a, b, value);
345        });
346    }
347
348    /// Set the message latency distribution curve for all links.
349    ///
350    /// Message latency follows an exponential distribution curve. The `value`
351    /// is the lambda argument to the probability function.
352    pub fn set_message_latency_curve(&self, value: f64) {
353        self.world
354            .borrow_mut()
355            .topology
356            .set_message_latency_curve(value);
357    }
358
359    pub fn set_fail_rate(&mut self, value: f64) {
360        self.world.borrow_mut().topology.set_fail_rate(value);
361    }
362
363    pub fn set_link_fail_rate(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: f64) {
364        let mut world = self.world.borrow_mut();
365        let a = world.lookup_many(a);
366        let b = world.lookup_many(b);
367
368        for_pairs(&a, &b, |a, b| {
369            world.topology.set_link_fail_rate(a, b, value);
370        });
371    }
372
373    /// Access a [`LinksIter`] to introspect inflight messages between hosts.
374    pub fn links(&self, f: impl FnOnce(LinksIter)) {
375        let top = &mut self.world.borrow_mut().topology;
376
377        f(top.iter_mut())
378    }
379
380    /// Run the simulation until all client hosts have completed.
381    ///
382    /// Executes a simple event loop that calls [step](#method.step) each
383    /// iteration, returning early if any host software errors.
384    pub fn run(&mut self) -> Result {
385        // check if we have any clients
386        if !self
387            .rts
388            .iter()
389            .any(|(_, rt)| matches!(rt.kind, rt::Kind::Client))
390        {
391            tracing::info!(target: TRACING_TARGET, "No client hosts registered, exiting simulation");
392            return Ok(());
393        }
394
395        loop {
396            let is_finished = self.step()?;
397
398            if is_finished {
399                return Ok(());
400            }
401        }
402    }
403
404    /// Step the simulation.
405    ///
406    /// Runs each host in the simulation a fixed duration configured by
407    /// `tick_duration` in the builder.
408    ///
409    /// The simulated network also steps, processing in flight messages, and
410    /// delivering them to their destination if appropriate.
411    ///
412    /// Returns whether or not all clients have completed.
413    pub fn step(&mut self) -> Result<bool> {
414        tracing::trace!(target: TRACING_TARGET, "step {}", self.steps);
415
416        let tick = self.config.tick;
417        let mut is_finished = true;
418
419        // Tick the networking, processing messages. This is done before
420        // ticking any other runtime, as they might be waiting on network
421        // IO. (It also might be waiting on something else, such as time.)
422        self.world.borrow_mut().topology.tick_by(tick);
423
424        // Tick each host runtimes with running software. If the software
425        // completes, extract the result and return early if an error is
426        // encountered.
427
428        let (mut running, stopped): (Vec<_>, Vec<_>) = self
429            .rts
430            .iter_mut()
431            .partition(|(_, rt)| rt.is_software_running());
432        if self.config.random_node_order {
433            running.shuffle(&mut self.world.borrow_mut().rng);
434        }
435
436        for (&addr, rt) in running {
437            let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename,).entered();
438            {
439                let mut world = self.world.borrow_mut();
440                // We need to move deliverable messages off the network and
441                // into the dst host. This requires two mutable borrows.
442                let World {
443                    rng,
444                    topology,
445                    hosts,
446                    ..
447                } = world.deref_mut();
448                topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));
449
450                // Set the current host (see method docs)
451                world.current = Some(addr);
452
453                world.current_host_mut().timer.now(rt.now());
454            }
455
456            let is_software_finished = World::enter(&self.world, || rt.tick(tick))?;
457
458            if rt.is_client() {
459                is_finished = is_finished && is_software_finished;
460            }
461
462            // Unset the current host
463            let mut world = self.world.borrow_mut();
464            world.current = None;
465
466            world.tick(addr, tick);
467        }
468
469        // Tick the nodes that are not actively running (i.e., crashed) to ensure their clock keeps up
470        // with the rest of the simulation when they are restarted (bounced).
471        for (&addr, _rt) in stopped {
472            let mut world = self.world.borrow_mut();
473            world.tick(addr, tick);
474        }
475
476        self.elapsed += tick;
477        self.steps += 1;
478
479        if self.elapsed > self.config.duration && !is_finished {
480            return Err(format!(
481                "Ran for duration: {:?} steps: {} without completing",
482                self.config.duration, self.steps,
483            ))?;
484        }
485
486        Ok(is_finished)
487    }
488}
489
490#[cfg(test)]
491mod test {
492    use rand::Rng;
493    use std::future;
494    use std::{
495        net::{IpAddr, Ipv4Addr},
496        rc::Rc,
497        sync::{
498            atomic::{AtomicU64, Ordering},
499            Arc, Mutex,
500        },
501        time::Duration,
502    };
503
504    use tokio::{
505        io::{AsyncReadExt, AsyncWriteExt},
506        sync::Semaphore,
507        time::Instant,
508    };
509
510    use crate::net::UdpSocket;
511    use crate::{
512        elapsed, hold,
513        net::{TcpListener, TcpStream},
514        sim_elapsed, since_epoch, Builder, Result, Sim, World,
515    };
516
517    #[test]
518    fn client_error() {
519        let mut sim = Builder::new().build();
520
521        sim.client("doomed", async { Err("An Error")? });
522
523        assert!(sim.run().is_err());
524    }
525
526    #[test]
527    fn timeout() {
528        let mut sim = Builder::new()
529            .simulation_duration(Duration::from_millis(500))
530            .build();
531
532        sim.client("timeout", async {
533            tokio::time::sleep(Duration::from_secs(1)).await;
534
535            Ok(())
536        });
537
538        assert!(sim.run().is_err());
539    }
540
541    #[test]
542    fn multiple_clients_all_finish() -> Result {
543        let how_many = 3;
544        let tick_ms = 10;
545
546        // N = how_many runs, each with a different client finishing immediately
547        for run in 0..how_many {
548            let mut sim = Builder::new()
549                .tick_duration(Duration::from_millis(tick_ms))
550                .build();
551
552            let ct = Rc::new(Semaphore::new(how_many));
553
554            for client in 0..how_many {
555                let ct = ct.clone();
556
557                sim.client(format!("client-{client}"), async move {
558                    let ms = if run == client { 0 } else { 2 * tick_ms };
559                    tokio::time::sleep(Duration::from_millis(ms)).await;
560
561                    let p = ct.acquire().await?;
562                    p.forget();
563
564                    Ok(())
565                });
566            }
567
568            sim.run()?;
569            assert_eq!(0, ct.available_permits());
570        }
571
572        Ok(())
573    }
574
575    /// This is a regression test that ensures host software completes when the
576    /// host crashes. Before this fix we simply dropped the LocalSet, which did
577    /// not ensure resources owned by spawned tasks were dropped. Now we drop
578    /// and replace both the tokio Runtime and the LocalSet.
579    #[test]
580    fn crash_blocks_until_complete() -> Result {
581        let ct = Arc::new(());
582
583        let mut sim = Builder::new().build();
584
585        sim.host("host", || {
586            let ct = ct.clone();
587
588            async move {
589                tokio::spawn(async move {
590                    let _into_task = ct;
591                    future::pending::<()>().await;
592                });
593
594                future::pending().await
595            }
596        });
597
598        sim.run()?;
599        assert_eq!(2, Arc::strong_count(&ct));
600
601        sim.crash("host");
602        assert_eq!(1, Arc::strong_count(&ct));
603
604        Ok(())
605    }
606
607    #[test]
608    fn elapsed_time() -> Result {
609        let tick = Duration::from_millis(5);
610        let mut sim = Builder::new().tick_duration(tick).build();
611        let start_epoch = sim.since_epoch();
612
613        let duration = Duration::from_millis(500);
614
615        sim.client("c1", async move {
616            tokio::time::sleep(duration).await;
617            assert_eq!(duration, elapsed());
618            // For hosts/clients started before the first `sim.run()` call, the
619            // `elapsed` and `sim_elapsed` time will be identical.
620            assert_eq!(duration, sim_elapsed().unwrap());
621
622            // Logical epoch time should move forward at the same rate the sim
623            assert_eq!(start_epoch + duration, since_epoch().unwrap());
624
625            Ok(())
626        });
627
628        sim.client("c2", async move {
629            tokio::time::sleep(duration).await;
630            assert_eq!(duration, elapsed());
631            assert_eq!(duration, sim_elapsed().unwrap());
632            assert_eq!(start_epoch + duration, since_epoch().unwrap());
633
634            Ok(())
635        });
636
637        sim.run()?;
638
639        // sleep duration plus one tick to complete
640        assert_eq!(duration + tick, sim.elapsed());
641
642        let start = sim.elapsed();
643        sim.client("c3", async move {
644            assert_eq!(Duration::ZERO, elapsed());
645            // Note that sim_elapsed is total simulation time while elapsed is
646            // still zero for this newly created host.
647            assert_eq!(duration + tick, sim_elapsed().unwrap());
648            assert_eq!(start_epoch + duration + tick, since_epoch().unwrap());
649            tokio::time::sleep(duration).await;
650            assert_eq!(duration, elapsed());
651            assert_eq!(duration + tick + duration, sim_elapsed().unwrap());
652            assert_eq!(
653                start_epoch + duration + tick + duration,
654                since_epoch().unwrap()
655            );
656
657            Ok(())
658        });
659
660        sim.run()?;
661
662        // Client "c3" takes one sleep duration plus one tick to complete
663        assert_eq!(duration + tick, sim.elapsed() - start);
664
665        Ok(())
666    }
667
668    /// This is a regression test to ensure it is safe to call sim_elapsed
669    /// if current world of host is not set.
670    #[test]
671    fn sim_elapsed_time() -> Result {
672        // Safe to call outside of simution while there
673        // is no current world set
674        assert!(sim_elapsed().is_none());
675
676        let sim = Builder::new().build();
677        // Safe to call while there is no current host set
678        World::enter(&sim.world, || assert!(sim_elapsed().is_none()));
679
680        Ok(())
681    }
682
683    /// This is a regression test to ensure it is safe to call since_epoch
684    /// if current world of host is not set.
685    #[test]
686    fn since_epoch_regression() -> Result {
687        // Safe to call outside of simution while there
688        // is no current world set
689        assert!(since_epoch().is_none());
690
691        let sim = Builder::new().build();
692        // Safe to call while there is no current host set
693        World::enter(&sim.world, || assert!(since_epoch().is_none()));
694
695        Ok(())
696    }
697
698    #[test]
699    fn hold_release_peers() -> Result {
700        let global = Duration::from_millis(2);
701
702        let mut sim = Builder::new()
703            .min_message_latency(global)
704            .max_message_latency(global)
705            .build();
706
707        sim.host("server", || async {
708            let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
709
710            while let Ok((mut s, _)) = listener.accept().await {
711                assert!(s.write_u8(42).await.is_ok());
712            }
713
714            Ok(())
715        });
716
717        sim.client("client", async move {
718            let mut s = TcpStream::connect("server:1234").await?;
719
720            s.read_u8().await?;
721
722            Ok(())
723        });
724
725        sim.hold("server", "client");
726
727        // Verify that msg is not delivered.
728        sim.step()?;
729
730        sim.links(|l| {
731            assert!(l.count() == 1);
732        });
733
734        // Verify that msg is still not delivered.
735        sim.step()?;
736
737        sim.release("server", "client");
738
739        sim.run()?;
740
741        Ok(())
742    }
743
744    #[test]
745    fn partition_peers() -> Result {
746        let global = Duration::from_millis(2);
747
748        let mut sim = Builder::new()
749            .min_message_latency(global)
750            .max_message_latency(global)
751            .build();
752
753        sim.host("server", || async {
754            let _listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
755
756            Ok(())
757        });
758
759        sim.client("client", async move {
760            // Peers are partitioned. TCP setup should fail.
761            let _ = TcpStream::connect("server:1234").await.unwrap_err();
762
763            Ok(())
764        });
765
766        sim.partition("server", "client");
767
768        sim.run()?;
769
770        Ok(())
771    }
772
773    struct Expectation {
774        expect_a_receive: bool,
775        expect_b_receive: bool,
776    }
777
778    #[derive(Debug)]
779    enum Action {
780        Partition,
781        PartitionOnewayAB,
782        PartitionOnewayBA,
783        RepairOnewayAB,
784        RepairOnewayBA,
785        Repair,
786    }
787
788    fn run_with_partitioning(
789        host_a: &'static str,
790        host_b: &'static str,
791        mut partitioning: impl FnMut(&mut Sim) -> Expectation,
792    ) -> Result {
793        let global = Duration::from_millis(1);
794
795        let mut sim = Builder::new()
796            .min_message_latency(global)
797            .max_message_latency(global)
798            .build();
799
800        let a_did_receive = Arc::new(Mutex::new(None));
801        let b_did_receive = Arc::new(Mutex::new(None));
802
803        let make_a = |sim: &mut Sim| {
804            sim.client(host_a, {
805                let a_did_receive = Arc::clone(&a_did_receive);
806                async move {
807                    let udp_socket =
808                        UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
809                    udp_socket
810                        .send_to(&[42], format!("{host_b}:1234"))
811                        .await
812                        .expect("sending packet should appear to work, even if partitioned");
813
814                    *a_did_receive.lock().unwrap() = Some(matches!(
815                        tokio::time::timeout(
816                            Duration::from_secs(1),
817                            udp_socket.recv_from(&mut [0])
818                        )
819                        .await,
820                        Ok(Ok(_))
821                    ));
822
823                    Ok(())
824                }
825            })
826        };
827
828        let make_b = |sim: &mut Sim| {
829            sim.client(host_b, {
830                let b_did_receive = Arc::clone(&b_did_receive);
831                async move {
832                    let udp_socket =
833                        UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
834                    udp_socket
835                        .send_to(&[42], format!("{host_a}:1234"))
836                        .await
837                        .expect("sending packet should work");
838
839                    *b_did_receive.lock().unwrap() = Some(matches!(
840                        tokio::time::timeout(
841                            Duration::from_secs(1),
842                            udp_socket.recv_from(&mut [0])
843                        )
844                        .await,
845                        Ok(Ok(_))
846                    ));
847
848                    Ok(())
849                }
850            })
851        };
852
853        let construct_a_first = sim.world.borrow_mut().rng.random_bool(0.5);
854        if construct_a_first {
855            make_a(&mut sim);
856            make_b(&mut sim);
857        } else {
858            make_b(&mut sim);
859            make_a(&mut sim);
860        }
861
862        let Expectation {
863            expect_a_receive,
864            expect_b_receive,
865        } = partitioning(&mut sim);
866        sim.run()?;
867
868        assert_eq!(*a_did_receive.lock().unwrap(), Some(expect_a_receive));
869        assert_eq!(*b_did_receive.lock().unwrap(), Some(expect_b_receive));
870
871        Ok(())
872    }
873
874    #[test]
875    fn partition_peers_oneway() -> Result {
876        run_with_partitioning("a", "b", |sim: &mut Sim| {
877            sim.partition_oneway("a", "b");
878            Expectation {
879                expect_a_receive: true,
880                expect_b_receive: false,
881            }
882        })
883    }
884
885    #[test]
886    fn partition_peers_oneway_many_cases() -> Result {
887        const HOST_A: &str = "a";
888        const HOST_B: &str = "b";
889
890        // Test all permutations of the above 6 actions.
891
892        fn run_with_actions(actions: &[Action]) -> Result {
893            run_with_partitioning(HOST_A, HOST_B, |sim: &mut Sim| {
894                let mut expect_a_receive = true;
895                let mut expect_b_receive = true;
896                for action in actions {
897                    match action {
898                        Action::Partition => {
899                            sim.partition(HOST_A, HOST_B);
900                            expect_a_receive = false;
901                            expect_b_receive = false;
902                        }
903                        Action::PartitionOnewayAB => {
904                            sim.partition_oneway(HOST_A, HOST_B);
905                            expect_b_receive = false;
906                        }
907                        Action::PartitionOnewayBA => {
908                            sim.partition_oneway(HOST_B, HOST_A);
909                            expect_a_receive = false;
910                        }
911                        Action::RepairOnewayAB => {
912                            sim.repair_oneway(HOST_A, HOST_B);
913                            expect_b_receive = true;
914                        }
915                        Action::RepairOnewayBA => {
916                            sim.repair_oneway(HOST_B, HOST_A);
917                            expect_a_receive = true;
918                        }
919                        Action::Repair => {
920                            sim.repair(HOST_A, HOST_B);
921                            expect_a_receive = true;
922                            expect_b_receive = true;
923                        }
924                    }
925                }
926                Expectation {
927                    expect_a_receive,
928                    expect_b_receive,
929                }
930            })?;
931            Ok(())
932        }
933
934        run_with_actions(&[Action::PartitionOnewayAB])?;
935        run_with_actions(&[Action::PartitionOnewayBA])?;
936        run_with_actions(&[Action::Partition, Action::RepairOnewayAB])?;
937        run_with_actions(&[Action::Partition, Action::RepairOnewayBA])?;
938        run_with_actions(&[Action::PartitionOnewayAB, Action::Repair])?;
939        run_with_actions(&[Action::PartitionOnewayBA, Action::Repair])?;
940        run_with_actions(&[Action::PartitionOnewayBA, Action::RepairOnewayAB])?;
941        run_with_actions(&[Action::PartitionOnewayAB, Action::PartitionOnewayBA])?;
942        run_with_actions(&[
943            Action::Partition,
944            Action::RepairOnewayAB,
945            Action::RepairOnewayBA,
946        ])?;
947
948        Ok(())
949    }
950
951    #[test]
952    fn elapsed_time_across_restarts() -> Result {
953        let tick_ms = 5;
954        let mut sim = Builder::new()
955            .tick_duration(Duration::from_millis(tick_ms))
956            .build();
957
958        let clock = Arc::new(AtomicU64::new(0));
959        let actual = clock.clone();
960
961        sim.host("host", move || {
962            let clock = clock.clone();
963
964            async move {
965                loop {
966                    tokio::time::sleep(Duration::from_millis(1)).await;
967                    clock.store(elapsed().as_millis() as u64, Ordering::SeqCst);
968                }
969            }
970        });
971
972        sim.step()?;
973        assert_eq!(tick_ms - 1, actual.load(Ordering::SeqCst));
974
975        sim.bounce("host");
976        sim.step()?;
977        assert_eq!((tick_ms * 2) - 1, actual.load(Ordering::SeqCst));
978
979        Ok(())
980    }
981
982    #[test]
983    fn elapsed_time_across_crashes() -> Result {
984        let tick_ms = 5;
985        let mut sim = Builder::new()
986            .tick_duration(Duration::from_millis(tick_ms))
987            .build();
988
989        let clock_1 = Arc::new(AtomicU64::new(0));
990        let clock_1_moved = clock_1.clone();
991
992        sim.host("host1", move || {
993            let clock = clock_1_moved.clone();
994            async move {
995                loop {
996                    tokio::time::sleep(Duration::from_millis(1)).await;
997                    clock.store(sim_elapsed().unwrap().as_millis() as u64, Ordering::SeqCst);
998                }
999            }
1000        });
1001
1002        // Crashing host 1
1003        sim.crash("host1");
1004        sim.step()?;
1005        // After bouncing host 1, host's clock must be synced.
1006        sim.bounce("host1");
1007        sim.step()?;
1008        assert_eq!(
1009            2 * tick_ms - 1,
1010            clock_1.load(Ordering::SeqCst),
1011            "Host 1 should have caught up"
1012        );
1013
1014        Ok(())
1015    }
1016
1017    #[test]
1018    fn host_finishes_with_error() {
1019        let mut sim = Builder::new().build();
1020
1021        sim.host("host", || async {
1022            Err("Host software finished unexpectedly")?
1023        });
1024
1025        assert!(sim.step().is_err());
1026    }
1027
1028    #[test]
1029    fn manual_message_delivery() -> Result {
1030        let mut sim = Builder::new().build();
1031
1032        sim.host("a", || async {
1033            let l = TcpListener::bind("0.0.0.0:1234").await?;
1034
1035            _ = l.accept().await?;
1036
1037            Ok(())
1038        });
1039
1040        sim.client("b", async {
1041            hold("a", "b");
1042
1043            _ = TcpStream::connect("a:1234").await?;
1044
1045            Ok(())
1046        });
1047
1048        assert!(!sim.step()?);
1049
1050        sim.links(|mut l| {
1051            let a_to_b = l.next().unwrap();
1052            a_to_b.deliver_all();
1053        });
1054
1055        assert!(sim.step()?);
1056
1057        Ok(())
1058    }
1059
1060    /// This is a regression test that ensures JoinError::Cancelled is not
1061    /// propagated to the test when the host crashes, which was causing
1062    /// incorrect test failure.
1063    #[test]
1064    fn run_after_host_crashes() -> Result {
1065        let mut sim = Builder::new().build();
1066
1067        sim.host("h", || async { future::pending().await });
1068
1069        sim.crash("h");
1070
1071        sim.run()
1072    }
1073
1074    #[test]
1075    fn restart_host_after_crash() -> Result {
1076        let mut sim = Builder::new().build();
1077
1078        let data = Arc::new(AtomicU64::new(0));
1079        let data_cloned = data.clone();
1080
1081        sim.host("h", move || {
1082            let data_cloned = data_cloned.clone();
1083            async move {
1084                data_cloned.store(data_cloned.load(Ordering::SeqCst) + 1, Ordering::SeqCst);
1085                Ok(())
1086            }
1087        });
1088
1089        // crash and step to execute the err handling logic
1090        sim.crash("h");
1091        sim.step()?;
1092
1093        // restart and step to ensure the host software runs
1094        sim.bounce("h");
1095        sim.step()?;
1096        // check that software actually runs
1097        assert_eq!(1, data.load(Ordering::SeqCst));
1098
1099        Ok(())
1100    }
1101
1102    #[test]
1103    fn override_link_latency() -> Result {
1104        let global = Duration::from_millis(2);
1105
1106        let mut sim = Builder::new()
1107            .min_message_latency(global)
1108            .max_message_latency(global)
1109            .build();
1110
1111        sim.host("server", || async {
1112            let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
1113
1114            while let Ok((mut s, _)) = listener.accept().await {
1115                assert!(s.write_u8(9).await.is_ok());
1116            }
1117
1118            Ok(())
1119        });
1120
1121        sim.client("client", async move {
1122            let mut s = TcpStream::connect("server:1234").await?;
1123
1124            let start = Instant::now();
1125            s.read_u8().await?;
1126            assert_eq!(global, start.elapsed());
1127
1128            Ok(())
1129        });
1130
1131        sim.run()?;
1132
1133        let degraded = Duration::from_millis(10);
1134
1135        sim.client("client2", async move {
1136            let mut s = TcpStream::connect("server:1234").await?;
1137
1138            let start = Instant::now();
1139            s.read_u8().await?;
1140            assert_eq!(degraded, start.elapsed());
1141
1142            Ok(())
1143        });
1144
1145        sim.set_link_latency("client2", "server", degraded);
1146
1147        sim.run()
1148    }
1149
1150    #[test]
1151    fn is_host_running() -> Result {
1152        let mut sim = Builder::new().build();
1153
1154        sim.client("client", async { future::pending().await });
1155        sim.host("host", || async { future::pending().await });
1156
1157        assert!(!sim.step()?);
1158
1159        assert!(sim.is_host_running("client"));
1160        assert!(sim.is_host_running("host"));
1161
1162        sim.crash("host");
1163        assert!(!sim.is_host_running("host"));
1164
1165        Ok(())
1166    }
1167
1168    #[test]
1169    #[cfg(feature = "regex")]
1170    fn host_scan() -> Result {
1171        let mut sim = Builder::new().build();
1172
1173        let how_many = 3;
1174        for i in 0..how_many {
1175            sim.host(format!("host-{i}"), || async { future::pending().await })
1176        }
1177
1178        let mut ips = sim.lookup_many(regex::Regex::new(".*")?);
1179        ips.sort();
1180
1181        assert_eq!(how_many, ips.len());
1182
1183        for (i, ip) in ips.iter().enumerate() {
1184            assert_eq!(
1185                format!("host-{i}"),
1186                sim.reverse_lookup(*ip).ok_or("Unable to resolve ip")?
1187            );
1188        }
1189
1190        Ok(())
1191    }
1192
1193    #[test]
1194    #[cfg(feature = "regex")]
1195    fn bounce_multiple_hosts_with_regex() -> Result {
1196        let mut sim = Builder::new().build();
1197
1198        let count = Arc::new(AtomicU64::new(0));
1199        for i in 1..=3 {
1200            let count = count.clone();
1201            sim.host(format!("host-{}", i), move || {
1202                let count = count.clone();
1203                async move {
1204                    count.fetch_add(1, Ordering::SeqCst);
1205                    future::pending().await
1206                }
1207            });
1208        }
1209
1210        sim.step()?;
1211        assert_eq!(count.load(Ordering::SeqCst), 3);
1212        sim.bounce(regex::Regex::new("host-[12]")?);
1213        sim.step()?;
1214        assert_eq!(count.load(Ordering::SeqCst), 5);
1215
1216        Ok(())
1217    }
1218
1219    #[test]
1220    #[cfg(feature = "regex")]
1221    fn hold_all() -> Result {
1222        let mut sim = Builder::new().build();
1223
1224        sim.host("host", || async {
1225            let l = TcpListener::bind("0.0.0.0:1234").await?;
1226
1227            loop {
1228                _ = l.accept().await?;
1229            }
1230        });
1231
1232        sim.client("test", async {
1233            hold(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1234
1235            assert!(tokio::time::timeout(
1236                Duration::from_millis(100),
1237                TcpStream::connect("host:1234")
1238            )
1239            .await
1240            .is_err());
1241
1242            crate::release(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1243
1244            assert!(TcpStream::connect("host:1234").await.is_ok());
1245
1246            Ok(())
1247        });
1248
1249        sim.run()
1250    }
1251}