1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Async runtime extensions.
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use mz_ore::task::{JoinHandle, RuntimeExt};
use tokio::runtime::{Builder, Runtime};
/// An isolated runtime for asynchronous tasks, particularly work
/// that may be CPU intensive such as encoding/decoding and shard
/// maintenance.
///
/// Using a separate runtime allows Persist to isolate its expensive
/// workloads on its own OS threads as an insurance policy against
/// tasks that erroneously fail to yield for a long time. By using
/// separate OS threads, the scheduler is able to context switch
/// out of any problematic tasks, preserving liveness for the rest
/// of the process.
///
/// Note: Even though the work done by this runtime might be "blocking" or
/// CPU bound we should not use the [`tokio::task::spawn_blocking`] API.
/// There can be issues during shutdown if tasks are currently running on the
/// blocking thread pool [1], and the blocking thread pool generally creates
/// many more threads than are physically available. This can pin CPU usage
/// to 100% starving other important threads like the Coordinator.
///
/// [1]: <https://github.com/MaterializeInc/materialize/pull/13955>
#[derive(Debug)]
pub struct IsolatedRuntime {
inner: Option<Runtime>,
}
impl IsolatedRuntime {
/// Creates a new isolated runtime.
pub fn new(worker_threads: usize) -> IsolatedRuntime {
let runtime = Builder::new_multi_thread()
.worker_threads(worker_threads)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
// This will wrap around eventually, which is not ideal, but it's important that
// it stays small to fit within OS limits.
format!("persist:{:04x}", id % 0x10000)
})
.enable_all()
.build()
.expect("known to be valid");
IsolatedRuntime {
inner: Some(runtime),
}
}
/// Spawns a task onto this runtime.
///
/// Note: We purposefully do not use the [`tokio::task::spawn_blocking`] API here, see the doc
/// comment on [`IsolatedRuntime`] for explanation.
pub fn spawn_named<N, S, F>(&self, name: N, fut: F) -> JoinHandle<F::Output>
where
S: AsRef<str>,
N: FnOnce() -> S,
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.inner
.as_ref()
.expect("exists until drop")
.spawn_named(name, fut)
}
}
impl Default for IsolatedRuntime {
fn default() -> Self {
IsolatedRuntime::new(num_cpus::get())
}
}
impl Drop for IsolatedRuntime {
fn drop(&mut self) {
// We don't need to worry about `shutdown_background` leaking
// blocking tasks (i.e., tasks spawned with `spawn_blocking`) because
// the `IsolatedRuntime` wrapper prevents access to `spawn_blocking`.
self.inner
.take()
.expect("cannot drop twice")
.shutdown_background()
}
}