turmoil/
top.rs

1use crate::envelope::{Envelope, Protocol};
2use crate::host::Host;
3use crate::rt::Rt;
4use crate::{config, TRACING_TARGET};
5
6use indexmap::IndexMap;
7use rand::{Rng, RngCore};
8use rand_distr::{Distribution, Exp};
9use std::collections::VecDeque;
10use std::io::{Error, ErrorKind, Result};
11use std::net::{IpAddr, SocketAddr};
12use std::time::Duration;
13use tokio::time::Instant;
14
15/// Describes the network topology.
16pub(crate) struct Topology {
17    config: config::Link,
18
19    /// Specific configuration overrides between specific hosts.
20    links: IndexMap<Pair, Link>,
21
22    /// We don't use a Rt for async. Right now, we just use it to tick time
23    /// forward in the same way we do it elsewhere. We'd like to represent
24    /// network state with async in the future.
25    rt: Rt<'static>,
26}
27
28/// This type is used as the key in the [`Topology::links`] map. See [`new`]
29/// which orders the addrs, such that this type uniquely identifies the link
30/// between two hosts on the network.
31#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32struct Pair(IpAddr, IpAddr);
33
34impl Pair {
35    fn new(a: IpAddr, b: IpAddr) -> Pair {
36        assert_ne!(a, b);
37
38        if a < b {
39            Pair(a, b)
40        } else {
41            Pair(b, a)
42        }
43    }
44}
45
46/// An iterator for the network topology, providing access to all active links
47/// in the simulated network.
48pub struct LinksIter<'a> {
49    iter: indexmap::map::IterMut<'a, Pair, Link>,
50}
51
52/// An iterator for the link, providing access to sent messages that have not
53/// yet been delivered.
54pub struct LinkIter<'a> {
55    a: IpAddr,
56    b: IpAddr,
57    now: Instant,
58    iter: std::collections::vec_deque::IterMut<'a, Sent>,
59}
60
61impl LinkIter<'_> {
62    /// The [`IpAddr`] pair for the link. Always ordered to uniquely identify
63    /// the link.
64    pub fn pair(&self) -> (IpAddr, IpAddr) {
65        (self.a, self.b)
66    }
67
68    /// Schedule all messages on the link for delivery the next time the
69    /// simulation steps, consuming the iterator.
70    pub fn deliver_all(self) {
71        for sent in self {
72            sent.deliver();
73        }
74    }
75}
76
77/// Provides a reference to a message that is currently inflight on the network
78/// from one host to another.
79pub struct SentRef<'a> {
80    src: SocketAddr,
81    dst: SocketAddr,
82    now: Instant,
83    sent: &'a mut Sent,
84}
85
86impl SentRef<'_> {
87    /// The (src, dst) [`SocketAddr`] pair for the message.
88    pub fn pair(&self) -> (SocketAddr, SocketAddr) {
89        (self.src, self.dst)
90    }
91
92    /// The message [`Protocol`].
93    pub fn protocol(&self) -> &Protocol {
94        &self.sent.protocol
95    }
96
97    /// Schedule the message for delivery the next time the simulation steps,
98    /// consuming the item.
99    pub fn deliver(self) {
100        self.sent.deliver(self.now);
101    }
102}
103
104impl<'a> Iterator for LinksIter<'a> {
105    type Item = LinkIter<'a>;
106
107    fn next(&mut self) -> Option<Self::Item> {
108        let (pair, link) = self.iter.next()?;
109
110        Some(LinkIter {
111            a: pair.0,
112            b: pair.1,
113            now: link.now,
114            iter: link.sent.iter_mut(),
115        })
116    }
117}
118
119impl<'a> Iterator for LinkIter<'a> {
120    type Item = SentRef<'a>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        let sent = self.iter.next()?;
124
125        Some(SentRef {
126            src: sent.src,
127            dst: sent.dst,
128            now: self.now,
129            sent,
130        })
131    }
132}
133
134/// A two-way link between two hosts on the network.
135struct Link {
136    /// The state of the link from node 'a' to node 'b'.
137    /// Of the two nodes attached to a link, 'a' is the
138    /// one whose ip address sorts smallest.
139    state_a_b: State,
140    /// The state of the link from node 'b' to node 'a'.
141    /// Of the two nodes attached to a link, 'b' is the
142    /// one whose ip address sorts greatest.
143    state_b_a: State,
144
145    /// Optional, per-link configuration.
146    config: config::Link,
147
148    /// Sent messages that are either scheduled for delivery in the future
149    /// or are on hold.
150    sent: VecDeque<Sent>,
151
152    /// Messages that are ready to be delivered.
153    deliverable: IndexMap<IpAddr, VecDeque<Envelope>>,
154
155    /// The current network time, moved forward with [`Link::tick`].
156    now: Instant,
157}
158
159/// States that a link between two nodes can be in.
160#[derive(Clone, Copy)]
161enum State {
162    /// The link is healthy.
163    Healthy,
164
165    /// The link was explicitly partitioned.
166    ExplicitPartition,
167
168    /// The link was randomly partitioned.
169    RandPartition,
170
171    /// Messages are being held indefinitely.
172    Hold,
173}
174
175impl Topology {
176    pub(crate) fn new(config: config::Link) -> Topology {
177        Topology {
178            config,
179            links: IndexMap::new(),
180            rt: Rt::no_software(),
181        }
182    }
183
184    /// Register a link between two hosts
185    pub(crate) fn register(&mut self, a: IpAddr, b: IpAddr) {
186        let pair = Pair::new(a, b);
187        assert!(self.links.insert(pair, Link::new(self.rt.now())).is_none());
188    }
189
190    pub(crate) fn set_max_message_latency(&mut self, value: Duration) {
191        self.config.latency_mut().max_message_latency = value;
192    }
193
194    pub(crate) fn set_link_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
195        let latency = self.links[&Pair::new(a, b)].latency(self.config.latency());
196        latency.min_message_latency = value;
197        latency.max_message_latency = value;
198    }
199
200    pub(crate) fn set_link_max_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
201        self.links[&Pair::new(a, b)]
202            .latency(self.config.latency())
203            .max_message_latency = value;
204    }
205
206    pub(crate) fn set_message_latency_curve(&mut self, value: f64) {
207        self.config.latency_mut().latency_distribution = Exp::new(value).unwrap();
208    }
209
210    pub(crate) fn set_fail_rate(&mut self, value: f64) {
211        self.config.message_loss_mut().fail_rate = value;
212    }
213
214    pub(crate) fn set_link_fail_rate(&mut self, a: IpAddr, b: IpAddr, value: f64) {
215        self.links[&Pair::new(a, b)]
216            .message_loss(self.config.message_loss())
217            .fail_rate = value;
218    }
219
220    // Send a `message` from `src` to `dst`. This method returns immediately,
221    // and message delivery happens at a later time (or never, if the link is
222    // broken).
223    pub(crate) fn enqueue_message(
224        &mut self,
225        rand: &mut dyn RngCore,
226        src: SocketAddr,
227        dst: SocketAddr,
228        message: Protocol,
229    ) -> Result<()> {
230        if let Some(link) = self.links.get_mut(&Pair::new(src.ip(), dst.ip())) {
231            link.enqueue_message(&self.config, rand, src, dst, message);
232            Ok(())
233        } else {
234            Err(Error::new(
235                ErrorKind::ConnectionRefused,
236                "Connection refused",
237            ))
238        }
239    }
240
241    // Move messages from any network links to the `dst` host.
242    pub(crate) fn deliver_messages(&mut self, rand: &mut dyn RngCore, dst: &mut Host) {
243        for (pair, link) in &mut self.links {
244            if pair.0 == dst.addr || pair.1 == dst.addr {
245                link.deliver_messages(&self.config, rand, dst);
246            }
247        }
248    }
249
250    pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
251        self.links[&Pair::new(a, b)].hold();
252    }
253
254    pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
255        self.links[&Pair::new(a, b)].release();
256    }
257
258    pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
259        self.links[&Pair::new(a, b)].explicit_partition();
260    }
261
262    pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
263        let link = &mut self.links[&Pair::new(a, b)];
264        link.partition_oneway(a, b);
265    }
266
267    pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
268        self.links[&Pair::new(a, b)].explicit_repair();
269    }
270
271    pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
272        let link = &mut self.links[&Pair::new(a, b)];
273        link.repair_oneway(a, b);
274    }
275
276    pub(crate) fn tick_by(&mut self, duration: Duration) {
277        let _ = self.rt.tick(duration);
278        for link in self.links.values_mut() {
279            link.tick(self.rt.now());
280        }
281    }
282
283    pub(crate) fn iter_mut(&mut self) -> LinksIter {
284        LinksIter {
285            iter: self.links.iter_mut(),
286        }
287    }
288}
289
290/// Represents a message sent between two hosts on the network.
291struct Sent {
292    src: SocketAddr,
293    dst: SocketAddr,
294    status: DeliveryStatus,
295    protocol: Protocol,
296}
297
298impl Sent {
299    fn deliver(&mut self, now: Instant) {
300        self.status = DeliveryStatus::DeliverAfter(now);
301    }
302}
303
304enum DeliveryStatus {
305    DeliverAfter(Instant),
306    Hold,
307}
308
309impl Link {
310    fn new(now: Instant) -> Link {
311        Link {
312            state_a_b: State::Healthy,
313            state_b_a: State::Healthy,
314            config: config::Link::default(),
315            sent: VecDeque::new(),
316            deliverable: IndexMap::new(),
317            now,
318        }
319    }
320
321    fn enqueue_message(
322        &mut self,
323        global_config: &config::Link,
324        rand: &mut dyn RngCore,
325        src: SocketAddr,
326        dst: SocketAddr,
327        message: Protocol,
328    ) {
329        tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Send");
330
331        self.rand_partition_or_repair(global_config, rand);
332        self.enqueue(global_config, rand, src, dst, message);
333        self.process_deliverables();
334    }
335
336    fn get_state_for_message(&self, src: IpAddr, dst: IpAddr) -> State {
337        // Between each pair of ip-addresses there can exist a link.
338        // A link between two such pairs can be constructed as
339        // Pair::new(x,y) or Pair::new(y,x). These two expressions create
340        // the exact same link. We denote one of the ends of the link as 'a',
341        // and the other 'b'. 'a' is always the one that compares smaller, i.e,
342        // `a < b` holds.
343        if src < dst {
344            self.state_a_b
345        } else {
346            self.state_b_a
347        }
348    }
349
350    // src -> link -> dst
351    //        ^-- you are here!
352    //
353    // Messages may be dropped, sit on the link for a while (due to latency, or
354    // because the link has stalled), or be delivered immediately.
355    fn enqueue(
356        &mut self,
357        global_config: &config::Link,
358        rand: &mut dyn RngCore,
359        src: SocketAddr,
360        dst: SocketAddr,
361        message: Protocol,
362    ) {
363        let state = self.get_state_for_message(src.ip(), dst.ip());
364        let status = match state {
365            State::Healthy => {
366                let delay = self.delay(global_config.latency(), rand);
367                DeliveryStatus::DeliverAfter(self.now + delay)
368            }
369            // Only A->B is blocked, so B can send, so we can send if src is B
370            State::Hold => {
371                tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Hold");
372                DeliveryStatus::Hold
373            }
374            _ => {
375                tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Drop");
376                return;
377            }
378        };
379
380        let sent = Sent {
381            src,
382            dst,
383            status,
384            protocol: message,
385        };
386
387        self.sent.push_back(sent);
388    }
389
390    fn tick(&mut self, now: Instant) {
391        self.now = now;
392        self.process_deliverables();
393    }
394
395    fn process_deliverables(&mut self) {
396        // TODO: `drain_filter` is not yet stable, and so we have a low quality
397        // implementation here that avoids clones.
398        let mut deliverable = 0;
399        for i in 0..self.sent.len() {
400            let index = i - deliverable;
401            let sent = &self.sent[index];
402            if let DeliveryStatus::DeliverAfter(time) = sent.status {
403                if time <= self.now {
404                    let sent = self.sent.remove(index).unwrap();
405                    let envelope = Envelope {
406                        src: sent.src,
407                        dst: sent.dst,
408                        message: sent.protocol,
409                    };
410                    self.deliverable
411                        .entry(sent.dst.ip())
412                        .or_default()
413                        .push_back(envelope);
414                    deliverable += 1;
415                }
416            }
417        }
418    }
419
420    // FIXME: This implementation does not respect message delivery order. If
421    // host A and host B are ordered (by addr), and B sends before A, then this
422    // method will deliver A's message before B's.
423    fn deliver_messages(
424        &mut self,
425        global_config: &config::Link,
426        rand: &mut dyn RngCore,
427        host: &mut Host,
428    ) {
429        let deliverable = self
430            .deliverable
431            .entry(host.addr)
432            .or_default()
433            .drain(..)
434            .collect::<Vec<Envelope>>();
435
436        for message in deliverable {
437            let (src, dst) = (message.src, message.dst);
438            if let Err(message) = host.receive_from_network(message) {
439                self.enqueue_message(global_config, rand, dst, src, message);
440            }
441        }
442    }
443
444    // Randomly break or repair this link.
445    fn rand_partition_or_repair(&mut self, global_config: &config::Link, rand: &mut dyn RngCore) {
446        let do_rand = self.rand_partition(global_config.message_loss(), rand);
447        match (self.state_a_b, self.state_b_a) {
448            (State::Healthy, _) | (_, State::Healthy) => {
449                if do_rand {
450                    self.state_a_b = State::RandPartition;
451                    self.state_b_a = State::RandPartition;
452                }
453            }
454            (State::RandPartition, _) | (_, State::RandPartition) => {
455                if self.rand_repair(global_config.message_loss(), rand) {
456                    self.release();
457                }
458            }
459            _ => {}
460        }
461    }
462
463    fn hold(&mut self) {
464        self.state_a_b = State::Hold;
465        self.state_b_a = State::Hold;
466    }
467
468    // This link becomes healthy, and any held messages are scheduled for delivery.
469    fn release(&mut self) {
470        self.state_a_b = State::Healthy;
471        self.state_b_a = State::Healthy;
472        for sent in &mut self.sent {
473            if let DeliveryStatus::Hold = sent.status {
474                sent.deliver(self.now);
475            }
476        }
477    }
478
479    fn explicit_partition(&mut self) {
480        self.state_a_b = State::ExplicitPartition;
481        self.state_b_a = State::ExplicitPartition;
482    }
483
484    fn partition_oneway(&mut self, from: IpAddr, to: IpAddr) {
485        if from < to {
486            self.state_a_b = State::ExplicitPartition;
487        } else {
488            self.state_b_a = State::ExplicitPartition;
489        }
490    }
491
492    fn repair_oneway(&mut self, from: IpAddr, to: IpAddr) {
493        if from < to {
494            self.state_a_b = State::Healthy;
495        } else {
496            self.state_b_a = State::Healthy;
497        }
498    }
499
500    // Repair the link, without releasing any held messages.
501    fn explicit_repair(&mut self) {
502        self.state_a_b = State::Healthy;
503        self.state_b_a = State::Healthy;
504    }
505
506    /// Should the link be randomly partitioned
507    fn rand_partition(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
508        let config = self.config.message_loss.as_ref().unwrap_or(global);
509        let fail_rate = config.fail_rate;
510        fail_rate > 0.0 && rand.gen_bool(fail_rate)
511    }
512
513    fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
514        let config = self.config.message_loss.as_ref().unwrap_or(global);
515        let repair_rate = config.repair_rate;
516        repair_rate > 0.0 && rand.gen_bool(repair_rate)
517    }
518
519    fn delay(&self, global: &config::Latency, rand: &mut dyn RngCore) -> Duration {
520        let config = self.config.latency.as_ref().unwrap_or(global);
521
522        let mult = config.latency_distribution.sample(rand);
523        let range = (config.max_message_latency - config.min_message_latency).as_millis() as f64;
524        let delay = config.min_message_latency + Duration::from_millis((range * mult) as _);
525
526        std::cmp::min(delay, config.max_message_latency)
527    }
528
529    fn latency(&mut self, global: &config::Latency) -> &mut config::Latency {
530        self.config.latency.get_or_insert_with(|| global.clone())
531    }
532
533    fn message_loss(&mut self, global: &config::MessageLoss) -> &mut config::MessageLoss {
534        self.config
535            .message_loss
536            .get_or_insert_with(|| global.clone())
537    }
538}