1use rand::seq::SliceRandom;
2use std::cell::RefCell;
3use std::future::Future;
4use std::net::IpAddr;
5use std::ops::DerefMut;
6use std::sync::Arc;
7use std::time::UNIX_EPOCH;
8
9use indexmap::IndexMap;
10use tokio::time::Duration;
11use tracing::Level;
12
13use crate::host::HostTimer;
14use crate::{
15 for_pairs, rt, Config, LinksIter, Result, Rt, ToIpAddr, ToIpAddrs, World, TRACING_TARGET,
16};
17
18pub struct Sim<'a> {
20 config: Config,
22
23 world: RefCell<World>,
27
28 rts: IndexMap<IpAddr, Rt<'a>>,
30
31 since_epoch: Duration,
34
35 elapsed: Duration,
37
38 steps: usize,
39}
40
41impl<'a> Sim<'a> {
42 pub(crate) fn new(config: Config, world: World) -> Self {
43 let since_epoch = config
44 .epoch
45 .duration_since(UNIX_EPOCH)
46 .expect("now must be >= UNIX_EPOCH");
47
48 Self {
49 config,
50 world: RefCell::new(world),
51 rts: IndexMap::new(),
52 since_epoch,
53 elapsed: Duration::ZERO,
54 steps: 1, }
56 }
57
58 pub fn elapsed(&self) -> Duration {
60 self.elapsed
61 }
62
63 pub fn since_epoch(&self) -> Duration {
69 self.since_epoch + self.elapsed
70 }
71
72 pub fn client<F>(&mut self, addr: impl ToIpAddr, client: F)
74 where
75 F: Future<Output = Result> + 'static,
76 {
77 let addr = self.lookup(addr);
78 let nodename: Arc<str> = self
79 .world
80 .borrow_mut()
81 .dns
82 .reverse(addr)
83 .map(str::to_string)
84 .unwrap_or_else(|| addr.to_string())
85 .into();
86
87 {
88 let world = RefCell::get_mut(&mut self.world);
89
90 world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
92 }
93
94 let rt = World::enter(&self.world, || {
95 Rt::client(nodename, client, self.config.enable_tokio_io)
96 });
97
98 self.rts.insert(addr, rt);
99 }
100
101 pub fn host<F, Fut>(&mut self, addr: impl ToIpAddr, host: F)
108 where
109 F: Fn() -> Fut + 'a,
110 Fut: Future<Output = Result> + 'static,
111 {
112 let addr = self.lookup(addr);
113 let nodename: Arc<str> = self
114 .world
115 .borrow_mut()
116 .dns
117 .reverse(addr)
118 .map(str::to_string)
119 .unwrap_or_else(|| addr.to_string())
120 .into();
121
122 {
123 let world = RefCell::get_mut(&mut self.world);
124
125 world.register(addr, &nodename, HostTimer::new(self.elapsed), &self.config);
127 }
128
129 let rt = World::enter(&self.world, || {
130 Rt::host(nodename, host, self.config.enable_tokio_io)
131 });
132
133 self.rts.insert(addr, rt);
134 }
135
136 pub fn crash(&mut self, addrs: impl ToIpAddrs) {
140 self.run_with_hosts(addrs, |addr, rt| {
141 rt.crash();
142
143 tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Crash");
144 });
145 }
146
147 pub fn bounce(&mut self, addrs: impl ToIpAddrs) {
149 self.run_with_hosts(addrs, |addr, rt| {
150 rt.bounce();
151
152 tracing::trace!(target: TRACING_TARGET, addr = ?addr, "Bounce");
153 });
154 }
155
156 fn run_with_hosts(&mut self, addrs: impl ToIpAddrs, mut f: impl FnMut(IpAddr, &mut Rt)) {
158 let hosts = self.world.borrow_mut().lookup_many(addrs);
159 for h in hosts {
160 let rt = self.rts.get_mut(&h).expect("missing host");
161
162 self.world.borrow_mut().current = Some(h);
163
164 World::enter(&self.world, || f(h, rt));
165 }
166
167 self.world.borrow_mut().current = None;
168 }
169
170 pub fn is_host_running(&mut self, addr: impl ToIpAddr) -> bool {
172 let host = self.world.borrow_mut().lookup(addr);
173
174 self.rts
175 .get(&host)
176 .expect("missing host")
177 .is_software_running()
178 }
179
180 pub fn lookup(&self, addr: impl ToIpAddr) -> IpAddr {
182 self.world.borrow_mut().lookup(addr)
183 }
184
185 pub fn reverse_lookup(&self, addr: IpAddr) -> Option<String> {
188 self.world
189 .borrow()
190 .reverse_lookup(addr)
191 .map(|h| h.to_owned())
192 }
193
194 pub fn hold(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
197 let mut world = self.world.borrow_mut();
198 world.hold_many(a, b);
199 }
200
201 pub fn repair(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
204 let mut world = self.world.borrow_mut();
205 world.repair_many(a, b);
206 }
207
208 pub fn repair_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
213 let mut world = self.world.borrow_mut();
214 world.repair_oneway_many(from, to);
215 }
216
217 pub fn release(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
219 let mut world = self.world.borrow_mut();
220 world.release_many(a, b);
221 }
222
223 pub fn partition(&self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
226 let mut world = self.world.borrow_mut();
227 world.partition_many(a, b);
228 }
229
230 pub fn partition_oneway(&self, from: impl ToIpAddrs, to: impl ToIpAddrs) {
239 let mut world = self.world.borrow_mut();
240 world.partition_oneway_many(from, to);
241 }
242
243 pub fn reverse_lookup_pair(&self, pair: (IpAddr, IpAddr)) -> (String, String) {
247 let world = self.world.borrow();
248
249 (
250 world
251 .dns
252 .reverse(pair.0)
253 .expect("no hostname found for ip address")
254 .to_owned(),
255 world
256 .dns
257 .reverse(pair.1)
258 .expect("no hostname found for ip address")
259 .to_owned(),
260 )
261 }
262
263 pub fn lookup_many(&self, addr: impl ToIpAddrs) -> Vec<IpAddr> {
265 self.world.borrow_mut().lookup_many(addr)
266 }
267
268 pub fn set_max_message_latency(&self, value: Duration) {
270 self.world
271 .borrow_mut()
272 .topology
273 .set_max_message_latency(value);
274 }
275
276 pub fn set_link_latency(&self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: Duration) {
281 let mut world = self.world.borrow_mut();
282 let a = world.lookup_many(a);
283 let b = world.lookup_many(b);
284
285 for_pairs(&a, &b, |a, b| {
286 world.topology.set_link_message_latency(a, b, value);
287 });
288 }
289
290 pub fn set_link_max_message_latency(
292 &self,
293 a: impl ToIpAddrs,
294 b: impl ToIpAddrs,
295 value: Duration,
296 ) {
297 let mut world = self.world.borrow_mut();
298 let a = world.lookup_many(a);
299 let b = world.lookup_many(b);
300
301 for_pairs(&a, &b, |a, b| {
302 world.topology.set_link_max_message_latency(a, b, value);
303 });
304 }
305
306 pub fn set_message_latency_curve(&self, value: f64) {
311 self.world
312 .borrow_mut()
313 .topology
314 .set_message_latency_curve(value);
315 }
316
317 pub fn set_fail_rate(&mut self, value: f64) {
318 self.world.borrow_mut().topology.set_fail_rate(value);
319 }
320
321 pub fn set_link_fail_rate(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs, value: f64) {
322 let mut world = self.world.borrow_mut();
323 let a = world.lookup_many(a);
324 let b = world.lookup_many(b);
325
326 for_pairs(&a, &b, |a, b| {
327 world.topology.set_link_fail_rate(a, b, value);
328 });
329 }
330
331 pub fn links(&self, f: impl FnOnce(LinksIter)) {
333 let top = &mut self.world.borrow_mut().topology;
334
335 f(top.iter_mut())
336 }
337
338 pub fn run(&mut self) -> Result {
343 if !self
345 .rts
346 .iter()
347 .any(|(_, rt)| matches!(rt.kind, rt::Kind::Client))
348 {
349 tracing::info!(target: TRACING_TARGET, "No client hosts registered, exiting simulation");
350 return Ok(());
351 }
352
353 loop {
354 let is_finished = self.step()?;
355
356 if is_finished {
357 return Ok(());
358 }
359 }
360 }
361
362 pub fn step(&mut self) -> Result<bool> {
372 tracing::trace!(target: TRACING_TARGET, "step {}", self.steps);
373
374 let tick = self.config.tick;
375 let mut is_finished = true;
376
377 self.world.borrow_mut().topology.tick_by(tick);
381
382 let (mut running, stopped): (Vec<_>, Vec<_>) = self
387 .rts
388 .iter_mut()
389 .partition(|(_, rt)| rt.is_software_running());
390 if self.config.random_node_order {
391 running.shuffle(&mut self.world.borrow_mut().rng);
392 }
393
394 for (&addr, rt) in running {
395 let _span_guard = tracing::span!(Level::INFO, "node", name = &*rt.nodename,).entered();
396 {
397 let mut world = self.world.borrow_mut();
398 let World {
401 rng,
402 topology,
403 hosts,
404 ..
405 } = world.deref_mut();
406 topology.deliver_messages(rng, hosts.get_mut(&addr).expect("missing host"));
407
408 world.current = Some(addr);
410
411 world.current_host_mut().timer.now(rt.now());
412 }
413
414 let is_software_finished = World::enter(&self.world, || rt.tick(tick))?;
415
416 if rt.is_client() {
417 is_finished = is_finished && is_software_finished;
418 }
419
420 let mut world = self.world.borrow_mut();
422 world.current = None;
423
424 world.tick(addr, tick);
425 }
426
427 for (&addr, _rt) in stopped {
430 let mut world = self.world.borrow_mut();
431 world.tick(addr, tick);
432 }
433
434 self.elapsed += tick;
435 self.steps += 1;
436
437 if self.elapsed > self.config.duration && !is_finished {
438 return Err(format!(
439 "Ran for duration: {:?} steps: {} without completing",
440 self.config.duration, self.steps,
441 ))?;
442 }
443
444 Ok(is_finished)
445 }
446}
447
448#[cfg(test)]
449mod test {
450 use rand::Rng;
451 use std::future;
452 use std::{
453 net::{IpAddr, Ipv4Addr},
454 rc::Rc,
455 sync::{
456 atomic::{AtomicU64, Ordering},
457 Arc, Mutex,
458 },
459 time::Duration,
460 };
461
462 use tokio::{
463 io::{AsyncReadExt, AsyncWriteExt},
464 sync::Semaphore,
465 time::Instant,
466 };
467
468 use crate::net::UdpSocket;
469 use crate::{
470 elapsed, hold,
471 net::{TcpListener, TcpStream},
472 sim_elapsed, Builder, Result, Sim, World,
473 };
474
475 #[test]
476 fn client_error() {
477 let mut sim = Builder::new().build();
478
479 sim.client("doomed", async { Err("An Error")? });
480
481 assert!(sim.run().is_err());
482 }
483
484 #[test]
485 fn timeout() {
486 let mut sim = Builder::new()
487 .simulation_duration(Duration::from_millis(500))
488 .build();
489
490 sim.client("timeout", async {
491 tokio::time::sleep(Duration::from_secs(1)).await;
492
493 Ok(())
494 });
495
496 assert!(sim.run().is_err());
497 }
498
499 #[test]
500 fn multiple_clients_all_finish() -> Result {
501 let how_many = 3;
502 let tick_ms = 10;
503
504 for run in 0..how_many {
506 let mut sim = Builder::new()
507 .tick_duration(Duration::from_millis(tick_ms))
508 .build();
509
510 let ct = Rc::new(Semaphore::new(how_many));
511
512 for client in 0..how_many {
513 let ct = ct.clone();
514
515 sim.client(format!("client-{client}"), async move {
516 let ms = if run == client { 0 } else { 2 * tick_ms };
517 tokio::time::sleep(Duration::from_millis(ms)).await;
518
519 let p = ct.acquire().await?;
520 p.forget();
521
522 Ok(())
523 });
524 }
525
526 sim.run()?;
527 assert_eq!(0, ct.available_permits());
528 }
529
530 Ok(())
531 }
532
533 #[test]
538 fn crash_blocks_until_complete() -> Result {
539 let ct = Arc::new(());
540
541 let mut sim = Builder::new().build();
542
543 sim.host("host", || {
544 let ct = ct.clone();
545
546 async move {
547 tokio::spawn(async move {
548 let _into_task = ct;
549 future::pending::<()>().await;
550 });
551
552 future::pending().await
553 }
554 });
555
556 sim.run()?;
557 assert_eq!(2, Arc::strong_count(&ct));
558
559 sim.crash("host");
560 assert_eq!(1, Arc::strong_count(&ct));
561
562 Ok(())
563 }
564
565 #[test]
566 fn elapsed_time() -> Result {
567 let tick = Duration::from_millis(5);
568 let mut sim = Builder::new().tick_duration(tick).build();
569
570 let duration = Duration::from_millis(500);
571
572 sim.client("c1", async move {
573 tokio::time::sleep(duration).await;
574 assert_eq!(duration, elapsed());
575 assert_eq!(duration, sim_elapsed().unwrap());
578
579 Ok(())
580 });
581
582 sim.client("c2", async move {
583 tokio::time::sleep(duration).await;
584 assert_eq!(duration, elapsed());
585 assert_eq!(duration, sim_elapsed().unwrap());
586
587 Ok(())
588 });
589
590 sim.run()?;
591
592 assert_eq!(duration + tick, sim.elapsed());
594
595 let start = sim.elapsed();
596 sim.client("c3", async move {
597 assert_eq!(Duration::ZERO, elapsed());
598 assert_eq!(duration + tick, sim_elapsed().unwrap());
601 tokio::time::sleep(duration).await;
602 assert_eq!(duration, elapsed());
603 assert_eq!(duration + tick + duration, sim_elapsed().unwrap());
604
605 Ok(())
606 });
607
608 sim.run()?;
609
610 assert_eq!(duration + tick, sim.elapsed() - start);
612
613 Ok(())
614 }
615
616 #[test]
619 fn sim_elapsed_time() -> Result {
620 assert!(sim_elapsed().is_none());
623
624 let sim = Builder::new().build();
625 World::enter(&sim.world, || assert!(sim_elapsed().is_none()));
627
628 Ok(())
629 }
630
631 #[test]
632 fn hold_release_peers() -> Result {
633 let global = Duration::from_millis(2);
634
635 let mut sim = Builder::new()
636 .min_message_latency(global)
637 .max_message_latency(global)
638 .build();
639
640 sim.host("server", || async {
641 let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
642
643 while let Ok((mut s, _)) = listener.accept().await {
644 assert!(s.write_u8(42).await.is_ok());
645 }
646
647 Ok(())
648 });
649
650 sim.client("client", async move {
651 let mut s = TcpStream::connect("server:1234").await?;
652
653 s.read_u8().await?;
654
655 Ok(())
656 });
657
658 sim.hold("server", "client");
659
660 sim.step()?;
662
663 sim.links(|l| {
664 assert!(l.count() == 1);
665 });
666
667 sim.step()?;
669
670 sim.release("server", "client");
671
672 sim.run()?;
673
674 Ok(())
675 }
676
677 #[test]
678 fn partition_peers() -> Result {
679 let global = Duration::from_millis(2);
680
681 let mut sim = Builder::new()
682 .min_message_latency(global)
683 .max_message_latency(global)
684 .build();
685
686 sim.host("server", || async {
687 let _listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
688
689 Ok(())
690 });
691
692 sim.client("client", async move {
693 let _ = TcpStream::connect("server:1234").await.unwrap_err();
695
696 Ok(())
697 });
698
699 sim.partition("server", "client");
700
701 sim.run()?;
702
703 Ok(())
704 }
705
706 struct Expectation {
707 expect_a_receive: bool,
708 expect_b_receive: bool,
709 }
710
711 #[derive(Debug)]
712 enum Action {
713 Partition,
714 PartitionOnewayAB,
715 PartitionOnewayBA,
716 RepairOnewayAB,
717 RepairOnewayBA,
718 Repair,
719 }
720
721 fn run_with_partitioning(
722 host_a: &'static str,
723 host_b: &'static str,
724 mut partitioning: impl FnMut(&mut Sim) -> Expectation,
725 ) -> Result {
726 let global = Duration::from_millis(1);
727
728 let mut sim = Builder::new()
729 .min_message_latency(global)
730 .max_message_latency(global)
731 .build();
732
733 let a_did_receive = Arc::new(Mutex::new(None));
734 let b_did_receive = Arc::new(Mutex::new(None));
735
736 let make_a = |sim: &mut Sim| {
737 sim.client(host_a, {
738 let a_did_receive = Arc::clone(&a_did_receive);
739 async move {
740 let udp_socket =
741 UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
742 udp_socket
743 .send_to(&[42], format!("{}:1234", host_b))
744 .await
745 .expect("sending packet should appear to work, even if partitioned");
746
747 *a_did_receive.lock().unwrap() = Some(matches!(
748 tokio::time::timeout(
749 Duration::from_secs(1),
750 udp_socket.recv_from(&mut [0])
751 )
752 .await,
753 Ok(Ok(_))
754 ));
755
756 Ok(())
757 }
758 })
759 };
760
761 let make_b = |sim: &mut Sim| {
762 sim.client(host_b, {
763 let b_did_receive = Arc::clone(&b_did_receive);
764 async move {
765 let udp_socket =
766 UdpSocket::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
767 udp_socket
768 .send_to(&[42], format!("{}:1234", host_a))
769 .await
770 .expect("sending packet should work");
771
772 *b_did_receive.lock().unwrap() = Some(matches!(
773 tokio::time::timeout(
774 Duration::from_secs(1),
775 udp_socket.recv_from(&mut [0])
776 )
777 .await,
778 Ok(Ok(_))
779 ));
780
781 Ok(())
782 }
783 })
784 };
785
786 let construct_a_first = sim.world.borrow_mut().rng.gen_bool(0.5);
787 if construct_a_first {
788 make_a(&mut sim);
789 make_b(&mut sim);
790 } else {
791 make_b(&mut sim);
792 make_a(&mut sim);
793 }
794
795 let Expectation {
796 expect_a_receive,
797 expect_b_receive,
798 } = partitioning(&mut sim);
799 sim.run()?;
800
801 assert_eq!(*a_did_receive.lock().unwrap(), Some(expect_a_receive));
802 assert_eq!(*b_did_receive.lock().unwrap(), Some(expect_b_receive));
803
804 Ok(())
805 }
806
807 #[test]
808 fn partition_peers_oneway() -> Result {
809 run_with_partitioning("a", "b", |sim: &mut Sim| {
810 sim.partition_oneway("a", "b");
811 Expectation {
812 expect_a_receive: true,
813 expect_b_receive: false,
814 }
815 })
816 }
817
818 #[test]
819 fn partition_peers_oneway_many_cases() -> Result {
820 const HOST_A: &str = "a";
821 const HOST_B: &str = "b";
822
823 fn run_with_actions(actions: &[Action]) -> Result {
826 run_with_partitioning(HOST_A, HOST_B, |sim: &mut Sim| {
827 let mut expect_a_receive = true;
828 let mut expect_b_receive = true;
829 for action in actions {
830 match action {
831 Action::Partition => {
832 sim.partition(HOST_A, HOST_B);
833 expect_a_receive = false;
834 expect_b_receive = false;
835 }
836 Action::PartitionOnewayAB => {
837 sim.partition_oneway(HOST_A, HOST_B);
838 expect_b_receive = false;
839 }
840 Action::PartitionOnewayBA => {
841 sim.partition_oneway(HOST_B, HOST_A);
842 expect_a_receive = false;
843 }
844 Action::RepairOnewayAB => {
845 sim.repair_oneway(HOST_A, HOST_B);
846 expect_b_receive = true;
847 }
848 Action::RepairOnewayBA => {
849 sim.repair_oneway(HOST_B, HOST_A);
850 expect_a_receive = true;
851 }
852 Action::Repair => {
853 sim.repair(HOST_A, HOST_B);
854 expect_a_receive = true;
855 expect_b_receive = true;
856 }
857 }
858 }
859 Expectation {
860 expect_a_receive,
861 expect_b_receive,
862 }
863 })?;
864 Ok(())
865 }
866
867 run_with_actions(&[Action::PartitionOnewayAB])?;
868 run_with_actions(&[Action::PartitionOnewayBA])?;
869 run_with_actions(&[Action::Partition, Action::RepairOnewayAB])?;
870 run_with_actions(&[Action::Partition, Action::RepairOnewayBA])?;
871 run_with_actions(&[Action::PartitionOnewayAB, Action::Repair])?;
872 run_with_actions(&[Action::PartitionOnewayBA, Action::Repair])?;
873 run_with_actions(&[Action::PartitionOnewayBA, Action::RepairOnewayAB])?;
874 run_with_actions(&[Action::PartitionOnewayAB, Action::PartitionOnewayBA])?;
875 run_with_actions(&[
876 Action::Partition,
877 Action::RepairOnewayAB,
878 Action::RepairOnewayBA,
879 ])?;
880
881 Ok(())
882 }
883
884 #[test]
885 fn elapsed_time_across_restarts() -> Result {
886 let tick_ms = 5;
887 let mut sim = Builder::new()
888 .tick_duration(Duration::from_millis(tick_ms))
889 .build();
890
891 let clock = Arc::new(AtomicU64::new(0));
892 let actual = clock.clone();
893
894 sim.host("host", move || {
895 let clock = clock.clone();
896
897 async move {
898 loop {
899 tokio::time::sleep(Duration::from_millis(1)).await;
900 clock.store(elapsed().as_millis() as u64, Ordering::SeqCst);
901 }
902 }
903 });
904
905 sim.step()?;
906 assert_eq!(tick_ms - 1, actual.load(Ordering::SeqCst));
907
908 sim.bounce("host");
909 sim.step()?;
910 assert_eq!((tick_ms * 2) - 1, actual.load(Ordering::SeqCst));
911
912 Ok(())
913 }
914
915 #[test]
916 fn elapsed_time_across_crashes() -> Result {
917 let tick_ms = 5;
918 let mut sim = Builder::new()
919 .tick_duration(Duration::from_millis(tick_ms))
920 .build();
921
922 let clock_1 = Arc::new(AtomicU64::new(0));
923 let clock_1_moved = clock_1.clone();
924
925 sim.host("host1", move || {
926 let clock = clock_1_moved.clone();
927 async move {
928 loop {
929 tokio::time::sleep(Duration::from_millis(1)).await;
930 clock.store(sim_elapsed().unwrap().as_millis() as u64, Ordering::SeqCst);
931 }
932 }
933 });
934
935 sim.crash("host1");
937 sim.step()?;
938 sim.bounce("host1");
940 sim.step()?;
941 assert_eq!(
942 2 * tick_ms - 1,
943 clock_1.load(Ordering::SeqCst),
944 "Host 1 should have caught up"
945 );
946
947 Ok(())
948 }
949
950 #[test]
951 fn host_finishes_with_error() {
952 let mut sim = Builder::new().build();
953
954 sim.host("host", || async {
955 Err("Host software finished unexpectedly")?
956 });
957
958 assert!(sim.step().is_err());
959 }
960
961 #[test]
962 fn manual_message_delivery() -> Result {
963 let mut sim = Builder::new().build();
964
965 sim.host("a", || async {
966 let l = TcpListener::bind("0.0.0.0:1234").await?;
967
968 _ = l.accept().await?;
969
970 Ok(())
971 });
972
973 sim.client("b", async {
974 hold("a", "b");
975
976 _ = TcpStream::connect("a:1234").await?;
977
978 Ok(())
979 });
980
981 assert!(!sim.step()?);
982
983 sim.links(|mut l| {
984 let a_to_b = l.next().unwrap();
985 a_to_b.deliver_all();
986 });
987
988 assert!(sim.step()?);
989
990 Ok(())
991 }
992
993 #[test]
997 fn run_after_host_crashes() -> Result {
998 let mut sim = Builder::new().build();
999
1000 sim.host("h", || async { future::pending().await });
1001
1002 sim.crash("h");
1003
1004 sim.run()
1005 }
1006
1007 #[test]
1008 fn restart_host_after_crash() -> Result {
1009 let mut sim = Builder::new().build();
1010
1011 let data = Arc::new(AtomicU64::new(0));
1012 let data_cloned = data.clone();
1013
1014 sim.host("h", move || {
1015 let data_cloned = data_cloned.clone();
1016 async move {
1017 data_cloned.store(data_cloned.load(Ordering::SeqCst) + 1, Ordering::SeqCst);
1018 Ok(())
1019 }
1020 });
1021
1022 sim.crash("h");
1024 sim.step()?;
1025
1026 sim.bounce("h");
1028 sim.step()?;
1029 assert_eq!(1, data.load(Ordering::SeqCst));
1031
1032 Ok(())
1033 }
1034
1035 #[test]
1036 fn override_link_latency() -> Result {
1037 let global = Duration::from_millis(2);
1038
1039 let mut sim = Builder::new()
1040 .min_message_latency(global)
1041 .max_message_latency(global)
1042 .build();
1043
1044 sim.host("server", || async {
1045 let listener = TcpListener::bind((IpAddr::V4(Ipv4Addr::UNSPECIFIED), 1234)).await?;
1046
1047 while let Ok((mut s, _)) = listener.accept().await {
1048 assert!(s.write_u8(9).await.is_ok());
1049 }
1050
1051 Ok(())
1052 });
1053
1054 sim.client("client", async move {
1055 let mut s = TcpStream::connect("server:1234").await?;
1056
1057 let start = Instant::now();
1058 s.read_u8().await?;
1059 assert_eq!(global, start.elapsed());
1060
1061 Ok(())
1062 });
1063
1064 sim.run()?;
1065
1066 let degraded = Duration::from_millis(10);
1067
1068 sim.client("client2", async move {
1069 let mut s = TcpStream::connect("server:1234").await?;
1070
1071 let start = Instant::now();
1072 s.read_u8().await?;
1073 assert_eq!(degraded, start.elapsed());
1074
1075 Ok(())
1076 });
1077
1078 sim.set_link_latency("client2", "server", degraded);
1079
1080 sim.run()
1081 }
1082
1083 #[test]
1084 fn is_host_running() -> Result {
1085 let mut sim = Builder::new().build();
1086
1087 sim.client("client", async { future::pending().await });
1088 sim.host("host", || async { future::pending().await });
1089
1090 assert!(!sim.step()?);
1091
1092 assert!(sim.is_host_running("client"));
1093 assert!(sim.is_host_running("host"));
1094
1095 sim.crash("host");
1096 assert!(!sim.is_host_running("host"));
1097
1098 Ok(())
1099 }
1100
1101 #[test]
1102 #[cfg(feature = "regex")]
1103 fn host_scan() -> Result {
1104 let mut sim = Builder::new().build();
1105
1106 let how_many = 3;
1107 for i in 0..how_many {
1108 sim.host(format!("host-{i}"), || async { future::pending().await })
1109 }
1110
1111 let mut ips = sim.lookup_many(regex::Regex::new(".*")?);
1112 ips.sort();
1113
1114 assert_eq!(how_many, ips.len());
1115
1116 for (i, ip) in ips.iter().enumerate() {
1117 assert_eq!(
1118 format!("host-{i}"),
1119 sim.reverse_lookup(*ip).ok_or("Unable to resolve ip")?
1120 );
1121 }
1122
1123 Ok(())
1124 }
1125
1126 #[test]
1127 #[cfg(feature = "regex")]
1128 fn bounce_multiple_hosts_with_regex() -> Result {
1129 let mut sim = Builder::new().build();
1130
1131 let count = Arc::new(AtomicU64::new(0));
1132 for i in 1..=3 {
1133 let count = count.clone();
1134 sim.host(format!("host-{}", i), move || {
1135 let count = count.clone();
1136 async move {
1137 count.fetch_add(1, Ordering::SeqCst);
1138 future::pending().await
1139 }
1140 });
1141 }
1142
1143 sim.step()?;
1144 assert_eq!(count.load(Ordering::SeqCst), 3);
1145 sim.bounce(regex::Regex::new("host-[12]")?);
1146 sim.step()?;
1147 assert_eq!(count.load(Ordering::SeqCst), 5);
1148
1149 Ok(())
1150 }
1151
1152 #[test]
1153 #[cfg(feature = "regex")]
1154 fn hold_all() -> Result {
1155 let mut sim = Builder::new().build();
1156
1157 sim.host("host", || async {
1158 let l = TcpListener::bind("0.0.0.0:1234").await?;
1159
1160 loop {
1161 _ = l.accept().await?;
1162 }
1163 });
1164
1165 sim.client("test", async {
1166 hold(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1167
1168 assert!(tokio::time::timeout(
1169 Duration::from_millis(100),
1170 TcpStream::connect("host:1234")
1171 )
1172 .await
1173 .is_err());
1174
1175 crate::release(regex::Regex::new(r".*")?, regex::Regex::new(r".*")?);
1176
1177 assert!(TcpStream::connect("host:1234").await.is_ok());
1178
1179 Ok(())
1180 });
1181
1182 sim.run()
1183 }
1184}