1use crate::envelope::{Envelope, Protocol};
2use crate::host::Host;
3use crate::rt::Rt;
4use crate::{config, TRACING_TARGET};
5
6use indexmap::IndexMap;
7use rand::{Rng, RngCore};
8use rand_distr::{Distribution, Exp};
9use std::collections::VecDeque;
10use std::io::{Error, ErrorKind, Result};
11use std::net::{IpAddr, SocketAddr};
12use std::time::Duration;
13use tokio::time::Instant;
14
15pub(crate) struct Topology {
17 config: config::Link,
18
19 links: IndexMap<Pair, Link>,
21
22 rt: Rt<'static>,
26}
27
28#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32struct Pair(IpAddr, IpAddr);
33
34impl Pair {
35 fn new(a: IpAddr, b: IpAddr) -> Pair {
36 assert_ne!(a, b);
37
38 if a < b {
39 Pair(a, b)
40 } else {
41 Pair(b, a)
42 }
43 }
44}
45
46pub struct LinksIter<'a> {
49 iter: indexmap::map::IterMut<'a, Pair, Link>,
50}
51
52pub struct LinkIter<'a> {
55 a: IpAddr,
56 b: IpAddr,
57 now: Instant,
58 iter: std::collections::vec_deque::IterMut<'a, Sent>,
59}
60
61impl LinkIter<'_> {
62 pub fn pair(&self) -> (IpAddr, IpAddr) {
65 (self.a, self.b)
66 }
67
68 pub fn deliver_all(self) {
71 for sent in self {
72 sent.deliver();
73 }
74 }
75}
76
77pub struct SentRef<'a> {
80 src: SocketAddr,
81 dst: SocketAddr,
82 now: Instant,
83 sent: &'a mut Sent,
84}
85
86impl SentRef<'_> {
87 pub fn pair(&self) -> (SocketAddr, SocketAddr) {
89 (self.src, self.dst)
90 }
91
92 pub fn protocol(&self) -> &Protocol {
94 &self.sent.protocol
95 }
96
97 pub fn deliver(self) {
100 self.sent.deliver(self.now);
101 }
102}
103
104impl<'a> Iterator for LinksIter<'a> {
105 type Item = LinkIter<'a>;
106
107 fn next(&mut self) -> Option<Self::Item> {
108 let (pair, link) = self.iter.next()?;
109
110 Some(LinkIter {
111 a: pair.0,
112 b: pair.1,
113 now: link.now,
114 iter: link.sent.iter_mut(),
115 })
116 }
117}
118
119impl<'a> Iterator for LinkIter<'a> {
120 type Item = SentRef<'a>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 let sent = self.iter.next()?;
124
125 Some(SentRef {
126 src: sent.src,
127 dst: sent.dst,
128 now: self.now,
129 sent,
130 })
131 }
132}
133
134struct Link {
136 state_a_b: State,
140 state_b_a: State,
144
145 config: config::Link,
147
148 sent: VecDeque<Sent>,
151
152 deliverable: IndexMap<IpAddr, VecDeque<Envelope>>,
154
155 now: Instant,
157}
158
159#[derive(Clone, Copy)]
161enum State {
162 Healthy,
164
165 ExplicitPartition,
167
168 RandPartition,
170
171 Hold,
173}
174
175impl Topology {
176 pub(crate) fn new(config: config::Link) -> Topology {
177 Topology {
178 config,
179 links: IndexMap::new(),
180 rt: Rt::no_software(),
181 }
182 }
183
184 pub(crate) fn register(&mut self, a: IpAddr, b: IpAddr) {
186 let pair = Pair::new(a, b);
187 assert!(self.links.insert(pair, Link::new(self.rt.now())).is_none());
188 }
189
190 pub(crate) fn set_max_message_latency(&mut self, value: Duration) {
191 self.config.latency_mut().max_message_latency = value;
192 }
193
194 pub(crate) fn set_link_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
195 let pair = Pair::new(a, b);
196 let latency = self
197 .links
198 .get_mut(&pair)
199 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
200 .latency(self.config.latency());
201 latency.min_message_latency = value;
202 latency.max_message_latency = value;
203 }
204
205 pub(crate) fn set_link_max_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
206 let pair = Pair::new(a, b);
207 self.links
208 .get_mut(&pair)
209 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
210 .latency(self.config.latency())
211 .max_message_latency = value;
212 }
213
214 pub(crate) fn set_message_latency_curve(&mut self, value: f64) {
215 self.config.latency_mut().latency_distribution = Exp::new(value).unwrap();
216 }
217
218 pub(crate) fn set_fail_rate(&mut self, value: f64) {
219 self.config.message_loss_mut().fail_rate = value;
220 }
221
222 pub(crate) fn set_link_fail_rate(&mut self, a: IpAddr, b: IpAddr, value: f64) {
223 let pair = Pair::new(a, b);
224 self.links
225 .get_mut(&pair)
226 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
227 .message_loss(self.config.message_loss())
228 .fail_rate = value;
229 }
230
231 pub(crate) fn enqueue_message(
235 &mut self,
236 rand: &mut dyn RngCore,
237 src: SocketAddr,
238 dst: SocketAddr,
239 message: Protocol,
240 ) -> Result<()> {
241 if let Some(link) = self.links.get_mut(&Pair::new(src.ip(), dst.ip())) {
242 link.enqueue_message(&self.config, rand, src, dst, message)
243 } else {
244 Err(Error::new(
245 ErrorKind::ConnectionRefused,
246 "Connection refused",
247 ))
248 }
249 }
250
251 pub(crate) fn deliver_messages(&mut self, rand: &mut dyn RngCore, dst: &mut Host) {
253 for (pair, link) in &mut self.links {
254 if pair.0 == dst.addr || pair.1 == dst.addr {
255 link.deliver_messages(&self.config, rand, dst);
256 }
257 }
258 }
259
260 pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
261 self.get_link_mut(&Pair::new(a, b)).hold();
262 }
263
264 pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
265 self.get_link_mut(&Pair::new(a, b)).release();
266 }
267
268 pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
269 self.get_link_mut(&Pair::new(a, b)).explicit_partition();
270 }
271
272 pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
273 let link = &mut self.get_link_mut(&Pair::new(a, b));
274 link.partition_oneway(a, b);
275 }
276
277 pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
278 self.get_link_mut(&Pair::new(a, b)).explicit_repair();
279 }
280
281 pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
282 let link = &mut self.get_link_mut(&Pair::new(a, b));
283 link.repair_oneway(a, b);
284 }
285
286 pub(crate) fn tick_by(&mut self, duration: Duration) {
287 let _ = self.rt.tick(duration);
288 for link in self.links.values_mut() {
289 link.tick(self.rt.now());
290 }
291 }
292
293 pub(crate) fn iter_mut(&mut self) -> LinksIter<'_> {
294 LinksIter {
295 iter: self.links.iter_mut(),
296 }
297 }
298
299 #[inline]
300 fn get_link_mut(&mut self, pair: &Pair) -> &mut Link {
301 self.links
302 .get_mut(pair)
303 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
304 }
305}
306
307struct Sent {
309 src: SocketAddr,
310 dst: SocketAddr,
311 status: DeliveryStatus,
312 protocol: Protocol,
313}
314
315impl Sent {
316 fn deliver(&mut self, now: Instant) {
317 self.status = DeliveryStatus::DeliverAfter(now);
318 }
319}
320
321enum DeliveryStatus {
322 DeliverAfter(Instant),
323 Hold,
324}
325
326impl Link {
327 fn new(now: Instant) -> Link {
328 Link {
329 state_a_b: State::Healthy,
330 state_b_a: State::Healthy,
331 config: config::Link::default(),
332 sent: VecDeque::new(),
333 deliverable: IndexMap::new(),
334 now,
335 }
336 }
337
338 fn enqueue_message(
339 &mut self,
340 global_config: &config::Link,
341 rand: &mut dyn RngCore,
342 src: SocketAddr,
343 dst: SocketAddr,
344 message: Protocol,
345 ) -> Result<()> {
346 tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Send");
347
348 self.rand_partition_or_repair(global_config, rand);
349 let result = self.enqueue(global_config, rand, src, dst, message);
350 self.process_deliverables();
351 result
352 }
353
354 fn get_state_for_message(&self, src: IpAddr, dst: IpAddr) -> State {
355 if src < dst {
362 self.state_a_b
363 } else {
364 self.state_b_a
365 }
366 }
367
368 fn enqueue(
375 &mut self,
376 global_config: &config::Link,
377 rand: &mut dyn RngCore,
378 src: SocketAddr,
379 dst: SocketAddr,
380 message: Protocol,
381 ) -> Result<()> {
382 let state = self.get_state_for_message(src.ip(), dst.ip());
383 let status = match state {
384 State::Healthy => {
385 let delay = self.delay(global_config.latency(), rand);
386 DeliveryStatus::DeliverAfter(self.now + delay)
387 }
388 State::Hold => {
390 tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Hold");
391 DeliveryStatus::Hold
392 }
393 _ => {
394 tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Drop");
395 return Ok(());
396 }
397 };
398
399 let sent = Sent {
400 src,
401 dst,
402 status,
403 protocol: message,
404 };
405
406 self.sent.push_back(sent);
407 Ok(())
408 }
409
410 fn tick(&mut self, now: Instant) {
411 self.now = now;
412 self.process_deliverables();
413 }
414
415 fn process_deliverables(&mut self) {
416 let mut deliverable = 0;
419 for i in 0..self.sent.len() {
420 let index = i - deliverable;
421 let sent = &self.sent[index];
422 if let DeliveryStatus::DeliverAfter(time) = sent.status {
423 if time <= self.now {
424 let sent = self.sent.remove(index).unwrap();
425 let envelope = Envelope {
426 src: sent.src,
427 dst: sent.dst,
428 message: sent.protocol,
429 };
430 self.deliverable
431 .entry(sent.dst.ip())
432 .or_default()
433 .push_back(envelope);
434 deliverable += 1;
435 }
436 }
437 }
438 }
439
440 fn deliver_messages(
444 &mut self,
445 global_config: &config::Link,
446 rand: &mut dyn RngCore,
447 host: &mut Host,
448 ) {
449 let deliverable = self
450 .deliverable
451 .entry(host.addr)
452 .or_default()
453 .drain(..)
454 .collect::<Vec<Envelope>>();
455
456 for message in deliverable {
457 let (src, dst) = (message.src, message.dst);
458 if let Err(message) = host.receive_from_network(message) {
459 let _ = self.enqueue_message(global_config, rand, dst, src, message);
460 }
461 }
462 }
463
464 fn rand_partition_or_repair(&mut self, global_config: &config::Link, rand: &mut dyn RngCore) {
466 let do_rand = self.rand_partition(global_config.message_loss(), rand);
467 match (self.state_a_b, self.state_b_a) {
468 (State::Healthy, _) | (_, State::Healthy) if do_rand => {
469 self.state_a_b = State::RandPartition;
470 self.state_b_a = State::RandPartition;
471
472 self.sent.clear();
473 }
474 (State::RandPartition, _) | (_, State::RandPartition)
475 if self.rand_repair(global_config.message_loss(), rand) =>
476 {
477 self.release();
478 }
479 _ => {}
480 }
481 }
482
483 fn hold(&mut self) {
484 self.state_a_b = State::Hold;
485 self.state_b_a = State::Hold;
486
487 for sent in &mut self.sent {
488 sent.status = DeliveryStatus::Hold;
489 }
490 }
491
492 fn release(&mut self) {
494 self.state_a_b = State::Healthy;
495 self.state_b_a = State::Healthy;
496 for sent in &mut self.sent {
497 if let DeliveryStatus::Hold = sent.status {
498 sent.deliver(self.now);
499 }
500 }
501 }
502
503 fn explicit_partition(&mut self) {
504 self.state_a_b = State::ExplicitPartition;
505 self.state_b_a = State::ExplicitPartition;
506
507 self.sent.clear();
508 }
509
510 fn partition_oneway(&mut self, from: IpAddr, to: IpAddr) {
511 if from < to {
512 self.state_a_b = State::ExplicitPartition;
513 } else {
514 self.state_b_a = State::ExplicitPartition;
515 }
516
517 self.sent.retain(|sent| sent.src.ip() != from);
518 }
519
520 fn repair_oneway(&mut self, from: IpAddr, to: IpAddr) {
521 if from < to {
522 self.state_a_b = State::Healthy;
523 } else {
524 self.state_b_a = State::Healthy;
525 }
526 }
527
528 fn explicit_repair(&mut self) {
530 self.state_a_b = State::Healthy;
531 self.state_b_a = State::Healthy;
532 }
533
534 fn rand_partition(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
536 let config = self.config.message_loss.as_ref().unwrap_or(global);
537 let fail_rate = config.fail_rate;
538 fail_rate > 0.0 && rand.random_bool(fail_rate)
539 }
540
541 fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
542 let config = self.config.message_loss.as_ref().unwrap_or(global);
543 let repair_rate = config.repair_rate;
544 repair_rate > 0.0 && rand.random_bool(repair_rate)
545 }
546
547 fn delay(&self, global: &config::Latency, rand: &mut dyn RngCore) -> Duration {
548 let config = self.config.latency.as_ref().unwrap_or(global);
549
550 let mult = config.latency_distribution.sample(rand);
551 let range = (config.max_message_latency - config.min_message_latency).as_millis() as f64;
552 let delay = config.min_message_latency + Duration::from_millis((range * mult) as _);
553
554 std::cmp::min(delay, config.max_message_latency)
555 }
556
557 fn latency(&mut self, global: &config::Latency) -> &mut config::Latency {
558 self.config.latency.get_or_insert_with(|| global.clone())
559 }
560
561 fn message_loss(&mut self, global: &config::MessageLoss) -> &mut config::MessageLoss {
562 self.config
563 .message_loss
564 .get_or_insert_with(|| global.clone())
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use crate::config::Link;
571
572 use super::*;
573
574 #[test]
575 #[should_panic(expected = "unable to find link between Pair(10.0.0.1, 192.168.0.1)")]
576 fn link_access_gives_useful_panic_message() {
577 let mut topo = Topology::new(Link::default());
578 topo.hold("10.0.0.1".parse().unwrap(), "192.168.0.1".parse().unwrap());
579 }
580}