Skip to main content

mz_compute/
memory_limiter.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities to limit memory usage.
11//!
12//! In the context of this module, "memory" refers to the sum of physical memory and swap space .
13//! Other parts of the code usually don't include swap space when talking about "memory".
14
15use std::sync::Mutex;
16use std::time::{Duration, Instant};
17
18use anyhow::Context;
19use mz_compute_types::dyncfgs::{
20    MEMORY_LIMITER_BURST_FACTOR, MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS,
21};
22use mz_dyncfg::ConfigSet;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::metric;
25use mz_ore::metrics::{MetricsRegistry, UIntGauge};
26use prometheus::Histogram;
27use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
28use tracing::{debug, error, info, warn};
29
30/// A handle to the active memory limiter.
31///
32/// The limiter is initialized by a call to [`start_limiter`]. It spawns a process-global task that
33/// runs for the lifetime of the process.
34static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
35
36/// Start the process-global memory limiter.
37///
38/// # Panics
39///
40/// Panics if the limiter was already started previously.
41pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
42    let mut limiter = LIMITER.lock().expect("poisoned");
43
44    if limiter.is_some() {
45        panic!("memory limiter is already running");
46    }
47
48    let metrics = LimiterMetrics::new(metrics_registry);
49    let (config_tx, config_rx) = mpsc::unbounded_channel();
50
51    mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
52
53    *limiter = Some(Limiter {
54        base_memory_limit: memory_limit,
55        effective_memory_limit: memory_limit,
56        config_tx,
57    });
58}
59
60/// Apply the given configuration to the active limiter.
61pub fn apply_limiter_config(config: &ConfigSet) {
62    if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
63        limiter.apply_config(config);
64    }
65}
66
67/// Get the current effective memory limit.
68pub fn get_memory_limit() -> Option<usize> {
69    let limiter = LIMITER.lock().expect("poisoned");
70    limiter.as_ref().map(|l| l.effective_memory_limit)
71}
72
73/// A handle to a running memory limiter task.
74struct Limiter {
75    /// The base process memory limit.
76    base_memory_limit: usize,
77    /// The effective memory limit, obtained by applying dyncfgs to the base limit.
78    effective_memory_limit: usize,
79    /// A sender for limiter configuration updates.
80    config_tx: UnboundedSender<LimiterConfig>,
81}
82
83impl Limiter {
84    /// Apply the given configuration to the limiter.
85    fn apply_config(&mut self, config: &ConfigSet) {
86        let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
87        // A zero duration means the limiter is disabled. Translate that into an ~infinite duration
88        // so the limiter doesn't have to worry about the special case.
89        if interval.is_zero() {
90            interval = Duration::MAX;
91        }
92
93        let memory_limit =
94            f64::cast_lossy(self.base_memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
95        let memory_limit = usize::cast_lossy(memory_limit);
96
97        let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
98        let burst_budget = usize::cast_lossy(burst_budget);
99
100        self.effective_memory_limit = memory_limit;
101
102        self.config_tx
103            .send(LimiterConfig {
104                interval,
105                memory_limit,
106                burst_budget,
107            })
108            .expect("limiter task never shuts down");
109    }
110}
111
112/// Configuration for an memory limiter task.
113#[derive(Clone, Copy, Debug, PartialEq, Eq)]
114struct LimiterConfig {
115    /// The interval at which memory usage is checked against the memory limit.
116    interval: Duration,
117    /// The memory limit.
118    memory_limit: usize,
119    /// Budget to allow memory usage above the memory limit, in byte-seconds.
120    burst_budget: usize,
121}
122
123impl LimiterConfig {
124    /// Return a config that disables the memory limiter.
125    fn disabled() -> Self {
126        Self {
127            interval: Duration::MAX,
128            memory_limit: 0,
129            burst_budget: 0,
130        }
131    }
132}
133
134/// A task that enforces configured memory limits.
135///
136/// The task operates by performing limit checks at a configured interval. For each check it
137/// obtains the current memory utilization from proc stats. It then compares the utilization against
138/// the configured memory limit, and if it exceeds the limit, reduces the burst budget by the amount
139/// of memory utilization that exceeds the limit. If the burst budget is exhausted, the limiter
140/// terminates the process.
141struct LimiterTask {
142    /// The current limiter configuration.
143    config: LimiterConfig,
144    /// The amount of burst budget remaining.
145    burst_budget_remaining: usize,
146    /// The time of the last check.
147    last_check: Instant,
148    /// Metrics tracked by the limiter task.
149    metrics: LimiterMetrics,
150}
151
152impl LimiterTask {
153    async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
154        info!("running memory limiter task");
155
156        let mut task = Self {
157            config: LimiterConfig::disabled(),
158            burst_budget_remaining: 0,
159            last_check: Instant::now(),
160            metrics,
161        };
162
163        loop {
164            tokio::select! {
165                _ = task.tick() => {
166                    let start = Instant::now();
167
168                    if let Err(err) = task.check() {
169                        error!("memory limit check failed: {err}");
170                    }
171
172                    let elapsed = start.elapsed();
173                    task.metrics.duration.observe(elapsed.as_secs_f64());
174                }
175                Some(config) = config_rx.recv() => task.apply_config(config),
176            }
177        }
178    }
179
180    /// Wait until the next check time.
181    async fn tick(&self) {
182        let elapsed = self.last_check.elapsed();
183        let duration = self.config.interval.saturating_sub(elapsed);
184        tokio::time::sleep(duration).await
185    }
186
187    fn current_utilization() -> anyhow::Result<ProcStatus> {
188        match ProcStatus::from_proc() {
189            Ok(status) => Ok(status),
190            #[cfg(target_os = "linux")]
191            Err(err) => {
192                error!("failed to read /proc/self/status: {err}");
193                Err(err)
194            }
195            #[cfg(not(target_os = "linux"))]
196            Err(_err) => Ok(ProcStatus {
197                vm_rss: 0,
198                vm_swap: 0,
199            }),
200        }
201    }
202
203    /// Perform a memory usage check, terminating the process if the configured limits are exceeded.
204    fn check(&mut self) -> Result<(), anyhow::Error> {
205        debug!("checking memory limits");
206
207        let ProcStatus { vm_rss, vm_swap } = Self::current_utilization()?;
208
209        let memory_limit = self.config.memory_limit;
210        let burst_budget_remaining = self.burst_budget_remaining;
211
212        let memory_usage = vm_rss + vm_swap;
213
214        debug!(
215            memory_usage,
216            memory_limit, burst_budget_remaining, vm_rss, vm_swap, "memory utilization check",
217        );
218
219        self.metrics.vm_rss.set(u64::cast_from(vm_rss));
220        self.metrics.vm_swap.set(u64::cast_from(vm_swap));
221        self.metrics.memory_usage.set(u64::cast_from(memory_usage));
222        self.metrics
223            .burst_budget
224            .set(u64::cast_from(burst_budget_remaining));
225
226        if memory_usage > memory_limit {
227            // Calculate excess usage in byte-seconds.
228            let elapsed = self.last_check.elapsed().as_secs_f64();
229            let excess = memory_usage - memory_limit;
230            let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
231
232            if burst_budget_remaining >= excess_bs {
233                self.burst_budget_remaining -= excess_bs;
234            } else {
235                // Burst budget exhausted, terminate the process.
236                warn!(
237                    memory_usage,
238                    memory_limit, "memory utilization exceeded configured limits",
239                );
240                // We terminate with a recognizable exit code so the orchestrator knows the
241                // termination was caused by exceeding memory limits, as opposed to another,
242                // unexpected cause.
243                mz_ore::process::exit_thread_safe(167);
244            }
245        } else {
246            // Reset burst budget if under limit.
247            self.burst_budget_remaining = self.config.burst_budget;
248        }
249
250        self.last_check = Instant::now();
251        Ok(())
252    }
253
254    /// Apply a new limiter config.
255    fn apply_config(&mut self, config: LimiterConfig) {
256        if config == self.config {
257            return; // no-op config change
258        }
259
260        info!(?config, "applying memory limiter config");
261        self.config = config;
262        self.burst_budget_remaining = config.burst_budget;
263        // Reset `last_check` so the next `check` measures the excess interval
264        // against the new config, not against a stale checkpoint left over from
265        // a period when the limiter was disabled or running with a different
266        // interval. Without this, enabling the limiter at runtime can charge an
267        // arbitrarily large excess byte-second amount against the burst budget
268        // and immediately terminate the process.
269        self.last_check = Instant::now();
270
271        self.metrics
272            .memory_limit
273            .set(u64::cast_from(config.memory_limit));
274    }
275}
276
277struct LimiterMetrics {
278    duration: Histogram,
279    memory_limit: UIntGauge,
280    memory_usage: UIntGauge,
281    vm_rss: UIntGauge,
282    vm_swap: UIntGauge,
283    burst_budget: UIntGauge,
284}
285
286impl LimiterMetrics {
287    fn new(registry: &MetricsRegistry) -> Self {
288        Self {
289            duration: registry.register(metric!(
290                name: "mz_memory_limiter_duration_seconds",
291                help: "A histogram of the time it took to run the memory limiter.",
292                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
293            )),
294            memory_limit: registry.register(metric!(
295                name: "mz_memory_limiter_memory_limit_bytes",
296                help: "The configured memory limit.",
297            )),
298            memory_usage: registry.register(metric!(
299                name: "mz_memory_limiter_memory_usage_bytes",
300                help: "The current memory usage.",
301            )),
302            vm_rss: registry.register(metric!(
303                name: "mz_memory_limiter_vm_rss_bytes",
304                help: "The current VmRSS metric.",
305            )),
306            vm_swap: registry.register(metric!(
307                name: "mz_memory_limiter_vm_swap_bytes",
308                help: "The current VmSwap metric.",
309            )),
310            burst_budget: registry.register(metric!(
311                name: "mz_memory_limiter_burst_budget_byteseconds",
312                help: "The remaining memory burst budget.",
313            )),
314        }
315    }
316}
317
318/// Helper for reading and parsing `/proc/self/status` on Linux.
319pub struct ProcStatus {
320    /// Resident Set Size (RSS) in bytes.
321    pub vm_rss: usize,
322    /// Swap memory in bytes.
323    pub vm_swap: usize,
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    fn task_for_test() -> LimiterTask {
331        let registry = MetricsRegistry::new();
332        let metrics = LimiterMetrics::new(&registry);
333        LimiterTask {
334            config: LimiterConfig::disabled(),
335            burst_budget_remaining: 0,
336            last_check: Instant::now(),
337            metrics,
338        }
339    }
340
341    /// Regression test for an issue where enabling the limiter at runtime would
342    /// immediately terminate the process because `last_check` had not been
343    /// advanced since task startup. After `apply_config` the next `check` would
344    /// see a huge `elapsed`, multiply it with the over-limit memory amount, and
345    /// instantly exhaust the burst budget.
346    #[mz_ore::test]
347    fn apply_config_resets_last_check() {
348        let mut task = task_for_test();
349
350        // Simulate a long period during which the limiter was disabled and
351        // `last_check` was never refreshed.
352        let stale = Instant::now() - Duration::from_secs(3600);
353        task.last_check = stale;
354
355        let new_config = LimiterConfig {
356            interval: Duration::from_secs(1),
357            memory_limit: 1024,
358            burst_budget: 1024,
359        };
360        task.apply_config(new_config);
361
362        assert!(
363            task.last_check > stale,
364            "apply_config must refresh last_check to avoid charging stale elapsed time \
365             against the burst budget when the limiter is enabled at runtime"
366        );
367        assert!(
368            task.last_check.elapsed() < Duration::from_secs(60),
369            "last_check should be ~now after apply_config"
370        );
371    }
372
373    /// A no-op config change must not refresh `last_check`, otherwise repeated
374    /// no-op calls would silently delay the next limit check.
375    #[mz_ore::test]
376    fn apply_config_noop_keeps_last_check() {
377        let mut task = task_for_test();
378        let stale = Instant::now() - Duration::from_secs(60);
379        task.last_check = stale;
380
381        task.apply_config(LimiterConfig::disabled());
382
383        assert_eq!(task.last_check, stale);
384    }
385}
386
387impl ProcStatus {
388    /// Populate a new `ProcStatus` with information in /proc/self/status.
389    pub fn from_proc() -> anyhow::Result<Self> {
390        let contents = std::fs::read_to_string("/proc/self/status")?;
391        let mut vm_rss = 0;
392        let mut vm_swap = 0;
393
394        for line in contents.lines() {
395            if line.starts_with("VmRSS:") {
396                vm_rss = line
397                    .split_whitespace()
398                    .nth(1)
399                    .ok_or_else(|| anyhow::anyhow!("failed to parse VmRSS"))?
400                    .parse::<usize>()
401                    .context("failed to parse VmRSS")?
402                    * 1024
403            } else if line.starts_with("VmSwap:") {
404                vm_swap = line
405                    .split_whitespace()
406                    .nth(1)
407                    .ok_or_else(|| anyhow::anyhow!("failed to parse VmSwap"))?
408                    .parse::<usize>()
409                    .context("failed to parse VmSwap")?
410                    * 1024;
411            }
412        }
413
414        Ok(Self { vm_rss, vm_swap })
415    }
416}