1use std::future::Future;
2use std::mem;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use tokio::runtime::Runtime;
7use tokio::task::JoinHandle;
8use tokio::task::LocalSet;
9use tokio::time::{sleep, Duration, Instant};
10
11use super::Result;
12
13type Software<'a> = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result>>> + 'a>;
17
18pub enum Kind<'a> {
20 Client,
22
23 Host { software: Software<'a> },
25
26 NoSoftware,
29}
30
31pub struct Rt<'a> {
37 pub kind: Kind<'a>,
38
39 tokio: Runtime,
42
43 local: LocalSet,
45
46 pub nodename: Arc<str>,
48
49 handle: Option<JoinHandle<Result>>,
52
53 enable_io: bool,
55}
56
57impl<'a> Rt<'a> {
58 pub(crate) fn client<F>(nodename: Arc<str>, client: F, enable_io: bool) -> Self
59 where
60 F: Future<Output = Result> + 'static,
61 {
62 let (tokio, local) = init(enable_io);
63
64 let handle = with(&tokio, &local, || tokio::task::spawn_local(client));
65
66 Self {
67 kind: Kind::Client,
68 tokio,
69 local,
70 nodename,
71 handle: Some(handle),
72 enable_io,
73 }
74 }
75
76 pub(crate) fn host<F, Fut>(nodename: Arc<str>, software: F, enable_io: bool) -> Self
77 where
78 F: Fn() -> Fut + 'a,
79 Fut: Future<Output = Result> + 'static,
80 {
81 let (tokio, local) = init(enable_io);
82
83 let software: Software = Box::new(move || Box::pin(software()));
84 let handle = with(&tokio, &local, || tokio::task::spawn_local(software()));
85
86 Self {
87 kind: Kind::Host { software },
88 tokio,
89 local,
90 nodename,
91 handle: Some(handle),
92 enable_io,
93 }
94 }
95
96 pub(crate) fn no_software() -> Self {
97 let (tokio, local) = init(false);
98
99 Self {
100 kind: Kind::NoSoftware,
101 tokio,
102 local,
103 nodename: String::new().into(),
104 handle: None,
105 enable_io: false,
106 }
107 }
108
109 pub(crate) fn is_client(&self) -> bool {
110 matches!(self.kind, Kind::Client)
111 }
112
113 fn is_host(&self) -> bool {
114 matches!(self.kind, Kind::Host { .. })
115 }
116
117 pub(crate) fn is_software_running(&self) -> bool {
118 self.handle.is_some()
119 }
120
121 pub(crate) fn now(&self) -> Instant {
122 let _guard = self.tokio.enter();
123 Instant::now()
124 }
125
126 pub(crate) fn tick(&mut self, duration: Duration) -> Result<bool> {
149 self.tokio.block_on(async {
150 self.local
151 .run_until(async {
152 sleep(duration).await;
153 })
154 .await
155 });
156
157 match &self.handle {
159 Some(handle) if handle.is_finished() => {
160 if let Some(h) = self.handle.take() {
162 match self.tokio.block_on(h) {
163 Err(je) if je.is_cancelled() => {}
166 res => res??,
167 }
168 };
169 Ok(true)
170 }
171 Some(_) => Ok(false),
172 None => Ok(true),
173 }
174 }
175
176 pub(crate) fn crash(&mut self) {
177 if !self.is_host() {
178 panic!("can only crash host's software");
179 }
180
181 if self.handle.take().is_some() {
182 self.cancel_tasks();
183 };
184 }
185
186 pub(crate) fn bounce(&mut self) {
187 if !self.is_host() {
188 panic!("can only bounce host's software");
189 }
190
191 self.cancel_tasks();
192
193 if let Kind::Host { software } = &self.kind {
194 let handle = with(&self.tokio, &self.local, || {
195 tokio::task::spawn_local(software())
196 });
197 self.handle.replace(handle);
198 };
199 }
200
201 fn cancel_tasks(&mut self) {
210 let (tokio, local) = init(self.enable_io);
211
212 _ = mem::replace(&mut self.tokio, tokio);
213 drop(mem::replace(&mut self.local, local));
214 }
215}
216
217fn init(enable_io: bool) -> (Runtime, LocalSet) {
218 let mut tokio_builder = tokio::runtime::Builder::new_current_thread();
219
220 #[cfg(tokio_unstable)]
221 tokio_builder.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);
222
223 if enable_io {
224 tokio_builder.enable_io();
225 }
226
227 let tokio = tokio_builder
228 .enable_time()
229 .start_paused(true)
230 .build()
231 .unwrap();
232
233 tokio.block_on(async {
234 tokio::time::sleep(Duration::from_millis(1)).await;
236 });
237
238 (tokio, new_local())
239}
240
241fn new_local() -> LocalSet {
242 #[cfg(not(tokio_unstable))]
243 let local = LocalSet::new();
244
245 #[cfg(tokio_unstable)]
246 let mut local = LocalSet::new();
247
248 #[cfg(tokio_unstable)]
249 local.unhandled_panic(tokio::runtime::UnhandledPanic::ShutdownRuntime);
250
251 local
252}
253
254fn with<R>(tokio: &Runtime, local: &LocalSet, f: impl FnOnce() -> R) -> R {
255 tokio.block_on(async { local.run_until(async { f() }).await })
256}