1use rand::{rngs::SmallRng, seq::SliceRandom, Rng, SeedableRng};
2use std::cell::RefCell;
3use std::future::Future;
4use std::net::IpAddr;
5use std::ops::DerefMut;
6use std::sync::Arc;
7use std::time::UNIX_EPOCH;
8
9use indexmap::IndexMap;
10use tokio::time::Duration;
11use tracing::Level;
12
13use crate::host::HostTimer;
14use crate::{
15 for_pairs, rt, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET,
16};
17
18pub struct Sim<'a> {
20 config: Config,
22
23 world: RefCell<World>,
27
28 rts: IndexMap<IpAddr, Rt<'a>>,
30
31 since_epoch: Duration,
34
35 elapsed: Duration,
37
38 steps: usize,
39}
40
41impl<'a> Sim<'a> {
42 pub(crate) fn new(config: Config, world: World) -> Self {
43 let since_epoch = config
44 .epoch
45 .duration_since(UNIX_EPOCH)
46 .expect("now must be >= UNIX_EPOCH");
47
48 Self {
49 config,
50 world: RefCell::new(world),
51 rts: IndexMap::new(),
52 since_epoch,
53 elapsed: Duration::ZERO,
54 steps: 1, }
56 }
57
58 pub fn elapsed(&self) -> Duration {
60 self.elapsed
61 }
62
63 pub fn since_epoch(&self) -> Duration {
69 self.since_epoch + self.elapsed
70 }
71
72 pub fn client<F>(&mut self, addr: impl ToIpAddr, client: F)
74 where
75 F: Future<Output = Result> + 'static,
76 {
77 let addr = self.lookup(addr);
78 let nodename: Arc<str> = self
79 .world
80 .borrow_mut()
81 .dns
82 .reverse(addr)
83 .map(str::to_string)
84 .unwrap_or_else(|| addr.to_string())
85 .into();
86
87 {
88 let world = RefCell::get_mut(&mut self.world);
89
90 world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
92 }
93
94 let seed = self.world.borrow_mut().rng.gen();
95 let rng = SmallRng::from_seed(seed);
96 let config = rt::Config {
97 enable_io: self.config.enable_tokio_io,
98 rng: Some(rng),
99 };
100
101 let rt = World::enter(&self.world, || Rt::client(nodename, client, config));
102
103 self.rts.insert(addr, rt);
104 }
105
106 pub fn host<F, Fut>(&mut self, addr: impl ToIpAddr, host: F)
113 where
114 F: Fn() -> Fut + 'a,
115 Fut: Future<Output = Result> + 'static,
116 {
117 let addr = self.lookup(addr);
118 let nodename: Arc<str> = self
119 .world
120 .borrow_mut()
121 .dns
122 .reverse(addr)
123 .map(str::to_string)
124 .unwrap_or_else(|| addr.to_string())
125 .into();
126
127 {
128 let world = RefCell::get_mut(&mut self.world);
129
130 world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
132 }
133
134 let seed = self.world.borrow_mut().rng.gen();
135 let rng = SmallRng::from_seed(seed);
136 let config = rt::Config {
137 enable_io: self.config.enable_tokio_io,
138 rng: Some(rng),
139 };
140
141 let rt = World::enter(&self.world, || Rt::host(nodename, host, config));
142
143 self.rts.insert(addr, rt);
144 }
145
146 pub fn crash(&mut self, addrs: impl ToIpAddrs) {
150 self.run_with_hosts(addrs, |addr, rt| {
151 rt.crash();
152
153 tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Crash");
154 });
155 }
156
157 pub fn bounce(&mut self, addrs: impl ToIpAddrs) {
159 self.run_with_hosts(addrs, |addr, rt| {
160 rt.bounce();
161
162 tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Bounce");
163 });
164 }
165
166 fn run_with_hosts(&mut self, addrs: impl ToIpAddrs, mut f: impl FnMut(IpAddr, &mut Rt)) {
168 let hosts = self.world.borrow_mut().lookup_many(addrs);
169 for h in hosts {
170 let rt = self.rts.get_mut(&h).expect("missing host");
171
172 self.world.borrow_mut().current = Some(h);
173
174 World::enter(&self.world, || f(h, rt));
175 }
176
177 self.world.borrow_mut().current = None;
178 }
179
180 pub fn is_host_running(&mut self, addr: impl ToIpAddr) -> bool {
182 let host = self.world.borrow_mut().lookup(addr);
183
184 self.rts
185 .get(&host)
186 .expect("missing host")
187 .is_software_running()
188 }
189
190 pub fn lookup(&self, addr: impl ToIpAddr) -> IpAddr {
192 self.world.borrow_mut().lookup(addr)
193 }
194
195 pub fn reverse_lookup(&self, addr: IpAddr) -> Option<String> {
198 self.world
199 .borrow()
200 .reverse_lookup(addr)
201 .map(|h| h.to_owned())
202 }
203
204 pub fn hold(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
207 let mut world = self.world.borrow_mut();
208 world.hold_many(a, b);
209 }
210
211 pub fn repair(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
214 let mut world = self.world.borrow_mut();
215 world.repair_many(a, b);
216 }
217
218 pub fn repair_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
223 let mut world = self.world.borrow_mut();
224 world.repair_oneway_many(from, to);
225 }
226
227 pub fn release(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
229 let mut world = self.world.borrow_mut();
230 world.release_many(a, b);
231 }
232
233 pub fn partition(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
236 let mut world = self.world.borrow_mut();
237 world.partition_many(a, b);
238 }
239
240 pub fn partition_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
249 let mut world = self.world.borrow_mut();
250 world.partition_oneway_many(from, to);
251 }
252
253 pub fn reverse_lookup_pair(&self, pair: (IpAddr, IpAddr)) -> (String, String) {
257 let world = self.world.borrow();
258
259 (
260 world
261 .dns
262 .reverse(pair.0)
263 .expect("no hostname found for ip address")
264 .to_owned(),
265 world
266 .dns
267 .reverse(pair.1)
268 .expect("no hostname found for ip address")
269 .to_owned(),
270 )
271 }
272
273 pub fn lookup_many(&self, addr: impl ToIpAddrs) -> Vec<IpAddr> {
275 self.world.borrow_mut().lookup_many(addr)
276 }
277
278 pub fn set_max_message_latency(&self, value: Duration) {
280 self.world
281 .borrow_mut()
282 .topology
283 .set_max_message_latency(value);
284 }
285
286 pub fn set_link_latency(&self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: Duration) {
291 let mut world = self.world.borrow_mut();
292 let a = world.lookup_many(a);
293 let b = world.lookup_many(b);
294
295 for_pairs(&a, &b, |a, b| {
296 world.topology.set_link_message_latency(a, b, value);
297 });
298 }
299
300 pub fn set_link_max_message_latency(
302 &self,
303 a: impl ToIpAddrs,
304 b: impl ToIpAddrs,
305 value: Duration,
306 ) {
307 let mut world = self.world.borrow_mut();
308 let a = world.lookup_many(a);
309 let b = world.lookup_many(b);
310
311 for_pairs(&a, &b, |a, b| {
312 world.topology.set_link_max_message_latency(a, b, value);
313 });
314 }
315
316 pub fn set_message_latency_curve(&self, value: f64) {
321 self.world
322 .borrow_mut()
323 .topology
324 .set_message_latency_curve(value);
325 }
326
327 pub fn set_fail_rate(&mut self, value: f64) {
328 self.world.borrow_mut().topology.set_fail_rate(value);
329 }
330
331 pub fn set_link_fail_rate(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: f64) {
332 let mut world = self.world.borrow_mut();
333 let a = world.lookup_many(a);
334 let b = world.lookup_many(b);
335
336 for_pairs(&a, &b, |a, b| {
337 world.topology.set_link_fail_rate(a, b, value);
338 });
339 }
340
341 pub fn links(&self, f: impl FnOnce(LinksIter)) {
343 let top = &mut self.world.borrow_mut().topology;
344
345 f(top.iter_mut())
346 }
347
348 pub fn run(&mut self) -> Result {
353 if !self
355 .rts
356 .iter()
357 .any(|(_, rt)| matches!(rt.kind, rt::Kind::Client))
358 {
359 tracing::info!(target: TRACING_TARGET, "No client hosts registered, exiting simulation");
360 return Ok(());
361 }
362
363 loop {
364 let is_finished = self.step()?;
365
366 if is_finished {
367 return Ok(());
368 }
369 }
370 }
371
372 pub fn step(&mut self) -> Result<bool> {
382 tracing::trace!(target: TRACING_TARGET, "step {}", self.steps);
383
384 let tick = self.config.tick;
385 let mut is_finished = true;
386
387 self.world.borrow_mut().topology.tick_by(tick);
391
392 let (mut running, stopped): (Vec<_>, Vec<_>) = self
397 .rts
398 .iter_mut()
399 .partition(|(_, rt)| rt.is_software_running());
400 if self.config.random_node_order {
401 running.shuffle(&mut self.world.borrow_mut().rng);
402 }
403
404 for (&addr, rt) in running {
405 let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename,).entered();
406 {
407 let mut world = self.world.borrow_mut();
408 let World {
411 rng,
412 topology,
413 hosts,
414 ..
415 } = world.deref_mut();
416 topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));
417
418 world.current = Some(addr);
420
421 world.current_host_mut().timer.now(rt.now());
422 }
423
424 let is_software_finished = World::enter(&self.world, || rt.tick(tick))?;
425
426 if rt.is_client() {
427 is_finished = is_finished && is_software_finished;
428 }
429
430 let mut world = self.world.borrow_mut();
432 world.current = None;
433
434 world.tick(addr, tick);
435 }
436
437 for (&addr, _rt) in stopped {
440 let mut world = self.world.borrow_mut();
441 world.tick(addr, tick);
442 }
443
444 self.elapsed += tick;
445 self.steps += 1;
446
447 if self.elapsed > self.config.duration && !is_finished {
448 return Err(format!(
449 "Ran for duration: {:?} steps: {} without completing",
450 self.config.duration, self.steps,
451 ))?;
452 }
453
454 Ok(is_finished)
455 }
456}
457
458#[cfg(test)]
459mod test {
460 use rand::Rng;
461 use std::future;
462 use std::{
463 net::{IpAddr, Ipv4Addr},
464 rc::Rc,
465 sync::{
466 atomic::{AtomicU64, Ordering},
467 Arc, Mutex,
468 },
469 time::Duration,
470 };
471
472 use tokio::{
473 io::{AsyncReadExt, AsyncWriteExt},
474 sync::Semaphore,
475 time::Instant,
476 };
477
478 use crate::net::UdpSocket;
479 use crate::{
480 elapsed, hold,
481 net::{TcpListener, TcpStream},
482 sim_elapsed, Builder, Result, Sim, World,
483 };
484
485 #[test]
486 fn client_error() {
487 let mut sim = Builder::new().build();
488
489 sim.client("doomed", async { Err("An Error")? });
490
491 assert!(sim.run().is_err());
492 }
493
494 #[test]
495 fn timeout() {
496 let mut sim = Builder::new()
497 .simulation_duration(Duration::from_millis(500))
498 .build();
499
500 sim.client("timeout", async {
501 tokio::time::sleep(Duration::from_secs(1)).await;
502
503 Ok(())
504 });
505
506 assert!(sim.run().is_err());
507 }
508
509 #[test]
510 fn multiple_clients_all_finish() -> Result {
511 let how_many = 3;
512 let tick_ms = 10;
513
514 for run in 0..how_many {
516 let mut sim = Builder::new()
517 .tick_duration(Duration::from_millis(tick_ms))
518 .build();
519
520 let ct = Rc::new(Semaphore::new(how_many));
521
522 for client in 0..how_many {
523 let ct = ct.clone();
524
525 sim.client(format!("client-{client}"), async move {
526 let ms = if run == client { 0 } else { 2 * tick_ms };
527 tokio::time::sleep(Duration::from_millis(ms)).await;
528
529 let p = ct.acquire().await?;
530 p.forget();
531
532 Ok(())
533 });
534 }
535
536 sim.run()?;
537 assert_eq!(0, ct.available_permits());
538 }
539
540 Ok(())
541 }
542
543 #[test]
548 fn crash_blocks_until_complete() -> Result {
549 let ct = Arc::new(());
550
551 let mut sim = Builder::new().build();
552
553 sim.host("host", || {
554 let ct = ct.clone();
555
556 async move {
557 tokio::spawn(async move {
558 let _into_task = ct;
559 future::pending::<()>().await;
560 });
561
562 future::pending().await
563 }
564 });
565
566 sim.run()?;
567 assert_eq!(2, Arc::strong_count(&ct));
568
569 sim.crash("host");
570 assert_eq!(1, Arc::strong_count(&ct));
571
572 Ok(())
573 }
574
575 #[test]
576 fn elapsed_time() -> Result {
577 let tick = Duration::from_millis(5);
578 let mut sim = Builder::new().tick_duration(tick).build();
579
580 let duration = Duration::from_millis(500);
581
582 sim.client("c1", async move {
583 tokio::time::sleep(duration).await;
584 assert_eq!(duration, elapsed());
585 assert_eq!(duration, sim_elapsed().unwrap());
588
589 Ok(())
590 });
591
592 sim.client("c2", async move {
593 tokio::time::sleep(duration).await;
594 assert_eq!(duration, elapsed());
595 assert_eq!(duration, sim_elapsed().unwrap());
596
597 Ok(())
598 });
599
600 sim.run()?;
601
602 assert_eq!(duration + tick, sim.elapsed());
604
605 let start = sim.elapsed();
606 sim.client("c3", async move {
607 assert_eq!(Duration::ZERO, elapsed());
608 assert_eq!(duration + tick, sim_elapsed().unwrap());
611 tokio::time::sleep(duration).await;
612 assert_eq!(duration, elapsed());
613 assert_eq!(duration + tick + duration, sim_elapsed().unwrap());
614
615 Ok(())
616 });
617
618 sim.run()?;
619
620 assert_eq!(duration + tick, sim.elapsed() - start);
622
623 Ok(())
624 }
625
626 #[test]
629 fn sim_elapsed_time() -> Result {
630 assert!(sim_elapsed().is_none());
633
634 let sim = Builder::new().build();
635 World::enter(&sim.world, || assert!(sim_elapsed().is_none()));
637
638 Ok(())
639 }
640
641 #[test]
642 fn hold_release_peers() -> Result {
643 let global = Duration::from_millis(2);
644
645 let mut sim = Builder::new()
646 .min_message_latency(global)
647 .max_message_latency(global)
648 .build();
649
650 sim.host("server", || async {
651 let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
652
653 while let Ok((mut s, _)) = listener.accept().await {
654 assert!(s.write_u8(42).await.is_ok());
655 }
656
657 Ok(())
658 });
659
660 sim.client("client", async move {
661 let mut s = TcpStream::connect("server:1234").await?;
662
663 s.read_u8().await?;
664
665 Ok(())
666 });
667
668 sim.hold("server", "client");
669
670 sim.step()?;
672
673 sim.links(|l| {
674 assert!(l.count() == 1);
675 });
676
677 sim.step()?;
679
680 sim.release("server", "client");
681
682 sim.run()?;
683
684 Ok(())
685 }
686
687 #[test]
688 fn partition_peers() -> Result {
689 let global = Duration::from_millis(2);
690
691 let mut sim = Builder::new()
692 .min_message_latency(global)
693 .max_message_latency(global)
694 .build();
695
696 sim.host("server", || async {
697 let _listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
698
699 Ok(())
700 });
701
702 sim.client("client", async move {
703 let _ = TcpStream::connect("server:1234").await.unwrap_err();
705
706 Ok(())
707 });
708
709 sim.partition("server", "client");
710
711 sim.run()?;
712
713 Ok(())
714 }
715
716 struct Expectation {
717 expect_a_receive: bool,
718 expect_b_receive: bool,
719 }
720
721 #[derive(Debug)]
722 enum Action {
723 Partition,
724 PartitionOnewayAB,
725 PartitionOnewayBA,
726 RepairOnewayAB,
727 RepairOnewayBA,
728 Repair,
729 }
730
731 fn run_with_partitioning(
732 host_a: &'static str,
733 host_b: &'static str,
734 mut partitioning: impl FnMut(&mut Sim) -> Expectation,
735 ) -> Result {
736 let global = Duration::from_millis(1);
737
738 let mut sim = Builder::new()
739 .min_message_latency(global)
740 .max_message_latency(global)
741 .build();
742
743 let a_did_receive = Arc::new(Mutex::new(None));
744 let b_did_receive = Arc::new(Mutex::new(None));
745
746 let make_a = |sim: &mut Sim| {
747 sim.client(host_a, {
748 let a_did_receive = Arc::clone(&a_did_receive);
749 async move {
750 let udp_socket =
751 UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
752 udp_socket
753 .send_to(&[42], format!("{host_b}:1234"))
754 .await
755 .expect("sending packet should appear to work, even if partitioned");
756
757 *a_did_receive.lock().unwrap() = Some(matches!(
758 tokio::time::timeout(
759 Duration::from_secs(1),
760 udp_socket.recv_from(&mut [0])
761 )
762 .await,
763 Ok(Ok(_))
764 ));
765
766 Ok(())
767 }
768 })
769 };
770
771 let make_b = |sim: &mut Sim| {
772 sim.client(host_b, {
773 let b_did_receive = Arc::clone(&b_did_receive);
774 async move {
775 let udp_socket =
776 UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
777 udp_socket
778 .send_to(&[42], format!("{host_a}:1234"))
779 .await
780 .expect("sending packet should work");
781
782 *b_did_receive.lock().unwrap() = Some(matches!(
783 tokio::time::timeout(
784 Duration::from_secs(1),
785 udp_socket.recv_from(&mut [0])
786 )
787 .await,
788 Ok(Ok(_))
789 ));
790
791 Ok(())
792 }
793 })
794 };
795
796 let construct_a_first = sim.world.borrow_mut().rng.gen_bool(0.5);
797 if construct_a_first {
798 make_a(&mut sim);
799 make_b(&mut sim);
800 } else {
801 make_b(&mut sim);
802 make_a(&mut sim);
803 }
804
805 let Expectation {
806 expect_a_receive,
807 expect_b_receive,
808 } = partitioning(&mut sim);
809 sim.run()?;
810
811 assert_eq!(*a_did_receive.lock().unwrap(), Some(expect_a_receive));
812 assert_eq!(*b_did_receive.lock().unwrap(), Some(expect_b_receive));
813
814 Ok(())
815 }
816
817 #[test]
818 fn partition_peers_oneway() -> Result {
819 run_with_partitioning("a", "b", |sim: &mut Sim| {
820 sim.partition_oneway("a", "b");
821 Expectation {
822 expect_a_receive: true,
823 expect_b_receive: false,
824 }
825 })
826 }
827
828 #[test]
829 fn partition_peers_oneway_many_cases() -> Result {
830 const HOST_A: &str = "a";
831 const HOST_B: &str = "b";
832
833 fn run_with_actions(actions: &[Action]) -> Result {
836 run_with_partitioning(HOST_A, HOST_B, |sim: &mut Sim| {
837 let mut expect_a_receive = true;
838 let mut expect_b_receive = true;
839 for action in actions {
840 match action {
841 Action::Partition => {
842 sim.partition(HOST_A, HOST_B);
843 expect_a_receive = false;
844 expect_b_receive = false;
845 }
846 Action::PartitionOnewayAB => {
847 sim.partition_oneway(HOST_A, HOST_B);
848 expect_b_receive = false;
849 }
850 Action::PartitionOnewayBA => {
851 sim.partition_oneway(HOST_B, HOST_A);
852 expect_a_receive = false;
853 }
854 Action::RepairOnewayAB => {
855 sim.repair_oneway(HOST_A, HOST_B);
856 expect_b_receive = true;
857 }
858 Action::RepairOnewayBA => {
859 sim.repair_oneway(HOST_B, HOST_A);
860 expect_a_receive = true;
861 }
862 Action::Repair => {
863 sim.repair(HOST_A, HOST_B);
864 expect_a_receive = true;
865 expect_b_receive = true;
866 }
867 }
868 }
869 Expectation {
870 expect_a_receive,
871 expect_b_receive,
872 }
873 })?;
874 Ok(())
875 }
876
877 run_with_actions(&[Action::PartitionOnewayAB])?;
878 run_with_actions(&[Action::PartitionOnewayBA])?;
879 run_with_actions(&[Action::Partition, Action::RepairOnewayAB])?;
880 run_with_actions(&[Action::Partition, Action::RepairOnewayBA])?;
881 run_with_actions(&[Action::PartitionOnewayAB, Action::Repair])?;
882 run_with_actions(&[Action::PartitionOnewayBA, Action::Repair])?;
883 run_with_actions(&[Action::PartitionOnewayBA, Action::RepairOnewayAB])?;
884 run_with_actions(&[Action::PartitionOnewayAB, Action::PartitionOnewayBA])?;
885 run_with_actions(&[
886 Action::Partition,
887 Action::RepairOnewayAB,
888 Action::RepairOnewayBA,
889 ])?;
890
891 Ok(())
892 }
893
894 #[test]
895 fn elapsed_time_across_restarts() -> Result {
896 let tick_ms = 5;
897 let mut sim = Builder::new()
898 .tick_duration(Duration::from_millis(tick_ms))
899 .build();
900
901 let clock = Arc::new(AtomicU64::new(0));
902 let actual = clock.clone();
903
904 sim.host("host", move || {
905 let clock = clock.clone();
906
907 async move {
908 loop {
909 tokio::time::sleep(Duration::from_millis(1)).await;
910 clock.store(elapsed().as_millis() as u64, Ordering::SeqCst);
911 }
912 }
913 });
914
915 sim.step()?;
916 assert_eq!(tick_ms - 1, actual.load(Ordering::SeqCst));
917
918 sim.bounce("host");
919 sim.step()?;
920 assert_eq!((tick_ms * 2) - 1, actual.load(Ordering::SeqCst));
921
922 Ok(())
923 }
924
925 #[test]
926 fn elapsed_time_across_crashes() -> Result {
927 let tick_ms = 5;
928 let mut sim = Builder::new()
929 .tick_duration(Duration::from_millis(tick_ms))
930 .build();
931
932 let clock_1 = Arc::new(AtomicU64::new(0));
933 let clock_1_moved = clock_1.clone();
934
935 sim.host("host1", move || {
936 let clock = clock_1_moved.clone();
937 async move {
938 loop {
939 tokio::time::sleep(Duration::from_millis(1)).await;
940 clock.store(sim_elapsed().unwrap().as_millis() as u64, Ordering::SeqCst);
941 }
942 }
943 });
944
945 sim.crash("host1");
947 sim.step()?;
948 sim.bounce("host1");
950 sim.step()?;
951 assert_eq!(
952 2 * tick_ms - 1,
953 clock_1.load(Ordering::SeqCst),
954 "Host 1 should have caught up"
955 );
956
957 Ok(())
958 }
959
960 #[test]
961 fn host_finishes_with_error() {
962 let mut sim = Builder::new().build();
963
964 sim.host("host", || async {
965 Err("Host software finished unexpectedly")?
966 });
967
968 assert!(sim.step().is_err());
969 }
970
971 #[test]
972 fn manual_message_delivery() -> Result {
973 let mut sim = Builder::new().build();
974
975 sim.host("a", || async {
976 let l = TcpListener::bind("0.0.0.0:1234").await?;
977
978 _ = l.accept().await?;
979
980 Ok(())
981 });
982
983 sim.client("b", async {
984 hold("a", "b");
985
986 _ = TcpStream::connect("a:1234").await?;
987
988 Ok(())
989 });
990
991 assert!(!sim.step()?);
992
993 sim.links(|mut l| {
994 let a_to_b = l.next().unwrap();
995 a_to_b.deliver_all();
996 });
997
998 assert!(sim.step()?);
999
1000 Ok(())
1001 }
1002
1003 #[test]
1007 fn run_after_host_crashes() -> Result {
1008 let mut sim = Builder::new().build();
1009
1010 sim.host("h", || async { future::pending().await });
1011
1012 sim.crash("h");
1013
1014 sim.run()
1015 }
1016
1017 #[test]
1018 fn restart_host_after_crash() -> Result {
1019 let mut sim = Builder::new().build();
1020
1021 let data = Arc::new(AtomicU64::new(0));
1022 let data_cloned = data.clone();
1023
1024 sim.host("h", move || {
1025 let data_cloned = data_cloned.clone();
1026 async move {
1027 data_cloned.store(data_cloned.load(Ordering::SeqCst) + 1, Ordering::SeqCst);
1028 Ok(())
1029 }
1030 });
1031
1032 sim.crash("h");
1034 sim.step()?;
1035
1036 sim.bounce("h");
1038 sim.step()?;
1039 assert_eq!(1, data.load(Ordering::SeqCst));
1041
1042 Ok(())
1043 }
1044
1045 #[test]
1046 fn override_link_latency() -> Result {
1047 let global = Duration::from_millis(2);
1048
1049 let mut sim = Builder::new()
1050 .min_message_latency(global)
1051 .max_message_latency(global)
1052 .build();
1053
1054 sim.host("server", || async {
1055 let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
1056
1057 while let Ok((mut s, _)) = listener.accept().await {
1058 assert!(s.write_u8(9).await.is_ok());
1059 }
1060
1061 Ok(())
1062 });
1063
1064 sim.client("client", async move {
1065 let mut s = TcpStream::connect("server:1234").await?;
1066
1067 let start = Instant::now();
1068 s.read_u8().await?;
1069 assert_eq!(global, start.elapsed());
1070
1071 Ok(())
1072 });
1073
1074 sim.run()?;
1075
1076 let degraded = Duration::from_millis(10);
1077
1078 sim.client("client2", async move {
1079 let mut s = TcpStream::connect("server:1234").await?;
1080
1081 let start = Instant::now();
1082 s.read_u8().await?;
1083 assert_eq!(degraded, start.elapsed());
1084
1085 Ok(())
1086 });
1087
1088 sim.set_link_latency("client2", "server", degraded);
1089
1090 sim.run()
1091 }
1092
1093 #[test]
1094 fn is_host_running() -> Result {
1095 let mut sim = Builder::new().build();
1096
1097 sim.client("client", async { future::pending().await });
1098 sim.host("host", || async { future::pending().await });
1099
1100 assert!(!sim.step()?);
1101
1102 assert!(sim.is_host_running("client"));
1103 assert!(sim.is_host_running("host"));
1104
1105 sim.crash("host");
1106 assert!(!sim.is_host_running("host"));
1107
1108 Ok(())
1109 }
1110
1111 #[test]
1112 #[cfg(feature = "regex")]
1113 fn host_scan() -> Result {
1114 let mut sim = Builder::new().build();
1115
1116 let how_many = 3;
1117 for i in 0..how_many {
1118 sim.host(format!("host-{i}"), || async { future::pending().await })
1119 }
1120
1121 let mut ips = sim.lookup_many(regex::Regex::new(".*")?);
1122 ips.sort();
1123
1124 assert_eq!(how_many, ips.len());
1125
1126 for (i, ip) in ips.iter().enumerate() {
1127 assert_eq!(
1128 format!("host-{i}"),
1129 sim.reverse_lookup(*ip).ok_or("Unable to resolve ip")?
1130 );
1131 }
1132
1133 Ok(())
1134 }
1135
1136 #[test]
1137 #[cfg(feature = "regex")]
1138 fn bounce_multiple_hosts_with_regex() -> Result {
1139 let mut sim = Builder::new().build();
1140
1141 let count = Arc::new(AtomicU64::new(0));
1142 for i in 1..=3 {
1143 let count = count.clone();
1144 sim.host(format!("host-{}", i), move || {
1145 let count = count.clone();
1146 async move {
1147 count.fetch_add(1, Ordering::SeqCst);
1148 future::pending().await
1149 }
1150 });
1151 }
1152
1153 sim.step()?;
1154 assert_eq!(count.load(Ordering::SeqCst), 3);
1155 sim.bounce(regex::Regex::new("host-[12]")?);
1156 sim.step()?;
1157 assert_eq!(count.load(Ordering::SeqCst), 5);
1158
1159 Ok(())
1160 }
1161
1162 #[test]
1163 #[cfg(feature = "regex")]
1164 fn hold_all() -> Result {
1165 let mut sim = Builder::new().build();
1166
1167 sim.host("host", || async {
1168 let l = TcpListener::bind("0.0.0.0:1234").await?;
1169
1170 loop {
1171 _ = l.accept().await?;
1172 }
1173 });
1174
1175 sim.client("test", async {
1176 hold(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1177
1178 assert!(tokio::time::timeout(
1179 Duration::from_millis(100),
1180 TcpStream::connect("host:1234")
1181 )
1182 .await
1183 .is_err());
1184
1185 crate::release(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1186
1187 assert!(TcpStream::connect("host:1234").await.is_ok());
1188
1189 Ok(())
1190 });
1191
1192 sim.run()
1193 }
1194}