1use crate::envelope::{Envelope, Protocol};
2use crate::host::Host;
3use crate::rt::Rt;
4use crate::{config, TRACING_TARGET};
5
6use indexmap::IndexMap;
7use rand::{Rng, RngCore};
8use rand_distr::{Distribution, Exp};
9use std::collections::VecDeque;
10use std::io::{Error, ErrorKind, Result};
11use std::net::{IpAddr, SocketAddr};
12use std::time::Duration;
13use tokio::time::Instant;
14
15pub(crate) struct Topology {
17 config: config::Link,
18
19 links: IndexMap<Pair, Link>,
21
22 rt: Rt<'static>,
26}
27
28#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32struct Pair(IpAddr, IpAddr);
33
34impl Pair {
35 fn new(a: IpAddr, b: IpAddr) -> Pair {
36 assert_ne!(a, b);
37
38 if a < b {
39 Pair(a, b)
40 } else {
41 Pair(b, a)
42 }
43 }
44}
45
46pub struct LinksIter<'a> {
49 iter: indexmap::map::IterMut<'a, Pair, Link>,
50}
51
52pub struct LinkIter<'a> {
55 a: IpAddr,
56 b: IpAddr,
57 now: Instant,
58 iter: std::collections::vec_deque::IterMut<'a, Sent>,
59}
60
61impl LinkIter<'_> {
62 pub fn pair(&self) -> (IpAddr, IpAddr) {
65 (self.a, self.b)
66 }
67
68 pub fn deliver_all(self) {
71 for sent in self {
72 sent.deliver();
73 }
74 }
75}
76
77pub struct SentRef<'a> {
80 src: SocketAddr,
81 dst: SocketAddr,
82 now: Instant,
83 sent: &'a mut Sent,
84}
85
86impl SentRef<'_> {
87 pub fn pair(&self) -> (SocketAddr, SocketAddr) {
89 (self.src, self.dst)
90 }
91
92 pub fn protocol(&self) -> &Protocol {
94 &self.sent.protocol
95 }
96
97 pub fn deliver(self) {
100 self.sent.deliver(self.now);
101 }
102}
103
104impl<'a> Iterator for LinksIter<'a> {
105 type Item = LinkIter<'a>;
106
107 fn next(&mut self) -> Option<Self::Item> {
108 let (pair, link) = self.iter.next()?;
109
110 Some(LinkIter {
111 a: pair.0,
112 b: pair.1,
113 now: link.now,
114 iter: link.sent.iter_mut(),
115 })
116 }
117}
118
119impl<'a> Iterator for LinkIter<'a> {
120 type Item = SentRef<'a>;
121
122 fn next(&mut self) -> Option<Self::Item> {
123 let sent = self.iter.next()?;
124
125 Some(SentRef {
126 src: sent.src,
127 dst: sent.dst,
128 now: self.now,
129 sent,
130 })
131 }
132}
133
134struct Link {
136 state_a_b: State,
140 state_b_a: State,
144
145 config: config::Link,
147
148 sent: VecDeque<Sent>,
151
152 deliverable: IndexMap<IpAddr, VecDeque<Envelope>>,
154
155 now: Instant,
157}
158
159#[derive(Clone, Copy)]
161enum State {
162 Healthy,
164
165 ExplicitPartition,
167
168 RandPartition,
170
171 Hold,
173}
174
175impl Topology {
176 pub(crate) fn new(config: config::Link) -> Topology {
177 Topology {
178 config,
179 links: IndexMap::new(),
180 rt: Rt::no_software(),
181 }
182 }
183
184 pub(crate) fn register(&mut self, a: IpAddr, b: IpAddr) {
186 let pair = Pair::new(a, b);
187 assert!(self.links.insert(pair, Link::new(self.rt.now())).is_none());
188 }
189
190 pub(crate) fn set_max_message_latency(&mut self, value: Duration) {
191 self.config.latency_mut().max_message_latency = value;
192 }
193
194 pub(crate) fn set_link_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
195 let latency = self.links[&Pair::new(a, b)].latency(self.config.latency());
196 latency.min_message_latency = value;
197 latency.max_message_latency = value;
198 }
199
200 pub(crate) fn set_link_max_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
201 self.links[&Pair::new(a, b)]
202 .latency(self.config.latency())
203 .max_message_latency = value;
204 }
205
206 pub(crate) fn set_message_latency_curve(&mut self, value: f64) {
207 self.config.latency_mut().latency_distribution = Exp::new(value).unwrap();
208 }
209
210 pub(crate) fn set_fail_rate(&mut self, value: f64) {
211 self.config.message_loss_mut().fail_rate = value;
212 }
213
214 pub(crate) fn set_link_fail_rate(&mut self, a: IpAddr, b: IpAddr, value: f64) {
215 self.links[&Pair::new(a, b)]
216 .message_loss(self.config.message_loss())
217 .fail_rate = value;
218 }
219
220 pub(crate) fn enqueue_message(
224 &mut self,
225 rand: &mut dyn RngCore,
226 src: SocketAddr,
227 dst: SocketAddr,
228 message: Protocol,
229 ) -> Result<()> {
230 if let Some(link) = self.links.get_mut(&Pair::new(src.ip(), dst.ip())) {
231 link.enqueue_message(&self.config, rand, src, dst, message);
232 Ok(())
233 } else {
234 Err(Error::new(
235 ErrorKind::ConnectionRefused,
236 "Connection refused",
237 ))
238 }
239 }
240
241 pub(crate) fn deliver_messages(&mut self, rand: &mut dyn RngCore, dst: &mut Host) {
243 for (pair, link) in &mut self.links {
244 if pair.0 == dst.addr || pair.1 == dst.addr {
245 link.deliver_messages(&self.config, rand, dst);
246 }
247 }
248 }
249
250 pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
251 self.links[&Pair::new(a, b)].hold();
252 }
253
254 pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
255 self.links[&Pair::new(a, b)].release();
256 }
257
258 pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
259 self.links[&Pair::new(a, b)].explicit_partition();
260 }
261
262 pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
263 let link = &mut self.links[&Pair::new(a, b)];
264 link.partition_oneway(a, b);
265 }
266
267 pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
268 self.links[&Pair::new(a, b)].explicit_repair();
269 }
270
271 pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
272 let link = &mut self.links[&Pair::new(a, b)];
273 link.repair_oneway(a, b);
274 }
275
276 pub(crate) fn tick_by(&mut self, duration: Duration) {
277 let _ = self.rt.tick(duration);
278 for link in self.links.values_mut() {
279 link.tick(self.rt.now());
280 }
281 }
282
283 pub(crate) fn iter_mut(&mut self) -> LinksIter {
284 LinksIter {
285 iter: self.links.iter_mut(),
286 }
287 }
288}
289
290struct Sent {
292 src: SocketAddr,
293 dst: SocketAddr,
294 status: DeliveryStatus,
295 protocol: Protocol,
296}
297
298impl Sent {
299 fn deliver(&mut self, now: Instant) {
300 self.status = DeliveryStatus::DeliverAfter(now);
301 }
302}
303
304enum DeliveryStatus {
305 DeliverAfter(Instant),
306 Hold,
307}
308
309impl Link {
310 fn new(now: Instant) -> Link {
311 Link {
312 state_a_b: State::Healthy,
313 state_b_a: State::Healthy,
314 config: config::Link::default(),
315 sent: VecDeque::new(),
316 deliverable: IndexMap::new(),
317 now,
318 }
319 }
320
321 fn enqueue_message(
322 &mut self,
323 global_config: &config::Link,
324 rand: &mut dyn RngCore,
325 src: SocketAddr,
326 dst: SocketAddr,
327 message: Protocol,
328 ) {
329 tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Send");
330
331 self.rand_partition_or_repair(global_config, rand);
332 self.enqueue(global_config, rand, src, dst, message);
333 self.process_deliverables();
334 }
335
336 fn get_state_for_message(&self, src: IpAddr, dst: IpAddr) -> State {
337 if src < dst {
344 self.state_a_b
345 } else {
346 self.state_b_a
347 }
348 }
349
350 fn enqueue(
356 &mut self,
357 global_config: &config::Link,
358 rand: &mut dyn RngCore,
359 src: SocketAddr,
360 dst: SocketAddr,
361 message: Protocol,
362 ) {
363 let state = self.get_state_for_message(src.ip(), dst.ip());
364 let status = match state {
365 State::Healthy => {
366 let delay = self.delay(global_config.latency(), rand);
367 DeliveryStatus::DeliverAfter(self.now + delay)
368 }
369 State::Hold => {
371 tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Hold");
372 DeliveryStatus::Hold
373 }
374 _ => {
375 tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Drop");
376 return;
377 }
378 };
379
380 let sent = Sent {
381 src,
382 dst,
383 status,
384 protocol: message,
385 };
386
387 self.sent.push_back(sent);
388 }
389
390 fn tick(&mut self, now: Instant) {
391 self.now = now;
392 self.process_deliverables();
393 }
394
395 fn process_deliverables(&mut self) {
396 let mut deliverable = 0;
399 for i in 0..self.sent.len() {
400 let index = i - deliverable;
401 let sent = &self.sent[index];
402 if let DeliveryStatus::DeliverAfter(time) = sent.status {
403 if time <= self.now {
404 let sent = self.sent.remove(index).unwrap();
405 let envelope = Envelope {
406 src: sent.src,
407 dst: sent.dst,
408 message: sent.protocol,
409 };
410 self.deliverable
411 .entry(sent.dst.ip())
412 .or_default()
413 .push_back(envelope);
414 deliverable += 1;
415 }
416 }
417 }
418 }
419
420 fn deliver_messages(
424 &mut self,
425 global_config: &config::Link,
426 rand: &mut dyn RngCore,
427 host: &mut Host,
428 ) {
429 let deliverable = self
430 .deliverable
431 .entry(host.addr)
432 .or_default()
433 .drain(..)
434 .collect::<Vec<Envelope>>();
435
436 for message in deliverable {
437 let (src, dst) = (message.src, message.dst);
438 if let Err(message) = host.receive_from_network(message) {
439 self.enqueue_message(global_config, rand, dst, src, message);
440 }
441 }
442 }
443
444 fn rand_partition_or_repair(&mut self, global_config: &config::Link, rand: &mut dyn RngCore) {
446 let do_rand = self.rand_partition(global_config.message_loss(), rand);
447 match (self.state_a_b, self.state_b_a) {
448 (State::Healthy, _) | (_, State::Healthy) => {
449 if do_rand {
450 self.state_a_b = State::RandPartition;
451 self.state_b_a = State::RandPartition;
452 }
453 }
454 (State::RandPartition, _) | (_, State::RandPartition) => {
455 if self.rand_repair(global_config.message_loss(), rand) {
456 self.release();
457 }
458 }
459 _ => {}
460 }
461 }
462
463 fn hold(&mut self) {
464 self.state_a_b = State::Hold;
465 self.state_b_a = State::Hold;
466 }
467
468 fn release(&mut self) {
470 self.state_a_b = State::Healthy;
471 self.state_b_a = State::Healthy;
472 for sent in &mut self.sent {
473 if let DeliveryStatus::Hold = sent.status {
474 sent.deliver(self.now);
475 }
476 }
477 }
478
479 fn explicit_partition(&mut self) {
480 self.state_a_b = State::ExplicitPartition;
481 self.state_b_a = State::ExplicitPartition;
482 }
483
484 fn partition_oneway(&mut self, from: IpAddr, to: IpAddr) {
485 if from < to {
486 self.state_a_b = State::ExplicitPartition;
487 } else {
488 self.state_b_a = State::ExplicitPartition;
489 }
490 }
491
492 fn repair_oneway(&mut self, from: IpAddr, to: IpAddr) {
493 if from < to {
494 self.state_a_b = State::Healthy;
495 } else {
496 self.state_b_a = State::Healthy;
497 }
498 }
499
500 fn explicit_repair(&mut self) {
502 self.state_a_b = State::Healthy;
503 self.state_b_a = State::Healthy;
504 }
505
506 fn rand_partition(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
508 let config = self.config.message_loss.as_ref().unwrap_or(global);
509 let fail_rate = config.fail_rate;
510 fail_rate > 0.0 && rand.gen_bool(fail_rate)
511 }
512
513 fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
514 let config = self.config.message_loss.as_ref().unwrap_or(global);
515 let repair_rate = config.repair_rate;
516 repair_rate > 0.0 && rand.gen_bool(repair_rate)
517 }
518
519 fn delay(&self, global: &config::Latency, rand: &mut dyn RngCore) -> Duration {
520 let config = self.config.latency.as_ref().unwrap_or(global);
521
522 let mult = config.latency_distribution.sample(rand);
523 let range = (config.max_message_latency - config.min_message_latency).as_millis() as f64;
524 let delay = config.min_message_latency + Duration::from_millis((range * mult) as _);
525
526 std::cmp::min(delay, config.max_message_latency)
527 }
528
529 fn latency(&mut self, global: &config::Latency) -> &mut config::Latency {
530 self.config.latency.get_or_insert_with(|| global.clone())
531 }
532
533 fn message_loss(&mut self, global: &config::MessageLoss) -> &mut config::MessageLoss {
534 self.config
535 .message_loss
536 .get_or_insert_with(|| global.clone())
537 }
538}