quanta/
upkeep.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
use crate::{set_recent, Clock};
use std::{
    fmt, io,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread::{self, JoinHandle},
    time::Duration,
};

static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false);

/// Ultra-low-overhead access to slightly-delayed time.
///
/// In some applications, there can be a need to check the current time very often, so much so that
/// the overhead of checking the time can begin to eat up measurable overhead. For some of these
/// cases, the time may need to be accessed often but does not necessarily need to be incredibly
/// accurate: one millisecond granularity could be entirely acceptable.
///
/// For these cases, we provide a slightly-delayed version of the time to callers via
/// [`Clock::recent`], which is updated by a background upkeep thread.  That thread is configured
/// and spanwed via [`Upkeep`].
///
/// [`Upkeep`] can construct a new clock (or be passed an existing clock to use), and given an
/// update interval, and it will faithfully attempt to update the global recent time on the
/// specified interval.  There is a trade-off to be struck in terms of how often the time is
/// updated versus the required accuracy.  Checking the time and updating the global reference is
/// itself not zero-cost, and so care must be taken to analyze the number of readers in order to
/// ensure that, given a particular update interval, the upkeep thread is saving more CPU time than
/// would be spent otherwise by directly querying the current time.
///
/// The recent time is read and written atomically.  It is global to an application, so if another
/// codepath creates the upkeep thread, the interval chosen by that instantiation will be the one
/// that all callers of [`Clock::recent`] end up using.
///
/// Multiple upkeep threads cannot exist at the same time.  A new upkeep thread can be started if
/// the old one is dropped and returns.
///
/// In terms of performance, reading the recent time can be up to two to three times as fast as
/// reading the current time in the optimized case of using the Time Stamp Counter source.  In
/// practice, while a caller might expect to take 12-14ns to read the TSC and scale it to reference
/// time, the recent time can be read in 4-5ns, with no reference scale conversion required.
#[derive(Debug)]
pub struct Upkeep {
    interval: Duration,
    clock: Clock,
}

/// Handle to a running upkeep thread.
///
/// If a handle is dropped, the upkeep thread will be stopped, and the recent time will cease to
/// update.  The upkeep thread can be started again to resume updating the recent time.
#[derive(Debug)]
pub struct Handle {
    done: Arc<AtomicBool>,
    handle: Option<JoinHandle<()>>,
}

/// Errors thrown during the creation/spawning of the upkeep thread.
#[derive(Debug)]
pub enum Error {
    /// An upkeep thread is already running in this process.
    UpkeepRunning,
    /// An error occurred when trying to spawn the upkeep thread.
    FailedToSpawnUpkeepThread(io::Error),
}

impl fmt::Display for Error {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            Error::UpkeepRunning => write!(f, "upkeep thread already running"),
            Error::FailedToSpawnUpkeepThread(e) => {
                write!(f, "failed to spawn upkeep thread: {}", e)
            }
        }
    }
}

impl std::error::Error for Error {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            Self::UpkeepRunning => None,
            Self::FailedToSpawnUpkeepThread(e) => Some(e),
        }
    }
}

impl Upkeep {
    /// Creates a new [`Upkeep`].
    ///
    /// This creates a new internal clock for acquiring the current time.  If you have an existing
    /// [`Clock`] that is already calibrated, it is slightly faster to clone it and construct the
    /// builder with [`new_with_clock`](Upkeep::new_with_clock) to avoid recalibrating.
    pub fn new(interval: Duration) -> Upkeep {
        Self::new_with_clock(interval, Clock::new())
    }

    /// Creates a new [`Upkeep`] with the specified [`Clock`] instance.
    pub fn new_with_clock(interval: Duration, clock: Clock) -> Upkeep {
        Upkeep { interval, clock }
    }

    /// Start the upkeep thread, periodically updating the global coarse time.
    ///
    /// [`Handle`] represents a drop guard for the upkeep thread if it is successfully spawned.
    /// Dropping the handle will also instruct the upkeep thread to stop and exist, so the handle
    /// must be held while the upkeep thread should continue to run.
    ///
    /// # Errors
    ///
    /// If either an existing upkeep thread is running, or there was an issue when attempting to
    /// spawn the upkeep thread, an error variant will be returned describing the error.
    pub fn start(self) -> Result<Handle, Error> {
        // If another upkeep thread is running, inform the caller.
        let _ = GLOBAL_UPKEEP_RUNNING
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
            .map_err(|_| Error::UpkeepRunning)?;

        let interval = self.interval;
        let clock = self.clock;

        let done = Arc::new(AtomicBool::new(false));
        let their_done = done.clone();

        let result = thread::Builder::new()
            .name("quanta-upkeep".to_string())
            .spawn(move || {
                while !their_done.load(Ordering::Acquire) {
                    set_recent(clock.now());

                    thread::sleep(interval);
                }

                GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
            })
            .map_err(Error::FailedToSpawnUpkeepThread);

        // Let another caller attempt to spawn the upkeep thread if we failed to do so.
        if result.is_err() {
            GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
        }

        let handle = result?;

        Ok(Handle {
            done,
            handle: Some(handle),
        })
    }
}

impl Drop for Handle {
    fn drop(&mut self) {
        self.done.store(true, Ordering::Release);

        if let Some(handle) = self.handle.take() {
            let _result = handle
                .join()
                .map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to stop upkeep thread"));
        }
    }
}

#[cfg(test)]
mod tests {
    use super::Upkeep;
    use std::time::Duration;

    #[test]
    #[cfg_attr(target_arch = "wasm32", ignore)] // WASM is single threaded
    fn test_spawning_second_upkeep() {
        let first = Upkeep::new(Duration::from_millis(250)).start();
        let second = Upkeep::new(Duration::from_millis(250))
            .start()
            .map_err(|e| e.to_string());

        assert!(first.is_ok());

        let second_err = second.expect_err("second upkeep should be error, got handle");
        assert_eq!(second_err, "upkeep thread already running");
    }
}