quanta/upkeep.rs
1use crate::{set_recent, Clock};
2use std::{
3 fmt, io,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc,
7 },
8 thread::{self, JoinHandle},
9 time::Duration,
10};
11
12static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false);
13
14/// Ultra-low-overhead access to slightly-delayed time.
15///
16/// In some applications, there can be a need to check the current time very often, so much so that
17/// the overhead of checking the time can begin to eat up measurable overhead. For some of these
18/// cases, the time may need to be accessed often but does not necessarily need to be incredibly
19/// accurate: one millisecond granularity could be entirely acceptable.
20///
21/// For these cases, we provide a slightly-delayed version of the time to callers via
22/// [`Clock::recent`], which is updated by a background upkeep thread. That thread is configured
23/// and spanwed via [`Upkeep`].
24///
25/// [`Upkeep`] can construct a new clock (or be passed an existing clock to use), and given an
26/// update interval, and it will faithfully attempt to update the global recent time on the
27/// specified interval. There is a trade-off to be struck in terms of how often the time is
28/// updated versus the required accuracy. Checking the time and updating the global reference is
29/// itself not zero-cost, and so care must be taken to analyze the number of readers in order to
30/// ensure that, given a particular update interval, the upkeep thread is saving more CPU time than
31/// would be spent otherwise by directly querying the current time.
32///
33/// The recent time is read and written atomically. It is global to an application, so if another
34/// codepath creates the upkeep thread, the interval chosen by that instantiation will be the one
35/// that all callers of [`Clock::recent`] end up using.
36///
37/// Multiple upkeep threads cannot exist at the same time. A new upkeep thread can be started if
38/// the old one is dropped and returns.
39///
40/// In terms of performance, reading the recent time can be up to two to three times as fast as
41/// reading the current time in the optimized case of using the Time Stamp Counter source. In
42/// practice, while a caller might expect to take 12-14ns to read the TSC and scale it to reference
43/// time, the recent time can be read in 4-5ns, with no reference scale conversion required.
44#[derive(Debug)]
45pub struct Upkeep {
46 interval: Duration,
47 clock: Clock,
48}
49
50/// Handle to a running upkeep thread.
51///
52/// If a handle is dropped, the upkeep thread will be stopped, and the recent time will cease to
53/// update. The upkeep thread can be started again to resume updating the recent time.
54#[derive(Debug)]
55pub struct Handle {
56 done: Arc<AtomicBool>,
57 handle: Option<JoinHandle<()>>,
58}
59
60/// Errors thrown during the creation/spawning of the upkeep thread.
61#[derive(Debug)]
62pub enum Error {
63 /// An upkeep thread is already running in this process.
64 UpkeepRunning,
65 /// An error occurred when trying to spawn the upkeep thread.
66 FailedToSpawnUpkeepThread(io::Error),
67}
68
69impl fmt::Display for Error {
70 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
71 match self {
72 Error::UpkeepRunning => write!(f, "upkeep thread already running"),
73 Error::FailedToSpawnUpkeepThread(e) => {
74 write!(f, "failed to spawn upkeep thread: {}", e)
75 }
76 }
77 }
78}
79
80impl std::error::Error for Error {
81 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
82 match self {
83 Self::UpkeepRunning => None,
84 Self::FailedToSpawnUpkeepThread(e) => Some(e),
85 }
86 }
87}
88
89impl Upkeep {
90 /// Creates a new [`Upkeep`].
91 ///
92 /// This creates a new internal clock for acquiring the current time. If you have an existing
93 /// [`Clock`] that is already calibrated, it is slightly faster to clone it and construct the
94 /// builder with [`new_with_clock`](Upkeep::new_with_clock) to avoid recalibrating.
95 pub fn new(interval: Duration) -> Upkeep {
96 Self::new_with_clock(interval, Clock::new())
97 }
98
99 /// Creates a new [`Upkeep`] with the specified [`Clock`] instance.
100 pub fn new_with_clock(interval: Duration, clock: Clock) -> Upkeep {
101 Upkeep { interval, clock }
102 }
103
104 /// Start the upkeep thread, periodically updating the global coarse time.
105 ///
106 /// [`Handle`] represents a drop guard for the upkeep thread if it is successfully spawned.
107 /// Dropping the handle will also instruct the upkeep thread to stop and exist, so the handle
108 /// must be held while the upkeep thread should continue to run.
109 ///
110 /// # Errors
111 ///
112 /// If either an existing upkeep thread is running, or there was an issue when attempting to
113 /// spawn the upkeep thread, an error variant will be returned describing the error.
114 pub fn start(self) -> Result<Handle, Error> {
115 // If another upkeep thread is running, inform the caller.
116 let _ = GLOBAL_UPKEEP_RUNNING
117 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
118 .map_err(|_| Error::UpkeepRunning)?;
119
120 let interval = self.interval;
121 let clock = self.clock;
122
123 // Note: spawning `quanta-upkeep` thread may take a significant amount of time. Thus, it is
124 // possible for a user to read a 0 value from `Clock::recent` before `quanta-upkeep` has
125 // started. To avoid that, make sure the recent time is initialized to some measurement.
126 set_recent(clock.now());
127
128 let done = Arc::new(AtomicBool::new(false));
129 let their_done = done.clone();
130
131 let result = thread::Builder::new()
132 .name("quanta-upkeep".to_string())
133 .spawn(move || {
134 while !their_done.load(Ordering::Acquire) {
135 set_recent(clock.now());
136
137 thread::sleep(interval);
138 }
139
140 GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
141 })
142 .map_err(Error::FailedToSpawnUpkeepThread);
143
144 // Let another caller attempt to spawn the upkeep thread if we failed to do so.
145 if result.is_err() {
146 GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
147 }
148
149 let handle = result?;
150
151 Ok(Handle {
152 done,
153 handle: Some(handle),
154 })
155 }
156}
157
158impl Drop for Handle {
159 fn drop(&mut self) {
160 self.done.store(true, Ordering::Release);
161
162 if let Some(handle) = self.handle.take() {
163 let _result = handle
164 .join()
165 .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to stop upkeep thread"));
166 }
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::Upkeep;
173 use std::time::Duration;
174
175 #[test]
176 #[cfg_attr(target_arch = "wasm32", ignore)] // WASM is single threaded
177 fn test_spawning_second_upkeep() {
178 let first = Upkeep::new(Duration::from_millis(250)).start();
179 let second = Upkeep::new(Duration::from_millis(250))
180 .start()
181 .map_err(|e| e.to_string());
182
183 assert!(first.is_ok());
184
185 let second_err = second.expect_err("second upkeep should be error, got handle");
186 assert_eq!(second_err, "upkeep thread already running");
187 }
188}