turmoil/
rt.rs

1use std::future::Future;
2use std::mem;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tokio::runtime::Runtime;
7use tokio::task::JoinHandle;
8use tokio::task::LocalSet;
9use tokio::time::{sleep, Duration, Instant};
10
11use super::Result;
12
13// To support re-creation, we need to store a factory of the future that
14// represents the software. This is somewhat annoying in that it requires
15// boxxing to avoid generics.
16type Software<'a> = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result>>> + 'a>;
17
18/// Runtime kinds.
19pub enum Kind<'a> {
20    /// A runtime for executing test code.
21    Client,
22
23    /// A runtime that is running simulated host software.
24    Host { software: Software<'a> },
25
26    /// A runtime without any software. The network topology uses this for
27    /// time tracking and message delivery.
28    NoSoftware,
29}
30
31/// Per host simulated runtime.
32///
33/// The tokio runtime is paused (see [`Builder::start_paused`]), which gives us
34/// control over when and how to advance time. In particular, see [`Rt::tick`],
35/// which lets the runtime do a bit more work.
36pub struct Rt<'a> {
37    pub kind: Kind<'a>,
38
39    /// Handle to the Tokio runtime driving this simulated host. Each runtime
40    /// may have a different sense of "now" which simulates clock skew.
41    tokio: Runtime,
42
43    /// Local task set used for running !Send tasks.
44    local: LocalSet,
45
46    /// A user readable name to identify the node.
47    pub nodename: Arc<str>,
48
49    /// Optional handle to a host's software. When software finishes, the handle is
50    /// consumed to check for error, which is propagated up to fail the simulation.
51    handle: Option<JoinHandle<Result>>,
52
53    /// Whether io is enabled on this runtime.
54    enable_io: bool,
55}
56
57impl<'a> Rt<'a> {
58    pub(crate) fn client<F>(nodename: Arc<str>, client: F, enable_io: bool) -> Self
59    where
60        F: Future<Output = Result> + 'static,
61    {
62        let (tokio, local) = init(enable_io);
63
64        let handle = with(&tokio, &local, || tokio::task::spawn_local(client));
65
66        Self {
67            kind: Kind::Client,
68            tokio,
69            local,
70            nodename,
71            handle: Some(handle),
72            enable_io,
73        }
74    }
75
76    pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F, enable_io: bool) -> Self
77    where
78        F: Fn() -> Fut + 'a,
79        Fut: Future<Output = Result> + 'static,
80    {
81        let (tokio, local) = init(enable_io);
82
83        let software: Software = Box::new(move || Box::pin(software()));
84        let handle = with(&tokio, &local, || tokio::task::spawn_local(software()));
85
86        Self {
87            kind: Kind::Host { software },
88            tokio,
89            local,
90            nodename,
91            handle: Some(handle),
92            enable_io,
93        }
94    }
95
96    pub(crate) fn no_software() -> Self {
97        let (tokio, local) = init(false);
98
99        Self {
100            kind: Kind::NoSoftware,
101            tokio,
102            local,
103            nodename: String::new().into(),
104            handle: None,
105            enable_io: false,
106        }
107    }
108
109    pub(crate) fn is_client(&self) -> bool {
110        matches!(self.kind, Kind::Client)
111    }
112
113    fn is_host(&self) -> bool {
114        matches!(self.kind, Kind::Host { .. })
115    }
116
117    pub(crate) fn is_software_running(&self) -> bool {
118        self.handle.is_some()
119    }
120
121    pub(crate) fn now(&self) -> Instant {
122        let _guard = self.tokio.enter();
123        Instant::now()
124    }
125
126    // This method is called by [`Sim::run`], which iterates through all the
127    // runtimes and ticks each one. The magic of this method is described in the
128    // documentation for [`LocalSet::run_until`], but it may not be entirely
129    // obvious how things fit together.
130    //
131    // A [`LocalSet`] tracks the tasks to run, which may in turn spawn more
132    // tasks. `run_until` drives a top level task to completion, but not its
133    // children. If you look below, you may be confused. The task we run here
134    // just sleeps and has no children! However, it's the _same `LocalSet`_ that
135    // is used to run software on the host.
136    //
137    // In this way, every time `tick` is called, the following unfolds:
138    //
139    // 1. Time advances on the runtime
140    // 2. We schedule a new task that simply sleeps
141    // 3. Other tasks on the `LocalSet` get a chance to run
142    // 4. The sleep finishes
143    // 5. The runtime pauses
144    //
145    // Returns whether the software has finished successfully or the error
146    // that caused failure. Subsequent calls do not return the error as it is
147    // expected to fail the simulation.
148    pub(crate) fn tick(&mut self, duration: Duration) -> Result<bool> {
149        self.tokio.block_on(async {
150            self.local
151                .run_until(async {
152                    sleep(duration).await;
153                })
154                .await
155        });
156
157        // pull for software completion
158        match &self.handle {
159            Some(handle) if handle.is_finished() => {
160                // Consume handle to extract task result
161                if let Some(h) = self.handle.take() {
162                    match self.tokio.block_on(h) {
163                        // If the host was crashed the JoinError is cancelled, which
164                        // needs to be handled to not fail the simulation.
165                        Err(je) if je.is_cancelled() => {}
166                        res => res??,
167                    }
168                };
169                Ok(true)
170            }
171            Some(_) => Ok(false),
172            None => Ok(true),
173        }
174    }
175
176    pub(crate) fn crash(&mut self) {
177        if !self.is_host() {
178            panic!("can only crash host's software");
179        }
180
181        if self.handle.take().is_some() {
182            self.cancel_tasks();
183        };
184    }
185
186    pub(crate) fn bounce(&mut self) {
187        if !self.is_host() {
188            panic!("can only bounce host's software");
189        }
190
191        self.cancel_tasks();
192
193        if let Kind::Host { software } = &self.kind {
194            let handle = with(&self.tokio, &self.local, || {
195                tokio::task::spawn_local(software())
196            });
197            self.handle.replace(handle);
198        };
199    }
200
201    /// Cancel all tasks within the [`Rt`] by dropping the current tokio
202    /// [`Runtime`].
203    ///
204    /// Dropping the runtime blocks the calling thread until all futures have
205    /// completed, which is desired here to ensure host software completes and
206    /// all resources are dropped.
207    ///
208    /// Both the [`Runtime`] and [`LocalSet`] are replaced with new instances.
209    fn cancel_tasks(&mut self) {
210        let (tokio, local) = init(self.enable_io);
211
212        _ = mem::replace(&mut self.tokio, tokio);
213        drop(mem::replace(&mut self.local, local));
214    }
215}
216
217fn init(enable_io: bool) -> (Runtime, LocalSet) {
218    let mut tokio_builder = tokio::runtime::Builder::new_current_thread();
219
220    #[cfg(tokio_unstable)]
221    tokio_builder.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);
222
223    if enable_io {
224        tokio_builder.enable_io();
225    }
226
227    let tokio = tokio_builder
228        .enable_time()
229        .start_paused(true)
230        .build()
231        .unwrap();
232
233    tokio.block_on(async {
234        // Sleep to "round" `Instant::now()` to the closest `ms`
235        tokio::time::sleep(Duration::from_millis(1)).await;
236    });
237
238    (tokio, new_local())
239}
240
241fn new_local() -> LocalSet {
242    #[cfg(not(tokio_unstable))]
243    let local = LocalSet::new();
244
245    #[cfg(tokio_unstable)]
246    let mut local = LocalSet::new();
247
248    #[cfg(tokio_unstable)]
249    local.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);
250
251    local
252}
253
254fn with<R>(tokio: &Runtime, local: &LocalSet, f: impl FnOnce() -> R) -> R {
255    tokio.block_on(async { local.run_until(async { f() }).await })
256}