turmoil/
sim.rs

1use rand::seq::SliceRandom;
2use std::cell::RefCell;
3use std::future::Future;
4use std::net::IpAddr;
5use std::ops::DerefMut;
6use std::sync::Arc;
7use std::time::UNIX_EPOCH;
8
9use indexmap::IndexMap;
10use tokio::time::Duration;
11use tracing::Level;
12
13use crate::host::HostTimer;
14use crate::{
15    for_pairs, rt, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET,
16};
17
18/// A handle for interacting with the simulation.
19pub struct Sim<'a> {
20    /// Simulation configuration
21    config: Config,
22
23    /// Tracks the simulated world state
24    ///
25    /// This is what is stored in the thread-local
26    world: RefCell<World>,
27
28    /// Per simulated host runtimes
29    rts: IndexMap<IpAddr, Rt<'a>>,
30
31    /// Simulation duration since unix epoch. Set when the simulation is
32    /// created.
33    since_epoch: Duration,
34
35    /// Simulation elapsed time
36    elapsed: Duration,
37
38    steps: usize,
39}
40
41impl<'a> Sim<'a> {
42    pub(crate) fn new(config: Config, world: World) -> Self {
43        let since_epoch = config
44            .epoch
45            .duration_since(UNIX_EPOCH)
46            .expect("now must be >= UNIX_EPOCH");
47
48        Self {
49            config,
50            world: RefCell::new(world),
51            rts: IndexMap::new(),
52            since_epoch,
53            elapsed: Duration::ZERO,
54            steps: 1, // bumped after each step
55        }
56    }
57
58    /// How much logical time has elapsed since the simulation started.
59    pub fn elapsed(&self) -> Duration {
60        self.elapsed
61    }
62
63    /// The logical duration from [`UNIX_EPOCH`] until now.
64    ///
65    /// On creation the simulation picks a `SystemTime` and calculates the
66    /// duration since the epoch. Each `run()` invocation moves logical time
67    /// forward the configured tick duration.
68    pub fn since_epoch(&self) -> Duration {
69        self.since_epoch + self.elapsed
70    }
71
72    /// Register a client with the simulation.
73    pub fn client<F>(&mut self, addr: impl ToIpAddr, client: F)
74    where
75        F: Future<Output = Result> + 'static,
76    {
77        let addr = self.lookup(addr);
78        let nodename: Arc<str> = self
79            .world
80            .borrow_mut()
81            .dns
82            .reverse(addr)
83            .map(str::to_string)
84            .unwrap_or_else(|| addr.to_string())
85            .into();
86
87        {
88            let world = RefCell::get_mut(&mut self.world);
89
90            // Register host state with the world
91            world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
92        }
93
94        let rt = World::enter(&self.world, || {
95            Rt::client(nodename, client, self.config.enable_tokio_io)
96        });
97
98        self.rts.insert(addr, rt);
99    }
100
101    /// Register a host with the simulation.
102    ///
103    /// This method takes a `Fn` that builds a future, as opposed to
104    /// [`Sim::client`] which just takes a future. The reason for this is we
105    /// might restart the host, and so need to be able to call the future
106    /// multiple times.
107    pub fn host<F, Fut>(&mut self, addr: impl ToIpAddr, host: F)
108    where
109        F: Fn() -> Fut + 'a,
110        Fut: Future<Output = Result> + 'static,
111    {
112        let addr = self.lookup(addr);
113        let nodename: Arc<str> = self
114            .world
115            .borrow_mut()
116            .dns
117            .reverse(addr)
118            .map(str::to_string)
119            .unwrap_or_else(|| addr.to_string())
120            .into();
121
122        {
123            let world = RefCell::get_mut(&mut self.world);
124
125            // Register host state with the world
126            world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
127        }
128
129        let rt = World::enter(&self.world, || {
130            Rt::host(nodename, host, self.config.enable_tokio_io)
131        });
132
133        self.rts.insert(addr, rt);
134    }
135
136    /// Crashes the resolved hosts. Nothing will be running on the matched hosts
137    /// after this method. You can use [`Sim::bounce`] to start the hosts up
138    /// again.
139    pub fn crash(&mut self, addrs: impl ToIpAddrs) {
140        self.run_with_hosts(addrs, |addr, rt| {
141            rt.crash();
142
143            tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Crash");
144        });
145    }
146
147    /// Bounces the resolved hosts. The software is restarted.
148    pub fn bounce(&mut self, addrs: impl ToIpAddrs) {
149        self.run_with_hosts(addrs, |addr, rt| {
150            rt.bounce();
151
152            tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Bounce");
153        });
154    }
155
156    /// Run `f` with the resolved hosts at `addrs` set on the world.
157    fn run_with_hosts(&mut self, addrs: impl ToIpAddrs, mut f: impl FnMut(IpAddr, &mut Rt)) {
158        let hosts = self.world.borrow_mut().lookup_many(addrs);
159        for h in hosts {
160            let rt = self.rts.get_mut(&h).expect("missing host");
161
162            self.world.borrow_mut().current = Some(h);
163
164            World::enter(&self.world, || f(h, rt));
165        }
166
167        self.world.borrow_mut().current = None;
168    }
169
170    /// Check whether a host has software running.
171    pub fn is_host_running(&mut self, addr: impl ToIpAddr) -> bool {
172        let host = self.world.borrow_mut().lookup(addr);
173
174        self.rts
175            .get(&host)
176            .expect("missing host")
177            .is_software_running()
178    }
179
180    /// Lookup an IP address by host name.
181    pub fn lookup(&self, addr: impl ToIpAddr) -> IpAddr {
182        self.world.borrow_mut().lookup(addr)
183    }
184
185    /// Perform a reverse DNS lookup, returning the hostname if the entry
186    /// exists.
187    pub fn reverse_lookup(&self, addr: IpAddr) -> Option<String> {
188        self.world
189            .borrow()
190            .reverse_lookup(addr)
191            .map(|h| h.to_owned())
192    }
193
194    /// Hold messages between two hosts, or sets of hosts, until [`release`](crate::release) is
195    /// called.
196    pub fn hold(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
197        let mut world = self.world.borrow_mut();
198        world.hold_many(a, b);
199    }
200
201    /// Repair the connection between two hosts, or sets of hosts, resulting in
202    /// messages to be delivered.
203    pub fn repair(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
204        let mut world = self.world.borrow_mut();
205        world.repair_many(a, b);
206    }
207
208    /// Repair the connection between two hosts, or sets of hosts, removing
209    /// the effect of a previous [`Self::partition_oneway`].
210    ///
211    /// Combining this feature with [`Self::hold`] is presently not supported.
212    pub fn repair_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
213        let mut world = self.world.borrow_mut();
214        world.repair_oneway_many(from, to);
215    }
216
217    /// The opposite of [`hold`](crate::hold). All held messages are immediately delivered.
218    pub fn release(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
219        let mut world = self.world.borrow_mut();
220        world.release_many(a, b);
221    }
222
223    /// Partition two hosts, or sets of hosts, resulting in all messages sent
224    /// between them to be dropped.
225    pub fn partition(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
226        let mut world = self.world.borrow_mut();
227        world.partition_many(a, b);
228    }
229
230    /// Partition two hosts, or sets of hosts, such that messages can not be sent
231    /// from 'from' to 'to', while not affecting the ability for them to be delivered
232    /// in the other direction.
233    ///
234    /// Partitioning first from->to, then to->from, will stop all messages, similarly to
235    /// a single call to [`Self::partition`].
236    ///
237    /// Combining this feature with [`Self::hold`] is presently not supported.
238    pub fn partition_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
239        let mut world = self.world.borrow_mut();
240        world.partition_oneway_many(from, to);
241    }
242
243    /// Resolve host names for an [`IpAddr`] pair.
244    ///
245    /// Useful when interacting with network [links](#method.links).
246    pub fn reverse_lookup_pair(&self, pair: (IpAddr, IpAddr)) -> (String, String) {
247        let world = self.world.borrow();
248
249        (
250            world
251                .dns
252                .reverse(pair.0)
253                .expect("no hostname found for ip address")
254                .to_owned(),
255            world
256                .dns
257                .reverse(pair.1)
258                .expect("no hostname found for ip address")
259                .to_owned(),
260        )
261    }
262
263    /// Lookup IP addresses for resolved hosts.
264    pub fn lookup_many(&self, addr: impl ToIpAddrs) -> Vec<IpAddr> {
265        self.world.borrow_mut().lookup_many(addr)
266    }
267
268    /// Set the max message latency for all links.
269    pub fn set_max_message_latency(&self, value: Duration) {
270        self.world
271            .borrow_mut()
272            .topology
273            .set_max_message_latency(value);
274    }
275
276    /// Set the message latency for any links matching `a` and `b`.
277    ///
278    /// This sets the min and max to the same value eliminating any variance in
279    /// latency.
280    pub fn set_link_latency(&self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: Duration) {
281        let mut world = self.world.borrow_mut();
282        let a = world.lookup_many(a);
283        let b = world.lookup_many(b);
284
285        for_pairs(&a, &b, |a, b| {
286            world.topology.set_link_message_latency(a, b, value);
287        });
288    }
289
290    /// Set the max message latency for any links matching `a` and `b`.
291    pub fn set_link_max_message_latency(
292        &self,
293        a: impl ToIpAddrs,
294        b: impl ToIpAddrs,
295        value: Duration,
296    ) {
297        let mut world = self.world.borrow_mut();
298        let a = world.lookup_many(a);
299        let b = world.lookup_many(b);
300
301        for_pairs(&a, &b, |a, b| {
302            world.topology.set_link_max_message_latency(a, b, value);
303        });
304    }
305
306    /// Set the message latency distribution curve for all links.
307    ///
308    /// Message latency follows an exponential distribution curve. The `value`
309    /// is the lambda argument to the probability function.
310    pub fn set_message_latency_curve(&self, value: f64) {
311        self.world
312            .borrow_mut()
313            .topology
314            .set_message_latency_curve(value);
315    }
316
317    pub fn set_fail_rate(&mut self, value: f64) {
318        self.world.borrow_mut().topology.set_fail_rate(value);
319    }
320
321    pub fn set_link_fail_rate(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: f64) {
322        let mut world = self.world.borrow_mut();
323        let a = world.lookup_many(a);
324        let b = world.lookup_many(b);
325
326        for_pairs(&a, &b, |a, b| {
327            world.topology.set_link_fail_rate(a, b, value);
328        });
329    }
330
331    /// Access a [`LinksIter`] to introspect inflight messages between hosts.
332    pub fn links(&self, f: impl FnOnce(LinksIter)) {
333        let top = &mut self.world.borrow_mut().topology;
334
335        f(top.iter_mut())
336    }
337
338    /// Run the simulation until all client hosts have completed.
339    ///
340    /// Executes a simple event loop that calls [step](#method.step) each
341    /// iteration, returning early if any host software errors.
342    pub fn run(&mut self) -> Result {
343        // check if we have any clients
344        if !self
345            .rts
346            .iter()
347            .any(|(_, rt)| matches!(rt.kind, rt::Kind::Client))
348        {
349            tracing::info!(target: TRACING_TARGET, "No client hosts registered, exiting simulation");
350            return Ok(());
351        }
352
353        loop {
354            let is_finished = self.step()?;
355
356            if is_finished {
357                return Ok(());
358            }
359        }
360    }
361
362    /// Step the simulation.
363    ///
364    /// Runs each host in the simulation a fixed duration configured by
365    /// `tick_duration` in the builder.
366    ///
367    /// The simulated network also steps, processing in flight messages, and
368    /// delivering them to their destination if appropriate.
369    ///
370    /// Returns whether or not all clients have completed.
371    pub fn step(&mut self) -> Result<bool> {
372        tracing::trace!(target: TRACING_TARGET, "step {}", self.steps);
373
374        let tick = self.config.tick;
375        let mut is_finished = true;
376
377        // Tick the networking, processing messages. This is done before
378        // ticking any other runtime, as they might be waiting on network
379        // IO. (It also might be waiting on something else, such as time.)
380        self.world.borrow_mut().topology.tick_by(tick);
381
382        // Tick each host runtimes with running software. If the software
383        // completes, extract the result and return early if an error is
384        // encountered.
385
386        let (mut running, stopped): (Vec<_>, Vec<_>) = self
387            .rts
388            .iter_mut()
389            .partition(|(_, rt)| rt.is_software_running());
390        if self.config.random_node_order {
391            running.shuffle(&mut self.world.borrow_mut().rng);
392        }
393
394        for (&addr, rt) in running {
395            let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename,).entered();
396            {
397                let mut world = self.world.borrow_mut();
398                // We need to move deliverable messages off the network and
399                // into the dst host. This requires two mutable borrows.
400                let World {
401                    rng,
402                    topology,
403                    hosts,
404                    ..
405                } = world.deref_mut();
406                topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));
407
408                // Set the current host (see method docs)
409                world.current = Some(addr);
410
411                world.current_host_mut().timer.now(rt.now());
412            }
413
414            let is_software_finished = World::enter(&self.world, || rt.tick(tick))?;
415
416            if rt.is_client() {
417                is_finished = is_finished && is_software_finished;
418            }
419
420            // Unset the current host
421            let mut world = self.world.borrow_mut();
422            world.current = None;
423
424            world.tick(addr, tick);
425        }
426
427        // Tick the nodes that are not actively running (i.e., crashed) to ensure their clock keeps up
428        // with the rest of the simulation when they are restarted (bounced).
429        for (&addr, _rt) in stopped {
430            let mut world = self.world.borrow_mut();
431            world.tick(addr, tick);
432        }
433
434        self.elapsed += tick;
435        self.steps += 1;
436
437        if self.elapsed > self.config.duration && !is_finished {
438            return Err(format!(
439                "Ran for duration: {:?} steps: {} without completing",
440                self.config.duration, self.steps,
441            ))?;
442        }
443
444        Ok(is_finished)
445    }
446}
447
448#[cfg(test)]
449mod test {
450    use rand::Rng;
451    use std::future;
452    use std::{
453        net::{IpAddr, Ipv4Addr},
454        rc::Rc,
455        sync::{
456            atomic::{AtomicU64, Ordering},
457            Arc, Mutex,
458        },
459        time::Duration,
460    };
461
462    use tokio::{
463        io::{AsyncReadExt, AsyncWriteExt},
464        sync::Semaphore,
465        time::Instant,
466    };
467
468    use crate::net::UdpSocket;
469    use crate::{
470        elapsed, hold,
471        net::{TcpListener, TcpStream},
472        sim_elapsed, Builder, Result, Sim, World,
473    };
474
475    #[test]
476    fn client_error() {
477        let mut sim = Builder::new().build();
478
479        sim.client("doomed", async { Err("An Error")? });
480
481        assert!(sim.run().is_err());
482    }
483
484    #[test]
485    fn timeout() {
486        let mut sim = Builder::new()
487            .simulation_duration(Duration::from_millis(500))
488            .build();
489
490        sim.client("timeout", async {
491            tokio::time::sleep(Duration::from_secs(1)).await;
492
493            Ok(())
494        });
495
496        assert!(sim.run().is_err());
497    }
498
499    #[test]
500    fn multiple_clients_all_finish() -> Result {
501        let how_many = 3;
502        let tick_ms = 10;
503
504        // N = how_many runs, each with a different client finishing immediately
505        for run in 0..how_many {
506            let mut sim = Builder::new()
507                .tick_duration(Duration::from_millis(tick_ms))
508                .build();
509
510            let ct = Rc::new(Semaphore::new(how_many));
511
512            for client in 0..how_many {
513                let ct = ct.clone();
514
515                sim.client(format!("client-{client}"), async move {
516                    let ms = if run == client { 0 } else { 2 * tick_ms };
517                    tokio::time::sleep(Duration::from_millis(ms)).await;
518
519                    let p = ct.acquire().await?;
520                    p.forget();
521
522                    Ok(())
523                });
524            }
525
526            sim.run()?;
527            assert_eq!(0, ct.available_permits());
528        }
529
530        Ok(())
531    }
532
533    /// This is a regression test that ensures host software completes when the
534    /// host crashes. Before this fix we simply dropped the LocalSet, which did
535    /// not ensure resources owned by spawned tasks were dropped. Now we drop
536    /// and replace both the tokio Runtime and the LocalSet.
537    #[test]
538    fn crash_blocks_until_complete() -> Result {
539        let ct = Arc::new(());
540
541        let mut sim = Builder::new().build();
542
543        sim.host("host", || {
544            let ct = ct.clone();
545
546            async move {
547                tokio::spawn(async move {
548                    let _into_task = ct;
549                    future::pending::<()>().await;
550                });
551
552                future::pending().await
553            }
554        });
555
556        sim.run()?;
557        assert_eq!(2, Arc::strong_count(&ct));
558
559        sim.crash("host");
560        assert_eq!(1, Arc::strong_count(&ct));
561
562        Ok(())
563    }
564
565    #[test]
566    fn elapsed_time() -> Result {
567        let tick = Duration::from_millis(5);
568        let mut sim = Builder::new().tick_duration(tick).build();
569
570        let duration = Duration::from_millis(500);
571
572        sim.client("c1", async move {
573            tokio::time::sleep(duration).await;
574            assert_eq!(duration, elapsed());
575            // For hosts/clients started before the first `sim.run()` call, the
576            // `elapsed` and `sim_elapsed` time will be identical.
577            assert_eq!(duration, sim_elapsed().unwrap());
578
579            Ok(())
580        });
581
582        sim.client("c2", async move {
583            tokio::time::sleep(duration).await;
584            assert_eq!(duration, elapsed());
585            assert_eq!(duration, sim_elapsed().unwrap());
586
587            Ok(())
588        });
589
590        sim.run()?;
591
592        // sleep duration plus one tick to complete
593        assert_eq!(duration + tick, sim.elapsed());
594
595        let start = sim.elapsed();
596        sim.client("c3", async move {
597            assert_eq!(Duration::ZERO, elapsed());
598            // Note that sim_elapsed is total simulation time while elapsed is
599            // still zero for this newly created host.
600            assert_eq!(duration + tick, sim_elapsed().unwrap());
601            tokio::time::sleep(duration).await;
602            assert_eq!(duration, elapsed());
603            assert_eq!(duration + tick + duration, sim_elapsed().unwrap());
604
605            Ok(())
606        });
607
608        sim.run()?;
609
610        // Client "c3" takes one sleep duration plus one tick to complete
611        assert_eq!(duration + tick, sim.elapsed() - start);
612
613        Ok(())
614    }
615
616    /// This is a regression test to ensure it is safe to call sim_elapsed
617    /// if current world of host is not set.
618    #[test]
619    fn sim_elapsed_time() -> Result {
620        // Safe to call outside of simution while there
621        // is no current world set
622        assert!(sim_elapsed().is_none());
623
624        let sim = Builder::new().build();
625        // Safe to call while there is no current host set
626        World::enter(&sim.world, || assert!(sim_elapsed().is_none()));
627
628        Ok(())
629    }
630
631    #[test]
632    fn hold_release_peers() -> Result {
633        let global = Duration::from_millis(2);
634
635        let mut sim = Builder::new()
636            .min_message_latency(global)
637            .max_message_latency(global)
638            .build();
639
640        sim.host("server", || async {
641            let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
642
643            while let Ok((mut s, _)) = listener.accept().await {
644                assert!(s.write_u8(42).await.is_ok());
645            }
646
647            Ok(())
648        });
649
650        sim.client("client", async move {
651            let mut s = TcpStream::connect("server:1234").await?;
652
653            s.read_u8().await?;
654
655            Ok(())
656        });
657
658        sim.hold("server", "client");
659
660        // Verify that msg is not delivered.
661        sim.step()?;
662
663        sim.links(|l| {
664            assert!(l.count() == 1);
665        });
666
667        // Verify that msg is still not delivered.
668        sim.step()?;
669
670        sim.release("server", "client");
671
672        sim.run()?;
673
674        Ok(())
675    }
676
677    #[test]
678    fn partition_peers() -> Result {
679        let global = Duration::from_millis(2);
680
681        let mut sim = Builder::new()
682            .min_message_latency(global)
683            .max_message_latency(global)
684            .build();
685
686        sim.host("server", || async {
687            let _listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
688
689            Ok(())
690        });
691
692        sim.client("client", async move {
693            // Peers are partitioned. TCP setup should fail.
694            let _ = TcpStream::connect("server:1234").await.unwrap_err();
695
696            Ok(())
697        });
698
699        sim.partition("server", "client");
700
701        sim.run()?;
702
703        Ok(())
704    }
705
706    struct Expectation {
707        expect_a_receive: bool,
708        expect_b_receive: bool,
709    }
710
711    #[derive(Debug)]
712    enum Action {
713        Partition,
714        PartitionOnewayAB,
715        PartitionOnewayBA,
716        RepairOnewayAB,
717        RepairOnewayBA,
718        Repair,
719    }
720
721    fn run_with_partitioning(
722        host_a: &'static str,
723        host_b: &'static str,
724        mut partitioning: impl FnMut(&mut Sim) -> Expectation,
725    ) -> Result {
726        let global = Duration::from_millis(1);
727
728        let mut sim = Builder::new()
729            .min_message_latency(global)
730            .max_message_latency(global)
731            .build();
732
733        let a_did_receive = Arc::new(Mutex::new(None));
734        let b_did_receive = Arc::new(Mutex::new(None));
735
736        let make_a = |sim: &mut Sim| {
737            sim.client(host_a, {
738                let a_did_receive = Arc::clone(&a_did_receive);
739                async move {
740                    let udp_socket =
741                        UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
742                    udp_socket
743                        .send_to(&[42], format!("{}:1234", host_b))
744                        .await
745                        .expect("sending packet should appear to work, even if partitioned");
746
747                    *a_did_receive.lock().unwrap() = Some(matches!(
748                        tokio::time::timeout(
749                            Duration::from_secs(1),
750                            udp_socket.recv_from(&mut [0])
751                        )
752                        .await,
753                        Ok(Ok(_))
754                    ));
755
756                    Ok(())
757                }
758            })
759        };
760
761        let make_b = |sim: &mut Sim| {
762            sim.client(host_b, {
763                let b_did_receive = Arc::clone(&b_did_receive);
764                async move {
765                    let udp_socket =
766                        UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
767                    udp_socket
768                        .send_to(&[42], format!("{}:1234", host_a))
769                        .await
770                        .expect("sending packet should work");
771
772                    *b_did_receive.lock().unwrap() = Some(matches!(
773                        tokio::time::timeout(
774                            Duration::from_secs(1),
775                            udp_socket.recv_from(&mut [0])
776                        )
777                        .await,
778                        Ok(Ok(_))
779                    ));
780
781                    Ok(())
782                }
783            })
784        };
785
786        let construct_a_first = sim.world.borrow_mut().rng.gen_bool(0.5);
787        if construct_a_first {
788            make_a(&mut sim);
789            make_b(&mut sim);
790        } else {
791            make_b(&mut sim);
792            make_a(&mut sim);
793        }
794
795        let Expectation {
796            expect_a_receive,
797            expect_b_receive,
798        } = partitioning(&mut sim);
799        sim.run()?;
800
801        assert_eq!(*a_did_receive.lock().unwrap(), Some(expect_a_receive));
802        assert_eq!(*b_did_receive.lock().unwrap(), Some(expect_b_receive));
803
804        Ok(())
805    }
806
807    #[test]
808    fn partition_peers_oneway() -> Result {
809        run_with_partitioning("a", "b", |sim: &mut Sim| {
810            sim.partition_oneway("a", "b");
811            Expectation {
812                expect_a_receive: true,
813                expect_b_receive: false,
814            }
815        })
816    }
817
818    #[test]
819    fn partition_peers_oneway_many_cases() -> Result {
820        const HOST_A: &str = "a";
821        const HOST_B: &str = "b";
822
823        // Test all permutations of the above 6 actions.
824
825        fn run_with_actions(actions: &[Action]) -> Result {
826            run_with_partitioning(HOST_A, HOST_B, |sim: &mut Sim| {
827                let mut expect_a_receive = true;
828                let mut expect_b_receive = true;
829                for action in actions {
830                    match action {
831                        Action::Partition => {
832                            sim.partition(HOST_A, HOST_B);
833                            expect_a_receive = false;
834                            expect_b_receive = false;
835                        }
836                        Action::PartitionOnewayAB => {
837                            sim.partition_oneway(HOST_A, HOST_B);
838                            expect_b_receive = false;
839                        }
840                        Action::PartitionOnewayBA => {
841                            sim.partition_oneway(HOST_B, HOST_A);
842                            expect_a_receive = false;
843                        }
844                        Action::RepairOnewayAB => {
845                            sim.repair_oneway(HOST_A, HOST_B);
846                            expect_b_receive = true;
847                        }
848                        Action::RepairOnewayBA => {
849                            sim.repair_oneway(HOST_B, HOST_A);
850                            expect_a_receive = true;
851                        }
852                        Action::Repair => {
853                            sim.repair(HOST_A, HOST_B);
854                            expect_a_receive = true;
855                            expect_b_receive = true;
856                        }
857                    }
858                }
859                Expectation {
860                    expect_a_receive,
861                    expect_b_receive,
862                }
863            })?;
864            Ok(())
865        }
866
867        run_with_actions(&[Action::PartitionOnewayAB])?;
868        run_with_actions(&[Action::PartitionOnewayBA])?;
869        run_with_actions(&[Action::Partition, Action::RepairOnewayAB])?;
870        run_with_actions(&[Action::Partition, Action::RepairOnewayBA])?;
871        run_with_actions(&[Action::PartitionOnewayAB, Action::Repair])?;
872        run_with_actions(&[Action::PartitionOnewayBA, Action::Repair])?;
873        run_with_actions(&[Action::PartitionOnewayBA, Action::RepairOnewayAB])?;
874        run_with_actions(&[Action::PartitionOnewayAB, Action::PartitionOnewayBA])?;
875        run_with_actions(&[
876            Action::Partition,
877            Action::RepairOnewayAB,
878            Action::RepairOnewayBA,
879        ])?;
880
881        Ok(())
882    }
883
884    #[test]
885    fn elapsed_time_across_restarts() -> Result {
886        let tick_ms = 5;
887        let mut sim = Builder::new()
888            .tick_duration(Duration::from_millis(tick_ms))
889            .build();
890
891        let clock = Arc::new(AtomicU64::new(0));
892        let actual = clock.clone();
893
894        sim.host("host", move || {
895            let clock = clock.clone();
896
897            async move {
898                loop {
899                    tokio::time::sleep(Duration::from_millis(1)).await;
900                    clock.store(elapsed().as_millis() as u64, Ordering::SeqCst);
901                }
902            }
903        });
904
905        sim.step()?;
906        assert_eq!(tick_ms - 1, actual.load(Ordering::SeqCst));
907
908        sim.bounce("host");
909        sim.step()?;
910        assert_eq!((tick_ms * 2) - 1, actual.load(Ordering::SeqCst));
911
912        Ok(())
913    }
914
915    #[test]
916    fn elapsed_time_across_crashes() -> Result {
917        let tick_ms = 5;
918        let mut sim = Builder::new()
919            .tick_duration(Duration::from_millis(tick_ms))
920            .build();
921
922        let clock_1 = Arc::new(AtomicU64::new(0));
923        let clock_1_moved = clock_1.clone();
924
925        sim.host("host1", move || {
926            let clock = clock_1_moved.clone();
927            async move {
928                loop {
929                    tokio::time::sleep(Duration::from_millis(1)).await;
930                    clock.store(sim_elapsed().unwrap().as_millis() as u64, Ordering::SeqCst);
931                }
932            }
933        });
934
935        // Crashing host 1
936        sim.crash("host1");
937        sim.step()?;
938        // After bouncing host 1, host's clock must be synced.
939        sim.bounce("host1");
940        sim.step()?;
941        assert_eq!(
942            2 * tick_ms - 1,
943            clock_1.load(Ordering::SeqCst),
944            "Host 1 should have caught up"
945        );
946
947        Ok(())
948    }
949
950    #[test]
951    fn host_finishes_with_error() {
952        let mut sim = Builder::new().build();
953
954        sim.host("host", || async {
955            Err("Host software finished unexpectedly")?
956        });
957
958        assert!(sim.step().is_err());
959    }
960
961    #[test]
962    fn manual_message_delivery() -> Result {
963        let mut sim = Builder::new().build();
964
965        sim.host("a", || async {
966            let l = TcpListener::bind("0.0.0.0:1234").await?;
967
968            _ = l.accept().await?;
969
970            Ok(())
971        });
972
973        sim.client("b", async {
974            hold("a", "b");
975
976            _ = TcpStream::connect("a:1234").await?;
977
978            Ok(())
979        });
980
981        assert!(!sim.step()?);
982
983        sim.links(|mut l| {
984            let a_to_b = l.next().unwrap();
985            a_to_b.deliver_all();
986        });
987
988        assert!(sim.step()?);
989
990        Ok(())
991    }
992
993    /// This is a regression test that ensures JoinError::Cancelled is not
994    /// propagated to the test when the host crashes, which was causing
995    /// incorrect test failure.
996    #[test]
997    fn run_after_host_crashes() -> Result {
998        let mut sim = Builder::new().build();
999
1000        sim.host("h", || async { future::pending().await });
1001
1002        sim.crash("h");
1003
1004        sim.run()
1005    }
1006
1007    #[test]
1008    fn restart_host_after_crash() -> Result {
1009        let mut sim = Builder::new().build();
1010
1011        let data = Arc::new(AtomicU64::new(0));
1012        let data_cloned = data.clone();
1013
1014        sim.host("h", move || {
1015            let data_cloned = data_cloned.clone();
1016            async move {
1017                data_cloned.store(data_cloned.load(Ordering::SeqCst) + 1, Ordering::SeqCst);
1018                Ok(())
1019            }
1020        });
1021
1022        // crash and step to execute the err handling logic
1023        sim.crash("h");
1024        sim.step()?;
1025
1026        // restart and step to ensure the host software runs
1027        sim.bounce("h");
1028        sim.step()?;
1029        // check that software actually runs
1030        assert_eq!(1, data.load(Ordering::SeqCst));
1031
1032        Ok(())
1033    }
1034
1035    #[test]
1036    fn override_link_latency() -> Result {
1037        let global = Duration::from_millis(2);
1038
1039        let mut sim = Builder::new()
1040            .min_message_latency(global)
1041            .max_message_latency(global)
1042            .build();
1043
1044        sim.host("server", || async {
1045            let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
1046
1047            while let Ok((mut s, _)) = listener.accept().await {
1048                assert!(s.write_u8(9).await.is_ok());
1049            }
1050
1051            Ok(())
1052        });
1053
1054        sim.client("client", async move {
1055            let mut s = TcpStream::connect("server:1234").await?;
1056
1057            let start = Instant::now();
1058            s.read_u8().await?;
1059            assert_eq!(global, start.elapsed());
1060
1061            Ok(())
1062        });
1063
1064        sim.run()?;
1065
1066        let degraded = Duration::from_millis(10);
1067
1068        sim.client("client2", async move {
1069            let mut s = TcpStream::connect("server:1234").await?;
1070
1071            let start = Instant::now();
1072            s.read_u8().await?;
1073            assert_eq!(degraded, start.elapsed());
1074
1075            Ok(())
1076        });
1077
1078        sim.set_link_latency("client2", "server", degraded);
1079
1080        sim.run()
1081    }
1082
1083    #[test]
1084    fn is_host_running() -> Result {
1085        let mut sim = Builder::new().build();
1086
1087        sim.client("client", async { future::pending().await });
1088        sim.host("host", || async { future::pending().await });
1089
1090        assert!(!sim.step()?);
1091
1092        assert!(sim.is_host_running("client"));
1093        assert!(sim.is_host_running("host"));
1094
1095        sim.crash("host");
1096        assert!(!sim.is_host_running("host"));
1097
1098        Ok(())
1099    }
1100
1101    #[test]
1102    #[cfg(feature = "regex")]
1103    fn host_scan() -> Result {
1104        let mut sim = Builder::new().build();
1105
1106        let how_many = 3;
1107        for i in 0..how_many {
1108            sim.host(format!("host-{i}"), || async { future::pending().await })
1109        }
1110
1111        let mut ips = sim.lookup_many(regex::Regex::new(".*")?);
1112        ips.sort();
1113
1114        assert_eq!(how_many, ips.len());
1115
1116        for (i, ip) in ips.iter().enumerate() {
1117            assert_eq!(
1118                format!("host-{i}"),
1119                sim.reverse_lookup(*ip).ok_or("Unable to resolve ip")?
1120            );
1121        }
1122
1123        Ok(())
1124    }
1125
1126    #[test]
1127    #[cfg(feature = "regex")]
1128    fn bounce_multiple_hosts_with_regex() -> Result {
1129        let mut sim = Builder::new().build();
1130
1131        let count = Arc::new(AtomicU64::new(0));
1132        for i in 1..=3 {
1133            let count = count.clone();
1134            sim.host(format!("host-{}", i), move || {
1135                let count = count.clone();
1136                async move {
1137                    count.fetch_add(1, Ordering::SeqCst);
1138                    future::pending().await
1139                }
1140            });
1141        }
1142
1143        sim.step()?;
1144        assert_eq!(count.load(Ordering::SeqCst), 3);
1145        sim.bounce(regex::Regex::new("host-[12]")?);
1146        sim.step()?;
1147        assert_eq!(count.load(Ordering::SeqCst), 5);
1148
1149        Ok(())
1150    }
1151
1152    #[test]
1153    #[cfg(feature = "regex")]
1154    fn hold_all() -> Result {
1155        let mut sim = Builder::new().build();
1156
1157        sim.host("host", || async {
1158            let l = TcpListener::bind("0.0.0.0:1234").await?;
1159
1160            loop {
1161                _ = l.accept().await?;
1162            }
1163        });
1164
1165        sim.client("test", async {
1166            hold(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1167
1168            assert!(tokio::time::timeout(
1169                Duration::from_millis(100),
1170                TcpStream::connect("host:1234")
1171            )
1172            .await
1173            .is_err());
1174
1175            crate::release(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1176
1177            assert!(TcpStream::connect("host:1234").await.is_ok());
1178
1179            Ok(())
1180        });
1181
1182        sim.run()
1183    }
1184}