mz_persist_client/
async_runtime.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Async runtime extensions.
11
12use std::future::Future;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
16use mz_ore::task::{JoinHandle, RuntimeExt};
17use tokio::runtime::{Builder, Runtime};
18
19/// A reasonable number of threads to use in tests: enough to reproduce nontrivial
20/// orderings if necessary and avoid blocking, but not too many.
21// This was done as a workaround for https://sourceware.org/bugzilla/show_bug.cgi?id=19951
22// in tests, but seems useful in general.
23pub const TEST_THREADS: usize = 4;
24
25/// An isolated runtime for asynchronous tasks, particularly work
26/// that may be CPU intensive such as encoding/decoding and shard
27/// maintenance.
28///
29/// Using a separate runtime allows Persist to isolate its expensive
30/// workloads on its own OS threads as an insurance policy against
31/// tasks that erroneously fail to yield for a long time. By using
32/// separate OS threads, the scheduler is able to context switch
33/// out of any problematic tasks, preserving liveness for the rest
34/// of the process.
35///
36/// Note: Even though the work done by this runtime might be "blocking" or
37/// CPU bound we should not use the [`tokio::task::spawn_blocking`] API.
38/// There can be issues during shutdown if tasks are currently running on the
39/// blocking thread pool [1], and the blocking thread pool generally creates
40/// many more threads than are physically available. This can pin CPU usage
41/// to 100% starving other important threads like the Coordinator.
42///
43/// [1]: <https://github.com/MaterializeInc/materialize/pull/13955>
44#[derive(Debug)]
45pub struct IsolatedRuntime {
46    inner: Option<Runtime>,
47}
48
49impl IsolatedRuntime {
50    /// Creates a new isolated runtime.
51    pub fn new(metrics: &MetricsRegistry, worker_threads: Option<usize>) -> IsolatedRuntime {
52        let mut runtime = Builder::new_multi_thread();
53        if let Some(worker_threads) = worker_threads {
54            runtime.worker_threads(worker_threads);
55        }
56        let runtime = runtime
57            .thread_name_fn(|| {
58                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
59                let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
60                // This will wrap around eventually, which is not ideal, but it's important that
61                // it stays small to fit within OS limits.
62                format!("persist:{:04x}", id % 0x10000)
63            })
64            .enable_all()
65            .build()
66            .expect("known to be valid");
67        register_runtime_metrics("persist", runtime.metrics(), metrics);
68        IsolatedRuntime {
69            inner: Some(runtime),
70        }
71    }
72
73    /// Create an isolated runtime with appropriate values for tests.
74    pub fn new_for_tests() -> Self {
75        IsolatedRuntime::new(&MetricsRegistry::new(), Some(TEST_THREADS))
76    }
77
78    #[cfg(feature = "turmoil")]
79    /// Create a no-op shim that spawns tasks on the current tokio runtime.
80    ///
81    /// This is useful for simulation tests where we don't want to spawn additional threads and/or
82    /// tokio runtimes.
83    pub fn new_disabled() -> Self {
84        IsolatedRuntime { inner: None }
85    }
86
87    /// Spawns a task onto this runtime.
88    ///
89    /// Note: We purposefully do not use the [`tokio::task::spawn_blocking`] API here, see the doc
90    /// comment on [`IsolatedRuntime`] for explanation.
91    pub fn spawn_named<N, S, F>(&self, name: N, fut: F) -> JoinHandle<F::Output>
92    where
93        S: AsRef<str>,
94        N: FnOnce() -> S,
95        F: Future + Send + 'static,
96        F::Output: Send + 'static,
97    {
98        if let Some(runtime) = &self.inner {
99            runtime.spawn_named(name, fut)
100        } else {
101            mz_ore::task::spawn(name, fut)
102        }
103    }
104}
105
106impl Drop for IsolatedRuntime {
107    fn drop(&mut self) {
108        // We don't need to worry about `shutdown_background` leaking
109        // blocking tasks (i.e., tasks spawned with `spawn_blocking`) because
110        // the `IsolatedRuntime` wrapper prevents access to `spawn_blocking`.
111        if let Some(runtime) = self.inner.take() {
112            runtime.shutdown_background();
113        }
114    }
115}