mz_compute/
lgalloc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities to operate lgalloc.
11
12use std::sync::Mutex;
13use std::time::{Duration, Instant};
14
15use mz_compute_types::dyncfgs::{
16    LGALLOC_LIMITER_BURST_FACTOR, LGALLOC_LIMITER_INTERVAL, LGALLOC_LIMITER_USAGE_BIAS,
17    LGALLOC_LIMITER_USAGE_FACTOR,
18};
19use mz_dyncfg::ConfigSet;
20use mz_ore::cast::{CastFrom, CastLossy};
21use mz_ore::metric;
22use mz_ore::metrics::{MetricsRegistry, UIntGauge};
23use prometheus::Histogram;
24use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
25use tracing::{debug, error, info, warn};
26
27/// A handle to the active lgalloc limiter.
28///
29/// The limiter is initialized by a call to [`start_limiter`]. It spawns a process-global task that
30/// runs for the lifetime of the process.
31static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
32
33/// Start the process-global lgalloc limiter.
34///
35/// # Panics
36///
37/// Panics if the limiter was already started previously.
38pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
39    let mut limiter = LIMITER.lock().expect("poisoned");
40
41    if limiter.is_some() {
42        panic!("lgalloc limiter is already running");
43    }
44
45    let metrics = LimiterMetrics::new(metrics_registry);
46    let (config_tx, config_rx) = mpsc::unbounded_channel();
47
48    mz_ore::task::spawn(|| "lgalloc-limiter", LimiterTask::run(config_rx, metrics));
49
50    *limiter = Some(Limiter {
51        memory_limit,
52        config_tx,
53    });
54}
55
56/// Apply the given configuration to the active limiter.
57pub fn apply_limiter_config(config: &ConfigSet) {
58    if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
59        limiter.apply_config(config);
60    }
61}
62
63/// A handle to a running lgalloc limiter task.
64struct Limiter {
65    /// The process memory limit.
66    memory_limit: usize,
67    /// A sender for limiter configuration updates.
68    config_tx: UnboundedSender<LimiterConfig>,
69}
70
71impl Limiter {
72    /// Apply the given configuration to the limiter.
73    fn apply_config(&self, config: &ConfigSet) {
74        let mut interval = LGALLOC_LIMITER_INTERVAL.get(config);
75        // A zero duration means the limiter is disabled. Translate that into an ~infinite duration
76        // so the limiter doesn't have to worry about the special case.
77        if interval.is_zero() {
78            interval = Duration::MAX;
79        }
80
81        let disk_limit = f64::cast_lossy(self.memory_limit)
82            * LGALLOC_LIMITER_USAGE_FACTOR.get(config)
83            * LGALLOC_LIMITER_USAGE_BIAS.get(config);
84        let disk_limit = usize::cast_lossy(disk_limit);
85
86        let burst_budget = f64::cast_lossy(disk_limit) * LGALLOC_LIMITER_BURST_FACTOR.get(config);
87        let burst_budget = usize::cast_lossy(burst_budget);
88
89        self.config_tx
90            .send(LimiterConfig {
91                interval,
92                disk_limit,
93                burst_budget,
94            })
95            .expect("limiter task never shuts down");
96    }
97}
98
99/// Configuration for an lgalloc limiter task.
100#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101struct LimiterConfig {
102    /// The interval at which disk usage is checked against the disk limit.
103    interval: Duration,
104    /// The lgalloc disk limit.
105    disk_limit: usize,
106    /// Budget to allow disk usage above the disk limit, in byte-seconds.
107    burst_budget: usize,
108}
109
110impl LimiterConfig {
111    /// Return a config that disables the disk limiter.
112    fn disabled() -> Self {
113        Self {
114            interval: Duration::MAX,
115            disk_limit: 0,
116            burst_budget: 0,
117        }
118    }
119}
120
121/// A task that enforces configured lgalloc disk limits.
122///
123/// The task operates by performing limit checks at a configured interval. For each check it
124/// obtains the current disk utilization from lgalloc file stats, summed across all files and size
125/// classes. It then compares the disk utilization against the configured disk limit, and if it
126/// exceeds the limit, reduces the burst budget by the amount of disk utilization that exceeds the
127/// limit. If the burst budget is exhausted, the limiter terminates the process.
128struct LimiterTask {
129    /// The current limiter configuration.
130    config: LimiterConfig,
131    /// The amount of burst budget remaining.
132    burst_budget_remaining: usize,
133    /// The time of the last check.
134    last_check: Instant,
135    /// Metrics tracked by the limiter task.
136    metrics: LimiterMetrics,
137}
138
139impl LimiterTask {
140    async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
141        info!("running lgalloc limiter task");
142
143        let mut task = Self {
144            config: LimiterConfig::disabled(),
145            burst_budget_remaining: 0,
146            last_check: Instant::now(),
147            metrics,
148        };
149
150        loop {
151            tokio::select! {
152                _ = task.tick() => {
153                    let start = Instant::now();
154
155                    if let Err(err) = task.check() {
156                        error!("lgalloc limit check failed: {err}");
157                    }
158
159                    let elapsed = start.elapsed();
160                    task.metrics.duration.observe(elapsed.as_secs_f64());
161                }
162                Some(config) = config_rx.recv() => task.apply_config(config),
163            }
164        }
165    }
166
167    /// Wait until the next check time.
168    async fn tick(&self) {
169        let elapsed = self.last_check.elapsed();
170        let duration = self.config.interval.saturating_sub(elapsed);
171        tokio::time::sleep(duration).await
172    }
173
174    /// Perform a disk usage check, terminating the process if the configured limits are exceeded.
175    fn check(&mut self) -> Result<(), anyhow::Error> {
176        debug!("checking lgalloc limits");
177
178        let mut disk_usage = 0;
179        let file_stats = lgalloc::lgalloc_stats().file;
180        for (_size_class, stat) in file_stats {
181            disk_usage += stat?.allocated_size;
182        }
183
184        let disk_limit = self.config.disk_limit;
185        let burst_budget_remaining = self.burst_budget_remaining;
186
187        debug!(disk_usage, disk_limit, burst_budget_remaining);
188
189        self.metrics.disk_usage.set(u64::cast_from(disk_usage));
190        self.metrics
191            .burst_budget
192            .set(u64::cast_from(burst_budget_remaining));
193
194        if disk_usage > disk_limit {
195            // Calculate excess usage in byte-seconds.
196            let elapsed = self.last_check.elapsed().as_secs_f64();
197            let excess = disk_usage - disk_limit;
198            let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
199
200            if burst_budget_remaining >= excess_bs {
201                self.burst_budget_remaining -= excess_bs;
202            } else {
203                // Burst budget exhausted, terminate the process.
204                warn!(
205                    disk_usage,
206                    disk_limit, "lgalloc disk utilization exceeded configured limits",
207                );
208                // We terminate with a recognizable exit code so the orchestrator knows the
209                // termination was caused by exceeding disk limits, as opposed to another,
210                // unexpected cause.
211                mz_ore::process::exit_thread_safe(167);
212            }
213        } else {
214            // Reset burst budget if under limit.
215            self.burst_budget_remaining = self.config.burst_budget;
216        }
217
218        self.last_check = Instant::now();
219        Ok(())
220    }
221
222    /// Apply a new limiter config.
223    fn apply_config(&mut self, config: LimiterConfig) {
224        if config == self.config {
225            return; // no-op config change
226        }
227
228        info!(?config, "applying lgalloc limiter config");
229        self.config = config;
230        self.burst_budget_remaining = config.burst_budget;
231
232        self.metrics
233            .disk_limit
234            .set(u64::cast_from(config.disk_limit));
235    }
236}
237
238struct LimiterMetrics {
239    duration: Histogram,
240    disk_limit: UIntGauge,
241    disk_usage: UIntGauge,
242    burst_budget: UIntGauge,
243}
244
245impl LimiterMetrics {
246    fn new(registry: &MetricsRegistry) -> Self {
247        Self {
248            duration: registry.register(metric!(
249                name: "mz_lgalloc_limiter_duration_seconds",
250                help: "A histogram of the time it took to run the lgalloc limiter.",
251                buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
252            )),
253            disk_limit: registry.register(metric!(
254                name: "mz_lgalloc_limiter_disk_limit_bytes",
255                help: "The configured lgalloc disk limit.",
256            )),
257            disk_usage: registry.register(metric!(
258                name: "mz_lgalloc_limiter_disk_usage_bytes",
259                help: "The current lgalloc disk usage.",
260            )),
261            burst_budget: registry.register(metric!(
262                name: "mz_lgalloc_limiter_burst_budget_byteseconds",
263                help: "The remaining lgalloc burst budget.",
264            )),
265        }
266    }
267}