1use crate::config::Config;
2use crate::envelope::Protocol;
3use crate::host::HostTimer;
4use crate::ip::IpVersionAddrIter;
5use crate::net::udp::MulticastGroups;
6use crate::{
7 config, for_pairs, Dns, Host, Result as TurmoilResult, ToIpAddr, ToIpAddrs, Topology,
8 TRACING_TARGET,
9};
10
11use indexmap::IndexMap;
12use rand::RngCore;
13use scoped_tls::scoped_thread_local;
14use std::cell::RefCell;
15use std::io::Result;
16use std::net::{IpAddr, SocketAddr};
17use std::time::Duration;
18
19pub(crate) struct World {
21 pub(crate) hosts: IndexMap<IpAddr, Host>,
23
24 pub(crate) topology: Topology,
26
27 pub(crate) dns: Dns,
29
30 pub(crate) multicast_groups: MulticastGroups,
32
33 pub(crate) current: Option<IpAddr>,
35
36 pub(crate) rng: Box<dyn RngCore>,
39
40 pub(crate) tick_duration: Duration,
43}
44
45scoped_thread_local!(static CURRENT: RefCell<World>);
46
47impl World {
48 pub(crate) fn new(
50 link: config::Link,
51 rng: Box<dyn RngCore>,
52 addrs: IpVersionAddrIter,
53 tick_duration: Duration,
54 ) -> World {
55 World {
56 hosts: IndexMap::new(),
57 topology: Topology::new(link),
58 dns: Dns::new(addrs),
59 multicast_groups: MulticastGroups::default(),
60 current: None,
61 rng,
62 tick_duration,
63 }
64 }
65
66 pub(crate) fn current<R>(f: impl FnOnce(&mut World) -> R) -> R {
68 CURRENT.with(|current| {
69 let mut current = current.borrow_mut();
70 f(&mut current)
71 })
72 }
73
74 pub(crate) fn current_if_set(f: impl FnOnce(&mut World)) {
79 if CURRENT.is_set() {
80 Self::current(f);
81 }
82 }
83
84 pub(crate) fn try_current<R>(f: impl FnOnce(&World) -> R) -> TurmoilResult<R> {
85 if CURRENT.is_set() {
86 CURRENT.with(|current| match current.try_borrow() {
87 Ok(world) => Ok(f(&world)),
88 Err(_) => Err("World already borrowed".into()),
89 })
90 } else {
91 Err("World not set".into())
92 }
93 }
94
95 pub(crate) fn enter<R>(world: &RefCell<World>, f: impl FnOnce() -> R) -> R {
96 CURRENT.set(world, f)
97 }
98
99 pub(crate) fn current_host_mut(&mut self) -> &mut Host {
100 let addr = self.current.expect("current host missing");
101 self.hosts.get_mut(&addr).expect("host missing")
102 }
103
104 pub(crate) fn current_host(&self) -> &Host {
105 let addr = self.current.expect("current host missing");
106 self.hosts.get(&addr).expect("host missing")
107 }
108
109 pub(crate) fn try_current_host(&self) -> TurmoilResult<&Host> {
110 let addr = self.current.ok_or("current host missing")?;
111 self.hosts.get(&addr).ok_or_else(|| "host missing".into())
112 }
113
114 pub(crate) fn lookup(&mut self, host: impl ToIpAddr) -> IpAddr {
115 self.dns.lookup(host)
116 }
117
118 pub(crate) fn reverse_lookup(&self, addr: IpAddr) -> Option<&str> {
119 self.dns.reverse(addr)
120 }
121
122 pub(crate) fn lookup_many(&mut self, hosts: impl ToIpAddrs) -> Vec<IpAddr> {
123 self.dns.lookup_many(hosts)
124 }
125
126 pub(crate) fn hold(&mut self, a: IpAddr, b: IpAddr) {
127 self.topology.hold(a, b);
128 }
129
130 pub(crate) fn hold_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
131 let a = self.lookup_many(a);
132 let b = self.lookup_many(b);
133
134 for_pairs(&a, &b, |a, b| {
135 self.hold(a, b);
136 });
137 }
138
139 pub(crate) fn release(&mut self, a: IpAddr, b: IpAddr) {
140 self.topology.release(a, b);
141 }
142
143 pub(crate) fn release_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
144 let a = self.lookup_many(a);
145 let b = self.lookup_many(b);
146
147 for_pairs(&a, &b, |a, b| {
148 self.release(a, b);
149 });
150 }
151
152 pub(crate) fn partition(&mut self, a: IpAddr, b: IpAddr) {
153 self.topology.partition(a, b);
154 }
155
156 pub(crate) fn partition_oneway(&mut self, a: IpAddr, b: IpAddr) {
157 self.topology.partition_oneway(a, b);
158 }
159
160 pub(crate) fn partition_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
161 let a = self.lookup_many(a);
162 let b = self.lookup_many(b);
163
164 for_pairs(&a, &b, |a, b| {
165 self.partition(a, b);
166 });
167 }
168 pub(crate) fn partition_oneway_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
169 let a = self.lookup_many(a);
170 let b = self.lookup_many(b);
171
172 for_pairs(&a, &b, |a, b| {
173 self.partition_oneway(a, b);
174 });
175 }
176 pub(crate) fn repair(&mut self, a: IpAddr, b: IpAddr) {
177 self.topology.repair(a, b);
178 }
179 pub(crate) fn repair_oneway(&mut self, a: IpAddr, b: IpAddr) {
180 self.topology.repair_oneway(a, b);
181 }
182
183 pub(crate) fn repair_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
184 let a = self.lookup_many(a);
185 let b = self.lookup_many(b);
186
187 for_pairs(&a, &b, |a, b| {
188 self.repair(a, b);
189 });
190 }
191
192 pub(crate) fn repair_oneway_many(&mut self, a: impl ToIpAddrs, b: impl ToIpAddrs) {
193 let a = self.lookup_many(a);
194 let b = self.lookup_many(b);
195
196 for_pairs(&a, &b, |a, b| {
197 self.repair_oneway(a, b);
198 });
199 }
200
201 pub(crate) fn est_tcp_streams(&mut self) -> usize {
202 self.current_host().tcp.stream_count()
203 }
204
205 pub(crate) fn est_tcp_streams_on(&mut self, addr: impl ToIpAddr) -> usize {
206 self.hosts
207 .get(&self.dns.lookup(addr))
208 .unwrap()
209 .tcp
210 .stream_count()
211 }
212
213 pub(crate) fn register(
215 &mut self,
216 addr: IpAddr,
217 nodename: &str,
218 timer: HostTimer,
219 config: &Config,
220 ) {
221 assert!(
222 !self.hosts.contains_key(&addr),
223 "already registered host for the given ip address"
224 );
225
226 tracing::info!(target: TRACING_TARGET, nodename, ?addr, "New");
227
228 for existing in self.hosts.keys() {
230 self.topology.register(*existing, addr);
231 }
232
233 self.hosts.insert(
235 addr,
236 Host::new(
237 nodename,
238 addr,
239 timer,
240 config.ephemeral_ports.clone(),
241 config.tcp_capacity,
242 config.udp_capacity,
243 ),
244 );
245 }
246
247 pub(crate) fn send_message(
250 &mut self,
251 src: SocketAddr,
252 dst: SocketAddr,
253 message: Protocol,
254 ) -> Result<()> {
255 self.topology
256 .enqueue_message(&mut self.rng, src, dst, message)
257 }
258
259 pub(crate) fn tick(&mut self, addr: IpAddr, duration: Duration) {
261 self.hosts
262 .get_mut(&addr)
263 .expect("missing host")
264 .timer
265 .tick(duration);
266 }
267}