1use crate::envelope::{Envelope, Protocol};
2use crate::host::Host;
3use crate::rt::Rt;
4use crate::{config, TRACING_TARGET};
5
6use indexmap::IndexMap;
7use rand::{Rng, RngCore};
8use rand_distr::{Distribution, Exp};
9use std::collections::VecDeque;
10use std::io::{Error, ErrorKind, Result};
11use std::net::{IpAddr, SocketAddr};
12use std::time::Duration;
13use tokio::time::Instant;
14
15pub(crate) struct Topology {
17 config: config::Link,
18
19 links: IndexMap<Pair, Link>,
21
22 rt: Rt<'static>,
26}
27
28#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32struct Pair(IpAddr, IpAddr);
33
34impl Pair {
35 fn new(a: IpAddr, b: IpAddr) -> Pair {
36 assert_ne!(a, b);
37
38 if a < b {
39 Pair(a, b)
40 } else {
41 Pair(b, a)
42 }
43 }
44}
45
46pub struct LinksIter<'a> {
49 iter: indexmap::map::IterMut<'a, Pair, Link>,
50}
51
52pub struct LinkIter<'a> {
55 a: IpAddr,
56 b: IpAddr,
57 now: Instant,
58 iter: std::collections::vec_deque::IterMut<'a, Sent>,
59}
60
61impl LinkIter<'_> {
62 pub fn pair(&self) -> (IpAddr, IpAddr) {
65 (self.a, self.b)
66 }
67
68 pub fn deliver_all(self) {
71 for sent in self {
72 sent.deliver();
73 }
74 }
75}
76
77pub struct SentRef<'a> {
80 src: SocketAddr,
81 dst: SocketAddr,
82 now: Instant,
83 sent: &'a mut Sent,
84}
85
86impl SentRef<'_> {
87 pub fn pair(&self) -> (SocketAddr, SocketAddr) {
89 (self.src, self.dst)
90 }
91
92 pub fn protocol(&self) -> &Protocol {
94 &self.sent.protocol
95 }
96
97 pub fn deliver(self) {
100 self.sent.deliver(self.now);
101 }
102}
103
104impl<'a> Iterator for LinksIter<'a> {
105 type Item = LinkIter<'a>;
106
107 fn next(&mut self) -> Option<Self::Item> {
108 let (pair, link) = self.iter.next()?;
109
110 Some(LinkIter {
111 a: pair.0,
112 b: pair.1,
113 now: link.now,
114 iter: link.sent.iter_mut(),
115 })
116 }
117}
118
119impl<'a> Iterator for LinkIter<'a> {
120 type Item = SentRef<'a>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 let sent = self.iter.next()?;
124
125 Some(SentRef {
126 src: sent.src,
127 dst: sent.dst,
128 now: self.now,
129 sent,
130 })
131 }
132}
133
134struct Link {
136 state_a_b: State,
140 state_b_a: State,
144
145 config: config::Link,
147
148 sent: VecDeque<Sent>,
151
152 deliverable: IndexMap<IpAddr, VecDeque<Envelope>>,
154
155 now: Instant,
157}
158
159#[derive(Clone, Copy)]
161enum State {
162 Healthy,
164
165 ExplicitPartition,
167
168 RandPartition,
170
171 Hold,
173}
174
175impl Topology {
176 pub(crate) fn new(config: config::Link) -> Topology {
177 Topology {
178 config,
179 links: IndexMap::new(),
180 rt: Rt::no_software(),
181 }
182 }
183
184 pub(crate) fn register(&mut self, a: IpAddr, b: IpAddr) {
186 let pair = Pair::new(a, b);
187 assert!(self.links.insert(pair, Link::new(self.rt.now())).is_none());
188 }
189
190 pub(crate) fn set_max_message_latency(&mut self, value: Duration) {
191 self.config.latency_mut().max_message_latency = value;
192 }
193
194 pub(crate) fn set_link_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
195 let pair = Pair::new(a, b);
196 let latency = self
197 .links
198 .get_mut(&pair)
199 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
200 .latency(self.config.latency());
201 latency.min_message_latency = value;
202 latency.max_message_latency = value;
203 }
204
205 pub(crate) fn set_link_max_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
206 let pair = Pair::new(a, b);
207 self.links
208 .get_mut(&pair)
209 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
210 .latency(self.config.latency())
211 .max_message_latency = value;
212 }
213
214 pub(crate) fn set_message_latency_curve(&mut self, value: f64) {
215 self.config.latency_mut().latency_distribution = Exp::new(value).unwrap();
216 }
217
218 pub(crate) fn set_fail_rate(&mut self, value: f64) {
219 self.config.message_loss_mut().fail_rate = value;
220 }
221
222 pub(crate) fn set_link_fail_rate(&mut self, a: IpAddr, b: IpAddr, value: f64) {
223 let pair = Pair::new(a, b);
224 self.links
225 .get_mut(&pair)
226 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
227 .message_loss(self.config.message_loss())
228 .fail_rate = value;
229 }
230
231 pub(crate) fn enqueue_message(
235 &mut self,
236 rand: &mut dyn RngCore,
237 src: SocketAddr,
238 dst: SocketAddr,
239 message: Protocol,
240 ) -> Result<()> {
241 if let Some(link) = self.links.get_mut(&Pair::new(src.ip(), dst.ip())) {
242 link.enqueue_message(&self.config, rand, src, dst, message);
243 Ok(())
244 } else {
245 Err(Error::new(
246 ErrorKind::ConnectionRefused,
247 "Connection refused",
248 ))
249 }
250 }
251
252 pub(crate) fn deliver_messages(&mut self, rand: &mut dyn RngCore, dst: &mut Host) {
254 for (pair, link) in &mut self.links {
255 if pair.0 == dst.addr || pair.1 == dst.addr {
256 link.deliver_messages(&self.config, rand, dst);
257 }
258 }
259 }
260
261 pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
262 self.get_link_mut(&Pair::new(a, b)).hold();
263 }
264
265 pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
266 self.get_link_mut(&Pair::new(a, b)).release();
267 }
268
269 pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
270 self.get_link_mut(&Pair::new(a, b)).explicit_partition();
271 }
272
273 pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
274 let link = &mut self.get_link_mut(&Pair::new(a, b));
275 link.partition_oneway(a, b);
276 }
277
278 pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
279 self.get_link_mut(&Pair::new(a, b)).explicit_repair();
280 }
281
282 pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
283 let link = &mut self.get_link_mut(&Pair::new(a, b));
284 link.repair_oneway(a, b);
285 }
286
287 pub(crate) fn tick_by(&mut self, duration: Duration) {
288 let _ = self.rt.tick(duration);
289 for link in self.links.values_mut() {
290 link.tick(self.rt.now());
291 }
292 }
293
294 pub(crate) fn iter_mut(&mut self) -> LinksIter<'_> {
295 LinksIter {
296 iter: self.links.iter_mut(),
297 }
298 }
299
300 #[inline]
301 fn get_link_mut(&mut self, pair: &Pair) -> &mut Link {
302 self.links
303 .get_mut(pair)
304 .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
305 }
306}
307
308struct Sent {
310 src: SocketAddr,
311 dst: SocketAddr,
312 status: DeliveryStatus,
313 protocol: Protocol,
314}
315
316impl Sent {
317 fn deliver(&mut self, now: Instant) {
318 self.status = DeliveryStatus::DeliverAfter(now);
319 }
320}
321
322enum DeliveryStatus {
323 DeliverAfter(Instant),
324 Hold,
325}
326
327impl Link {
328 fn new(now: Instant) -> Link {
329 Link {
330 state_a_b: State::Healthy,
331 state_b_a: State::Healthy,
332 config: config::Link::default(),
333 sent: VecDeque::new(),
334 deliverable: IndexMap::new(),
335 now,
336 }
337 }
338
339 fn enqueue_message(
340 &mut self,
341 global_config: &config::Link,
342 rand: &mut dyn RngCore,
343 src: SocketAddr,
344 dst: SocketAddr,
345 message: Protocol,
346 ) {
347 tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Send");
348
349 self.rand_partition_or_repair(global_config, rand);
350 self.enqueue(global_config, rand, src, dst, message);
351 self.process_deliverables();
352 }
353
354 fn get_state_for_message(&self, src: IpAddr, dst: IpAddr) -> State {
355 if src < dst {
362 self.state_a_b
363 } else {
364 self.state_b_a
365 }
366 }
367
368 fn enqueue(
374 &mut self,
375 global_config: &config::Link,
376 rand: &mut dyn RngCore,
377 src: SocketAddr,
378 dst: SocketAddr,
379 message: Protocol,
380 ) {
381 let state = self.get_state_for_message(src.ip(), dst.ip());
382 let status = match state {
383 State::Healthy => {
384 let delay = self.delay(global_config.latency(), rand);
385 DeliveryStatus::DeliverAfter(self.now + delay)
386 }
387 State::Hold => {
389 tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Hold");
390 DeliveryStatus::Hold
391 }
392 _ => {
393 tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Drop");
394 return;
395 }
396 };
397
398 let sent = Sent {
399 src,
400 dst,
401 status,
402 protocol: message,
403 };
404
405 self.sent.push_back(sent);
406 }
407
408 fn tick(&mut self, now: Instant) {
409 self.now = now;
410 self.process_deliverables();
411 }
412
413 fn process_deliverables(&mut self) {
414 let mut deliverable = 0;
417 for i in 0..self.sent.len() {
418 let index = i - deliverable;
419 let sent = &self.sent[index];
420 if let DeliveryStatus::DeliverAfter(time) = sent.status {
421 if time <= self.now {
422 let sent = self.sent.remove(index).unwrap();
423 let envelope = Envelope {
424 src: sent.src,
425 dst: sent.dst,
426 message: sent.protocol,
427 };
428 self.deliverable
429 .entry(sent.dst.ip())
430 .or_default()
431 .push_back(envelope);
432 deliverable += 1;
433 }
434 }
435 }
436 }
437
438 fn deliver_messages(
442 &mut self,
443 global_config: &config::Link,
444 rand: &mut dyn RngCore,
445 host: &mut Host,
446 ) {
447 let deliverable = self
448 .deliverable
449 .entry(host.addr)
450 .or_default()
451 .drain(..)
452 .collect::<Vec<Envelope>>();
453
454 for message in deliverable {
455 let (src, dst) = (message.src, message.dst);
456 if let Err(message) = host.receive_from_network(message) {
457 self.enqueue_message(global_config, rand, dst, src, message);
458 }
459 }
460 }
461
462 fn rand_partition_or_repair(&mut self, global_config: &config::Link, rand: &mut dyn RngCore) {
464 let do_rand = self.rand_partition(global_config.message_loss(), rand);
465 match (self.state_a_b, self.state_b_a) {
466 (State::Healthy, _) | (_, State::Healthy) => {
467 if do_rand {
468 self.state_a_b = State::RandPartition;
469 self.state_b_a = State::RandPartition;
470
471 self.sent.clear();
472 }
473 }
474 (State::RandPartition, _) | (_, State::RandPartition) => {
475 if self.rand_repair(global_config.message_loss(), rand) {
476 self.release();
477 }
478 }
479 _ => {}
480 }
481 }
482
483 fn hold(&mut self) {
484 self.state_a_b = State::Hold;
485 self.state_b_a = State::Hold;
486
487 for sent in &mut self.sent {
488 sent.status = DeliveryStatus::Hold;
489 }
490 }
491
492 fn release(&mut self) {
494 self.state_a_b = State::Healthy;
495 self.state_b_a = State::Healthy;
496 for sent in &mut self.sent {
497 if let DeliveryStatus::Hold = sent.status {
498 sent.deliver(self.now);
499 }
500 }
501 }
502
503 fn explicit_partition(&mut self) {
504 self.state_a_b = State::ExplicitPartition;
505 self.state_b_a = State::ExplicitPartition;
506
507 self.sent.clear();
508 }
509
510 fn partition_oneway(&mut self, from: IpAddr, to: IpAddr) {
511 if from < to {
512 self.state_a_b = State::ExplicitPartition;
513 } else {
514 self.state_b_a = State::ExplicitPartition;
515 }
516
517 self.sent.retain(|sent| sent.src.ip() != from);
518 }
519
520 fn repair_oneway(&mut self, from: IpAddr, to: IpAddr) {
521 if from < to {
522 self.state_a_b = State::Healthy;
523 } else {
524 self.state_b_a = State::Healthy;
525 }
526 }
527
528 fn explicit_repair(&mut self) {
530 self.state_a_b = State::Healthy;
531 self.state_b_a = State::Healthy;
532 }
533
534 fn rand_partition(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
536 let config = self.config.message_loss.as_ref().unwrap_or(global);
537 let fail_rate = config.fail_rate;
538 fail_rate > 0.0 && rand.random_bool(fail_rate)
539 }
540
541 fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
542 let config = self.config.message_loss.as_ref().unwrap_or(global);
543 let repair_rate = config.repair_rate;
544 repair_rate > 0.0 && rand.random_bool(repair_rate)
545 }
546
547 fn delay(&self, global: &config::Latency, rand: &mut dyn RngCore) -> Duration {
548 let config = self.config.latency.as_ref().unwrap_or(global);
549
550 let mult = config.latency_distribution.sample(rand);
551 let range = (config.max_message_latency - config.min_message_latency).as_millis() as f64;
552 let delay = config.min_message_latency + Duration::from_millis((range * mult) as _);
553
554 std::cmp::min(delay, config.max_message_latency)
555 }
556
557 fn latency(&mut self, global: &config::Latency) -> &mut config::Latency {
558 self.config.latency.get_or_insert_with(|| global.clone())
559 }
560
561 fn message_loss(&mut self, global: &config::MessageLoss) -> &mut config::MessageLoss {
562 self.config
563 .message_loss
564 .get_or_insert_with(|| global.clone())
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use crate::config::Link;
571
572 use super::*;
573
574 #[test]
575 #[should_panic(expected = "unable to find link between Pair(10.0.0.1, 192.168.0.1)")]
576 fn link_access_gives_useful_panic_message() {
577 let mut topo = Topology::new(Link::default());
578 topo.hold("10.0.0.1".parse().unwrap(), "192.168.0.1".parse().unwrap());
579 }
580}