1use std::sync::Mutex;
13use std::time::{Duration, Instant};
14
15use anyhow::Context;
16use mz_compute_types::dyncfgs::{
17 MEMORY_LIMITER_BURST_FACTOR, MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS,
18 MEMORY_LIMITER_USAGE_FACTOR,
19};
20use mz_dyncfg::ConfigSet;
21use mz_ore::cast::{CastFrom, CastLossy};
22use mz_ore::metric;
23use mz_ore::metrics::{MetricsRegistry, UIntGauge};
24use prometheus::Histogram;
25use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
26use tracing::{debug, error, info, warn};
27
28static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
33
34pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
40 let mut limiter = LIMITER.lock().expect("poisoned");
41
42 if limiter.is_some() {
43 panic!("memory limiter is already running");
44 }
45
46 let metrics = LimiterMetrics::new(metrics_registry);
47 let (config_tx, config_rx) = mpsc::unbounded_channel();
48
49 mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
50
51 *limiter = Some(Limiter {
52 memory_limit,
53 config_tx,
54 });
55}
56
57pub fn apply_limiter_config(config: &ConfigSet) {
59 if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
60 limiter.apply_config(config);
61 }
62}
63
64struct Limiter {
66 memory_limit: usize,
68 config_tx: UnboundedSender<LimiterConfig>,
70}
71
72impl Limiter {
73 fn apply_config(&self, config: &ConfigSet) {
75 let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
76 if interval.is_zero() {
79 interval = Duration::MAX;
80 }
81
82 let memory_limit = f64::cast_lossy(self.memory_limit)
83 * MEMORY_LIMITER_USAGE_FACTOR.get(config)
84 * MEMORY_LIMITER_USAGE_BIAS.get(config);
85 let memory_limit = usize::cast_lossy(memory_limit);
86
87 let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
88 let burst_budget = usize::cast_lossy(burst_budget);
89
90 self.config_tx
91 .send(LimiterConfig {
92 interval,
93 memory_limit,
94 burst_budget,
95 })
96 .expect("limiter task never shuts down");
97 }
98}
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq)]
102struct LimiterConfig {
103 interval: Duration,
105 memory_limit: usize,
107 burst_budget: usize,
109}
110
111impl LimiterConfig {
112 fn disabled() -> Self {
114 Self {
115 interval: Duration::MAX,
116 memory_limit: 0,
117 burst_budget: 0,
118 }
119 }
120}
121
122struct LimiterTask {
130 config: LimiterConfig,
132 burst_budget_remaining: usize,
134 last_check: Instant,
136 metrics: LimiterMetrics,
138}
139
140impl LimiterTask {
141 async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
142 info!("running memory limiter task");
143
144 let mut task = Self {
145 config: LimiterConfig::disabled(),
146 burst_budget_remaining: 0,
147 last_check: Instant::now(),
148 metrics,
149 };
150
151 loop {
152 tokio::select! {
153 _ = task.tick() => {
154 let start = Instant::now();
155
156 if let Err(err) = task.check() {
157 error!("memory limit check failed: {err}");
158 }
159
160 let elapsed = start.elapsed();
161 task.metrics.duration.observe(elapsed.as_secs_f64());
162 }
163 Some(config) = config_rx.recv() => task.apply_config(config),
164 }
165 }
166 }
167
168 async fn tick(&self) {
170 let elapsed = self.last_check.elapsed();
171 let duration = self.config.interval.saturating_sub(elapsed);
172 tokio::time::sleep(duration).await
173 }
174
175 fn current_utilization() -> anyhow::Result<ProcStatus> {
176 match ProcStatus::from_proc() {
177 Ok(status) => Ok(status),
178 #[cfg(target_os = "linux")]
179 Err(err) => {
180 error!("failed to read /proc/self/status: {err}");
181 Err(err)
182 }
183 #[cfg(not(target_os = "linux"))]
184 Err(_err) => Ok(ProcStatus {
185 vm_rss: 0,
186 vm_swap: 0,
187 }),
188 }
189 }
190
191 fn check(&mut self) -> Result<(), anyhow::Error> {
193 debug!("checking memory limits");
194
195 let ProcStatus { vm_rss, vm_swap } = Self::current_utilization()?;
196
197 let memory_limit = self.config.memory_limit;
198 let burst_budget_remaining = self.burst_budget_remaining;
199
200 let memory_usage = vm_rss + vm_swap;
201
202 debug!(
203 memory_usage,
204 memory_limit, burst_budget_remaining, vm_rss, vm_swap, "memory utilization check",
205 );
206
207 self.metrics.vm_rss.set(u64::cast_from(vm_rss));
208 self.metrics.vm_swap.set(u64::cast_from(vm_swap));
209 self.metrics.memory_usage.set(u64::cast_from(memory_usage));
210 self.metrics
211 .burst_budget
212 .set(u64::cast_from(burst_budget_remaining));
213
214 if memory_usage > memory_limit {
215 let elapsed = self.last_check.elapsed().as_secs_f64();
217 let excess = memory_usage - memory_limit;
218 let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
219
220 if burst_budget_remaining >= excess_bs {
221 self.burst_budget_remaining -= excess_bs;
222 } else {
223 warn!(
225 memory_usage,
226 memory_limit, "memory utilization exceeded configured limits",
227 );
228 mz_ore::process::exit_thread_safe(167);
232 }
233 } else {
234 self.burst_budget_remaining = self.config.burst_budget;
236 }
237
238 self.last_check = Instant::now();
239 Ok(())
240 }
241
242 fn apply_config(&mut self, config: LimiterConfig) {
244 if config == self.config {
245 return; }
247
248 info!(?config, "applying memory limiter config");
249 self.config = config;
250 self.burst_budget_remaining = config.burst_budget;
251
252 self.metrics
253 .memory_limit
254 .set(u64::cast_from(config.memory_limit));
255 }
256}
257
258struct LimiterMetrics {
259 duration: Histogram,
260 memory_limit: UIntGauge,
261 memory_usage: UIntGauge,
262 vm_rss: UIntGauge,
263 vm_swap: UIntGauge,
264 burst_budget: UIntGauge,
265}
266
267impl LimiterMetrics {
268 fn new(registry: &MetricsRegistry) -> Self {
269 Self {
270 duration: registry.register(metric!(
271 name: "mz_memory_limiter_duration_seconds",
272 help: "A histogram of the time it took to run the memory limiter.",
273 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
274 )),
275 memory_limit: registry.register(metric!(
276 name: "mz_memory_limiter_memory_limit_bytes",
277 help: "The configured memory limit.",
278 )),
279 memory_usage: registry.register(metric!(
280 name: "mz_memory_limiter_memory_usage_bytes",
281 help: "The current memory usage.",
282 )),
283 vm_rss: registry.register(metric!(
284 name: "mz_memory_limiter_vm_rss_bytes",
285 help: "The current VmRSS metric.",
286 )),
287 vm_swap: registry.register(metric!(
288 name: "mz_memory_limiter_vm_swap_bytes",
289 help: "The current VmSwap metric.",
290 )),
291 burst_budget: registry.register(metric!(
292 name: "mz_memory_limiter_burst_budget_byteseconds",
293 help: "The remaining memory burst budget.",
294 )),
295 }
296 }
297}
298
299pub struct ProcStatus {
301 pub vm_rss: usize,
303 pub vm_swap: usize,
305}
306
307impl ProcStatus {
308 pub fn from_proc() -> anyhow::Result<Self> {
310 let contents = std::fs::read_to_string("/proc/self/status")?;
311 let mut vm_rss = 0;
312 let mut vm_swap = 0;
313
314 for line in contents.lines() {
315 if line.starts_with("VmRSS:") {
316 vm_rss = line
317 .split_whitespace()
318 .nth(1)
319 .ok_or_else(|| anyhow::anyhow!("failed to parse VmRSS"))?
320 .parse::<usize>()
321 .context("failed to parse VmRSS")?
322 * 1024
323 } else if line.starts_with("VmSwap:") {
324 vm_swap = line
325 .split_whitespace()
326 .nth(1)
327 .ok_or_else(|| anyhow::anyhow!("failed to parse VmSwap"))?
328 .parse::<usize>()
329 .context("failed to parse VmSwap")?
330 * 1024;
331 }
332 }
333
334 Ok(Self { vm_rss, vm_swap })
335 }
336}