mz_compute/
memory_limiter.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities to limit memory usage.
11//!
12//! In the context of this module, "memory" refers to the sum of physical memory and swap space .
13//! Other parts of the code usually don't include swap space when talking about "memory".
14
15use std::sync::Mutex;
16use std::time::{Duration, Instant};
17
18use anyhow::Context;
19use mz_compute_types::dyncfgs::{
20    MEMORY_LIMITER_BURST_FACTOR, MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS,
21};
22use mz_dyncfg::ConfigSet;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::metric;
25use mz_ore::metrics::{MetricsRegistry, UIntGauge};
26use prometheus::Histogram;
27use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
28use tracing::{debug, error, info, warn};
29
30/// A handle to the active memory limiter.
31///
32/// The limiter is initialized by a call to [`start_limiter`]. It spawns a process-global task that
33/// runs for the lifetime of the process.
34static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
35
36/// Start the process-global memory limiter.
37///
38/// # Panics
39///
40/// Panics if the limiter was already started previously.
41pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
42    let mut limiter = LIMITER.lock().expect("poisoned");
43
44    if limiter.is_some() {
45        panic!("memory limiter is already running");
46    }
47
48    let metrics = LimiterMetrics::new(metrics_registry);
49    let (config_tx, config_rx) = mpsc::unbounded_channel();
50
51    mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
52
53    *limiter = Some(Limiter {
54        memory_limit,
55        config_tx,
56    });
57}
58
59/// Apply the given configuration to the active limiter.
60pub fn apply_limiter_config(config: &ConfigSet) {
61    if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
62        limiter.apply_config(config);
63    }
64}
65
66/// A handle to a running memory limiter task.
67struct Limiter {
68    /// The process memory limit.
69    memory_limit: usize,
70    /// A sender for limiter configuration updates.
71    config_tx: UnboundedSender<LimiterConfig>,
72}
73
74impl Limiter {
75    /// Apply the given configuration to the limiter.
76    fn apply_config(&self, config: &ConfigSet) {
77        let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
78        // A zero duration means the limiter is disabled. Translate that into an ~infinite duration
79        // so the limiter doesn't have to worry about the special case.
80        if interval.is_zero() {
81            interval = Duration::MAX;
82        }
83
84        let memory_limit =
85            f64::cast_lossy(self.memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
86        let memory_limit = usize::cast_lossy(memory_limit);
87
88        let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
89        let burst_budget = usize::cast_lossy(burst_budget);
90
91        self.config_tx
92            .send(LimiterConfig {
93                interval,
94                memory_limit,
95                burst_budget,
96            })
97            .expect("limiter task never shuts down");
98    }
99}
100
101/// Configuration for an memory limiter task.
102#[derive(Clone, Copy, Debug, PartialEq, Eq)]
103struct LimiterConfig {
104    /// The interval at which memory usage is checked against the memory limit.
105    interval: Duration,
106    /// The memory limit.
107    memory_limit: usize,
108    /// Budget to allow memory usage above the memory limit, in byte-seconds.
109    burst_budget: usize,
110}
111
112impl LimiterConfig {
113    /// Return a config that disables the memory limiter.
114    fn disabled() -> Self {
115        Self {
116            interval: Duration::MAX,
117            memory_limit: 0,
118            burst_budget: 0,
119        }
120    }
121}
122
123/// A task that enforces configured memory limits.
124///
125/// The task operates by performing limit checks at a configured interval. For each check it
126/// obtains the current memory utilization from proc stats. It then compares the utilization against
127/// the configured memory limit, and if it exceeds the limit, reduces the burst budget by the amount
128/// of memory utilization that exceeds the limit. If the burst budget is exhausted, the limiter
129/// terminates the process.
130struct LimiterTask {
131    /// The current limiter configuration.
132    config: LimiterConfig,
133    /// The amount of burst budget remaining.
134    burst_budget_remaining: usize,
135    /// The time of the last check.
136    last_check: Instant,
137    /// Metrics tracked by the limiter task.
138    metrics: LimiterMetrics,
139}
140
141impl LimiterTask {
142    async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
143        info!("running memory limiter task");
144
145        let mut task = Self {
146            config: LimiterConfig::disabled(),
147            burst_budget_remaining: 0,
148            last_check: Instant::now(),
149            metrics,
150        };
151
152        loop {
153            tokio::select! {
154                _ = task.tick() => {
155                    let start = Instant::now();
156
157                    if let Err(err) = task.check() {
158                        error!("memory limit check failed: {err}");
159                    }
160
161                    let elapsed = start.elapsed();
162                    task.metrics.duration.observe(elapsed.as_secs_f64());
163                }
164                Some(config) = config_rx.recv() => task.apply_config(config),
165            }
166        }
167    }
168
169    /// Wait until the next check time.
170    async fn tick(&self) {
171        let elapsed = self.last_check.elapsed();
172        let duration = self.config.interval.saturating_sub(elapsed);
173        tokio::time::sleep(duration).await
174    }
175
176    fn current_utilization() -> anyhow::Result<ProcStatus> {
177        match ProcStatus::from_proc() {
178            Ok(status) => Ok(status),
179            #[cfg(target_os = "linux")]
180            Err(err) => {
181                error!("failed to read /proc/self/status: {err}");
182                Err(err)
183            }
184            #[cfg(not(target_os = "linux"))]
185            Err(_err) => Ok(ProcStatus {
186                vm_rss: 0,
187                vm_swap: 0,
188            }),
189        }
190    }
191
192    /// Perform a memory usage check, terminating the process if the configured limits are exceeded.
193    fn check(&mut self) -> Result<(), anyhow::Error> {
194        debug!("checking memory limits");
195
196        let ProcStatus { vm_rss, vm_swap } = Self::current_utilization()?;
197
198        let memory_limit = self.config.memory_limit;
199        let burst_budget_remaining = self.burst_budget_remaining;
200
201        let memory_usage = vm_rss + vm_swap;
202
203        debug!(
204            memory_usage,
205            memory_limit, burst_budget_remaining, vm_rss, vm_swap, "memory utilization check",
206        );
207
208        self.metrics.vm_rss.set(u64::cast_from(vm_rss));
209        self.metrics.vm_swap.set(u64::cast_from(vm_swap));
210        self.metrics.memory_usage.set(u64::cast_from(memory_usage));
211        self.metrics
212            .burst_budget
213            .set(u64::cast_from(burst_budget_remaining));
214
215        if memory_usage > memory_limit {
216            // Calculate excess usage in byte-seconds.
217            let elapsed = self.last_check.elapsed().as_secs_f64();
218            let excess = memory_usage - memory_limit;
219            let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
220
221            if burst_budget_remaining >= excess_bs {
222                self.burst_budget_remaining -= excess_bs;
223            } else {
224                // Burst budget exhausted, terminate the process.
225                warn!(
226                    memory_usage,
227                    memory_limit, "memory utilization exceeded configured limits",
228                );
229                // We terminate with a recognizable exit code so the orchestrator knows the
230                // termination was caused by exceeding memory limits, as opposed to another,
231                // unexpected cause.
232                mz_ore::process::exit_thread_safe(167);
233            }
234        } else {
235            // Reset burst budget if under limit.
236            self.burst_budget_remaining = self.config.burst_budget;
237        }
238
239        self.last_check = Instant::now();
240        Ok(())
241    }
242
243    /// Apply a new limiter config.
244    fn apply_config(&mut self, config: LimiterConfig) {
245        if config == self.config {
246            return; // no-op config change
247        }
248
249        info!(?config, "applying memory limiter config");
250        self.config = config;
251        self.burst_budget_remaining = config.burst_budget;
252
253        self.metrics
254            .memory_limit
255            .set(u64::cast_from(config.memory_limit));
256    }
257}
258
259struct LimiterMetrics {
260    duration: Histogram,
261    memory_limit: UIntGauge,
262    memory_usage: UIntGauge,
263    vm_rss: UIntGauge,
264    vm_swap: UIntGauge,
265    burst_budget: UIntGauge,
266}
267
268impl LimiterMetrics {
269    fn new(registry: &MetricsRegistry) -> Self {
270        Self {
271            duration: registry.register(metric!(
272                name: "mz_memory_limiter_duration_seconds",
273                help: "A histogram of the time it took to run the memory limiter.",
274                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
275            )),
276            memory_limit: registry.register(metric!(
277                name: "mz_memory_limiter_memory_limit_bytes",
278                help: "The configured memory limit.",
279            )),
280            memory_usage: registry.register(metric!(
281                name: "mz_memory_limiter_memory_usage_bytes",
282                help: "The current memory usage.",
283            )),
284            vm_rss: registry.register(metric!(
285                name: "mz_memory_limiter_vm_rss_bytes",
286                help: "The current VmRSS metric.",
287            )),
288            vm_swap: registry.register(metric!(
289                name: "mz_memory_limiter_vm_swap_bytes",
290                help: "The current VmSwap metric.",
291            )),
292            burst_budget: registry.register(metric!(
293                name: "mz_memory_limiter_burst_budget_byteseconds",
294                help: "The remaining memory burst budget.",
295            )),
296        }
297    }
298}
299
300/// Helper for reading and parsing `/proc/self/status` on Linux.
301pub struct ProcStatus {
302    /// Resident Set Size (RSS) in bytes.
303    pub vm_rss: usize,
304    /// Swap memory in bytes.
305    pub vm_swap: usize,
306}
307
308impl ProcStatus {
309    /// Populate a new `ProcStatus` with information in /proc/self/status.
310    pub fn from_proc() -> anyhow::Result<Self> {
311        let contents = std::fs::read_to_string("/proc/self/status")?;
312        let mut vm_rss = 0;
313        let mut vm_swap = 0;
314
315        for line in contents.lines() {
316            if line.starts_with("VmRSS:") {
317                vm_rss = line
318                    .split_whitespace()
319                    .nth(1)
320                    .ok_or_else(|| anyhow::anyhow!("failed to parse VmRSS"))?
321                    .parse::<usize>()
322                    .context("failed to parse VmRSS")?
323                    * 1024
324            } else if line.starts_with("VmSwap:") {
325                vm_swap = line
326                    .split_whitespace()
327                    .nth(1)
328                    .ok_or_else(|| anyhow::anyhow!("failed to parse VmSwap"))?
329                    .parse::<usize>()
330                    .context("failed to parse VmSwap")?
331                    * 1024;
332            }
333        }
334
335        Ok(Self { vm_rss, vm_swap })
336    }
337}