Skip to main content

turmoil/
top.rs

1use crate::envelope::{Envelope, Protocol};
2use crate::host::Host;
3use crate::rt::Rt;
4use crate::{config, TRACING_TARGET};
5
6use indexmap::IndexMap;
7use rand::{Rng, RngCore};
8use rand_distr::{Distribution, Exp};
9use std::collections::VecDeque;
10use std::io::{Error, ErrorKind, Result};
11use std::net::{IpAddr, SocketAddr};
12use std::time::Duration;
13use tokio::time::Instant;
14
15/// Describes the network topology.
16pub(crate) struct Topology {
17    config: config::Link,
18
19    /// Specific configuration overrides between specific hosts.
20    links: IndexMap<Pair, Link>,
21
22    /// We don't use a Rt for async. Right now, we just use it to tick time
23    /// forward in the same way we do it elsewhere. We'd like to represent
24    /// network state with async in the future.
25    rt: Rt<'static>,
26}
27
28/// This type is used as the key in the [`Topology::links`] map. See [`new`]
29/// which orders the addrs, such that this type uniquely identifies the link
30/// between two hosts on the network.
31#[derive(Debug, Clone, Hash, Eq, PartialEq)]
32struct Pair(IpAddr, IpAddr);
33
34impl Pair {
35    fn new(a: IpAddr, b: IpAddr) -> Pair {
36        assert_ne!(a, b);
37
38        if a < b {
39            Pair(a, b)
40        } else {
41            Pair(b, a)
42        }
43    }
44}
45
46/// An iterator for the network topology, providing access to all active links
47/// in the simulated network.
48pub struct LinksIter<'a> {
49    iter: indexmap::map::IterMut<'a, Pair, Link>,
50}
51
52/// An iterator for the link, providing access to sent messages that have not
53/// yet been delivered.
54pub struct LinkIter<'a> {
55    a: IpAddr,
56    b: IpAddr,
57    now: Instant,
58    iter: std::collections::vec_deque::IterMut<'a, Sent>,
59}
60
61impl LinkIter<'_> {
62    /// The [`IpAddr`] pair for the link. Always ordered to uniquely identify
63    /// the link.
64    pub fn pair(&self) -> (IpAddr, IpAddr) {
65        (self.a, self.b)
66    }
67
68    /// Schedule all messages on the link for delivery the next time the
69    /// simulation steps, consuming the iterator.
70    pub fn deliver_all(self) {
71        for sent in self {
72            sent.deliver();
73        }
74    }
75}
76
77/// Provides a reference to a message that is currently inflight on the network
78/// from one host to another.
79pub struct SentRef<'a> {
80    src: SocketAddr,
81    dst: SocketAddr,
82    now: Instant,
83    sent: &'a mut Sent,
84}
85
86impl SentRef<'_> {
87    /// The (src, dst) [`SocketAddr`] pair for the message.
88    pub fn pair(&self) -> (SocketAddr, SocketAddr) {
89        (self.src, self.dst)
90    }
91
92    /// The message [`Protocol`].
93    pub fn protocol(&self) -> &Protocol {
94        &self.sent.protocol
95    }
96
97    /// Schedule the message for delivery the next time the simulation steps,
98    /// consuming the item.
99    pub fn deliver(self) {
100        self.sent.deliver(self.now);
101    }
102}
103
104impl<'a> Iterator for LinksIter<'a> {
105    type Item = LinkIter<'a>;
106
107    fn next(&mut self) -> Option<Self::Item> {
108        let (pair, link) = self.iter.next()?;
109
110        Some(LinkIter {
111            a: pair.0,
112            b: pair.1,
113            now: link.now,
114            iter: link.sent.iter_mut(),
115        })
116    }
117}
118
119impl<'a> Iterator for LinkIter<'a> {
120    type Item = SentRef<'a>;
121
122    fn next(&mut self) -> Option<Self::Item> {
123        let sent = self.iter.next()?;
124
125        Some(SentRef {
126            src: sent.src,
127            dst: sent.dst,
128            now: self.now,
129            sent,
130        })
131    }
132}
133
134/// A two-way link between two hosts on the network.
135struct Link {
136    /// The state of the link from node 'a' to node 'b'.
137    /// Of the two nodes attached to a link, 'a' is the
138    /// one whose ip address sorts smallest.
139    state_a_b: State,
140    /// The state of the link from node 'b' to node 'a'.
141    /// Of the two nodes attached to a link, 'b' is the
142    /// one whose ip address sorts greatest.
143    state_b_a: State,
144
145    /// Optional, per-link configuration.
146    config: config::Link,
147
148    /// Sent messages that are either scheduled for delivery in the future
149    /// or are on hold.
150    sent: VecDeque<Sent>,
151
152    /// Messages that are ready to be delivered.
153    deliverable: IndexMap<IpAddr, VecDeque<Envelope>>,
154
155    /// The current network time, moved forward with [`Link::tick`].
156    now: Instant,
157}
158
159/// States that a link between two nodes can be in.
160#[derive(Clone, Copy)]
161enum State {
162    /// The link is healthy.
163    Healthy,
164
165    /// The link was explicitly partitioned.
166    ExplicitPartition,
167
168    /// The link was randomly partitioned.
169    RandPartition,
170
171    /// Messages are being held indefinitely.
172    Hold,
173}
174
175impl Topology {
176    pub(crate) fn new(config: config::Link) -> Topology {
177        Topology {
178            config,
179            links: IndexMap::new(),
180            rt: Rt::no_software(),
181        }
182    }
183
184    /// Register a link between two hosts
185    pub(crate) fn register(&mut self, a: IpAddr, b: IpAddr) {
186        let pair = Pair::new(a, b);
187        assert!(self.links.insert(pair, Link::new(self.rt.now())).is_none());
188    }
189
190    pub(crate) fn set_max_message_latency(&mut self, value: Duration) {
191        self.config.latency_mut().max_message_latency = value;
192    }
193
194    pub(crate) fn set_link_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
195        let pair = Pair::new(a, b);
196        let latency = self
197            .links
198            .get_mut(&pair)
199            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
200            .latency(self.config.latency());
201        latency.min_message_latency = value;
202        latency.max_message_latency = value;
203    }
204
205    pub(crate) fn set_link_max_message_latency(&mut self, a: IpAddr, b: IpAddr, value: Duration) {
206        let pair = Pair::new(a, b);
207        self.links
208            .get_mut(&pair)
209            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
210            .latency(self.config.latency())
211            .max_message_latency = value;
212    }
213
214    pub(crate) fn set_message_latency_curve(&mut self, value: f64) {
215        self.config.latency_mut().latency_distribution = Exp::new(value).unwrap();
216    }
217
218    pub(crate) fn set_fail_rate(&mut self, value: f64) {
219        self.config.message_loss_mut().fail_rate = value;
220    }
221
222    pub(crate) fn set_link_fail_rate(&mut self, a: IpAddr, b: IpAddr, value: f64) {
223        let pair = Pair::new(a, b);
224        self.links
225            .get_mut(&pair)
226            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
227            .message_loss(self.config.message_loss())
228            .fail_rate = value;
229    }
230
231    // Send a `message` from `src` to `dst`. This method returns immediately,
232    // and message delivery happens at a later time (or never, if the link is
233    // broken).
234    pub(crate) fn enqueue_message(
235        &mut self,
236        rand: &mut dyn RngCore,
237        src: SocketAddr,
238        dst: SocketAddr,
239        message: Protocol,
240    ) -> Result<()> {
241        if let Some(link) = self.links.get_mut(&Pair::new(src.ip(), dst.ip())) {
242            link.enqueue_message(&self.config, rand, src, dst, message);
243            Ok(())
244        } else {
245            Err(Error::new(
246                ErrorKind::ConnectionRefused,
247                "Connection refused",
248            ))
249        }
250    }
251
252    // Move messages from any network links to the `dst` host.
253    pub(crate) fn deliver_messages(&mut self, rand: &mut dyn RngCore, dst: &mut Host) {
254        for (pair, link) in &mut self.links {
255            if pair.0 == dst.addr || pair.1 == dst.addr {
256                link.deliver_messages(&self.config, rand, dst);
257            }
258        }
259    }
260
261    pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
262        self.get_link_mut(&Pair::new(a, b)).hold();
263    }
264
265    pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
266        self.get_link_mut(&Pair::new(a, b)).release();
267    }
268
269    pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
270        self.get_link_mut(&Pair::new(a, b)).explicit_partition();
271    }
272
273    pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
274        let link = &mut self.get_link_mut(&Pair::new(a, b));
275        link.partition_oneway(a, b);
276    }
277
278    pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
279        self.get_link_mut(&Pair::new(a, b)).explicit_repair();
280    }
281
282    pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
283        let link = &mut self.get_link_mut(&Pair::new(a, b));
284        link.repair_oneway(a, b);
285    }
286
287    pub(crate) fn tick_by(&mut self, duration: Duration) {
288        let _ = self.rt.tick(duration);
289        for link in self.links.values_mut() {
290            link.tick(self.rt.now());
291        }
292    }
293
294    pub(crate) fn iter_mut(&mut self) -> LinksIter<'_> {
295        LinksIter {
296            iter: self.links.iter_mut(),
297        }
298    }
299
300    #[inline]
301    fn get_link_mut(&mut self, pair: &Pair) -> &mut Link {
302        self.links
303            .get_mut(pair)
304            .unwrap_or_else(|| panic!("unable to find link between {pair:?}"))
305    }
306}
307
308/// Represents a message sent between two hosts on the network.
309struct Sent {
310    src: SocketAddr,
311    dst: SocketAddr,
312    status: DeliveryStatus,
313    protocol: Protocol,
314}
315
316impl Sent {
317    fn deliver(&mut self, now: Instant) {
318        self.status = DeliveryStatus::DeliverAfter(now);
319    }
320}
321
322enum DeliveryStatus {
323    DeliverAfter(Instant),
324    Hold,
325}
326
327impl Link {
328    fn new(now: Instant) -> Link {
329        Link {
330            state_a_b: State::Healthy,
331            state_b_a: State::Healthy,
332            config: config::Link::default(),
333            sent: VecDeque::new(),
334            deliverable: IndexMap::new(),
335            now,
336        }
337    }
338
339    fn enqueue_message(
340        &mut self,
341        global_config: &config::Link,
342        rand: &mut dyn RngCore,
343        src: SocketAddr,
344        dst: SocketAddr,
345        message: Protocol,
346    ) {
347        tracing::trace!(target: TRACING_TARGET, ?src, ?dst, protocol = %message, "Send");
348
349        self.rand_partition_or_repair(global_config, rand);
350        self.enqueue(global_config, rand, src, dst, message);
351        self.process_deliverables();
352    }
353
354    fn get_state_for_message(&self, src: IpAddr, dst: IpAddr) -> State {
355        // Between each pair of ip-addresses there can exist a link.
356        // A link between two such pairs can be constructed as
357        // Pair::new(x,y) or Pair::new(y,x). These two expressions create
358        // the exact same link. We denote one of the ends of the link as 'a',
359        // and the other 'b'. 'a' is always the one that compares smaller, i.e,
360        // `a < b` holds.
361        if src < dst {
362            self.state_a_b
363        } else {
364            self.state_b_a
365        }
366    }
367
368    // src -> link -> dst
369    //        ^-- you are here!
370    //
371    // Messages may be dropped, sit on the link for a while (due to latency, or
372    // because the link has stalled), or be delivered immediately.
373    fn enqueue(
374        &mut self,
375        global_config: &config::Link,
376        rand: &mut dyn RngCore,
377        src: SocketAddr,
378        dst: SocketAddr,
379        message: Protocol,
380    ) {
381        let state = self.get_state_for_message(src.ip(), dst.ip());
382        let status = match state {
383            State::Healthy => {
384                let delay = self.delay(global_config.latency(), rand);
385                DeliveryStatus::DeliverAfter(self.now + delay)
386            }
387            // Only A->B is blocked, so B can send, so we can send if src is B
388            State::Hold => {
389                tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Hold");
390                DeliveryStatus::Hold
391            }
392            _ => {
393                tracing::trace!(target: TRACING_TARGET,?src, ?dst, protocol = %message, "Drop");
394                return;
395            }
396        };
397
398        let sent = Sent {
399            src,
400            dst,
401            status,
402            protocol: message,
403        };
404
405        self.sent.push_back(sent);
406    }
407
408    fn tick(&mut self, now: Instant) {
409        self.now = now;
410        self.process_deliverables();
411    }
412
413    fn process_deliverables(&mut self) {
414        // TODO: `drain_filter` is not yet stable, and so we have a low quality
415        // implementation here that avoids clones.
416        let mut deliverable = 0;
417        for i in 0..self.sent.len() {
418            let index = i - deliverable;
419            let sent = &self.sent[index];
420            if let DeliveryStatus::DeliverAfter(time) = sent.status {
421                if time <= self.now {
422                    let sent = self.sent.remove(index).unwrap();
423                    let envelope = Envelope {
424                        src: sent.src,
425                        dst: sent.dst,
426                        message: sent.protocol,
427                    };
428                    self.deliverable
429                        .entry(sent.dst.ip())
430                        .or_default()
431                        .push_back(envelope);
432                    deliverable += 1;
433                }
434            }
435        }
436    }
437
438    // FIXME: This implementation does not respect message delivery order. If
439    // host A and host B are ordered (by addr), and B sends before A, then this
440    // method will deliver A's message before B's.
441    fn deliver_messages(
442        &mut self,
443        global_config: &config::Link,
444        rand: &mut dyn RngCore,
445        host: &mut Host,
446    ) {
447        let deliverable = self
448            .deliverable
449            .entry(host.addr)
450            .or_default()
451            .drain(..)
452            .collect::<Vec<Envelope>>();
453
454        for message in deliverable {
455            let (src, dst) = (message.src, message.dst);
456            if let Err(message) = host.receive_from_network(message) {
457                self.enqueue_message(global_config, rand, dst, src, message);
458            }
459        }
460    }
461
462    // Randomly break or repair this link.
463    fn rand_partition_or_repair(&mut self, global_config: &config::Link, rand: &mut dyn RngCore) {
464        let do_rand = self.rand_partition(global_config.message_loss(), rand);
465        match (self.state_a_b, self.state_b_a) {
466            (State::Healthy, _) | (_, State::Healthy) => {
467                if do_rand {
468                    self.state_a_b = State::RandPartition;
469                    self.state_b_a = State::RandPartition;
470
471                    self.sent.clear();
472                }
473            }
474            (State::RandPartition, _) | (_, State::RandPartition) => {
475                if self.rand_repair(global_config.message_loss(), rand) {
476                    self.release();
477                }
478            }
479            _ => {}
480        }
481    }
482
483    fn hold(&mut self) {
484        self.state_a_b = State::Hold;
485        self.state_b_a = State::Hold;
486
487        for sent in &mut self.sent {
488            sent.status = DeliveryStatus::Hold;
489        }
490    }
491
492    // This link becomes healthy, and any held messages are scheduled for delivery.
493    fn release(&mut self) {
494        self.state_a_b = State::Healthy;
495        self.state_b_a = State::Healthy;
496        for sent in &mut self.sent {
497            if let DeliveryStatus::Hold = sent.status {
498                sent.deliver(self.now);
499            }
500        }
501    }
502
503    fn explicit_partition(&mut self) {
504        self.state_a_b = State::ExplicitPartition;
505        self.state_b_a = State::ExplicitPartition;
506
507        self.sent.clear();
508    }
509
510    fn partition_oneway(&mut self, from: IpAddr, to: IpAddr) {
511        if from < to {
512            self.state_a_b = State::ExplicitPartition;
513        } else {
514            self.state_b_a = State::ExplicitPartition;
515        }
516
517        self.sent.retain(|sent| sent.src.ip() != from);
518    }
519
520    fn repair_oneway(&mut self, from: IpAddr, to: IpAddr) {
521        if from < to {
522            self.state_a_b = State::Healthy;
523        } else {
524            self.state_b_a = State::Healthy;
525        }
526    }
527
528    // Repair the link, without releasing any held messages.
529    fn explicit_repair(&mut self) {
530        self.state_a_b = State::Healthy;
531        self.state_b_a = State::Healthy;
532    }
533
534    /// Should the link be randomly partitioned
535    fn rand_partition(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
536        let config = self.config.message_loss.as_ref().unwrap_or(global);
537        let fail_rate = config.fail_rate;
538        fail_rate > 0.0 && rand.random_bool(fail_rate)
539    }
540
541    fn rand_repair(&self, global: &config::MessageLoss, rand: &mut dyn RngCore) -> bool {
542        let config = self.config.message_loss.as_ref().unwrap_or(global);
543        let repair_rate = config.repair_rate;
544        repair_rate > 0.0 && rand.random_bool(repair_rate)
545    }
546
547    fn delay(&self, global: &config::Latency, rand: &mut dyn RngCore) -> Duration {
548        let config = self.config.latency.as_ref().unwrap_or(global);
549
550        let mult = config.latency_distribution.sample(rand);
551        let range = (config.max_message_latency - config.min_message_latency).as_millis() as f64;
552        let delay = config.min_message_latency + Duration::from_millis((range * mult) as _);
553
554        std::cmp::min(delay, config.max_message_latency)
555    }
556
557    fn latency(&mut self, global: &config::Latency) -> &mut config::Latency {
558        self.config.latency.get_or_insert_with(|| global.clone())
559    }
560
561    fn message_loss(&mut self, global: &config::MessageLoss) -> &mut config::MessageLoss {
562        self.config
563            .message_loss
564            .get_or_insert_with(|| global.clone())
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use crate::config::Link;
571
572    use super::*;
573
574    #[test]
575    #[should_panic(expected = "unable to find link between Pair(10.0.0.1, 192.168.0.1)")]
576    fn link_access_gives_useful_panic_message() {
577        let mut topo = Topology::new(Link::default());
578        topo.hold("10.0.0.1".parse().unwrap(), "192.168.0.1".parse().unwrap());
579    }
580}