Skip to main content

mz_timely_util/column_pager/
policy.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Concrete [`PagingPolicy`] implementations.
17//!
18//! Today: [`TieredPolicy`], a single process-wide byte budget for resident
19//! columns. Resident columns can move between Timely workers, so the
20//! accounting cannot be thread-local; budget is held in a single
21//! [`AtomicUsize`] and credited back from whichever thread happens to drop
22//! the column.
23
24use std::sync::atomic::{AtomicUsize, Ordering};
25
26use mz_ore::pager::Backend;
27
28use crate::column_pager::{Codec, PageDecision, PageEvent, PageHint, PagingPolicy};
29
30/// A single-pool byte budget for resident columns.
31///
32/// Each call to [`PagingPolicy::decide`] tries to reserve `len_bytes` from a
33/// process-wide [`AtomicUsize`] pool. Success ⇒ [`PageDecision::Skip`]
34/// (column kept resident); failure ⇒ [`PageDecision::Page`] (column spills
35/// via the configured `backend` + `codec`). [`PagingPolicy::record`] credits
36/// the bytes back on [`PageEvent::ResidentReleased`].
37///
38/// ## Why a single global pool
39///
40/// Earlier iterations used `thread_local!` per-worker accounting plus a
41/// shared spill pool. That layout assumed each `PagedColumn::Resident`
42/// would drop on the thread that received the grant — which is **not**
43/// true in Materialize: columns move between Timely workers freely. A
44/// cross-thread drop would credit the wrong worker's local pool and drift
45/// both budgets silently.
46///
47/// The single-atomic design sidesteps the issue entirely: credit goes back
48/// to the same pool regardless of where the drop runs. A future
49/// thread-aware policy is welcome to revive the tiered layout — it would
50/// need either a `SendColumn` wrapper that pins the column to its
51/// originating thread, or an explicit cross-thread credit channel keyed by
52/// origin [`std::thread::ThreadId`].
53///
54/// ## Contention
55///
56/// Hot paths (`decide`, `record`) touch a single atomic via a
57/// compare-exchange loop. Per-byte CAS is fine at current column
58/// granularity; if profiles show contention we can switch to chunk-sized
59/// reservations.
60pub struct TieredPolicy {
61    /// Remaining budget, in bytes, available for resident columns.
62    budget: AtomicUsize,
63    backend: Backend,
64    codec: Option<Codec>,
65}
66
67impl TieredPolicy {
68    /// Constructs a policy with `total_budget` bytes available for resident
69    /// columns. `backend` and `codec` are used for the
70    /// [`PageDecision::Page`] outcome when the pool is exhausted.
71    pub fn new(total_budget: usize, backend: Backend, codec: Option<Codec>) -> Self {
72        Self {
73            budget: AtomicUsize::new(total_budget),
74            backend,
75            codec,
76        }
77    }
78
79    /// Returns the current remaining budget in bytes. Useful for metrics or
80    /// tests.
81    pub fn budget_remaining(&self) -> usize {
82        self.budget.load(Ordering::Relaxed)
83    }
84}
85
86impl PagingPolicy for TieredPolicy {
87    fn decide(&self, hint: PageHint) -> PageDecision {
88        if try_consume(&self.budget, hint.len_bytes) {
89            PageDecision::Skip
90        } else {
91            PageDecision::Page {
92                backend: self.backend,
93                codec: self.codec,
94            }
95        }
96    }
97
98    fn record(&self, event: PageEvent) {
99        let PageEvent::ResidentReleased { bytes } = event else {
100            return;
101        };
102        self.budget.fetch_add(bytes, Ordering::Relaxed);
103    }
104}
105
106/// Atomically subtracts `want` from `atomic` if at least `want` is available.
107/// Returns `true` on success.
108fn try_consume(atomic: &AtomicUsize, want: usize) -> bool {
109    let mut cur = atomic.load(Ordering::Relaxed);
110    loop {
111        if cur < want {
112            return false;
113        }
114        match atomic.compare_exchange_weak(cur, cur - want, Ordering::AcqRel, Ordering::Relaxed) {
115            Ok(_) => return true,
116            Err(actual) => cur = actual,
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// Tests
123// ---------------------------------------------------------------------------
124
125#[cfg(test)]
126mod tests {
127    use std::sync::Arc;
128
129    use timely::container::PushInto;
130
131    use crate::column_pager::{ColumnPager, PagedColumn};
132    use crate::columnar::Column;
133
134    use super::*;
135
136    fn sample(n: i64) -> Column<i64> {
137        let mut c: Column<i64> = Default::default();
138        for v in 0..n {
139            c.push_into(v);
140        }
141        c
142    }
143
144    /// Promotes a typed policy `Arc` to `Arc<dyn PagingPolicy>` without
145    /// triggering `clippy::clone_on_ref_ptr` or `clippy::as_conversions`.
146    fn as_dyn(p: &Arc<impl PagingPolicy + 'static>) -> Arc<dyn PagingPolicy> {
147        #[allow(clippy::clone_on_ref_ptr)]
148        p.clone()
149    }
150
151    /// Allocations within the budget stay resident; release returns budget.
152    #[mz_ore::test]
153    fn fits_in_budget() {
154        let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
155        let cp = ColumnPager::new(as_dyn(&policy));
156        let before = policy.budget_remaining();
157        let mut col = sample(256);
158        let p = cp.page(&mut col);
159        assert!(matches!(p, PagedColumn::Resident(_, _)));
160        assert!(policy.budget_remaining() < before);
161        drop(p); // Drop fires ResidentReleased; budget returns.
162        assert_eq!(policy.budget_remaining(), before);
163    }
164
165    /// An exhausted budget forces pageout.
166    #[mz_ore::test]
167    fn exhausted_pages_out() {
168        let policy = Arc::new(TieredPolicy::new(0, Backend::Swap, None));
169        let cp = ColumnPager::new(as_dyn(&policy));
170        let mut col = sample(256);
171        let p = cp.page(&mut col);
172        assert!(matches!(p, PagedColumn::Paged { .. }));
173    }
174
175    /// Holding one Resident exhausts the budget; releasing it makes room.
176    #[mz_ore::test]
177    fn release_refills_budget() {
178        let policy = Arc::new(TieredPolicy::new(4 * 1024, Backend::Swap, None));
179        let cp = ColumnPager::new(as_dyn(&policy));
180
181        // First allocation fits.
182        let mut col = sample(256);
183        let p1 = cp.page(&mut col);
184        assert!(matches!(p1, PagedColumn::Resident(_, _)));
185
186        // Second allocation overflows the budget → page out.
187        let mut col2 = sample(256);
188        let p2 = cp.page(&mut col2);
189        assert!(matches!(p2, PagedColumn::Paged { .. }));
190
191        // Releasing the first refills the budget; a third allocation now
192        // fits resident again.
193        drop(p1);
194        drop(p2);
195        let mut col3 = sample(256);
196        let p3 = cp.page(&mut col3);
197        assert!(matches!(p3, PagedColumn::Resident(_, _)));
198    }
199
200    /// Releasing a Resident on a different thread must still credit the same
201    /// global budget pool.
202    #[mz_ore::test]
203    fn release_crosses_threads() {
204        let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
205        let cp = ColumnPager::new(as_dyn(&policy));
206        let before = policy.budget_remaining();
207        let mut col = sample(256);
208        let p = cp.page(&mut col);
209        assert!(matches!(p, PagedColumn::Resident(_, _)));
210        assert!(policy.budget_remaining() < before);
211
212        // Move ownership to another thread; drop happens there.
213        std::thread::spawn(move || drop(p)).join().unwrap();
214        assert_eq!(
215            policy.budget_remaining(),
216            before,
217            "cross-thread drop must credit the same global pool",
218        );
219    }
220
221    #[mz_ore::test]
222    fn try_consume_atomicity() {
223        let a = AtomicUsize::new(10);
224        assert!(try_consume(&a, 4));
225        assert_eq!(a.load(Ordering::Relaxed), 6);
226        assert!(!try_consume(&a, 7));
227        assert_eq!(a.load(Ordering::Relaxed), 6);
228        assert!(try_consume(&a, 6));
229        assert_eq!(a.load(Ordering::Relaxed), 0);
230        assert!(!try_consume(&a, 1));
231    }
232}