1use std::sync::Mutex;
16use std::time::{Duration, Instant};
17
18use anyhow::Context;
19use mz_compute_types::dyncfgs::{
20 MEMORY_LIMITER_BURST_FACTOR, MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS,
21};
22use mz_dyncfg::ConfigSet;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::metric;
25use mz_ore::metrics::{MetricsRegistry, UIntGauge};
26use prometheus::Histogram;
27use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
28use tracing::{debug, error, info, warn};
29
30static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
35
36pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
42 let mut limiter = LIMITER.lock().expect("poisoned");
43
44 if limiter.is_some() {
45 panic!("memory limiter is already running");
46 }
47
48 let metrics = LimiterMetrics::new(metrics_registry);
49 let (config_tx, config_rx) = mpsc::unbounded_channel();
50
51 mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
52
53 *limiter = Some(Limiter {
54 memory_limit,
55 config_tx,
56 });
57}
58
59pub fn apply_limiter_config(config: &ConfigSet) {
61 if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
62 limiter.apply_config(config);
63 }
64}
65
66struct Limiter {
68 memory_limit: usize,
70 config_tx: UnboundedSender<LimiterConfig>,
72}
73
74impl Limiter {
75 fn apply_config(&self, config: &ConfigSet) {
77 let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
78 if interval.is_zero() {
81 interval = Duration::MAX;
82 }
83
84 let memory_limit =
85 f64::cast_lossy(self.memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
86 let memory_limit = usize::cast_lossy(memory_limit);
87
88 let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
89 let burst_budget = usize::cast_lossy(burst_budget);
90
91 self.config_tx
92 .send(LimiterConfig {
93 interval,
94 memory_limit,
95 burst_budget,
96 })
97 .expect("limiter task never shuts down");
98 }
99}
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq)]
103struct LimiterConfig {
104 interval: Duration,
106 memory_limit: usize,
108 burst_budget: usize,
110}
111
112impl LimiterConfig {
113 fn disabled() -> Self {
115 Self {
116 interval: Duration::MAX,
117 memory_limit: 0,
118 burst_budget: 0,
119 }
120 }
121}
122
123struct LimiterTask {
131 config: LimiterConfig,
133 burst_budget_remaining: usize,
135 last_check: Instant,
137 metrics: LimiterMetrics,
139}
140
141impl LimiterTask {
142 async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
143 info!("running memory limiter task");
144
145 let mut task = Self {
146 config: LimiterConfig::disabled(),
147 burst_budget_remaining: 0,
148 last_check: Instant::now(),
149 metrics,
150 };
151
152 loop {
153 tokio::select! {
154 _ = task.tick() => {
155 let start = Instant::now();
156
157 if let Err(err) = task.check() {
158 error!("memory limit check failed: {err}");
159 }
160
161 let elapsed = start.elapsed();
162 task.metrics.duration.observe(elapsed.as_secs_f64());
163 }
164 Some(config) = config_rx.recv() => task.apply_config(config),
165 }
166 }
167 }
168
169 async fn tick(&self) {
171 let elapsed = self.last_check.elapsed();
172 let duration = self.config.interval.saturating_sub(elapsed);
173 tokio::time::sleep(duration).await
174 }
175
176 fn current_utilization() -> anyhow::Result<ProcStatus> {
177 match ProcStatus::from_proc() {
178 Ok(status) => Ok(status),
179 #[cfg(target_os = "linux")]
180 Err(err) => {
181 error!("failed to read /proc/self/status: {err}");
182 Err(err)
183 }
184 #[cfg(not(target_os = "linux"))]
185 Err(_err) => Ok(ProcStatus {
186 vm_rss: 0,
187 vm_swap: 0,
188 }),
189 }
190 }
191
192 fn check(&mut self) -> Result<(), anyhow::Error> {
194 debug!("checking memory limits");
195
196 let ProcStatus { vm_rss, vm_swap } = Self::current_utilization()?;
197
198 let memory_limit = self.config.memory_limit;
199 let burst_budget_remaining = self.burst_budget_remaining;
200
201 let memory_usage = vm_rss + vm_swap;
202
203 debug!(
204 memory_usage,
205 memory_limit, burst_budget_remaining, vm_rss, vm_swap, "memory utilization check",
206 );
207
208 self.metrics.vm_rss.set(u64::cast_from(vm_rss));
209 self.metrics.vm_swap.set(u64::cast_from(vm_swap));
210 self.metrics.memory_usage.set(u64::cast_from(memory_usage));
211 self.metrics
212 .burst_budget
213 .set(u64::cast_from(burst_budget_remaining));
214
215 if memory_usage > memory_limit {
216 let elapsed = self.last_check.elapsed().as_secs_f64();
218 let excess = memory_usage - memory_limit;
219 let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
220
221 if burst_budget_remaining >= excess_bs {
222 self.burst_budget_remaining -= excess_bs;
223 } else {
224 warn!(
226 memory_usage,
227 memory_limit, "memory utilization exceeded configured limits",
228 );
229 mz_ore::process::exit_thread_safe(167);
233 }
234 } else {
235 self.burst_budget_remaining = self.config.burst_budget;
237 }
238
239 self.last_check = Instant::now();
240 Ok(())
241 }
242
243 fn apply_config(&mut self, config: LimiterConfig) {
245 if config == self.config {
246 return; }
248
249 info!(?config, "applying memory limiter config");
250 self.config = config;
251 self.burst_budget_remaining = config.burst_budget;
252
253 self.metrics
254 .memory_limit
255 .set(u64::cast_from(config.memory_limit));
256 }
257}
258
259struct LimiterMetrics {
260 duration: Histogram,
261 memory_limit: UIntGauge,
262 memory_usage: UIntGauge,
263 vm_rss: UIntGauge,
264 vm_swap: UIntGauge,
265 burst_budget: UIntGauge,
266}
267
268impl LimiterMetrics {
269 fn new(registry: &MetricsRegistry) -> Self {
270 Self {
271 duration: registry.register(metric!(
272 name: "mz_memory_limiter_duration_seconds",
273 help: "A histogram of the time it took to run the memory limiter.",
274 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
275 )),
276 memory_limit: registry.register(metric!(
277 name: "mz_memory_limiter_memory_limit_bytes",
278 help: "The configured memory limit.",
279 )),
280 memory_usage: registry.register(metric!(
281 name: "mz_memory_limiter_memory_usage_bytes",
282 help: "The current memory usage.",
283 )),
284 vm_rss: registry.register(metric!(
285 name: "mz_memory_limiter_vm_rss_bytes",
286 help: "The current VmRSS metric.",
287 )),
288 vm_swap: registry.register(metric!(
289 name: "mz_memory_limiter_vm_swap_bytes",
290 help: "The current VmSwap metric.",
291 )),
292 burst_budget: registry.register(metric!(
293 name: "mz_memory_limiter_burst_budget_byteseconds",
294 help: "The remaining memory burst budget.",
295 )),
296 }
297 }
298}
299
300pub struct ProcStatus {
302 pub vm_rss: usize,
304 pub vm_swap: usize,
306}
307
308impl ProcStatus {
309 pub fn from_proc() -> anyhow::Result<Self> {
311 let contents = std::fs::read_to_string("/proc/self/status")?;
312 let mut vm_rss = 0;
313 let mut vm_swap = 0;
314
315 for line in contents.lines() {
316 if line.starts_with("VmRSS:") {
317 vm_rss = line
318 .split_whitespace()
319 .nth(1)
320 .ok_or_else(|| anyhow::anyhow!("failed to parse VmRSS"))?
321 .parse::<usize>()
322 .context("failed to parse VmRSS")?
323 * 1024
324 } else if line.starts_with("VmSwap:") {
325 vm_swap = line
326 .split_whitespace()
327 .nth(1)
328 .ok_or_else(|| anyhow::anyhow!("failed to parse VmSwap"))?
329 .parse::<usize>()
330 .context("failed to parse VmSwap")?
331 * 1024;
332 }
333 }
334
335 Ok(Self { vm_rss, vm_swap })
336 }
337}