1use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
2use std::cell::RefCell;
3use std::future::Future;
4use std::net::IpAddr;
5use std::ops::DerefMut;
6use std::sync::Arc;
7use std::time::UNIX_EPOCH;
8
9use indexmap::IndexMap;
10use tokio::time::Duration;
11use tracing::Level;
12
13use crate::host::HostTimer;
14use crate::{
15 for_pairs, rt, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET,
16};
17
18pub struct Sim<'a> {
20 config: Config,
22
23 world: RefCell<World>,
27
28 rts: IndexMap<IpAddr, Rt<'a>>,
30
31 since_epoch: Duration,
34
35 elapsed: Duration,
37
38 steps: usize,
39}
40
41impl<'a> Sim<'a> {
42 pub(crate) fn new(config: Config, world: World) -> Self {
43 let since_epoch = config
44 .epoch
45 .duration_since(UNIX_EPOCH)
46 .expect("now must be >= UNIX_EPOCH");
47
48 Self {
49 config,
50 world: RefCell::new(world),
51 rts: IndexMap::new(),
52 since_epoch,
53 elapsed: Duration::ZERO,
54 steps: 1, }
56 }
57
58 pub fn elapsed(&self) -> Duration {
60 self.elapsed
61 }
62
63 pub fn since_epoch(&self) -> Duration {
69 self.since_epoch + self.elapsed
70 }
71
72 pub fn client<F>(&mut self, addr: impl ToIpAddr, client: F)
74 where
75 F: Future<Output = Result> + 'static,
76 {
77 let addr = self.lookup(addr);
78 let nodename: Arc<str> = self
79 .world
80 .borrow_mut()
81 .dns
82 .reverse(addr)
83 .map(str::to_string)
84 .unwrap_or_else(|| addr.to_string())
85 .into();
86
87 {
88 let world = RefCell::get_mut(&mut self.world);
89
90 world.register(
92 addr,
93 &nodename,
94 HostTimer::new(self.elapsed, self.since_epoch),
95 &self.config,
96 );
97 }
98
99 let seed = self.world.borrow_mut().rng.random();
100 let rng = SmallRng::from_seed(seed);
101 let config = rt::Config {
102 enable_io: self.config.enable_tokio_io,
103 rng: Some(rng),
104 };
105
106 let rt = World::enter(&self.world, || Rt::client(nodename, client, config));
107
108 self.rts.insert(addr, rt);
109 }
110
111 pub fn host<F, Fut>(&mut self, addr: impl ToIpAddr, host: F)
118 where
119 F: Fn() -> Fut + 'a,
120 Fut: Future<Output = Result> + 'static,
121 {
122 let addr = self.lookup(addr);
123 let nodename: Arc<str> = self
124 .world
125 .borrow_mut()
126 .dns
127 .reverse(addr)
128 .map(str::to_string)
129 .unwrap_or_else(|| addr.to_string())
130 .into();
131
132 {
133 let world = RefCell::get_mut(&mut self.world);
134
135 world.register(
137 addr,
138 &nodename,
139 HostTimer::new(self.elapsed, self.since_epoch),
140 &self.config,
141 );
142 }
143
144 let seed = self.world.borrow_mut().rng.random();
145 let rng = SmallRng::from_seed(seed);
146 let config = rt::Config {
147 enable_io: self.config.enable_tokio_io,
148 rng: Some(rng),
149 };
150
151 let rt = World::enter(&self.world, || Rt::host(nodename, host, config));
152
153 self.rts.insert(addr, rt);
154 }
155
156 pub fn crash(&mut self, addrs: impl ToIpAddrs) {
164 let hosts = self.world.borrow_mut().lookup_many(addrs);
165 for h in hosts {
166 let rt = self.rts.get_mut(&h).expect("missing host");
167
168 self.world.borrow_mut().current = Some(h);
169
170 World::enter(&self.world, || {
171 rt.crash();
172
173 #[cfg(feature = "unstable-fs")]
175 World::current(|world| {
176 let addr = world.current.expect("current host missing");
177 let World { hosts, rng, .. } = world;
178 let host = hosts.get_mut(&addr).unwrap();
179 host.fs.lock().unwrap().discard_pending(&mut **rng);
180 });
181
182 tracing::trace!(target: TRACING_TARGET, addr = ?h, "Crash");
183 });
184 }
185
186 self.world.borrow_mut().current = None;
187 }
188
189 pub fn bounce(&mut self, addrs: impl ToIpAddrs) {
191 self.run_with_hosts(addrs, |addr, rt| {
192 rt.bounce();
193
194 tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Bounce");
195 });
196 }
197
198 fn run_with_hosts(&mut self, addrs: impl ToIpAddrs, mut f: impl FnMut(IpAddr, &mut Rt)) {
200 let hosts = self.world.borrow_mut().lookup_many(addrs);
201 for h in hosts {
202 let rt = self.rts.get_mut(&h).expect("missing host");
203
204 self.world.borrow_mut().current = Some(h);
205
206 World::enter(&self.world, || f(h, rt));
207 }
208
209 self.world.borrow_mut().current = None;
210 }
211
212 pub fn is_host_running(&mut self, addr: impl ToIpAddr) -> bool {
214 let host = self.world.borrow_mut().lookup(addr);
215
216 self.rts
217 .get(&host)
218 .expect("missing host")
219 .is_software_running()
220 }
221
222 pub fn lookup(&self, addr: impl ToIpAddr) -> IpAddr {
224 self.world.borrow_mut().lookup(addr)
225 }
226
227 pub fn reverse_lookup(&self, addr: IpAddr) -> Option<String> {
230 self.world
231 .borrow()
232 .reverse_lookup(addr)
233 .map(|h| h.to_owned())
234 }
235
236 pub fn hold(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
239 let mut world = self.world.borrow_mut();
240 world.hold_many(a, b);
241 }
242
243 pub fn repair(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
246 let mut world = self.world.borrow_mut();
247 world.repair_many(a, b);
248 }
249
250 pub fn repair_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
255 let mut world = self.world.borrow_mut();
256 world.repair_oneway_many(from, to);
257 }
258
259 pub fn release(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
261 let mut world = self.world.borrow_mut();
262 world.release_many(a, b);
263 }
264
265 pub fn partition(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
268 let mut world = self.world.borrow_mut();
269 world.partition_many(a, b);
270 }
271
272 pub fn partition_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
281 let mut world = self.world.borrow_mut();
282 world.partition_oneway_many(from, to);
283 }
284
285 pub fn reverse_lookup_pair(&self, pair: (IpAddr, IpAddr)) -> (String, String) {
289 let world = self.world.borrow();
290
291 (
292 world
293 .dns
294 .reverse(pair.0)
295 .expect("no hostname found for ip address")
296 .to_owned(),
297 world
298 .dns
299 .reverse(pair.1)
300 .expect("no hostname found for ip address")
301 .to_owned(),
302 )
303 }
304
305 pub fn lookup_many(&self, addr: impl ToIpAddrs) -> Vec<IpAddr> {
307 self.world.borrow_mut().lookup_many(addr)
308 }
309
310 pub fn set_max_message_latency(&self, value: Duration) {
312 self.world
313 .borrow_mut()
314 .topology
315 .set_max_message_latency(value);
316 }
317
318 pub fn set_link_latency(&self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: Duration) {
323 let mut world = self.world.borrow_mut();
324 let a = world.lookup_many(a);
325 let b = world.lookup_many(b);
326
327 for_pairs(&a, &b, |a, b| {
328 world.topology.set_link_message_latency(a, b, value);
329 });
330 }
331
332 pub fn set_link_max_message_latency(
334 &self,
335 a: impl ToIpAddrs,
336 b: impl ToIpAddrs,
337 value: Duration,
338 ) {
339 let mut world = self.world.borrow_mut();
340 let a = world.lookup_many(a);
341 let b = world.lookup_many(b);
342
343 for_pairs(&a, &b, |a, b| {
344 world.topology.set_link_max_message_latency(a, b, value);
345 });
346 }
347
348 pub fn set_message_latency_curve(&self, value: f64) {
353 self.world
354 .borrow_mut()
355 .topology
356 .set_message_latency_curve(value);
357 }
358
359 pub fn set_fail_rate(&mut self, value: f64) {
360 self.world.borrow_mut().topology.set_fail_rate(value);
361 }
362
363 pub fn set_link_fail_rate(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: f64) {
364 let mut world = self.world.borrow_mut();
365 let a = world.lookup_many(a);
366 let b = world.lookup_many(b);
367
368 for_pairs(&a, &b, |a, b| {
369 world.topology.set_link_fail_rate(a, b, value);
370 });
371 }
372
373 pub fn links(&self, f: impl FnOnce(LinksIter)) {
375 let top = &mut self.world.borrow_mut().topology;
376
377 f(top.iter_mut())
378 }
379
380 pub fn run(&mut self) -> Result {
385 if !self
387 .rts
388 .iter()
389 .any(|(_, rt)| matches!(rt.kind, rt::Kind::Client))
390 {
391 tracing::info!(target: TRACING_TARGET, "No client hosts registered, exiting simulation");
392 return Ok(());
393 }
394
395 loop {
396 let is_finished = self.step()?;
397
398 if is_finished {
399 return Ok(());
400 }
401 }
402 }
403
404 pub fn step(&mut self) -> Result<bool> {
414 tracing::trace!(target: TRACING_TARGET, "step {}", self.steps);
415
416 let tick = self.config.tick;
417 let mut is_finished = true;
418
419 self.world.borrow_mut().topology.tick_by(tick);
423
424 let (mut running, stopped): (Vec<_>, Vec<_>) = self
429 .rts
430 .iter_mut()
431 .partition(|(_, rt)| rt.is_software_running());
432 if self.config.random_node_order {
433 running.shuffle(&mut self.world.borrow_mut().rng);
434 }
435
436 for (&addr, rt) in running {
437 let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename,).entered();
438 {
439 let mut world = self.world.borrow_mut();
440 let World {
443 rng,
444 topology,
445 hosts,
446 ..
447 } = world.deref_mut();
448 topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));
449
450 world.current = Some(addr);
452
453 world.current_host_mut().timer.now(rt.now());
454 }
455
456 let is_software_finished = World::enter(&self.world, || rt.tick(tick))?;
457
458 if rt.is_client() {
459 is_finished = is_finished && is_software_finished;
460 }
461
462 let mut world = self.world.borrow_mut();
464 world.current = None;
465
466 world.tick(addr, tick);
467 }
468
469 for (&addr, _rt) in stopped {
472 let mut world = self.world.borrow_mut();
473 world.tick(addr, tick);
474 }
475
476 self.elapsed += tick;
477 self.steps += 1;
478
479 if self.elapsed > self.config.duration && !is_finished {
480 return Err(format!(
481 "Ran for duration: {:?} steps: {} without completing",
482 self.config.duration, self.steps,
483 ))?;
484 }
485
486 Ok(is_finished)
487 }
488}
489
490#[cfg(test)]
491mod test {
492 use rand::Rng;
493 use std::future;
494 use std::{
495 net::{IpAddr, Ipv4Addr},
496 rc::Rc,
497 sync::{
498 atomic::{AtomicU64, Ordering},
499 Arc, Mutex,
500 },
501 time::Duration,
502 };
503
504 use tokio::{
505 io::{AsyncReadExt, AsyncWriteExt},
506 sync::Semaphore,
507 time::Instant,
508 };
509
510 use crate::net::UdpSocket;
511 use crate::{
512 elapsed, hold,
513 net::{TcpListener, TcpStream},
514 sim_elapsed, since_epoch, Builder, Result, Sim, World,
515 };
516
517 #[test]
518 fn client_error() {
519 let mut sim = Builder::new().build();
520
521 sim.client("doomed", async { Err("An Error")? });
522
523 assert!(sim.run().is_err());
524 }
525
526 #[test]
527 fn timeout() {
528 let mut sim = Builder::new()
529 .simulation_duration(Duration::from_millis(500))
530 .build();
531
532 sim.client("timeout", async {
533 tokio::time::sleep(Duration::from_secs(1)).await;
534
535 Ok(())
536 });
537
538 assert!(sim.run().is_err());
539 }
540
541 #[test]
542 fn multiple_clients_all_finish() -> Result {
543 let how_many = 3;
544 let tick_ms = 10;
545
546 for run in 0..how_many {
548 let mut sim = Builder::new()
549 .tick_duration(Duration::from_millis(tick_ms))
550 .build();
551
552 let ct = Rc::new(Semaphore::new(how_many));
553
554 for client in 0..how_many {
555 let ct = ct.clone();
556
557 sim.client(format!("client-{client}"), async move {
558 let ms = if run == client { 0 } else { 2 * tick_ms };
559 tokio::time::sleep(Duration::from_millis(ms)).await;
560
561 let p = ct.acquire().await?;
562 p.forget();
563
564 Ok(())
565 });
566 }
567
568 sim.run()?;
569 assert_eq!(0, ct.available_permits());
570 }
571
572 Ok(())
573 }
574
575 #[test]
580 fn crash_blocks_until_complete() -> Result {
581 let ct = Arc::new(());
582
583 let mut sim = Builder::new().build();
584
585 sim.host("host", || {
586 let ct = ct.clone();
587
588 async move {
589 tokio::spawn(async move {
590 let _into_task = ct;
591 future::pending::<()>().await;
592 });
593
594 future::pending().await
595 }
596 });
597
598 sim.run()?;
599 assert_eq!(2, Arc::strong_count(&ct));
600
601 sim.crash("host");
602 assert_eq!(1, Arc::strong_count(&ct));
603
604 Ok(())
605 }
606
607 #[test]
608 fn elapsed_time() -> Result {
609 let tick = Duration::from_millis(5);
610 let mut sim = Builder::new().tick_duration(tick).build();
611 let start_epoch = sim.since_epoch();
612
613 let duration = Duration::from_millis(500);
614
615 sim.client("c1", async move {
616 tokio::time::sleep(duration).await;
617 assert_eq!(duration, elapsed());
618 assert_eq!(duration, sim_elapsed().unwrap());
621
622 assert_eq!(start_epoch + duration, since_epoch().unwrap());
624
625 Ok(())
626 });
627
628 sim.client("c2", async move {
629 tokio::time::sleep(duration).await;
630 assert_eq!(duration, elapsed());
631 assert_eq!(duration, sim_elapsed().unwrap());
632 assert_eq!(start_epoch + duration, since_epoch().unwrap());
633
634 Ok(())
635 });
636
637 sim.run()?;
638
639 assert_eq!(duration + tick, sim.elapsed());
641
642 let start = sim.elapsed();
643 sim.client("c3", async move {
644 assert_eq!(Duration::ZERO, elapsed());
645 assert_eq!(duration + tick, sim_elapsed().unwrap());
648 assert_eq!(start_epoch + duration + tick, since_epoch().unwrap());
649 tokio::time::sleep(duration).await;
650 assert_eq!(duration, elapsed());
651 assert_eq!(duration + tick + duration, sim_elapsed().unwrap());
652 assert_eq!(
653 start_epoch + duration + tick + duration,
654 since_epoch().unwrap()
655 );
656
657 Ok(())
658 });
659
660 sim.run()?;
661
662 assert_eq!(duration + tick, sim.elapsed() - start);
664
665 Ok(())
666 }
667
668 #[test]
671 fn sim_elapsed_time() -> Result {
672 assert!(sim_elapsed().is_none());
675
676 let sim = Builder::new().build();
677 World::enter(&sim.world, || assert!(sim_elapsed().is_none()));
679
680 Ok(())
681 }
682
683 #[test]
686 fn since_epoch_regression() -> Result {
687 assert!(since_epoch().is_none());
690
691 let sim = Builder::new().build();
692 World::enter(&sim.world, || assert!(since_epoch().is_none()));
694
695 Ok(())
696 }
697
698 #[test]
699 fn hold_release_peers() -> Result {
700 let global = Duration::from_millis(2);
701
702 let mut sim = Builder::new()
703 .min_message_latency(global)
704 .max_message_latency(global)
705 .build();
706
707 sim.host("server", || async {
708 let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
709
710 while let Ok((mut s, _)) = listener.accept().await {
711 assert!(s.write_u8(42).await.is_ok());
712 }
713
714 Ok(())
715 });
716
717 sim.client("client", async move {
718 let mut s = TcpStream::connect("server:1234").await?;
719
720 s.read_u8().await?;
721
722 Ok(())
723 });
724
725 sim.hold("server", "client");
726
727 sim.step()?;
729
730 sim.links(|l| {
731 assert!(l.count() == 1);
732 });
733
734 sim.step()?;
736
737 sim.release("server", "client");
738
739 sim.run()?;
740
741 Ok(())
742 }
743
744 #[test]
745 fn partition_peers() -> Result {
746 let global = Duration::from_millis(2);
747
748 let mut sim = Builder::new()
749 .min_message_latency(global)
750 .max_message_latency(global)
751 .build();
752
753 sim.host("server", || async {
754 let _listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
755
756 Ok(())
757 });
758
759 sim.client("client", async move {
760 let _ = TcpStream::connect("server:1234").await.unwrap_err();
762
763 Ok(())
764 });
765
766 sim.partition("server", "client");
767
768 sim.run()?;
769
770 Ok(())
771 }
772
773 struct Expectation {
774 expect_a_receive: bool,
775 expect_b_receive: bool,
776 }
777
778 #[derive(Debug)]
779 enum Action {
780 Partition,
781 PartitionOnewayAB,
782 PartitionOnewayBA,
783 RepairOnewayAB,
784 RepairOnewayBA,
785 Repair,
786 }
787
788 fn run_with_partitioning(
789 host_a: &'static str,
790 host_b: &'static str,
791 mut partitioning: impl FnMut(&mut Sim) -> Expectation,
792 ) -> Result {
793 let global = Duration::from_millis(1);
794
795 let mut sim = Builder::new()
796 .min_message_latency(global)
797 .max_message_latency(global)
798 .build();
799
800 let a_did_receive = Arc::new(Mutex::new(None));
801 let b_did_receive = Arc::new(Mutex::new(None));
802
803 let make_a = |sim: &mut Sim| {
804 sim.client(host_a, {
805 let a_did_receive = Arc::clone(&a_did_receive);
806 async move {
807 let udp_socket =
808 UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
809 udp_socket
810 .send_to(&[42], format!("{host_b}:1234"))
811 .await
812 .expect("sending packet should appear to work, even if partitioned");
813
814 *a_did_receive.lock().unwrap() = Some(matches!(
815 tokio::time::timeout(
816 Duration::from_secs(1),
817 udp_socket.recv_from(&mut [0])
818 )
819 .await,
820 Ok(Ok(_))
821 ));
822
823 Ok(())
824 }
825 })
826 };
827
828 let make_b = |sim: &mut Sim| {
829 sim.client(host_b, {
830 let b_did_receive = Arc::clone(&b_did_receive);
831 async move {
832 let udp_socket =
833 UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
834 udp_socket
835 .send_to(&[42], format!("{host_a}:1234"))
836 .await
837 .expect("sending packet should work");
838
839 *b_did_receive.lock().unwrap() = Some(matches!(
840 tokio::time::timeout(
841 Duration::from_secs(1),
842 udp_socket.recv_from(&mut [0])
843 )
844 .await,
845 Ok(Ok(_))
846 ));
847
848 Ok(())
849 }
850 })
851 };
852
853 let construct_a_first = sim.world.borrow_mut().rng.random_bool(0.5);
854 if construct_a_first {
855 make_a(&mut sim);
856 make_b(&mut sim);
857 } else {
858 make_b(&mut sim);
859 make_a(&mut sim);
860 }
861
862 let Expectation {
863 expect_a_receive,
864 expect_b_receive,
865 } = partitioning(&mut sim);
866 sim.run()?;
867
868 assert_eq!(*a_did_receive.lock().unwrap(), Some(expect_a_receive));
869 assert_eq!(*b_did_receive.lock().unwrap(), Some(expect_b_receive));
870
871 Ok(())
872 }
873
874 #[test]
875 fn partition_peers_oneway() -> Result {
876 run_with_partitioning("a", "b", |sim: &mut Sim| {
877 sim.partition_oneway("a", "b");
878 Expectation {
879 expect_a_receive: true,
880 expect_b_receive: false,
881 }
882 })
883 }
884
885 #[test]
886 fn partition_peers_oneway_many_cases() -> Result {
887 const HOST_A: &str = "a";
888 const HOST_B: &str = "b";
889
890 fn run_with_actions(actions: &[Action]) -> Result {
893 run_with_partitioning(HOST_A, HOST_B, |sim: &mut Sim| {
894 let mut expect_a_receive = true;
895 let mut expect_b_receive = true;
896 for action in actions {
897 match action {
898 Action::Partition => {
899 sim.partition(HOST_A, HOST_B);
900 expect_a_receive = false;
901 expect_b_receive = false;
902 }
903 Action::PartitionOnewayAB => {
904 sim.partition_oneway(HOST_A, HOST_B);
905 expect_b_receive = false;
906 }
907 Action::PartitionOnewayBA => {
908 sim.partition_oneway(HOST_B, HOST_A);
909 expect_a_receive = false;
910 }
911 Action::RepairOnewayAB => {
912 sim.repair_oneway(HOST_A, HOST_B);
913 expect_b_receive = true;
914 }
915 Action::RepairOnewayBA => {
916 sim.repair_oneway(HOST_B, HOST_A);
917 expect_a_receive = true;
918 }
919 Action::Repair => {
920 sim.repair(HOST_A, HOST_B);
921 expect_a_receive = true;
922 expect_b_receive = true;
923 }
924 }
925 }
926 Expectation {
927 expect_a_receive,
928 expect_b_receive,
929 }
930 })?;
931 Ok(())
932 }
933
934 run_with_actions(&[Action::PartitionOnewayAB])?;
935 run_with_actions(&[Action::PartitionOnewayBA])?;
936 run_with_actions(&[Action::Partition, Action::RepairOnewayAB])?;
937 run_with_actions(&[Action::Partition, Action::RepairOnewayBA])?;
938 run_with_actions(&[Action::PartitionOnewayAB, Action::Repair])?;
939 run_with_actions(&[Action::PartitionOnewayBA, Action::Repair])?;
940 run_with_actions(&[Action::PartitionOnewayBA, Action::RepairOnewayAB])?;
941 run_with_actions(&[Action::PartitionOnewayAB, Action::PartitionOnewayBA])?;
942 run_with_actions(&[
943 Action::Partition,
944 Action::RepairOnewayAB,
945 Action::RepairOnewayBA,
946 ])?;
947
948 Ok(())
949 }
950
951 #[test]
952 fn elapsed_time_across_restarts() -> Result {
953 let tick_ms = 5;
954 let mut sim = Builder::new()
955 .tick_duration(Duration::from_millis(tick_ms))
956 .build();
957
958 let clock = Arc::new(AtomicU64::new(0));
959 let actual = clock.clone();
960
961 sim.host("host", move || {
962 let clock = clock.clone();
963
964 async move {
965 loop {
966 tokio::time::sleep(Duration::from_millis(1)).await;
967 clock.store(elapsed().as_millis() as u64, Ordering::SeqCst);
968 }
969 }
970 });
971
972 sim.step()?;
973 assert_eq!(tick_ms - 1, actual.load(Ordering::SeqCst));
974
975 sim.bounce("host");
976 sim.step()?;
977 assert_eq!((tick_ms * 2) - 1, actual.load(Ordering::SeqCst));
978
979 Ok(())
980 }
981
982 #[test]
983 fn elapsed_time_across_crashes() -> Result {
984 let tick_ms = 5;
985 let mut sim = Builder::new()
986 .tick_duration(Duration::from_millis(tick_ms))
987 .build();
988
989 let clock_1 = Arc::new(AtomicU64::new(0));
990 let clock_1_moved = clock_1.clone();
991
992 sim.host("host1", move || {
993 let clock = clock_1_moved.clone();
994 async move {
995 loop {
996 tokio::time::sleep(Duration::from_millis(1)).await;
997 clock.store(sim_elapsed().unwrap().as_millis() as u64, Ordering::SeqCst);
998 }
999 }
1000 });
1001
1002 sim.crash("host1");
1004 sim.step()?;
1005 sim.bounce("host1");
1007 sim.step()?;
1008 assert_eq!(
1009 2 * tick_ms - 1,
1010 clock_1.load(Ordering::SeqCst),
1011 "Host 1 should have caught up"
1012 );
1013
1014 Ok(())
1015 }
1016
1017 #[test]
1018 fn host_finishes_with_error() {
1019 let mut sim = Builder::new().build();
1020
1021 sim.host("host", || async {
1022 Err("Host software finished unexpectedly")?
1023 });
1024
1025 assert!(sim.step().is_err());
1026 }
1027
1028 #[test]
1029 fn manual_message_delivery() -> Result {
1030 let mut sim = Builder::new().build();
1031
1032 sim.host("a", || async {
1033 let l = TcpListener::bind("0.0.0.0:1234").await?;
1034
1035 _ = l.accept().await?;
1036
1037 Ok(())
1038 });
1039
1040 sim.client("b", async {
1041 hold("a", "b");
1042
1043 _ = TcpStream::connect("a:1234").await?;
1044
1045 Ok(())
1046 });
1047
1048 assert!(!sim.step()?);
1049
1050 sim.links(|mut l| {
1051 let a_to_b = l.next().unwrap();
1052 a_to_b.deliver_all();
1053 });
1054
1055 assert!(sim.step()?);
1056
1057 Ok(())
1058 }
1059
1060 #[test]
1064 fn run_after_host_crashes() -> Result {
1065 let mut sim = Builder::new().build();
1066
1067 sim.host("h", || async { future::pending().await });
1068
1069 sim.crash("h");
1070
1071 sim.run()
1072 }
1073
1074 #[test]
1075 fn restart_host_after_crash() -> Result {
1076 let mut sim = Builder::new().build();
1077
1078 let data = Arc::new(AtomicU64::new(0));
1079 let data_cloned = data.clone();
1080
1081 sim.host("h", move || {
1082 let data_cloned = data_cloned.clone();
1083 async move {
1084 data_cloned.store(data_cloned.load(Ordering::SeqCst) + 1, Ordering::SeqCst);
1085 Ok(())
1086 }
1087 });
1088
1089 sim.crash("h");
1091 sim.step()?;
1092
1093 sim.bounce("h");
1095 sim.step()?;
1096 assert_eq!(1, data.load(Ordering::SeqCst));
1098
1099 Ok(())
1100 }
1101
1102 #[test]
1103 fn override_link_latency() -> Result {
1104 let global = Duration::from_millis(2);
1105
1106 let mut sim = Builder::new()
1107 .min_message_latency(global)
1108 .max_message_latency(global)
1109 .build();
1110
1111 sim.host("server", || async {
1112 let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
1113
1114 while let Ok((mut s, _)) = listener.accept().await {
1115 assert!(s.write_u8(9).await.is_ok());
1116 }
1117
1118 Ok(())
1119 });
1120
1121 sim.client("client", async move {
1122 let mut s = TcpStream::connect("server:1234").await?;
1123
1124 let start = Instant::now();
1125 s.read_u8().await?;
1126 assert_eq!(global, start.elapsed());
1127
1128 Ok(())
1129 });
1130
1131 sim.run()?;
1132
1133 let degraded = Duration::from_millis(10);
1134
1135 sim.client("client2", async move {
1136 let mut s = TcpStream::connect("server:1234").await?;
1137
1138 let start = Instant::now();
1139 s.read_u8().await?;
1140 assert_eq!(degraded, start.elapsed());
1141
1142 Ok(())
1143 });
1144
1145 sim.set_link_latency("client2", "server", degraded);
1146
1147 sim.run()
1148 }
1149
1150 #[test]
1151 fn is_host_running() -> Result {
1152 let mut sim = Builder::new().build();
1153
1154 sim.client("client", async { future::pending().await });
1155 sim.host("host", || async { future::pending().await });
1156
1157 assert!(!sim.step()?);
1158
1159 assert!(sim.is_host_running("client"));
1160 assert!(sim.is_host_running("host"));
1161
1162 sim.crash("host");
1163 assert!(!sim.is_host_running("host"));
1164
1165 Ok(())
1166 }
1167
1168 #[test]
1169 #[cfg(feature = "regex")]
1170 fn host_scan() -> Result {
1171 let mut sim = Builder::new().build();
1172
1173 let how_many = 3;
1174 for i in 0..how_many {
1175 sim.host(format!("host-{i}"), || async { future::pending().await })
1176 }
1177
1178 let mut ips = sim.lookup_many(regex::Regex::new(".*")?);
1179 ips.sort();
1180
1181 assert_eq!(how_many, ips.len());
1182
1183 for (i, ip) in ips.iter().enumerate() {
1184 assert_eq!(
1185 format!("host-{i}"),
1186 sim.reverse_lookup(*ip).ok_or("Unable to resolve ip")?
1187 );
1188 }
1189
1190 Ok(())
1191 }
1192
1193 #[test]
1194 #[cfg(feature = "regex")]
1195 fn bounce_multiple_hosts_with_regex() -> Result {
1196 let mut sim = Builder::new().build();
1197
1198 let count = Arc::new(AtomicU64::new(0));
1199 for i in 1..=3 {
1200 let count = count.clone();
1201 sim.host(format!("host-{}", i), move || {
1202 let count = count.clone();
1203 async move {
1204 count.fetch_add(1, Ordering::SeqCst);
1205 future::pending().await
1206 }
1207 });
1208 }
1209
1210 sim.step()?;
1211 assert_eq!(count.load(Ordering::SeqCst), 3);
1212 sim.bounce(regex::Regex::new("host-[12]")?);
1213 sim.step()?;
1214 assert_eq!(count.load(Ordering::SeqCst), 5);
1215
1216 Ok(())
1217 }
1218
1219 #[test]
1220 #[cfg(feature = "regex")]
1221 fn hold_all() -> Result {
1222 let mut sim = Builder::new().build();
1223
1224 sim.host("host", || async {
1225 let l = TcpListener::bind("0.0.0.0:1234").await?;
1226
1227 loop {
1228 _ = l.accept().await?;
1229 }
1230 });
1231
1232 sim.client("test", async {
1233 hold(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1234
1235 assert!(tokio::time::timeout(
1236 Duration::from_millis(100),
1237 TcpStream::connect("host:1234")
1238 )
1239 .await
1240 .is_err());
1241
1242 crate::release(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1243
1244 assert!(TcpStream::connect("host:1234").await.is_ok());
1245
1246 Ok(())
1247 });
1248
1249 sim.run()
1250 }
1251}