1use std::sync::Mutex;
13use std::time::{Duration, Instant};
14
15use mz_compute_types::dyncfgs::{
16 LGALLOC_LIMITER_BURST_FACTOR, LGALLOC_LIMITER_INTERVAL, LGALLOC_LIMITER_USAGE_BIAS,
17 LGALLOC_LIMITER_USAGE_FACTOR,
18};
19use mz_dyncfg::ConfigSet;
20use mz_ore::cast::{CastFrom, CastLossy};
21use mz_ore::metric;
22use mz_ore::metrics::{MetricsRegistry, UIntGauge};
23use prometheus::Histogram;
24use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
25use tracing::{debug, error, info, warn};
26
27static LIMITER: Mutex<Option<Limiter>> = Mutex::new(None);
32
33pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
39 let mut limiter = LIMITER.lock().expect("poisoned");
40
41 if limiter.is_some() {
42 panic!("lgalloc limiter is already running");
43 }
44
45 let metrics = LimiterMetrics::new(metrics_registry);
46 let (config_tx, config_rx) = mpsc::unbounded_channel();
47
48 mz_ore::task::spawn(|| "lgalloc-limiter", LimiterTask::run(config_rx, metrics));
49
50 *limiter = Some(Limiter {
51 memory_limit,
52 config_tx,
53 });
54}
55
56pub fn apply_limiter_config(config: &ConfigSet) {
58 if let Some(limiter) = LIMITER.lock().expect("poisoned").as_mut() {
59 limiter.apply_config(config);
60 }
61}
62
63struct Limiter {
65 memory_limit: usize,
67 config_tx: UnboundedSender<LimiterConfig>,
69}
70
71impl Limiter {
72 fn apply_config(&self, config: &ConfigSet) {
74 let mut interval = LGALLOC_LIMITER_INTERVAL.get(config);
75 if interval.is_zero() {
78 interval = Duration::MAX;
79 }
80
81 let disk_limit = f64::cast_lossy(self.memory_limit)
82 * LGALLOC_LIMITER_USAGE_FACTOR.get(config)
83 * LGALLOC_LIMITER_USAGE_BIAS.get(config);
84 let disk_limit = usize::cast_lossy(disk_limit);
85
86 let burst_budget = f64::cast_lossy(disk_limit) * LGALLOC_LIMITER_BURST_FACTOR.get(config);
87 let burst_budget = usize::cast_lossy(burst_budget);
88
89 self.config_tx
90 .send(LimiterConfig {
91 interval,
92 disk_limit,
93 burst_budget,
94 })
95 .expect("limiter task never shuts down");
96 }
97}
98
99#[derive(Clone, Copy, Debug, PartialEq, Eq)]
101struct LimiterConfig {
102 interval: Duration,
104 disk_limit: usize,
106 burst_budget: usize,
108}
109
110impl LimiterConfig {
111 fn disabled() -> Self {
113 Self {
114 interval: Duration::MAX,
115 disk_limit: 0,
116 burst_budget: 0,
117 }
118 }
119}
120
121struct LimiterTask {
129 config: LimiterConfig,
131 burst_budget_remaining: usize,
133 last_check: Instant,
135 metrics: LimiterMetrics,
137}
138
139impl LimiterTask {
140 async fn run(mut config_rx: UnboundedReceiver<LimiterConfig>, metrics: LimiterMetrics) {
141 info!("running lgalloc limiter task");
142
143 let mut task = Self {
144 config: LimiterConfig::disabled(),
145 burst_budget_remaining: 0,
146 last_check: Instant::now(),
147 metrics,
148 };
149
150 loop {
151 tokio::select! {
152 _ = task.tick() => {
153 let start = Instant::now();
154
155 if let Err(err) = task.check() {
156 error!("lgalloc limit check failed: {err}");
157 }
158
159 let elapsed = start.elapsed();
160 task.metrics.duration.observe(elapsed.as_secs_f64());
161 }
162 Some(config) = config_rx.recv() => task.apply_config(config),
163 }
164 }
165 }
166
167 async fn tick(&self) {
169 let elapsed = self.last_check.elapsed();
170 let duration = self.config.interval.saturating_sub(elapsed);
171 tokio::time::sleep(duration).await
172 }
173
174 fn check(&mut self) -> Result<(), anyhow::Error> {
176 debug!("checking lgalloc limits");
177
178 let mut disk_usage = 0;
179 let file_stats = lgalloc::lgalloc_stats().file;
180 for (_size_class, stat) in file_stats {
181 disk_usage += stat?.allocated_size;
182 }
183
184 let disk_limit = self.config.disk_limit;
185 let burst_budget_remaining = self.burst_budget_remaining;
186
187 debug!(disk_usage, disk_limit, burst_budget_remaining);
188
189 self.metrics.disk_usage.set(u64::cast_from(disk_usage));
190 self.metrics
191 .burst_budget
192 .set(u64::cast_from(burst_budget_remaining));
193
194 if disk_usage > disk_limit {
195 let elapsed = self.last_check.elapsed().as_secs_f64();
197 let excess = disk_usage - disk_limit;
198 let excess_bs = usize::cast_lossy(f64::cast_lossy(excess) * elapsed);
199
200 if burst_budget_remaining >= excess_bs {
201 self.burst_budget_remaining -= excess_bs;
202 } else {
203 warn!(
205 disk_usage,
206 disk_limit, "lgalloc disk utilization exceeded configured limits",
207 );
208 mz_ore::process::exit_thread_safe(167);
212 }
213 } else {
214 self.burst_budget_remaining = self.config.burst_budget;
216 }
217
218 self.last_check = Instant::now();
219 Ok(())
220 }
221
222 fn apply_config(&mut self, config: LimiterConfig) {
224 if config == self.config {
225 return; }
227
228 info!(?config, "applying lgalloc limiter config");
229 self.config = config;
230 self.burst_budget_remaining = config.burst_budget;
231
232 self.metrics
233 .disk_limit
234 .set(u64::cast_from(config.disk_limit));
235 }
236}
237
238struct LimiterMetrics {
239 duration: Histogram,
240 disk_limit: UIntGauge,
241 disk_usage: UIntGauge,
242 burst_budget: UIntGauge,
243}
244
245impl LimiterMetrics {
246 fn new(registry: &MetricsRegistry) -> Self {
247 Self {
248 duration: registry.register(metric!(
249 name: "mz_lgalloc_limiter_duration_seconds",
250 help: "A histogram of the time it took to run the lgalloc limiter.",
251 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
252 )),
253 disk_limit: registry.register(metric!(
254 name: "mz_lgalloc_limiter_disk_limit_bytes",
255 help: "The configured lgalloc disk limit.",
256 )),
257 disk_usage: registry.register(metric!(
258 name: "mz_lgalloc_limiter_disk_usage_bytes",
259 help: "The current lgalloc disk usage.",
260 )),
261 burst_budget: registry.register(metric!(
262 name: "mz_lgalloc_limiter_burst_budget_byteseconds",
263 help: "The remaining lgalloc burst budget.",
264 )),
265 }
266 }
267}