1use std::sync::Mutex;
16use std::time::{Duration, Instant};
17
18use anyhow::Context;
19use mz_compute_types::dyncfgs::{
20 MEMORY_LIMITER_BURST_FACTOR, MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS,
21};
22use mz_dyncfg::ConfigSet;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::metric;
25use mz_ore::metrics::{MetricsRegistry, UIntGauge};
26use prometheus::Histogram;
27use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
28use tracing::{debug, error, info, warn};
29
30static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
35
36pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
42 let mut limiter = LIMITER.lock().expect("poisoned");
43
44 if limiter.is_some() {
45 panic!("memory limiter is already running");
46 }
47
48 let metrics = LimiterMetrics::new(metrics_registry);
49 let (config_tx, config_rx) = mpsc::unbounded_channel();
50
51 mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
52
53 *limiter = Some(Limiter {
54 base_memory_limit: memory_limit,
55 effective_memory_limit: memory_limit,
56 config_tx,
57 });
58}
59
60pub fn apply_limiter_config(config: &ConfigSet) {
62 if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
63 limiter.apply_config(config);
64 }
65}
66
67pub fn get_memory_limit() -> Option<usize> {
69 let limiter = LIMITER.lock().expect("poisoned");
70 limiter.as_ref().map(|l| l.effective_memory_limit)
71}
72
73struct Limiter {
75 base_memory_limit: usize,
77 effective_memory_limit: usize,
79 config_tx: UnboundedSender<LimiterConfig>,
81}
82
83impl Limiter {
84 fn apply_config(&mut self, config: &ConfigSet) {
86 let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
87 if interval.is_zero() {
90 interval = Duration::MAX;
91 }
92
93 let memory_limit =
94 f64::cast_lossy(self.base_memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
95 let memory_limit = usize::cast_lossy(memory_limit);
96
97 let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
98 let burst_budget = usize::cast_lossy(burst_budget);
99
100 self.effective_memory_limit = memory_limit;
101
102 self.config_tx
103 .send(LimiterConfig {
104 interval,
105 memory_limit,
106 burst_budget,
107 })
108 .expect("limiter task never shuts down");
109 }
110}
111
112#[derive(Clone, Copy, Debug, PartialEq, Eq)]
114struct LimiterConfig {
115 interval: Duration,
117 memory_limit: usize,
119 burst_budget: usize,
121}
122
123impl LimiterConfig {
124 fn disabled() -> Self {
126 Self {
127 interval: Duration::MAX,
128 memory_limit: 0,
129 burst_budget: 0,
130 }
131 }
132}
133
134struct LimiterTask {
142 config: LimiterConfig,
144 burst_budget_remaining: usize,
146 last_check: Instant,
148 metrics: LimiterMetrics,
150}
151
152impl LimiterTask {
153 async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
154 info!("running memory limiter task");
155
156 let mut task = Self {
157 config: LimiterConfig::disabled(),
158 burst_budget_remaining: 0,
159 last_check: Instant::now(),
160 metrics,
161 };
162
163 loop {
164 tokio::select! {
165 _ = task.tick() => {
166 let start = Instant::now();
167
168 if let Err(err) = task.check() {
169 error!("memory limit check failed: {err}");
170 }
171
172 let elapsed = start.elapsed();
173 task.metrics.duration.observe(elapsed.as_secs_f64());
174 }
175 Some(config) = config_rx.recv() => task.apply_config(config),
176 }
177 }
178 }
179
180 async fn tick(&self) {
182 let elapsed = self.last_check.elapsed();
183 let duration = self.config.interval.saturating_sub(elapsed);
184 tokio::time::sleep(duration).await
185 }
186
187 fn current_utilization() -> anyhow::Result<ProcStatus> {
188 match ProcStatus::from_proc() {
189 Ok(status) => Ok(status),
190 #[cfg(target_os = "linux")]
191 Err(err) => {
192 error!("failed to read /proc/self/status: {err}");
193 Err(err)
194 }
195 #[cfg(not(target_os = "linux"))]
196 Err(_err) => Ok(ProcStatus {
197 vm_rss: 0,
198 vm_swap: 0,
199 }),
200 }
201 }
202
203 fn check(&mut self) -> Result<(), anyhow::Error> {
205 debug!("checking memory limits");
206
207 let ProcStatus { vm_rss, vm_swap } = Self::current_utilization()?;
208
209 let memory_limit = self.config.memory_limit;
210 let burst_budget_remaining = self.burst_budget_remaining;
211
212 let memory_usage = vm_rss + vm_swap;
213
214 debug!(
215 memory_usage,
216 memory_limit, burst_budget_remaining, vm_rss, vm_swap, "memory utilization check",
217 );
218
219 self.metrics.vm_rss.set(u64::cast_from(vm_rss));
220 self.metrics.vm_swap.set(u64::cast_from(vm_swap));
221 self.metrics.memory_usage.set(u64::cast_from(memory_usage));
222 self.metrics
223 .burst_budget
224 .set(u64::cast_from(burst_budget_remaining));
225
226 if memory_usage > memory_limit {
227 let elapsed = self.last_check.elapsed().as_secs_f64();
229 let excess = memory_usage - memory_limit;
230 let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
231
232 if burst_budget_remaining >= excess_bs {
233 self.burst_budget_remaining -= excess_bs;
234 } else {
235 warn!(
237 memory_usage,
238 memory_limit, "memory utilization exceeded configured limits",
239 );
240 mz_ore::process::exit_thread_safe(167);
244 }
245 } else {
246 self.burst_budget_remaining = self.config.burst_budget;
248 }
249
250 self.last_check = Instant::now();
251 Ok(())
252 }
253
254 fn apply_config(&mut self, config: LimiterConfig) {
256 if config == self.config {
257 return; }
259
260 info!(?config, "applying memory limiter config");
261 self.config = config;
262 self.burst_budget_remaining = config.burst_budget;
263 self.last_check = Instant::now();
270
271 self.metrics
272 .memory_limit
273 .set(u64::cast_from(config.memory_limit));
274 }
275}
276
277struct LimiterMetrics {
278 duration: Histogram,
279 memory_limit: UIntGauge,
280 memory_usage: UIntGauge,
281 vm_rss: UIntGauge,
282 vm_swap: UIntGauge,
283 burst_budget: UIntGauge,
284}
285
286impl LimiterMetrics {
287 fn new(registry: &MetricsRegistry) -> Self {
288 Self {
289 duration: registry.register(metric!(
290 name: "mz_memory_limiter_duration_seconds",
291 help: "A histogram of the time it took to run the memory limiter.",
292 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
293 )),
294 memory_limit: registry.register(metric!(
295 name: "mz_memory_limiter_memory_limit_bytes",
296 help: "The configured memory limit.",
297 )),
298 memory_usage: registry.register(metric!(
299 name: "mz_memory_limiter_memory_usage_bytes",
300 help: "The current memory usage.",
301 )),
302 vm_rss: registry.register(metric!(
303 name: "mz_memory_limiter_vm_rss_bytes",
304 help: "The current VmRSS metric.",
305 )),
306 vm_swap: registry.register(metric!(
307 name: "mz_memory_limiter_vm_swap_bytes",
308 help: "The current VmSwap metric.",
309 )),
310 burst_budget: registry.register(metric!(
311 name: "mz_memory_limiter_burst_budget_byteseconds",
312 help: "The remaining memory burst budget.",
313 )),
314 }
315 }
316}
317
318pub struct ProcStatus {
320 pub vm_rss: usize,
322 pub vm_swap: usize,
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 fn task_for_test() -> LimiterTask {
331 let registry = MetricsRegistry::new();
332 let metrics = LimiterMetrics::new(®istry);
333 LimiterTask {
334 config: LimiterConfig::disabled(),
335 burst_budget_remaining: 0,
336 last_check: Instant::now(),
337 metrics,
338 }
339 }
340
341 #[mz_ore::test]
347 fn apply_config_resets_last_check() {
348 let mut task = task_for_test();
349
350 let stale = Instant::now() - Duration::from_secs(3600);
353 task.last_check = stale;
354
355 let new_config = LimiterConfig {
356 interval: Duration::from_secs(1),
357 memory_limit: 1024,
358 burst_budget: 1024,
359 };
360 task.apply_config(new_config);
361
362 assert!(
363 task.last_check > stale,
364 "apply_config must refresh last_check to avoid charging stale elapsed time \
365 against the burst budget when the limiter is enabled at runtime"
366 );
367 assert!(
368 task.last_check.elapsed() < Duration::from_secs(60),
369 "last_check should be ~now after apply_config"
370 );
371 }
372
373 #[mz_ore::test]
376 fn apply_config_noop_keeps_last_check() {
377 let mut task = task_for_test();
378 let stale = Instant::now() - Duration::from_secs(60);
379 task.last_check = stale;
380
381 task.apply_config(LimiterConfig::disabled());
382
383 assert_eq!(task.last_check, stale);
384 }
385}
386
387impl ProcStatus {
388 pub fn from_proc() -> anyhow::Result<Self> {
390 let contents = std::fs::read_to_string("/proc/self/status")?;
391 let mut vm_rss = 0;
392 let mut vm_swap = 0;
393
394 for line in contents.lines() {
395 if line.starts_with("VmRSS:") {
396 vm_rss = line
397 .split_whitespace()
398 .nth(1)
399 .ok_or_else(|| anyhow::anyhow!("failed to parse VmRSS"))?
400 .parse::<usize>()
401 .context("failed to parse VmRSS")?
402 * 1024
403 } else if line.starts_with("VmSwap:") {
404 vm_swap = line
405 .split_whitespace()
406 .nth(1)
407 .ok_or_else(|| anyhow::anyhow!("failed to parse VmSwap"))?
408 .parse::<usize>()
409 .context("failed to parse VmSwap")?
410 * 1024;
411 }
412 }
413
414 Ok(Self { vm_rss, vm_swap })
415 }
416}