mz_persist_client/
async_runtime.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Async runtime extensions.
11
12use std::future::Future;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15use mz_ore::metrics::{MetricsRegistry, register_runtime_metrics};
16use mz_ore::task::{JoinHandle, RuntimeExt};
17use tokio::runtime::{Builder, Runtime};
18
19/// An isolated runtime for asynchronous tasks, particularly work
20/// that may be CPU intensive such as encoding/decoding and shard
21/// maintenance.
22///
23/// Using a separate runtime allows Persist to isolate its expensive
24/// workloads on its own OS threads as an insurance policy against
25/// tasks that erroneously fail to yield for a long time. By using
26/// separate OS threads, the scheduler is able to context switch
27/// out of any problematic tasks, preserving liveness for the rest
28/// of the process.
29///
30/// Note: Even though the work done by this runtime might be "blocking" or
31/// CPU bound we should not use the [`tokio::task::spawn_blocking`] API.
32/// There can be issues during shutdown if tasks are currently running on the
33/// blocking thread pool [1], and the blocking thread pool generally creates
34/// many more threads than are physically available. This can pin CPU usage
35/// to 100% starving other important threads like the Coordinator.
36///
37/// [1]: <https://github.com/MaterializeInc/materialize/pull/13955>
38#[derive(Debug)]
39pub struct IsolatedRuntime {
40    inner: Option<Runtime>,
41}
42
43impl IsolatedRuntime {
44    /// Creates a new isolated runtime.
45    pub fn new(metrics: &MetricsRegistry, worker_threads: Option<usize>) -> IsolatedRuntime {
46        let mut runtime = Builder::new_multi_thread();
47        if let Some(worker_threads) = worker_threads {
48            runtime.worker_threads(worker_threads);
49        }
50        let runtime = runtime
51            .thread_name_fn(|| {
52                static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
53                let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
54                // This will wrap around eventually, which is not ideal, but it's important that
55                // it stays small to fit within OS limits.
56                format!("persist:{:04x}", id % 0x10000)
57            })
58            .enable_all()
59            .build()
60            .expect("known to be valid");
61        register_runtime_metrics("persist", runtime.metrics(), metrics);
62        IsolatedRuntime {
63            inner: Some(runtime),
64        }
65    }
66
67    /// Spawns a task onto this runtime.
68    ///
69    /// Note: We purposefully do not use the [`tokio::task::spawn_blocking`] API here, see the doc
70    /// comment on [`IsolatedRuntime`] for explanation.
71    pub fn spawn_named<N, S, F>(&self, name: N, fut: F) -> JoinHandle<F::Output>
72    where
73        S: AsRef<str>,
74        N: FnOnce() -> S,
75        F: Future + Send + 'static,
76        F::Output: Send + 'static,
77    {
78        self.inner
79            .as_ref()
80            .expect("exists until drop")
81            .spawn_named(name, fut)
82    }
83}
84
85impl Default for IsolatedRuntime {
86    fn default() -> Self {
87        IsolatedRuntime::new(&MetricsRegistry::new(), None)
88    }
89}
90
91impl Drop for IsolatedRuntime {
92    fn drop(&mut self) {
93        // We don't need to worry about `shutdown_background` leaking
94        // blocking tasks (i.e., tasks spawned with `spawn_blocking`) because
95        // the `IsolatedRuntime` wrapper prevents access to `spawn_blocking`.
96        self.inner
97            .take()
98            .expect("cannot drop twice")
99            .shutdown_background()
100    }
101}