Skip to main content

mz_ore/
pager.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Explicit pager for cold data. See `doc/developer/design/20260504_pager.md`.
17
18use std::sync::atomic::{AtomicU8, Ordering};
19
20mod file;
21mod swap;
22
23pub use file::set_scratch_dir;
24
25use crate::pager::file::FileInner;
26use crate::pager::swap::SwapInner;
27
28/// An opaque handle to data paged out via [`pageout`]. The handle's backend variant
29/// is fixed at `pageout` time and is independent of any later `set_backend` call.
30#[derive(Debug)]
31pub struct Handle {
32    inner: HandleInner,
33}
34
35#[derive(Debug)]
36enum HandleInner {
37    Swap(SwapInner),
38    File(FileInner),
39}
40
41impl Handle {
42    /// Returns the logical length of the handle's payload in `u64`s.
43    pub fn len(&self) -> usize {
44        match &self.inner {
45            HandleInner::Swap(s) => s.total_len(),
46            HandleInner::File(f) => f.len_u64s,
47        }
48    }
49
50    /// Returns the logical length of the handle's payload in bytes (`len() * 8`).
51    pub fn len_bytes(&self) -> usize {
52        self.len() * 8
53    }
54
55    /// Returns `true` if the handle holds no data.
56    pub fn is_empty(&self) -> bool {
57        self.len() == 0
58    }
59
60    pub(crate) fn from_swap(inner: SwapInner) -> Self {
61        Self {
62            inner: HandleInner::Swap(inner),
63        }
64    }
65
66    pub(crate) fn from_file(inner: FileInner) -> Self {
67        Self {
68            inner: HandleInner::File(inner),
69        }
70    }
71
72    pub(crate) fn swap_inner(&self) -> Option<&SwapInner> {
73        match &self.inner {
74            HandleInner::Swap(s) => Some(s),
75            HandleInner::File(_) => None,
76        }
77    }
78
79    pub(crate) fn file_inner(&self) -> Option<&FileInner> {
80        match &self.inner {
81            HandleInner::File(f) => Some(f),
82            HandleInner::Swap(_) => None,
83        }
84    }
85
86    pub(crate) fn into_swap_inner(self) -> Option<SwapInner> {
87        match self.inner {
88            HandleInner::Swap(s) => Some(s),
89            HandleInner::File(_) => None,
90        }
91    }
92
93    pub(crate) fn into_file_inner(self) -> Option<FileInner> {
94        match self.inner {
95            HandleInner::File(f) => Some(f),
96            HandleInner::Swap(_) => None,
97        }
98    }
99}
100
101/// Selects which backend stores paged-out data.
102#[derive(Copy, Clone, Debug, Eq, PartialEq)]
103pub enum Backend {
104    /// Hold allocations resident; hint the kernel via `MADV_COLD`.
105    Swap,
106    /// Write to a named scratch file; no file descriptor retained.
107    File,
108}
109
110const BACKEND_SWAP: u8 = 0;
111const BACKEND_FILE: u8 = 1;
112
113static BACKEND: AtomicU8 = AtomicU8::new(BACKEND_SWAP);
114
115/// Returns the currently active backend.
116pub fn backend() -> Backend {
117    match BACKEND.load(Ordering::Relaxed) {
118        BACKEND_SWAP => Backend::Swap,
119        BACKEND_FILE => Backend::File,
120        _ => unreachable!("BACKEND atomic holds invalid discriminant"),
121    }
122}
123
124/// Sets the active backend for future `pageout` calls. Existing handles are unaffected.
125pub fn set_backend(b: Backend) {
126    let raw = match b {
127        Backend::Swap => BACKEND_SWAP,
128        Backend::File => BACKEND_FILE,
129    };
130    BACKEND.store(raw, Ordering::Relaxed);
131}
132
133/// Scatter pageout. Logical layout = chunks concatenated in order.
134/// After return, each `Vec` in `chunks` is empty.
135/// File backend preserves capacity; swap backend moves the alloc into the handle.
136/// Empty input returns a `len == 0` handle of the active backend's variant
137/// (no I/O is performed in either backend).
138///
139/// Panics on I/O failure. Use [`try_pageout`] to surface the error.
140pub fn pageout(chunks: &mut [Vec<u64>]) -> Handle {
141    pageout_with(backend(), chunks)
142}
143
144/// Same as [`pageout`], but selects the backend explicitly. Bypasses the global
145/// atomic so callers (such as the column-pager layer) can dispatch per call
146/// without racing other writers.
147///
148/// Panics on I/O failure. Use [`try_pageout_with`] to surface the error.
149pub fn pageout_with(b: Backend, chunks: &mut [Vec<u64>]) -> Handle {
150    match b {
151        Backend::Swap => swap::pageout_swap(chunks),
152        Backend::File => file::pageout_file(chunks),
153    }
154}
155
156/// Fallible counterpart to [`pageout`]. Returns the underlying I/O error
157/// instead of panicking. The swap backend cannot fail at I/O, so this is
158/// equivalent to `Ok(pageout(chunks))` when [`backend`] is [`Backend::Swap`].
159pub fn try_pageout(chunks: &mut [Vec<u64>]) -> std::io::Result<Handle> {
160    try_pageout_with(backend(), chunks)
161}
162
163/// Fallible counterpart to [`pageout_with`].
164pub fn try_pageout_with(b: Backend, chunks: &mut [Vec<u64>]) -> std::io::Result<Handle> {
165    match b {
166        Backend::Swap => Ok(swap::pageout_swap(chunks)),
167        Backend::File => file::try_pageout_file(chunks),
168    }
169}
170
171/// Reads multiple ranges. Output appended to `dst` in request order (concat).
172///
173/// Ranges must be pairwise non-overlapping; ordering is otherwise unconstrained.
174/// Panics if any range is out of bounds, if two ranges overlap, or on I/O
175/// failure. Use [`try_read_at_many`] for fallible reads.
176pub fn read_at_many(handle: &Handle, ranges: &[(usize, usize)], dst: &mut Vec<u64>) {
177    assert_ranges_disjoint(ranges);
178    match &handle.inner {
179        HandleInner::Swap(_) => swap::read_at_swap(handle, ranges, dst),
180        HandleInner::File(_) => file::read_at_file(handle, ranges, dst),
181    }
182}
183
184/// Fallible counterpart to [`read_at_many`]. Caller-side preconditions
185/// (out-of-bounds, overlapping ranges) still panic; I/O failures return `Err`.
186pub fn try_read_at_many(
187    handle: &Handle,
188    ranges: &[(usize, usize)],
189    dst: &mut Vec<u64>,
190) -> std::io::Result<()> {
191    assert_ranges_disjoint(ranges);
192    match &handle.inner {
193        HandleInner::Swap(_) => {
194            swap::read_at_swap(handle, ranges, dst);
195            Ok(())
196        }
197        HandleInner::File(_) => file::try_read_at_file(handle, ranges, dst),
198    }
199}
200
201/// Asserts that no two ranges share any byte position. Both backends assume
202/// disjoint ranges: the file backend coalesces adjacent ranges into a single
203/// pread and writes into a contiguous slice of `dst`, and the swap backend
204/// likewise concatenates into `dst` per range. Overlapping ranges would
205/// duplicate bytes in `dst` silently, which is almost certainly not what the
206/// caller meant; reject upfront so misuse fails loud.
207fn assert_ranges_disjoint(ranges: &[(usize, usize)]) {
208    // Skip zero-length ranges; they cannot overlap anything by definition.
209    let mut sorted: Vec<(usize, usize)> = ranges.iter().copied().filter(|&(_, l)| l > 0).collect();
210    sorted.sort_by_key(|&(off, _)| off);
211    let mut prev_end: usize = 0;
212    for (off, len) in sorted {
213        assert!(
214            off >= prev_end,
215            "read_at_many: overlapping ranges (range starting at {off} overlaps a previous range ending at {prev_end})",
216        );
217        prev_end = off
218            .checked_add(len)
219            .expect("range offset+len overflow in assert_ranges_disjoint");
220    }
221}
222
223/// Reads a single range. Convenience wrapper around [`read_at_many`].
224pub fn read_at(handle: &Handle, offset: usize, len: usize, dst: &mut Vec<u64>) {
225    read_at_many(handle, &[(offset, len)], dst);
226}
227
228/// Fallible counterpart to [`read_at`].
229pub fn try_read_at(
230    handle: &Handle,
231    offset: usize,
232    len: usize,
233    dst: &mut Vec<u64>,
234) -> std::io::Result<()> {
235    try_read_at_many(handle, &[(offset, len)], dst)
236}
237
238/// Consumes handle, writing the entire payload into `dst` (cleared first), then reclaims storage.
239/// Swap fast path: single-chunk handle into empty `dst` swaps in place, no copy.
240///
241/// Panics on I/O failure. Use [`try_take`] to surface the error.
242pub fn take(handle: Handle, dst: &mut Vec<u64>) {
243    match &handle.inner {
244        HandleInner::Swap(_) => swap::take_swap(handle, dst),
245        HandleInner::File(_) => file::take_file(handle, dst),
246    }
247}
248
249/// Fallible counterpart to [`take`]. On I/O error the handle is consumed and
250/// `dst` may hold partial data; the scratch file is unlinked on inner drop.
251pub fn try_take(handle: Handle, dst: &mut Vec<u64>) -> std::io::Result<()> {
252    match &handle.inner {
253        HandleInner::Swap(_) => {
254            swap::take_swap(handle, dst);
255            Ok(())
256        }
257        HandleInner::File(_) => file::try_take_file(handle, dst),
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[mz_ore::test]
266    fn backend_round_trip() {
267        set_backend(Backend::File);
268        assert_eq!(backend(), Backend::File);
269        set_backend(Backend::Swap);
270        assert_eq!(backend(), Backend::Swap);
271    }
272
273    #[mz_ore::test]
274    fn disjoint_check_accepts_sorted_adjacent_and_unsorted_disjoint() {
275        // Adjacent: not overlapping.
276        assert_ranges_disjoint(&[(0, 4), (4, 4)]);
277        // Sorted with gaps.
278        assert_ranges_disjoint(&[(0, 2), (10, 3), (100, 1)]);
279        // Unsorted but disjoint.
280        assert_ranges_disjoint(&[(10, 3), (0, 2), (100, 1)]);
281        // Zero-length ranges are always OK.
282        assert_ranges_disjoint(&[(5, 0), (5, 0), (5, 0)]);
283    }
284
285    #[mz_ore::test]
286    #[should_panic(expected = "overlapping ranges")]
287    fn disjoint_check_rejects_overlap() {
288        assert_ranges_disjoint(&[(0, 4), (2, 4)]);
289    }
290
291    #[mz_ore::test]
292    #[should_panic(expected = "overlapping ranges")]
293    fn disjoint_check_rejects_overlap_unsorted() {
294        assert_ranges_disjoint(&[(10, 5), (0, 2), (12, 1)]);
295    }
296
297    #[mz_ore::test]
298    #[should_panic(expected = "overlapping ranges")]
299    fn disjoint_check_rejects_duplicate_range() {
300        assert_ranges_disjoint(&[(3, 2), (3, 2)]);
301    }
302}
303
304#[cfg(test)]
305mod dispatch_tests {
306    use super::*;
307
308    #[mz_ore::test]
309    fn end_to_end_swap() {
310        set_backend(Backend::Swap);
311        let mut chunks = [vec![1u64, 2, 3, 4]];
312        let h = pageout(&mut chunks);
313        assert_eq!(h.len(), 4);
314        assert!(chunks[0].is_empty());
315
316        let mut dst = Vec::new();
317        read_at(&h, 1, 2, &mut dst);
318        assert_eq!(dst, vec![2, 3]);
319
320        let mut dst2 = Vec::new();
321        take(h, &mut dst2);
322        assert_eq!(dst2, vec![1, 2, 3, 4]);
323    }
324}