turmoil/
world.rs

1use crate::config::Config;
2use crate::envelope::Protocol;
3use crate::host::HostTimer;
4use crate::ip::IpVersionAddrIter;
5use crate::net::udp::MulticastGroups;
6use crate::{
7    config, for_pairs, Dns, Host, Result as TurmoilResult, ToIpAddr, ToIpAddrs, Topology,
8    TRACING_TARGET,
9};
10
11use indexmap::IndexMap;
12use rand::RngCore;
13use scoped_tls::scoped_thread_local;
14use std::cell::RefCell;
15use std::io::Result;
16use std::net::{IpAddr, SocketAddr};
17use std::time::Duration;
18
19/// Tracks all the state for the simulated world.
20pub(crate) struct World {
21    /// Tracks all individual hosts
22    pub(crate) hosts: IndexMap<IpAddr, Host>,
23
24    /// Tracks how each host is connected to each other.
25    pub(crate) topology: Topology,
26
27    /// Maps hostnames to ip addresses.
28    pub(crate) dns: Dns,
29
30    // Maps multicast groups to udp destination addresses.
31    pub(crate) multicast_groups: MulticastGroups,
32
33    /// If set, this is the current host being executed.
34    pub(crate) current: Option<IpAddr>,
35
36    /// Random number generator used for all decisions. To make execution
37    /// determinstic, reuse the same seed.
38    pub(crate) rng: Box<dyn RngCore>,
39
40    /// Run duration for each host on every step.
41    // TODO: Remove this once we've cleaned up the loopback implementation hacks
42    pub(crate) tick_duration: Duration,
43}
44
45scoped_thread_local!(static CURRENT: RefCell<World>);
46
47impl World {
48    /// Initialize a new world.
49    pub(crate) fn new(
50        link: config::Link,
51        rng: Box<dyn RngCore>,
52        addrs: IpVersionAddrIter,
53        tick_duration: Duration,
54    ) -> World {
55        World {
56            hosts: IndexMap::new(),
57            topology: Topology::new(link),
58            dns: Dns::new(addrs),
59            multicast_groups: MulticastGroups::default(),
60            current: None,
61            rng,
62            tick_duration,
63        }
64    }
65
66    /// Run `f` on the world.
67    pub(crate) fn current<R>(f: impl FnOnce(&mut World) -> R) -> R {
68        CURRENT.with(|current| {
69            let mut current = current.borrow_mut();
70            f(&mut current)
71        })
72    }
73
74    /// Run `f` if the world is set - otherwise no-op.
75    ///
76    /// Used in drop paths, where the simulation may be shutting
77    /// down and we don't need to do anything.
78    pub(crate) fn current_if_set(f: impl FnOnce(&mut World)) {
79        if CURRENT.is_set() {
80            Self::current(f);
81        }
82    }
83
84    pub(crate) fn try_current<R>(f: impl FnOnce(&World) -> R) -> TurmoilResult<R> {
85        if CURRENT.is_set() {
86            CURRENT.with(|current| match current.try_borrow() {
87                Ok(world) => Ok(f(&world)),
88                Err(_) => Err("World already borrowed".into()),
89            })
90        } else {
91            Err("World not set".into())
92        }
93    }
94
95    pub(crate) fn enter<R>(world: &RefCell<World>, f: impl FnOnce() -> R) -> R {
96        CURRENT.set(world, f)
97    }
98
99    pub(crate) fn current_host_mut(&mut self) -> &mut Host {
100        let addr = self.current.expect("current host missing");
101        self.hosts.get_mut(&addr).expect("host missing")
102    }
103
104    pub(crate) fn current_host(&self) -> &Host {
105        let addr = self.current.expect("current host missing");
106        self.hosts.get(&addr).expect("host missing")
107    }
108
109    pub(crate) fn try_current_host(&self) -> TurmoilResult<&Host> {
110        let addr = self.current.ok_or("current host missing")?;
111        self.hosts.get(&addr).ok_or_else(|| "host missing".into())
112    }
113
114    pub(crate) fn lookup(&mut self, host: impl ToIpAddr) -> IpAddr {
115        self.dns.lookup(host)
116    }
117
118    pub(crate) fn reverse_lookup(&self, addr: IpAddr) -> Option<&str> {
119        self.dns.reverse(addr)
120    }
121
122    pub(crate) fn lookup_many(&mut self, hosts: impl ToIpAddrs) -> Vec<IpAddr> {
123        self.dns.lookup_many(hosts)
124    }
125
126    pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
127        self.topology.hold(a, b);
128    }
129
130    pub(crate) fn hold_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
131        let a = self.lookup_many(a);
132        let b = self.lookup_many(b);
133
134        for_pairs(&a, &b, |a, b| {
135            self.hold(a, b);
136        });
137    }
138
139    pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
140        self.topology.release(a, b);
141    }
142
143    pub(crate) fn release_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
144        let a = self.lookup_many(a);
145        let b = self.lookup_many(b);
146
147        for_pairs(&a, &b, |a, b| {
148            self.release(a, b);
149        });
150    }
151
152    pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
153        self.topology.partition(a, b);
154    }
155
156    pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
157        self.topology.partition_oneway(a, b);
158    }
159
160    pub(crate) fn partition_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
161        let a = self.lookup_many(a);
162        let b = self.lookup_many(b);
163
164        for_pairs(&a, &b, |a, b| {
165            self.partition(a, b);
166        });
167    }
168    pub(crate) fn partition_oneway_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
169        let a = self.lookup_many(a);
170        let b = self.lookup_many(b);
171
172        for_pairs(&a, &b, |a, b| {
173            self.partition_oneway(a, b);
174        });
175    }
176    pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
177        self.topology.repair(a, b);
178    }
179    pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
180        self.topology.repair_oneway(a, b);
181    }
182
183    pub(crate) fn repair_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
184        let a = self.lookup_many(a);
185        let b = self.lookup_many(b);
186
187        for_pairs(&a, &b, |a, b| {
188            self.repair(a, b);
189        });
190    }
191
192    pub(crate) fn repair_oneway_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
193        let a = self.lookup_many(a);
194        let b = self.lookup_many(b);
195
196        for_pairs(&a, &b, |a, b| {
197            self.repair_oneway(a, b);
198        });
199    }
200
201    pub(crate) fn est_tcp_streams(&mut self) -> usize {
202        self.current_host().tcp.stream_count()
203    }
204
205    pub(crate) fn est_tcp_streams_on(&mut self, addr: impl ToIpAddr) -> usize {
206        self.hosts
207            .get(&self.dns.lookup(addr))
208            .unwrap()
209            .tcp
210            .stream_count()
211    }
212
213    /// Register a new host with the simulation.
214    pub(crate) fn register(
215        &mut self,
216        addr: IpAddr,
217        nodename: &str,
218        timer: HostTimer,
219        config: &Config,
220    ) {
221        assert!(
222            !self.hosts.contains_key(&addr),
223            "already registered host for the given ip address"
224        );
225
226        tracing::info!(target: TRACING_TARGET, nodename, ?addr, "New");
227
228        // Register links between the new host and all existing hosts
229        for existing in self.hosts.keys() {
230            self.topology.register(*existing, addr);
231        }
232
233        // Initialize host state
234        self.hosts.insert(
235            addr,
236            Host::new(
237                nodename,
238                addr,
239                timer,
240                config.ephemeral_ports.clone(),
241                config.tcp_capacity,
242                config.udp_capacity,
243            ),
244        );
245    }
246
247    /// Send `message` from `src` to `dst`. Delivery is asynchronous and not
248    /// guaranteed.
249    pub(crate) fn send_message(
250        &mut self,
251        src: SocketAddr,
252        dst: SocketAddr,
253        message: Protocol,
254    ) -> Result<()> {
255        self.topology
256            .enqueue_message(&mut self.rng, src, dst, message)
257    }
258
259    /// Tick the host at `addr` by `duration`.
260    pub(crate) fn tick(&mut self, addr: IpAddr, duration: Duration) {
261        self.hosts
262            .get_mut(&addr)
263            .expect("missing host")
264            .timer
265            .tick(duration);
266    }
267}