mz_compute/
memory_limiter.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities to limit memory usage.
11
12use std::sync::Mutex;
13use std::time::{Duration, Instant};
14
15use anyhow::Context;
16use mz_compute_types::dyncfgs::{
17    MEMORY_LIMITER_BURST_FACTOR, MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS,
18    MEMORY_LIMITER_USAGE_FACTOR,
19};
20use mz_dyncfg::ConfigSet;
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::metric;
23use mz_ore::metrics::{MetricsRegistry, UIntGauge};
24use prometheus::Histogram;
25use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
26use tracing::{debug, error, info, warn};
27
28/// A handle to the active memory limiter.
29///
30/// The limiter is initialized by a call to [`start_limiter`]. It spawns a process-global task that
31/// runs for the lifetime of the process.
32static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
33
34/// Start the process-global memory limiter.
35///
36/// # Panics
37///
38/// Panics if the limiter was already started previously.
39pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
40    let mut limiter = LIMITER.lock().expect("poisoned");
41
42    if limiter.is_some() {
43        panic!("memory limiter is already running");
44    }
45
46    let metrics = LimiterMetrics::new(metrics_registry);
47    let (config_tx, config_rx) = mpsc::unbounded_channel();
48
49    mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
50
51    *limiter = Some(Limiter {
52        memory_limit,
53        config_tx,
54    });
55}
56
57/// Apply the given configuration to the active limiter.
58pub fn apply_limiter_config(config: &ConfigSet) {
59    if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
60        limiter.apply_config(config);
61    }
62}
63
64/// A handle to a running memory limiter task.
65struct Limiter {
66    /// The process memory limit.
67    memory_limit: usize,
68    /// A sender for limiter configuration updates.
69    config_tx: UnboundedSender<LimiterConfig>,
70}
71
72impl Limiter {
73    /// Apply the given configuration to the limiter.
74    fn apply_config(&self, config: &ConfigSet) {
75        let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
76        // A zero duration means the limiter is disabled. Translate that into an ~infinite duration
77        // so the limiter doesn't have to worry about the special case.
78        if interval.is_zero() {
79            interval = Duration::MAX;
80        }
81
82        let memory_limit = f64::cast_lossy(self.memory_limit)
83            * MEMORY_LIMITER_USAGE_FACTOR.get(config)
84            * MEMORY_LIMITER_USAGE_BIAS.get(config);
85        let memory_limit = usize::cast_lossy(memory_limit);
86
87        let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
88        let burst_budget = usize::cast_lossy(burst_budget);
89
90        self.config_tx
91            .send(LimiterConfig {
92                interval,
93                memory_limit,
94                burst_budget,
95            })
96            .expect("limiter task never shuts down");
97    }
98}
99
100/// Configuration for an memory limiter task.
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102struct LimiterConfig {
103    /// The interval at which memory usage is checked against the memory limit.
104    interval: Duration,
105    /// The memory limit.
106    memory_limit: usize,
107    /// Budget to allow memory usage above the memory limit, in byte-seconds.
108    burst_budget: usize,
109}
110
111impl LimiterConfig {
112    /// Return a config that disables the memory limiter.
113    fn disabled() -> Self {
114        Self {
115            interval: Duration::MAX,
116            memory_limit: 0,
117            burst_budget: 0,
118        }
119    }
120}
121
122/// A task that enforces configured memory limits.
123///
124/// The task operates by performing limit checks at a configured interval. For each check it
125/// obtains the current memory utilization from proc stats. It then compares the utilization against
126/// the configured memory limit, and if it exceeds the limit, reduces the burst budget by the amount
127/// of memory utilization that exceeds the limit. If the burst budget is exhausted, the limiter
128/// terminates the process.
129struct LimiterTask {
130    /// The current limiter configuration.
131    config: LimiterConfig,
132    /// The amount of burst budget remaining.
133    burst_budget_remaining: usize,
134    /// The time of the last check.
135    last_check: Instant,
136    /// Metrics tracked by the limiter task.
137    metrics: LimiterMetrics,
138}
139
140impl LimiterTask {
141    async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
142        info!("running memory limiter task");
143
144        let mut task = Self {
145            config: LimiterConfig::disabled(),
146            burst_budget_remaining: 0,
147            last_check: Instant::now(),
148            metrics,
149        };
150
151        loop {
152            tokio::select! {
153                _ = task.tick() => {
154                    let start = Instant::now();
155
156                    if let Err(err) = task.check() {
157                        error!("memory limit check failed: {err}");
158                    }
159
160                    let elapsed = start.elapsed();
161                    task.metrics.duration.observe(elapsed.as_secs_f64());
162                }
163                Some(config) = config_rx.recv() => task.apply_config(config),
164            }
165        }
166    }
167
168    /// Wait until the next check time.
169    async fn tick(&self) {
170        let elapsed = self.last_check.elapsed();
171        let duration = self.config.interval.saturating_sub(elapsed);
172        tokio::time::sleep(duration).await
173    }
174
175    fn current_utilization() -> anyhow::Result<ProcStatus> {
176        match ProcStatus::from_proc() {
177            Ok(status) => Ok(status),
178            #[cfg(target_os = "linux")]
179            Err(err) => {
180                error!("failed to read /proc/self/status: {err}");
181                Err(err)
182            }
183            #[cfg(not(target_os = "linux"))]
184            Err(_err) => Ok(ProcStatus {
185                vm_rss: 0,
186                vm_swap: 0,
187            }),
188        }
189    }
190
191    /// Perform a memory usage check, terminating the process if the configured limits are exceeded.
192    fn check(&mut self) -> Result<(), anyhow::Error> {
193        debug!("checking memory limits");
194
195        let ProcStatus { vm_rss, vm_swap } = Self::current_utilization()?;
196
197        let memory_limit = self.config.memory_limit;
198        let burst_budget_remaining = self.burst_budget_remaining;
199
200        let memory_usage = vm_rss + vm_swap;
201
202        debug!(
203            memory_usage,
204            memory_limit, burst_budget_remaining, vm_rss, vm_swap, "memory utilization check",
205        );
206
207        self.metrics.vm_rss.set(u64::cast_from(vm_rss));
208        self.metrics.vm_swap.set(u64::cast_from(vm_swap));
209        self.metrics.memory_usage.set(u64::cast_from(memory_usage));
210        self.metrics
211            .burst_budget
212            .set(u64::cast_from(burst_budget_remaining));
213
214        if memory_usage > memory_limit {
215            // Calculate excess usage in byte-seconds.
216            let elapsed = self.last_check.elapsed().as_secs_f64();
217            let excess = memory_usage - memory_limit;
218            let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
219
220            if burst_budget_remaining >= excess_bs {
221                self.burst_budget_remaining -= excess_bs;
222            } else {
223                // Burst budget exhausted, terminate the process.
224                warn!(
225                    memory_usage,
226                    memory_limit, "memory utilization exceeded configured limits",
227                );
228                // We terminate with a recognizable exit code so the orchestrator knows the
229                // termination was caused by exceeding memory limits, as opposed to another,
230                // unexpected cause.
231                mz_ore::process::exit_thread_safe(167);
232            }
233        } else {
234            // Reset burst budget if under limit.
235            self.burst_budget_remaining = self.config.burst_budget;
236        }
237
238        self.last_check = Instant::now();
239        Ok(())
240    }
241
242    /// Apply a new limiter config.
243    fn apply_config(&mut self, config: LimiterConfig) {
244        if config == self.config {
245            return; // no-op config change
246        }
247
248        info!(?config, "applying memory limiter config");
249        self.config = config;
250        self.burst_budget_remaining = config.burst_budget;
251
252        self.metrics
253            .memory_limit
254            .set(u64::cast_from(config.memory_limit));
255    }
256}
257
258struct LimiterMetrics {
259    duration: Histogram,
260    memory_limit: UIntGauge,
261    memory_usage: UIntGauge,
262    vm_rss: UIntGauge,
263    vm_swap: UIntGauge,
264    burst_budget: UIntGauge,
265}
266
267impl LimiterMetrics {
268    fn new(registry: &MetricsRegistry) -> Self {
269        Self {
270            duration: registry.register(metric!(
271                name: "mz_memory_limiter_duration_seconds",
272                help: "A histogram of the time it took to run the memory limiter.",
273                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
274            )),
275            memory_limit: registry.register(metric!(
276                name: "mz_memory_limiter_memory_limit_bytes",
277                help: "The configured memory limit.",
278            )),
279            memory_usage: registry.register(metric!(
280                name: "mz_memory_limiter_memory_usage_bytes",
281                help: "The current memory usage.",
282            )),
283            vm_rss: registry.register(metric!(
284                name: "mz_memory_limiter_vm_rss_bytes",
285                help: "The current VmRSS metric.",
286            )),
287            vm_swap: registry.register(metric!(
288                name: "mz_memory_limiter_vm_swap_bytes",
289                help: "The current VmSwap metric.",
290            )),
291            burst_budget: registry.register(metric!(
292                name: "mz_memory_limiter_burst_budget_byteseconds",
293                help: "The remaining memory burst budget.",
294            )),
295        }
296    }
297}
298
299/// Helper for reading and parsing `/proc/self/status` on Linux.
300pub struct ProcStatus {
301    /// Resident Set Size (RSS) in bytes.
302    pub vm_rss: usize,
303    /// Swap memory in bytes.
304    pub vm_swap: usize,
305}
306
307impl ProcStatus {
308    /// Populate a new `ProcStatus` with information in /proc/self/status.
309    pub fn from_proc() -> anyhow::Result<Self> {
310        let contents = std::fs::read_to_string("/proc/self/status")?;
311        let mut vm_rss = 0;
312        let mut vm_swap = 0;
313
314        for line in contents.lines() {
315            if line.starts_with("VmRSS:") {
316                vm_rss = line
317                    .split_whitespace()
318                    .nth(1)
319                    .ok_or_else(|| anyhow::anyhow!("failed to parse VmRSS"))?
320                    .parse::<usize>()
321                    .context("failed to parse VmRSS")?
322                    * 1024
323            } else if line.starts_with("VmSwap:") {
324                vm_swap = line
325                    .split_whitespace()
326                    .nth(1)
327                    .ok_or_else(|| anyhow::anyhow!("failed to parse VmSwap"))?
328                    .parse::<usize>()
329                    .context("failed to parse VmSwap")?
330                    * 1024;
331            }
332        }
333
334        Ok(Self { vm_rss, vm_swap })
335    }
336}