mz_timely_util/column_pager/policy.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Concrete [`PagingPolicy`] implementations.
17//!
18//! Today: [`TieredPolicy`], a single process-wide byte budget for resident
19//! columns. Resident columns can move between Timely workers, so the
20//! accounting cannot be thread-local; budget is held in a single
21//! [`AtomicUsize`] and credited back from whichever thread happens to drop
22//! the column.
23
24use std::sync::atomic::{AtomicUsize, Ordering};
25
26use mz_ore::pager::Backend;
27
28use crate::column_pager::{Codec, PageDecision, PageEvent, PageHint, PagingPolicy};
29
30/// A single-pool byte budget for resident columns.
31///
32/// Each call to [`PagingPolicy::decide`] tries to reserve `len_bytes` from a
33/// process-wide [`AtomicUsize`] pool. Success ⇒ [`PageDecision::Skip`]
34/// (column kept resident); failure ⇒ [`PageDecision::Page`] (column spills
35/// via the configured `backend` + `codec`). [`PagingPolicy::record`] credits
36/// the bytes back on [`PageEvent::ResidentReleased`].
37///
38/// ## Why a single global pool
39///
40/// Earlier iterations used `thread_local!` per-worker accounting plus a
41/// shared spill pool. That layout assumed each `PagedColumn::Resident`
42/// would drop on the thread that received the grant — which is **not**
43/// true in Materialize: columns move between Timely workers freely. A
44/// cross-thread drop would credit the wrong worker's local pool and drift
45/// both budgets silently.
46///
47/// The single-atomic design sidesteps the issue entirely: credit goes back
48/// to the same pool regardless of where the drop runs. A future
49/// thread-aware policy is welcome to revive the tiered layout — it would
50/// need either a `SendColumn` wrapper that pins the column to its
51/// originating thread, or an explicit cross-thread credit channel keyed by
52/// origin [`std::thread::ThreadId`].
53///
54/// ## Contention
55///
56/// Hot paths (`decide`, `record`) touch a single atomic via a
57/// compare-exchange loop. Per-byte CAS is fine at current column
58/// granularity; if profiles show contention we can switch to chunk-sized
59/// reservations.
60pub struct TieredPolicy {
61 /// Remaining budget, in bytes, available for resident columns.
62 budget: AtomicUsize,
63 backend: Backend,
64 codec: Option<Codec>,
65}
66
67impl TieredPolicy {
68 /// Constructs a policy with `total_budget` bytes available for resident
69 /// columns. `backend` and `codec` are used for the
70 /// [`PageDecision::Page`] outcome when the pool is exhausted.
71 pub fn new(total_budget: usize, backend: Backend, codec: Option<Codec>) -> Self {
72 Self {
73 budget: AtomicUsize::new(total_budget),
74 backend,
75 codec,
76 }
77 }
78
79 /// Returns the current remaining budget in bytes. Useful for metrics or
80 /// tests.
81 pub fn budget_remaining(&self) -> usize {
82 self.budget.load(Ordering::Relaxed)
83 }
84}
85
86impl PagingPolicy for TieredPolicy {
87 fn decide(&self, hint: PageHint) -> PageDecision {
88 if try_consume(&self.budget, hint.len_bytes) {
89 PageDecision::Skip
90 } else {
91 PageDecision::Page {
92 backend: self.backend,
93 codec: self.codec,
94 }
95 }
96 }
97
98 fn record(&self, event: PageEvent) {
99 let PageEvent::ResidentReleased { bytes } = event else {
100 return;
101 };
102 self.budget.fetch_add(bytes, Ordering::Relaxed);
103 }
104}
105
106/// Atomically subtracts `want` from `atomic` if at least `want` is available.
107/// Returns `true` on success.
108fn try_consume(atomic: &AtomicUsize, want: usize) -> bool {
109 let mut cur = atomic.load(Ordering::Relaxed);
110 loop {
111 if cur < want {
112 return false;
113 }
114 match atomic.compare_exchange_weak(cur, cur - want, Ordering::AcqRel, Ordering::Relaxed) {
115 Ok(_) => return true,
116 Err(actual) => cur = actual,
117 }
118 }
119}
120
121// ---------------------------------------------------------------------------
122// Tests
123// ---------------------------------------------------------------------------
124
125#[cfg(test)]
126mod tests {
127 use std::sync::Arc;
128
129 use timely::container::PushInto;
130
131 use crate::column_pager::{ColumnPager, PagedColumn};
132 use crate::columnar::Column;
133
134 use super::*;
135
136 fn sample(n: i64) -> Column<i64> {
137 let mut c: Column<i64> = Default::default();
138 for v in 0..n {
139 c.push_into(v);
140 }
141 c
142 }
143
144 /// Promotes a typed policy `Arc` to `Arc<dyn PagingPolicy>` without
145 /// triggering `clippy::clone_on_ref_ptr` or `clippy::as_conversions`.
146 fn as_dyn(p: &Arc<impl PagingPolicy + 'static>) -> Arc<dyn PagingPolicy> {
147 #[allow(clippy::clone_on_ref_ptr)]
148 p.clone()
149 }
150
151 /// Allocations within the budget stay resident; release returns budget.
152 #[mz_ore::test]
153 fn fits_in_budget() {
154 let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
155 let cp = ColumnPager::new(as_dyn(&policy));
156 let before = policy.budget_remaining();
157 let mut col = sample(256);
158 let p = cp.page(&mut col);
159 assert!(matches!(p, PagedColumn::Resident(_, _)));
160 assert!(policy.budget_remaining() < before);
161 drop(p); // Drop fires ResidentReleased; budget returns.
162 assert_eq!(policy.budget_remaining(), before);
163 }
164
165 /// An exhausted budget forces pageout.
166 #[mz_ore::test]
167 fn exhausted_pages_out() {
168 let policy = Arc::new(TieredPolicy::new(0, Backend::Swap, None));
169 let cp = ColumnPager::new(as_dyn(&policy));
170 let mut col = sample(256);
171 let p = cp.page(&mut col);
172 assert!(matches!(p, PagedColumn::Paged { .. }));
173 }
174
175 /// Holding one Resident exhausts the budget; releasing it makes room.
176 #[mz_ore::test]
177 fn release_refills_budget() {
178 let policy = Arc::new(TieredPolicy::new(4 * 1024, Backend::Swap, None));
179 let cp = ColumnPager::new(as_dyn(&policy));
180
181 // First allocation fits.
182 let mut col = sample(256);
183 let p1 = cp.page(&mut col);
184 assert!(matches!(p1, PagedColumn::Resident(_, _)));
185
186 // Second allocation overflows the budget → page out.
187 let mut col2 = sample(256);
188 let p2 = cp.page(&mut col2);
189 assert!(matches!(p2, PagedColumn::Paged { .. }));
190
191 // Releasing the first refills the budget; a third allocation now
192 // fits resident again.
193 drop(p1);
194 drop(p2);
195 let mut col3 = sample(256);
196 let p3 = cp.page(&mut col3);
197 assert!(matches!(p3, PagedColumn::Resident(_, _)));
198 }
199
200 /// Releasing a Resident on a different thread must still credit the same
201 /// global budget pool.
202 #[mz_ore::test]
203 fn release_crosses_threads() {
204 let policy = Arc::new(TieredPolicy::new(64 * 1024, Backend::Swap, None));
205 let cp = ColumnPager::new(as_dyn(&policy));
206 let before = policy.budget_remaining();
207 let mut col = sample(256);
208 let p = cp.page(&mut col);
209 assert!(matches!(p, PagedColumn::Resident(_, _)));
210 assert!(policy.budget_remaining() < before);
211
212 // Move ownership to another thread; drop happens there.
213 std::thread::spawn(move || drop(p)).join().unwrap();
214 assert_eq!(
215 policy.budget_remaining(),
216 before,
217 "cross-thread drop must credit the same global pool",
218 );
219 }
220
221 #[mz_ore::test]
222 fn try_consume_atomicity() {
223 let a = AtomicUsize::new(10);
224 assert!(try_consume(&a, 4));
225 assert_eq!(a.load(Ordering::Relaxed), 6);
226 assert!(!try_consume(&a, 7));
227 assert_eq!(a.load(Ordering::Relaxed), 6);
228 assert!(try_consume(&a, 6));
229 assert_eq!(a.load(Ordering::Relaxed), 0);
230 assert!(!try_consume(&a, 1));
231 }
232}