Skip to main content

mz_ore/
pager.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Explicit pager for cold data. See `doc/developer/design/20260504_pager.md`.
17
18use std::sync::atomic::{AtomicU8, Ordering};
19
20mod file;
21mod swap;
22
23pub use file::set_scratch_dir;
24pub use swap::advise_pageout;
25
26use crate::pager::file::FileInner;
27use crate::pager::swap::SwapInner;
28
29/// An opaque handle to data paged out via [`pageout`]. The handle's backend variant
30/// is fixed at `pageout` time and is independent of any later `set_backend` call.
31#[derive(Debug)]
32pub struct Handle {
33    inner: HandleInner,
34}
35
36#[derive(Debug)]
37enum HandleInner {
38    Swap(SwapInner),
39    File(FileInner),
40}
41
42impl Handle {
43    /// Returns the logical length of the handle's payload in `u64`s.
44    pub fn len(&self) -> usize {
45        match &self.inner {
46            HandleInner::Swap(s) => s.total_len(),
47            HandleInner::File(f) => f.len_u64s,
48        }
49    }
50
51    /// Returns the logical length of the handle's payload in bytes (`len() * 8`).
52    pub fn len_bytes(&self) -> usize {
53        self.len() * 8
54    }
55
56    /// Returns `true` if the handle holds no data.
57    pub fn is_empty(&self) -> bool {
58        self.len() == 0
59    }
60
61    pub(crate) fn from_swap(inner: SwapInner) -> Self {
62        Self {
63            inner: HandleInner::Swap(inner),
64        }
65    }
66
67    pub(crate) fn from_file(inner: FileInner) -> Self {
68        Self {
69            inner: HandleInner::File(inner),
70        }
71    }
72
73    pub(crate) fn swap_inner(&self) -> Option<&SwapInner> {
74        match &self.inner {
75            HandleInner::Swap(s) => Some(s),
76            HandleInner::File(_) => None,
77        }
78    }
79
80    pub(crate) fn file_inner(&self) -> Option<&FileInner> {
81        match &self.inner {
82            HandleInner::File(f) => Some(f),
83            HandleInner::Swap(_) => None,
84        }
85    }
86
87    pub(crate) fn into_swap_inner(self) -> Option<SwapInner> {
88        match self.inner {
89            HandleInner::Swap(s) => Some(s),
90            HandleInner::File(_) => None,
91        }
92    }
93
94    pub(crate) fn into_file_inner(self) -> Option<FileInner> {
95        match self.inner {
96            HandleInner::File(f) => Some(f),
97            HandleInner::Swap(_) => None,
98        }
99    }
100}
101
102/// Selects which backend stores paged-out data.
103#[derive(Copy, Clone, Debug, Eq, PartialEq)]
104pub enum Backend {
105    /// Hold allocations resident; hint the kernel via `MADV_COLD`.
106    Swap,
107    /// Write to a named scratch file; no file descriptor retained.
108    File,
109}
110
111const BACKEND_SWAP: u8 = 0;
112const BACKEND_FILE: u8 = 1;
113
114static BACKEND: AtomicU8 = AtomicU8::new(BACKEND_SWAP);
115
116/// Returns the currently active backend.
117pub fn backend() -> Backend {
118    match BACKEND.load(Ordering::Relaxed) {
119        BACKEND_SWAP => Backend::Swap,
120        BACKEND_FILE => Backend::File,
121        _ => unreachable!("BACKEND atomic holds invalid discriminant"),
122    }
123}
124
125/// Sets the active backend for future `pageout` calls. Existing handles are unaffected.
126pub fn set_backend(b: Backend) {
127    let raw = match b {
128        Backend::Swap => BACKEND_SWAP,
129        Backend::File => BACKEND_FILE,
130    };
131    BACKEND.store(raw, Ordering::Relaxed);
132}
133
134/// Scatter pageout. Logical layout = chunks concatenated in order.
135/// After return, each `Vec` in `chunks` is empty.
136/// File backend preserves capacity; swap backend moves the alloc into the handle.
137/// Empty input returns a `len == 0` handle of the active backend's variant
138/// (no I/O is performed in either backend).
139///
140/// Panics on I/O failure. Use [`try_pageout`] to surface the error.
141pub fn pageout(chunks: &mut [Vec<u64>]) -> Handle {
142    pageout_with(backend(), chunks)
143}
144
145/// Same as [`pageout`], but selects the backend explicitly. Bypasses the global
146/// atomic so callers (such as the column-pager layer) can dispatch per call
147/// without racing other writers.
148///
149/// Panics on I/O failure. Use [`try_pageout_with`] to surface the error.
150pub fn pageout_with(b: Backend, chunks: &mut [Vec<u64>]) -> Handle {
151    match b {
152        Backend::Swap => swap::pageout_swap(chunks),
153        Backend::File => file::pageout_file(chunks),
154    }
155}
156
157/// Fallible counterpart to [`pageout`]. Returns the underlying I/O error
158/// instead of panicking. The swap backend cannot fail at I/O, so this is
159/// equivalent to `Ok(pageout(chunks))` when [`backend`] is [`Backend::Swap`].
160pub fn try_pageout(chunks: &mut [Vec<u64>]) -> std::io::Result<Handle> {
161    try_pageout_with(backend(), chunks)
162}
163
164/// Fallible counterpart to [`pageout_with`].
165pub fn try_pageout_with(b: Backend, chunks: &mut [Vec<u64>]) -> std::io::Result<Handle> {
166    match b {
167        Backend::Swap => Ok(swap::pageout_swap(chunks)),
168        Backend::File => file::try_pageout_file(chunks),
169    }
170}
171
172/// Reads multiple ranges. Output appended to `dst` in request order (concat).
173///
174/// Ranges must be pairwise non-overlapping; ordering is otherwise unconstrained.
175/// Panics if any range is out of bounds, if two ranges overlap, or on I/O
176/// failure. Use [`try_read_at_many`] for fallible reads.
177pub fn read_at_many(handle: &Handle, ranges: &[(usize, usize)], dst: &mut Vec<u64>) {
178    assert_ranges_disjoint(ranges);
179    match &handle.inner {
180        HandleInner::Swap(_) => swap::read_at_swap(handle, ranges, dst),
181        HandleInner::File(_) => file::read_at_file(handle, ranges, dst),
182    }
183}
184
185/// Fallible counterpart to [`read_at_many`]. Caller-side preconditions
186/// (out-of-bounds, overlapping ranges) still panic; I/O failures return `Err`.
187pub fn try_read_at_many(
188    handle: &Handle,
189    ranges: &[(usize, usize)],
190    dst: &mut Vec<u64>,
191) -> std::io::Result<()> {
192    assert_ranges_disjoint(ranges);
193    match &handle.inner {
194        HandleInner::Swap(_) => {
195            swap::read_at_swap(handle, ranges, dst);
196            Ok(())
197        }
198        HandleInner::File(_) => file::try_read_at_file(handle, ranges, dst),
199    }
200}
201
202/// Asserts that no two ranges share any byte position. Both backends assume
203/// disjoint ranges: the file backend coalesces adjacent ranges into a single
204/// pread and writes into a contiguous slice of `dst`, and the swap backend
205/// likewise concatenates into `dst` per range. Overlapping ranges would
206/// duplicate bytes in `dst` silently, which is almost certainly not what the
207/// caller meant; reject upfront so misuse fails loud.
208fn assert_ranges_disjoint(ranges: &[(usize, usize)]) {
209    // Skip zero-length ranges; they cannot overlap anything by definition.
210    let mut sorted: Vec<(usize, usize)> = ranges.iter().copied().filter(|&(_, l)| l > 0).collect();
211    sorted.sort_by_key(|&(off, _)| off);
212    let mut prev_end: usize = 0;
213    for (off, len) in sorted {
214        assert!(
215            off >= prev_end,
216            "read_at_many: overlapping ranges (range starting at {off} overlaps a previous range ending at {prev_end})",
217        );
218        prev_end = off
219            .checked_add(len)
220            .expect("range offset+len overflow in assert_ranges_disjoint");
221    }
222}
223
224/// Reads a single range. Convenience wrapper around [`read_at_many`].
225pub fn read_at(handle: &Handle, offset: usize, len: usize, dst: &mut Vec<u64>) {
226    read_at_many(handle, &[(offset, len)], dst);
227}
228
229/// Fallible counterpart to [`read_at`].
230pub fn try_read_at(
231    handle: &Handle,
232    offset: usize,
233    len: usize,
234    dst: &mut Vec<u64>,
235) -> std::io::Result<()> {
236    try_read_at_many(handle, &[(offset, len)], dst)
237}
238
239/// Consumes handle, writing the entire payload into `dst` (cleared first), then reclaims storage.
240/// Swap fast path: single-chunk handle into empty `dst` swaps in place, no copy.
241///
242/// Panics on I/O failure. Use [`try_take`] to surface the error.
243pub fn take(handle: Handle, dst: &mut Vec<u64>) {
244    match &handle.inner {
245        HandleInner::Swap(_) => swap::take_swap(handle, dst),
246        HandleInner::File(_) => file::take_file(handle, dst),
247    }
248}
249
250/// Fallible counterpart to [`take`]. On I/O error the handle is consumed and
251/// `dst` may hold partial data; the scratch file is unlinked on inner drop.
252pub fn try_take(handle: Handle, dst: &mut Vec<u64>) -> std::io::Result<()> {
253    match &handle.inner {
254        HandleInner::Swap(_) => {
255            swap::take_swap(handle, dst);
256            Ok(())
257        }
258        HandleInner::File(_) => file::try_take_file(handle, dst),
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[mz_ore::test]
267    fn backend_round_trip() {
268        set_backend(Backend::File);
269        assert_eq!(backend(), Backend::File);
270        set_backend(Backend::Swap);
271        assert_eq!(backend(), Backend::Swap);
272    }
273
274    #[mz_ore::test]
275    fn disjoint_check_accepts_sorted_adjacent_and_unsorted_disjoint() {
276        // Adjacent: not overlapping.
277        assert_ranges_disjoint(&[(0, 4), (4, 4)]);
278        // Sorted with gaps.
279        assert_ranges_disjoint(&[(0, 2), (10, 3), (100, 1)]);
280        // Unsorted but disjoint.
281        assert_ranges_disjoint(&[(10, 3), (0, 2), (100, 1)]);
282        // Zero-length ranges are always OK.
283        assert_ranges_disjoint(&[(5, 0), (5, 0), (5, 0)]);
284    }
285
286    #[mz_ore::test]
287    #[should_panic(expected = "overlapping ranges")]
288    fn disjoint_check_rejects_overlap() {
289        assert_ranges_disjoint(&[(0, 4), (2, 4)]);
290    }
291
292    #[mz_ore::test]
293    #[should_panic(expected = "overlapping ranges")]
294    fn disjoint_check_rejects_overlap_unsorted() {
295        assert_ranges_disjoint(&[(10, 5), (0, 2), (12, 1)]);
296    }
297
298    #[mz_ore::test]
299    #[should_panic(expected = "overlapping ranges")]
300    fn disjoint_check_rejects_duplicate_range() {
301        assert_ranges_disjoint(&[(3, 2), (3, 2)]);
302    }
303}
304
305#[cfg(test)]
306mod dispatch_tests {
307    use super::*;
308
309    #[mz_ore::test]
310    fn end_to_end_swap() {
311        set_backend(Backend::Swap);
312        let mut chunks = [vec![1u64, 2, 3, 4]];
313        let h = pageout(&mut chunks);
314        assert_eq!(h.len(), 4);
315        assert!(chunks[0].is_empty());
316
317        let mut dst = Vec::new();
318        read_at(&h, 1, 2, &mut dst);
319        assert_eq!(dst, vec![2, 3]);
320
321        let mut dst2 = Vec::new();
322        take(h, &mut dst2);
323        assert_eq!(dst2, vec![1, 2, 3, 4]);
324    }
325}