mz_timely_util/column_pager/
policy.rs1use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
25
26use mz_ore::pager::Backend;
27
28use crate::column_pager::{Codec, PageDecision, PageEvent, PageHint, PagingPolicy};
29
30const BACKEND_SWAP: u8 = 0;
31const BACKEND_FILE: u8 = 1;
32
33const CODEC_NONE: u8 = 0;
34const CODEC_LZ4: u8 = 1;
35
36fn encode_backend(b: Backend) -> u8 {
37 match b {
38 Backend::Swap => BACKEND_SWAP,
39 Backend::File => BACKEND_FILE,
40 }
41}
42
43fn decode_backend(v: u8) -> Backend {
44 match v {
45 BACKEND_FILE => Backend::File,
46 _ => Backend::Swap,
47 }
48}
49
50fn encode_codec(c: Option<Codec>) -> u8 {
51 match c {
52 None => CODEC_NONE,
53 Some(Codec::Lz4) => CODEC_LZ4,
54 }
55}
56
57fn decode_codec(v: u8) -> Option<Codec> {
58 match v {
59 CODEC_LZ4 => Some(Codec::Lz4),
60 _ => None,
61 }
62}
63
64pub struct TieredPolicy {
95 budget: AtomicUsize,
98 configured: AtomicUsize,
102 backend: AtomicU8,
103 codec: AtomicU8,
104}
105
106impl TieredPolicy {
107 pub fn new(total_budget: usize, backend: Backend, codec: Option<Codec>) -> Self {
111 Self {
112 budget: AtomicUsize::new(total_budget),
113 configured: AtomicUsize::new(total_budget),
114 backend: AtomicU8::new(encode_backend(backend)),
115 codec: AtomicU8::new(encode_codec(codec)),
116 }
117 }
118
119 pub fn reconfigure(&self, new_total: usize, backend: Backend, codec: Option<Codec>) {
129 let prev = self.configured.swap(new_total, Ordering::Relaxed);
130 if new_total > prev {
131 self.budget.fetch_add(new_total - prev, Ordering::Relaxed);
132 } else if prev > new_total {
133 let shrink = prev - new_total;
134 let _ = self
135 .budget
136 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
137 Some(cur.saturating_sub(shrink))
138 });
139 }
140 self.backend
141 .store(encode_backend(backend), Ordering::Relaxed);
142 self.codec.store(encode_codec(codec), Ordering::Relaxed);
143 }
144
145 pub fn budget_remaining(&self) -> usize {
148 self.budget.load(Ordering::Relaxed)
149 }
150
151 pub fn configured_total(&self) -> usize {
153 self.configured.load(Ordering::Relaxed)
154 }
155}
156
157impl PagingPolicy for TieredPolicy {
158 fn decide(&self, hint: PageHint) -> PageDecision {
159 if try_consume(&self.budget, hint.len_bytes) {
160 PageDecision::Skip
161 } else {
162 PageDecision::Page {
163 backend: decode_backend(self.backend.load(Ordering::Relaxed)),
164 codec: decode_codec(self.codec.load(Ordering::Relaxed)),
165 }
166 }
167 }
168
169 fn record(&self, event: PageEvent) {
170 let PageEvent::ResidentReleased { bytes } = event else {
171 return;
172 };
173 self.budget.fetch_add(bytes, Ordering::Relaxed);
174 }
175}
176
177fn try_consume(atomic: &AtomicUsize, want: usize) -> bool {
180 let mut cur = atomic.load(Ordering::Relaxed);
181 loop {
182 if cur < want {
183 return false;
184 }
185 match atomic.compare_exchange_weak(cur, cur - want, Ordering::AcqRel, Ordering::Relaxed) {
186 Ok(_) => return true,
187 Err(actual) => cur = actual,
188 }
189 }
190}
191
192#[cfg(test)]
197mod tests {
198 use std::sync::Arc;
199
200 use timely::container::PushInto;
201
202 use crate::column_pager::{ColumnPager, PagedColumn};
203 use crate::columnar::Column;
204
205 use super::*;
206
207 fn sample(n: i64) -> Column<i64> {
208 let mut c: Column<i64> = Default::default();
209 for v in 0..n {
210 c.push_into(v);
211 }
212 c
213 }
214
215 fn as_dyn(p: &Arc<impl PagingPolicy + 'static>) -> Arc<dyn PagingPolicy> {
218 #[allow(clippy::clone_on_ref_ptr)]
219 p.clone()
220 }
221
222 #[mz_ore::test]
224 fn fits_in_budget() {
225 let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
226 let cp = ColumnPager::new(as_dyn(&policy));
227 let before = policy.budget_remaining();
228 let mut col = sample(256);
229 let p = cp.page(&mut col);
230 assert!(matches!(p, PagedColumn::Resident(_, _)));
231 assert!(policy.budget_remaining() < before);
232 drop(p); assert_eq!(policy.budget_remaining(), before);
234 }
235
236 #[mz_ore::test]
238 fn exhausted_pages_out() {
239 let policy = Arc::new(TieredPolicy::new(0, Backend::Swap, None));
240 let cp = ColumnPager::new(as_dyn(&policy));
241 let mut col = sample(256);
242 let p = cp.page(&mut col);
243 assert!(matches!(p, PagedColumn::Paged { .. }));
244 }
245
246 #[mz_ore::test]
248 fn release_refills_budget() {
249 let policy = Arc::new(TieredPolicy::new(4 * 1024, Backend::Swap, None));
250 let cp = ColumnPager::new(as_dyn(&policy));
251
252 let mut col = sample(256);
254 let p1 = cp.page(&mut col);
255 assert!(matches!(p1, PagedColumn::Resident(_, _)));
256
257 let mut col2 = sample(256);
259 let p2 = cp.page(&mut col2);
260 assert!(matches!(p2, PagedColumn::Paged { .. }));
261
262 drop(p1);
265 drop(p2);
266 let mut col3 = sample(256);
267 let p3 = cp.page(&mut col3);
268 assert!(matches!(p3, PagedColumn::Resident(_, _)));
269 }
270
271 #[mz_ore::test]
274 fn release_crosses_threads() {
275 let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
276 let cp = ColumnPager::new(as_dyn(&policy));
277 let before = policy.budget_remaining();
278 let mut col = sample(256);
279 let p = cp.page(&mut col);
280 assert!(matches!(p, PagedColumn::Resident(_, _)));
281 assert!(policy.budget_remaining() < before);
282
283 std::thread::spawn(move || drop(p)).join().unwrap();
285 assert_eq!(
286 policy.budget_remaining(),
287 before,
288 "cross-thread drop must credit the same global pool",
289 );
290 }
291
292 #[mz_ore::test]
297 fn reconfigure_preserves_in_flight() {
298 let policy = Arc::new(TieredPolicy::new(4 * 1024, Backend::Swap, None));
299 let cp = ColumnPager::new(as_dyn(&policy));
300
301 let mut col = sample(256);
303 let p = cp.page(&mut col);
304 assert!(matches!(p, PagedColumn::Resident(_, _)));
305 let consumed = 4 * 1024 - policy.budget_remaining();
306 assert!(consumed > 0);
307
308 let before = policy.budget_remaining();
311 policy.reconfigure(4 * 1024 + 8 * 1024, Backend::Swap, None);
312 assert_eq!(policy.budget_remaining(), before + 8 * 1024);
313 assert_eq!(policy.configured_total(), 4 * 1024 + 8 * 1024);
314
315 drop(p);
318 assert_eq!(policy.budget_remaining(), 4 * 1024 + 8 * 1024);
319 }
320
321 #[mz_ore::test]
324 fn reconfigure_shrink_saturates() {
325 let policy = Arc::new(TieredPolicy::new(1024, Backend::Swap, None));
326 policy.reconfigure(0, Backend::Swap, None);
328 assert_eq!(policy.budget_remaining(), 0);
329 assert_eq!(policy.configured_total(), 0);
330 }
331
332 #[mz_ore::test]
334 fn reconfigure_swaps_backend_and_codec() {
335 let policy = Arc::new(TieredPolicy::new(0, Backend::Swap, None));
336 let initial = policy.decide(PageHint { len_bytes: 1 });
337 assert!(matches!(
338 initial,
339 PageDecision::Page {
340 backend: Backend::Swap,
341 codec: None
342 }
343 ));
344 policy.reconfigure(0, Backend::File, Some(Codec::Lz4));
345 let updated = policy.decide(PageHint { len_bytes: 1 });
346 assert!(matches!(
347 updated,
348 PageDecision::Page {
349 backend: Backend::File,
350 codec: Some(Codec::Lz4),
351 }
352 ));
353 }
354
355 #[mz_ore::test]
356 fn try_consume_atomicity() {
357 let a = AtomicUsize::new(10);
358 assert!(try_consume(&a, 4));
359 assert_eq!(a.load(Ordering::Relaxed), 6);
360 assert!(!try_consume(&a, 7));
361 assert_eq!(a.load(Ordering::Relaxed), 6);
362 assert!(try_consume(&a, 6));
363 assert_eq!(a.load(Ordering::Relaxed), 0);
364 assert!(!try_consume(&a, 1));
365 }
366}