Skip to main content

mz_timely_util/column_pager/
policy.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Concrete [`PagingPolicy`] implementations.
17//!
18//! Today: [`TieredPolicy`], a single process-wide byte budget for resident
19//! columns. Resident columns can move between Timely workers, so the
20//! accounting cannot be thread-local; budget is held in a single
21//! [`AtomicUsize`] and credited back from whichever thread happens to drop
22//! the column.
23
24use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
25
26use mz_ore::pager::Backend;
27
28use crate::column_pager::{Codec, PageDecision, PageEvent, PageHint, PagingPolicy};
29
30const BACKEND_SWAP: u8 = 0;
31const BACKEND_FILE: u8 = 1;
32
33const CODEC_NONE: u8 = 0;
34const CODEC_LZ4: u8 = 1;
35
36fn encode_backend(b: Backend) -> u8 {
37    match b {
38        Backend::Swap => BACKEND_SWAP,
39        Backend::File => BACKEND_FILE,
40    }
41}
42
43fn decode_backend(v: u8) -> Backend {
44    match v {
45        BACKEND_FILE => Backend::File,
46        _ => Backend::Swap,
47    }
48}
49
50fn encode_codec(c: Option<Codec>) -> u8 {
51    match c {
52        None => CODEC_NONE,
53        Some(Codec::Lz4) => CODEC_LZ4,
54    }
55}
56
57fn decode_codec(v: u8) -> Option<Codec> {
58    match v {
59        CODEC_LZ4 => Some(Codec::Lz4),
60        _ => None,
61    }
62}
63
64/// A single-pool byte budget for resident columns.
65///
66/// Each call to [`PagingPolicy::decide`] tries to reserve `len_bytes` from a
67/// process-wide [`AtomicUsize`] pool. Success ⇒ [`PageDecision::Skip`]
68/// (column kept resident); failure ⇒ [`PageDecision::Page`] (column spills
69/// via the configured `backend` + `codec`). [`PagingPolicy::record`] credits
70/// the bytes back on [`PageEvent::ResidentReleased`].
71///
72/// ## Why a single global pool
73///
74/// Earlier iterations used `thread_local!` per-worker accounting plus a
75/// shared spill pool. That layout assumed each `PagedColumn::Resident`
76/// would drop on the thread that received the grant — which is **not**
77/// true in Materialize: columns move between Timely workers freely. A
78/// cross-thread drop would credit the wrong worker's local pool and drift
79/// both budgets silently.
80///
81/// The single-atomic design sidesteps the issue entirely: credit goes back
82/// to the same pool regardless of where the drop runs. A future
83/// thread-aware policy is welcome to revive the tiered layout — it would
84/// need either a `SendColumn` wrapper that pins the column to its
85/// originating thread, or an explicit cross-thread credit channel keyed by
86/// origin [`std::thread::ThreadId`].
87///
88/// ## Contention
89///
90/// Hot paths (`decide`, `record`) touch a single atomic via a
91/// compare-exchange loop. Per-byte CAS is fine at current column
92/// granularity; if profiles show contention we can switch to chunk-sized
93/// reservations.
94pub struct TieredPolicy {
95    /// Remaining budget, in bytes, available for resident columns. Drains
96    /// on `decide` (Skip), refills on `record(ResidentReleased)`.
97    budget: AtomicUsize,
98    /// Last-configured total. `reconfigure` adjusts `budget` by the delta
99    /// against this value so existing `ResidentTicket`s stay coherent with
100    /// the running budget after an operator-driven tune.
101    configured: AtomicUsize,
102    backend: AtomicU8,
103    codec: AtomicU8,
104}
105
106impl TieredPolicy {
107    /// Constructs a policy with `total_budget` bytes available for resident
108    /// columns. `backend` and `codec` are used for the
109    /// [`PageDecision::Page`] outcome when the pool is exhausted.
110    pub fn new(total_budget: usize, backend: Backend, codec: Option<Codec>) -> Self {
111        Self {
112            budget: AtomicUsize::new(total_budget),
113            configured: AtomicUsize::new(total_budget),
114            backend: AtomicU8::new(encode_backend(backend)),
115            codec: AtomicU8::new(encode_codec(codec)),
116        }
117    }
118
119    /// Adjust this policy in place. Budget moves by `new_total - prev_total`
120    /// so in-flight `ResidentTicket`s — which still credit this same atomic
121    /// when they drop — stay coherent with the resized pool. Backend and
122    /// codec selection take effect on the next [`PagingPolicy::decide`]
123    /// call.
124    ///
125    /// Shrinking the configured total below the in-flight resident set
126    /// saturates the available budget at zero; subsequent `decide` calls
127    /// page out until releases bring the pool back above zero.
128    pub fn reconfigure(&self, new_total: usize, backend: Backend, codec: Option<Codec>) {
129        let prev = self.configured.swap(new_total, Ordering::Relaxed);
130        if new_total > prev {
131            self.budget.fetch_add(new_total - prev, Ordering::Relaxed);
132        } else if prev > new_total {
133            let shrink = prev - new_total;
134            let _ = self
135                .budget
136                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
137                    Some(cur.saturating_sub(shrink))
138                });
139        }
140        self.backend
141            .store(encode_backend(backend), Ordering::Relaxed);
142        self.codec.store(encode_codec(codec), Ordering::Relaxed);
143    }
144
145    /// Returns the current remaining budget in bytes. Useful for metrics or
146    /// tests.
147    pub fn budget_remaining(&self) -> usize {
148        self.budget.load(Ordering::Relaxed)
149    }
150
151    /// Returns the most-recently-configured total. Useful for tests.
152    pub fn configured_total(&self) -> usize {
153        self.configured.load(Ordering::Relaxed)
154    }
155}
156
157impl PagingPolicy for TieredPolicy {
158    fn decide(&self, hint: PageHint) -> PageDecision {
159        if try_consume(&self.budget, hint.len_bytes) {
160            PageDecision::Skip
161        } else {
162            PageDecision::Page {
163                backend: decode_backend(self.backend.load(Ordering::Relaxed)),
164                codec: decode_codec(self.codec.load(Ordering::Relaxed)),
165            }
166        }
167    }
168
169    fn record(&self, event: PageEvent) {
170        let PageEvent::ResidentReleased { bytes } = event else {
171            return;
172        };
173        self.budget.fetch_add(bytes, Ordering::Relaxed);
174    }
175}
176
177/// Atomically subtracts `want` from `atomic` if at least `want` is available.
178/// Returns `true` on success.
179fn try_consume(atomic: &AtomicUsize, want: usize) -> bool {
180    let mut cur = atomic.load(Ordering::Relaxed);
181    loop {
182        if cur < want {
183            return false;
184        }
185        match atomic.compare_exchange_weak(cur, cur - want, Ordering::AcqRel, Ordering::Relaxed) {
186            Ok(_) => return true,
187            Err(actual) => cur = actual,
188        }
189    }
190}
191
192// ---------------------------------------------------------------------------
193// Tests
194// ---------------------------------------------------------------------------
195
196#[cfg(test)]
197mod tests {
198    use std::sync::Arc;
199
200    use timely::container::PushInto;
201
202    use crate::column_pager::{ColumnPager, PagedColumn};
203    use crate::columnar::Column;
204
205    use super::*;
206
207    fn sample(n: i64) -> Column<i64> {
208        let mut c: Column<i64> = Default::default();
209        for v in 0..n {
210            c.push_into(v);
211        }
212        c
213    }
214
215    /// Promotes a typed policy `Arc` to `Arc<dyn PagingPolicy>` without
216    /// triggering `clippy::clone_on_ref_ptr` or `clippy::as_conversions`.
217    fn as_dyn(p: &Arc<impl PagingPolicy + 'static>) -> Arc<dyn PagingPolicy> {
218        #[allow(clippy::clone_on_ref_ptr)]
219        p.clone()
220    }
221
222    /// Allocations within the budget stay resident; release returns budget.
223    #[mz_ore::test]
224    fn fits_in_budget() {
225        let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
226        let cp = ColumnPager::new(as_dyn(&policy));
227        let before = policy.budget_remaining();
228        let mut col = sample(256);
229        let p = cp.page(&mut col);
230        assert!(matches!(p, PagedColumn::Resident(_, _)));
231        assert!(policy.budget_remaining() < before);
232        drop(p); // Drop fires ResidentReleased; budget returns.
233        assert_eq!(policy.budget_remaining(), before);
234    }
235
236    /// An exhausted budget forces pageout.
237    #[mz_ore::test]
238    fn exhausted_pages_out() {
239        let policy = Arc::new(TieredPolicy::new(0, Backend::Swap, None));
240        let cp = ColumnPager::new(as_dyn(&policy));
241        let mut col = sample(256);
242        let p = cp.page(&mut col);
243        assert!(matches!(p, PagedColumn::Paged { .. }));
244    }
245
246    /// Holding one Resident exhausts the budget; releasing it makes room.
247    #[mz_ore::test]
248    fn release_refills_budget() {
249        let policy = Arc::new(TieredPolicy::new(4 * 1024, Backend::Swap, None));
250        let cp = ColumnPager::new(as_dyn(&policy));
251
252        // First allocation fits.
253        let mut col = sample(256);
254        let p1 = cp.page(&mut col);
255        assert!(matches!(p1, PagedColumn::Resident(_, _)));
256
257        // Second allocation overflows the budget → page out.
258        let mut col2 = sample(256);
259        let p2 = cp.page(&mut col2);
260        assert!(matches!(p2, PagedColumn::Paged { .. }));
261
262        // Releasing the first refills the budget; a third allocation now
263        // fits resident again.
264        drop(p1);
265        drop(p2);
266        let mut col3 = sample(256);
267        let p3 = cp.page(&mut col3);
268        assert!(matches!(p3, PagedColumn::Resident(_, _)));
269    }
270
271    /// Releasing a Resident on a different thread must still credit the same
272    /// global budget pool.
273    #[mz_ore::test]
274    fn release_crosses_threads() {
275        let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
276        let cp = ColumnPager::new(as_dyn(&policy));
277        let before = policy.budget_remaining();
278        let mut col = sample(256);
279        let p = cp.page(&mut col);
280        assert!(matches!(p, PagedColumn::Resident(_, _)));
281        assert!(policy.budget_remaining() < before);
282
283        // Move ownership to another thread; drop happens there.
284        std::thread::spawn(move || drop(p)).join().unwrap();
285        assert_eq!(
286            policy.budget_remaining(),
287            before,
288            "cross-thread drop must credit the same global pool",
289        );
290    }
291
292    /// Reconfigure preserves the in-flight resident set: tickets minted
293    /// against the old configured total still credit the same atomic when
294    /// they drop. Growing the configured total adds the delta; shrinking
295    /// subtracts saturating at zero.
296    #[mz_ore::test]
297    fn reconfigure_preserves_in_flight() {
298        let policy = Arc::new(TieredPolicy::new(4 * 1024, Backend::Swap, None));
299        let cp = ColumnPager::new(as_dyn(&policy));
300
301        // Hold one resident, consuming some budget.
302        let mut col = sample(256);
303        let p = cp.page(&mut col);
304        assert!(matches!(p, PagedColumn::Resident(_, _)));
305        let consumed = 4 * 1024 - policy.budget_remaining();
306        assert!(consumed > 0);
307
308        // Grow the pool by 8 KiB. Available budget should rise by the delta;
309        // configured total reflects the new size.
310        let before = policy.budget_remaining();
311        policy.reconfigure(4 * 1024 + 8 * 1024, Backend::Swap, None);
312        assert_eq!(policy.budget_remaining(), before + 8 * 1024);
313        assert_eq!(policy.configured_total(), 4 * 1024 + 8 * 1024);
314
315        // Drop the resident; the ticket credits the same atomic, even
316        // though the pool was resized in between.
317        drop(p);
318        assert_eq!(policy.budget_remaining(), 4 * 1024 + 8 * 1024);
319    }
320
321    /// Shrinking the configured total below available budget saturates at
322    /// zero rather than wrapping.
323    #[mz_ore::test]
324    fn reconfigure_shrink_saturates() {
325        let policy = Arc::new(TieredPolicy::new(1024, Backend::Swap, None));
326        // Shrink by more than the current available budget.
327        policy.reconfigure(0, Backend::Swap, None);
328        assert_eq!(policy.budget_remaining(), 0);
329        assert_eq!(policy.configured_total(), 0);
330    }
331
332    /// Backend / codec selection takes effect on the next decide.
333    #[mz_ore::test]
334    fn reconfigure_swaps_backend_and_codec() {
335        let policy = Arc::new(TieredPolicy::new(0, Backend::Swap, None));
336        let initial = policy.decide(PageHint { len_bytes: 1 });
337        assert!(matches!(
338            initial,
339            PageDecision::Page {
340                backend: Backend::Swap,
341                codec: None
342            }
343        ));
344        policy.reconfigure(0, Backend::File, Some(Codec::Lz4));
345        let updated = policy.decide(PageHint { len_bytes: 1 });
346        assert!(matches!(
347            updated,
348            PageDecision::Page {
349                backend: Backend::File,
350                codec: Some(Codec::Lz4),
351            }
352        ));
353    }
354
355    #[mz_ore::test]
356    fn try_consume_atomicity() {
357        let a = AtomicUsize::new(10);
358        assert!(try_consume(&a, 4));
359        assert_eq!(a.load(Ordering::Relaxed), 6);
360        assert!(!try_consume(&a, 7));
361        assert_eq!(a.load(Ordering::Relaxed), 6);
362        assert!(try_consume(&a, 6));
363        assert_eq!(a.load(Ordering::Relaxed), 0);
364        assert!(!try_consume(&a, 1));
365    }
366}