Skip to main content

turmoil/
top.rs

1use crate::envelope::{Envelope, Protocol};
2use crate::host::Host;
3use crate::rt::Rt;
4use crate::{config, TRACING_TARGET};
5
6use indexmap::IndexMap;
7use rand::{Rng, RngCore};
8use rand_distr::{Distribution, Exp};
9use std::collections::VecDeque;
10use std::io::{Error, ErrorKind, Result};
11use std::net::{IpAddr, SocketAddr};
12use std::time::Duration;
13use tokio::time::Instant;
14
15/// Describes the network topology.
16pub(crate) struct Topology {
17    config: config::Link,
18
19    /// Specific configuration overrides between specific hosts.
20    links: IndexMap<Pair, Link>,
21
22    /// We don't use a Rt for async. Right now, we just use it to tick time
23    /// forward in the same way we do it elsewhere. We'd like to represent
24    /// network state with async in the future.
25    rt: Rt<'static>,
26}
27
28/// This type is used as the key in the [`Topology::links`] map. See [`new`]
29/// which orders the addrs, such that this type uniquely identifies the link
30/// between two hosts on the network.
31#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32struct Pair(IpAddr, IpAddr);
33
34impl Pair {
35    fn new(a: IpAddr, b: IpAddr) -> Pair {
36        assert_ne!(a, b);
37
38        if a < b {
39            Pair(a, b)
40        } else {
41            Pair(b, a)
42        }
43    }
44}
45
46/// An iterator for the network topology, providing access to all active links
47/// in the simulated network.
48pub struct LinksIter<'a> {
49    iter: indexmap::map::IterMut<'a, Pair, Link>,
50}
51
52/// An iterator for the link, providing access to sent messages that have not
53/// yet been delivered.
54pub struct LinkIter<'a> {
55    a: IpAddr,
56    b: IpAddr,
57    now: Instant,
58    iter: std::collections::vec_deque::IterMut<'a, Sent>,
59}
60
61impl LinkIter<'_> {
62    /// The [`IpAddr`] pair for the link. Always ordered to uniquely identify
63    /// the link.
64    pub fn pair(&self) -> (IpAddr, IpAddr) {
65        (self.a, self.b)
66    }
67
68    /// Schedule all messages on the link for delivery the next time the
69    /// simulation steps, consuming the iterator.
70    pub fn deliver_all(self) {
71        for sent in self {
72            sent.deliver();
73        }
74    }
75}
76
77/// Provides a reference to a message that is currently inflight on the network
78/// from one host to another.
79pub struct SentRef<'a> {
80    src: SocketAddr,
81    dst: SocketAddr,
82    now: Instant,
83    sent: &'a mut Sent,
84}
85
86impl SentRef<'_> {
87    /// The (src, dst) [`SocketAddr`] pair for the message.
88    pub fn pair(&self) -> (SocketAddr, SocketAddr) {
89        (self.src, self.dst)
90    }
91
92    /// The message [`Protocol`].
93    pub fn protocol(&self) -> &Protocol {
94        &self.sent.protocol
95    }
96
97    /// Schedule the message for delivery the next time the simulation steps,
98    /// consuming the item.
99    pub fn deliver(self) {
100        self.sent.deliver(self.now);
101    }
102}
103
104impl<'a> Iterator for LinksIter<'a> {
105    type Item = LinkIter<'a>;
106
107    fn next(&mut self) -> Option<Self::Item> {
108        let (pair, link) = self.iter.next()?;
109
110        Some(LinkIter {
111            a: pair.0,
112            b: pair.1,
113            now: link.now,
114            iter: link.sent.iter_mut(),
115        })
116    }
117}
118
119impl<'a> Iterator for LinkIter<'a> {
120    type Item = SentRef<'a>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        let sent = self.iter.next()?;
124
125        Some(SentRef {
126            src: sent.src,
127            dst: sent.dst,
128            now: self.now,
129            sent,
130        })
131    }
132}
133
134/// A two-way link between two hosts on the network.
135struct Link {
136    /// The state of the link from node 'a' to node 'b'.
137    /// Of the two nodes attached to a link, 'a' is the
138    /// one whose ip address sorts smallest.
139    state_a_b: State,
140    /// The state of the link from node 'b' to node 'a'.
141    /// Of the two nodes attached to a link, 'b' is the
142    /// one whose ip address sorts greatest.
143    state_b_a: State,
144
145    /// Optional, per-link configuration.
146    config: config::Link,
147
148    /// Sent messages that are either scheduled for delivery in the future
149    /// or are on hold.
150    sent: VecDeque<Sent>,
151
152    /// Messages that are ready to be delivered.
153    deliverable: IndexMap<IpAddr, VecDeque<Envelope>>,
154
155    /// The current network time, moved forward with [`Link::tick`].
156    now: Instant,
157}
158
159/// States that a link between two nodes can be in.
160#[derive(Clone, Copy)]
161enum State {
162    /// The link is healthy.
163    Healthy,
164
165    /// The link was explicitly partitioned.
166    ExplicitPartition,
167
168    /// The link was randomly partitioned.
169    RandPartition,
170
171    /// Messages are being held indefinitely.
172    Hold,
173}
174
175impl Topology {
176    pub(crate) fn new(config: config::Link) -> Topology {
177        Topology {
178            config,
179            links: IndexMap::new(),
180            rt: Rt::no_software(),
181        }
182    }
183
184    /// Register a link between two hosts
185    pub(crate) fn register(&mut self, a: IpAddr, b: IpAddr) {
186        let pair = Pair::new(a, b);
187        assert!(self.links.insert(pair, Link::new(self.rt.now())).is_none());
188    }
189
190    pub(crate) fn set_max_message_latency(&mut self, value: Duration) {
191        self.config.latency_mut().max_message_latency = value;
192    }
193
194    pub(crate) fn set_link_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
195        let pair = Pair::new(a, b);
196        let latency = self
197            .links
198            .get_mut(&pair)
199            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
200            .latency(self.config.latency());
201        latency.min_message_latency = value;
202        latency.max_message_latency = value;
203    }
204
205    pub(crate) fn set_link_max_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
206        let pair = Pair::new(a, b);
207        self.links
208            .get_mut(&pair)
209            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
210            .latency(self.config.latency())
211            .max_message_latency = value;
212    }
213
214    pub(crate) fn set_message_latency_curve(&mut self, value: f64) {
215        self.config.latency_mut().latency_distribution = Exp::new(value).unwrap();
216    }
217
218    pub(crate) fn set_fail_rate(&mut self, value: f64) {
219        self.config.message_loss_mut().fail_rate = value;
220    }
221
222    pub(crate) fn set_link_fail_rate(&mut self, a: IpAddr, b: IpAddr, value: f64) {
223        let pair = Pair::new(a, b);
224        self.links
225            .get_mut(&pair)
226            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
227            .message_loss(self.config.message_loss())
228            .fail_rate = value;
229    }
230
231    // Send a `message` from `src` to `dst`. This method returns immediately,
232    // and message delivery happens at a later time (or never, if the link is
233    // broken).
234    pub(crate) fn enqueue_message(
235        &mut self,
236        rand: &mut dyn RngCore,
237        src: SocketAddr,
238        dst: SocketAddr,
239        message: Protocol,
240    ) -> Result<()> {
241        if let Some(link) = self.links.get_mut(&Pair::new(src.ip(), dst.ip())) {
242            link.enqueue_message(&self.config, rand, src, dst, message)
243        } else {
244            Err(Error::new(
245                ErrorKind::ConnectionRefused,
246                "Connection refused",
247            ))
248        }
249    }
250
251    // Move messages from any network links to the `dst` host.
252    pub(crate) fn deliver_messages(&mut self, rand: &mut dyn RngCore, dst: &mut Host) {
253        for (pair, link) in &mut self.links {
254            if pair.0 == dst.addr || pair.1 == dst.addr {
255                link.deliver_messages(&self.config, rand, dst);
256            }
257        }
258    }
259
260    pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
261        self.get_link_mut(&Pair::new(a, b)).hold();
262    }
263
264    pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
265        self.get_link_mut(&Pair::new(a, b)).release();
266    }
267
268    pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
269        self.get_link_mut(&Pair::new(a, b)).explicit_partition();
270    }
271
272    pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
273        let link = &mut self.get_link_mut(&Pair::new(a, b));
274        link.partition_oneway(a, b);
275    }
276
277    pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
278        self.get_link_mut(&Pair::new(a, b)).explicit_repair();
279    }
280
281    pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
282        let link = &mut self.get_link_mut(&Pair::new(a, b));
283        link.repair_oneway(a, b);
284    }
285
286    pub(crate) fn tick_by(&mut self, duration: Duration) {
287        let _ = self.rt.tick(duration);
288        for link in self.links.values_mut() {
289            link.tick(self.rt.now());
290        }
291    }
292
293    pub(crate) fn iter_mut(&mut self) -> LinksIter<'_> {
294        LinksIter {
295            iter: self.links.iter_mut(),
296        }
297    }
298
299    #[inline]
300    fn get_link_mut(&mut self, pair: &Pair) -> &mut Link {
301        self.links
302            .get_mut(pair)
303            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
304    }
305}
306
307/// Represents a message sent between two hosts on the network.
308struct Sent {
309    src: SocketAddr,
310    dst: SocketAddr,
311    status: DeliveryStatus,
312    protocol: Protocol,
313}
314
315impl Sent {
316    fn deliver(&mut self, now: Instant) {
317        self.status = DeliveryStatus::DeliverAfter(now);
318    }
319}
320
321enum DeliveryStatus {
322    DeliverAfter(Instant),
323    Hold,
324}
325
326impl Link {
327    fn new(now: Instant) -> Link {
328        Link {
329            state_a_b: State::Healthy,
330            state_b_a: State::Healthy,
331            config: config::Link::default(),
332            sent: VecDeque::new(),
333            deliverable: IndexMap::new(),
334            now,
335        }
336    }
337
338    fn enqueue_message(
339        &mut self,
340        global_config: &config::Link,
341        rand: &mut dyn RngCore,
342        src: SocketAddr,
343        dst: SocketAddr,
344        message: Protocol,
345    ) -> Result<()> {
346        tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Send");
347
348        self.rand_partition_or_repair(global_config, rand);
349        let result = self.enqueue(global_config, rand, src, dst, message);
350        self.process_deliverables();
351        result
352    }
353
354    fn get_state_for_message(&self, src: IpAddr, dst: IpAddr) -> State {
355        // Between each pair of ip-addresses there can exist a link.
356        // A link between two such pairs can be constructed as
357        // Pair::new(x,y) or Pair::new(y,x). These two expressions create
358        // the exact same link. We denote one of the ends of the link as 'a',
359        // and the other 'b'. 'a' is always the one that compares smaller, i.e,
360        // `a < b` holds.
361        if src < dst {
362            self.state_a_b
363        } else {
364            self.state_b_a
365        }
366    }
367
368    // src -> link -> dst
369    //        ^-- you are here!
370    //
371    // Messages may be dropped, sit on the link for a while (due to latency, or
372    // because the link has stalled), or be delivered immediately.
373    // Link will return an Err(()) if the socket buffer is full.
374    fn enqueue(
375        &mut self,
376        global_config: &config::Link,
377        rand: &mut dyn RngCore,
378        src: SocketAddr,
379        dst: SocketAddr,
380        message: Protocol,
381    ) -> Result<()> {
382        let state = self.get_state_for_message(src.ip(), dst.ip());
383        let status = match state {
384            State::Healthy => {
385                let delay = self.delay(global_config.latency(), rand);
386                DeliveryStatus::DeliverAfter(self.now + delay)
387            }
388            // Only A->B is blocked, so B can send, so we can send if src is B
389            State::Hold => {
390                tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Hold");
391                DeliveryStatus::Hold
392            }
393            _ => {
394                tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Drop");
395                return Ok(());
396            }
397        };
398
399        let sent = Sent {
400            src,
401            dst,
402            status,
403            protocol: message,
404        };
405
406        self.sent.push_back(sent);
407        Ok(())
408    }
409
410    fn tick(&mut self, now: Instant) {
411        self.now = now;
412        self.process_deliverables();
413    }
414
415    fn process_deliverables(&mut self) {
416        // TODO: `drain_filter` is not yet stable, and so we have a low quality
417        // implementation here that avoids clones.
418        let mut deliverable = 0;
419        for i in 0..self.sent.len() {
420            let index = i - deliverable;
421            let sent = &self.sent[index];
422            if let DeliveryStatus::DeliverAfter(time) = sent.status {
423                if time <= self.now {
424                    let sent = self.sent.remove(index).unwrap();
425                    let envelope = Envelope {
426                        src: sent.src,
427                        dst: sent.dst,
428                        message: sent.protocol,
429                    };
430                    self.deliverable
431                        .entry(sent.dst.ip())
432                        .or_default()
433                        .push_back(envelope);
434                    deliverable += 1;
435                }
436            }
437        }
438    }
439
440    // FIXME: This implementation does not respect message delivery order. If
441    // host A and host B are ordered (by addr), and B sends before A, then this
442    // method will deliver A's message before B's.
443    fn deliver_messages(
444        &mut self,
445        global_config: &config::Link,
446        rand: &mut dyn RngCore,
447        host: &mut Host,
448    ) {
449        let deliverable = self
450            .deliverable
451            .entry(host.addr)
452            .or_default()
453            .drain(..)
454            .collect::<Vec<Envelope>>();
455
456        for message in deliverable {
457            let (src, dst) = (message.src, message.dst);
458            if let Err(message) = host.receive_from_network(message) {
459                let _ = self.enqueue_message(global_config, rand, dst, src, message);
460            }
461        }
462    }
463
464    // Randomly break or repair this link.
465    fn rand_partition_or_repair(&mut self, global_config: &config::Link, rand: &mut dyn RngCore) {
466        let do_rand = self.rand_partition(global_config.message_loss(), rand);
467        match (self.state_a_b, self.state_b_a) {
468            (State::Healthy, _) | (_, State::Healthy) if do_rand => {
469                self.state_a_b = State::RandPartition;
470                self.state_b_a = State::RandPartition;
471
472                self.sent.clear();
473            }
474            (State::RandPartition, _) | (_, State::RandPartition)
475                if self.rand_repair(global_config.message_loss(), rand) =>
476            {
477                self.release();
478            }
479            _ => {}
480        }
481    }
482
483    fn hold(&mut self) {
484        self.state_a_b = State::Hold;
485        self.state_b_a = State::Hold;
486
487        for sent in &mut self.sent {
488            sent.status = DeliveryStatus::Hold;
489        }
490    }
491
492    // This link becomes healthy, and any held messages are scheduled for delivery.
493    fn release(&mut self) {
494        self.state_a_b = State::Healthy;
495        self.state_b_a = State::Healthy;
496        for sent in &mut self.sent {
497            if let DeliveryStatus::Hold = sent.status {
498                sent.deliver(self.now);
499            }
500        }
501    }
502
503    fn explicit_partition(&mut self) {
504        self.state_a_b = State::ExplicitPartition;
505        self.state_b_a = State::ExplicitPartition;
506
507        self.sent.clear();
508    }
509
510    fn partition_oneway(&mut self, from: IpAddr, to: IpAddr) {
511        if from < to {
512            self.state_a_b = State::ExplicitPartition;
513        } else {
514            self.state_b_a = State::ExplicitPartition;
515        }
516
517        self.sent.retain(|sent| sent.src.ip() != from);
518    }
519
520    fn repair_oneway(&mut self, from: IpAddr, to: IpAddr) {
521        if from < to {
522            self.state_a_b = State::Healthy;
523        } else {
524            self.state_b_a = State::Healthy;
525        }
526    }
527
528    // Repair the link, without releasing any held messages.
529    fn explicit_repair(&mut self) {
530        self.state_a_b = State::Healthy;
531        self.state_b_a = State::Healthy;
532    }
533
534    /// Should the link be randomly partitioned
535    fn rand_partition(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
536        let config = self.config.message_loss.as_ref().unwrap_or(global);
537        let fail_rate = config.fail_rate;
538        fail_rate > 0.0 && rand.random_bool(fail_rate)
539    }
540
541    fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
542        let config = self.config.message_loss.as_ref().unwrap_or(global);
543        let repair_rate = config.repair_rate;
544        repair_rate > 0.0 && rand.random_bool(repair_rate)
545    }
546
547    fn delay(&self, global: &config::Latency, rand: &mut dyn RngCore) -> Duration {
548        let config = self.config.latency.as_ref().unwrap_or(global);
549
550        let mult = config.latency_distribution.sample(rand);
551        let range = (config.max_message_latency - config.min_message_latency).as_millis() as f64;
552        let delay = config.min_message_latency + Duration::from_millis((range * mult) as _);
553
554        std::cmp::min(delay, config.max_message_latency)
555    }
556
557    fn latency(&mut self, global: &config::Latency) -> &mut config::Latency {
558        self.config.latency.get_or_insert_with(|| global.clone())
559    }
560
561    fn message_loss(&mut self, global: &config::MessageLoss) -> &mut config::MessageLoss {
562        self.config
563            .message_loss
564            .get_or_insert_with(|| global.clone())
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use crate::config::Link;
571
572    use super::*;
573
574    #[test]
575    #[should_panic(expected = "unable to find link between Pair(10.0.0.1, 192.168.0.1)")]
576    fn link_access_gives_useful_panic_message() {
577        let mut topo = Topology::new(Link::default());
578        topo.hold("10.0.0.1".parse().unwrap(), "192.168.0.1".parse().unwrap());
579    }
580}