Skip to main content

mz_ore/pager/
swap.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Swap backend for the pager. See `mz_ore::pager` for the public API.
17
18use crate::pager::Handle;
19
20/// Storage for a swap-backed handle.
21#[derive(Debug)]
22pub(crate) struct SwapInner {
23    /// Logical chunks; logical layout is concatenation in this order.
24    pub(crate) chunks: Vec<Vec<u64>>,
25    /// Cumulative element counts; `prefix[i]` = sum of `chunks[..i]` lengths.
26    /// `prefix[0] == 0`, `prefix.last() == total_len`.
27    pub(crate) prefix: Vec<usize>,
28}
29
30impl SwapInner {
31    pub(crate) fn new(chunks: Vec<Vec<u64>>) -> Self {
32        let mut prefix = Vec::with_capacity(chunks.len() + 1);
33        prefix.push(0);
34        let mut sum = 0;
35        for c in &chunks {
36            sum += c.len();
37            prefix.push(sum);
38        }
39        Self { chunks, prefix }
40    }
41
42    pub(crate) fn total_len(&self) -> usize {
43        // `new` always pushes the initial 0, so `prefix` has at least one element.
44        *self
45            .prefix
46            .last()
47            .expect("SwapInner::prefix invariant: at least [0]")
48    }
49}
50
51pub(crate) fn pageout_swap(chunks: &mut [Vec<u64>]) -> Handle {
52    let mut taken: Vec<Vec<u64>> = Vec::with_capacity(chunks.len());
53    for c in chunks.iter_mut() {
54        taken.push(std::mem::take(c));
55    }
56    for c in &taken {
57        madvise_cold(c);
58    }
59    Handle::from_swap(SwapInner::new(taken))
60}
61
62#[cfg(target_os = "linux")]
63fn madvise_cold(chunk: &[u64]) {
64    if chunk.is_empty() {
65        return;
66    }
67    let page = page_size();
68    let base_ptr = chunk.as_ptr();
69    let base_addr = base_ptr.addr();
70    // `Vec<u64>` cannot exceed `isize::MAX` bytes, so this multiplication
71    // cannot overflow on any supported target. Use `checked_mul` for
72    // defense-in-depth: a corrupted length should fail loudly, not wrap.
73    let Some(len_bytes) = chunk.len().checked_mul(std::mem::size_of::<u64>()) else {
74        return;
75    };
76    // Round the start up and the end down to page boundaries. Both additions
77    // use `checked_add` so that an allocation sitting near the top of the
78    // address space can never silently wrap into a tiny range.
79    let Some(start_unaligned) = base_addr.checked_add(page - 1) else {
80        return;
81    };
82    let Some(end_unaligned) = base_addr.checked_add(len_bytes) else {
83        return;
84    };
85    let aligned_start_addr = start_unaligned & !(page - 1);
86    let aligned_end_addr = end_unaligned & !(page - 1);
87    if aligned_end_addr <= aligned_start_addr {
88        return;
89    }
90    let aligned_len = aligned_end_addr - aligned_start_addr;
91    // SAFETY: `aligned_start_addr` lies in `[base_addr, base_addr + len_bytes]`
92    // by construction (rounding up the start cannot exceed `end_unaligned`,
93    // which equals `base_addr + len_bytes`; the early-return above guarantees
94    // `start ≤ end`). That interval is exactly the range covered by the live
95    // `&[u64]`, so `byte_add` stays in-bounds and preserves provenance.
96    let aligned_ptr = unsafe { base_ptr.byte_add(aligned_start_addr - base_addr) }
97        .cast::<libc::c_void>()
98        .cast_mut();
99    // SAFETY: pointer/length describe a fully page-aligned subrange contained
100    // within the live `&[u64]` (justified above). `MADV_COLD` is non-mutating;
101    // it only signals reclaim preference to the kernel, so concurrent reads
102    // of the slice remain sound.
103    unsafe {
104        libc::madvise(aligned_ptr, aligned_len, libc::MADV_COLD);
105    }
106}
107
108#[cfg(not(target_os = "linux"))]
109fn madvise_cold(_chunk: &[u64]) {}
110
111#[cfg(target_os = "linux")]
112fn page_size() -> usize {
113    // SAFETY: `sysconf` with a valid argument is safe.
114    let raw = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
115    usize::try_from(raw).expect("page size is positive and fits usize")
116}
117
118pub(crate) fn read_at_swap(handle: &Handle, ranges: &[(usize, usize)], dst: &mut Vec<u64>) {
119    let inner = handle
120        .swap_inner()
121        .expect("read_at_swap called on non-swap handle");
122    let total = inner.total_len();
123    let total_out: usize = ranges.iter().map(|(_, l)| *l).sum();
124    dst.reserve(total_out);
125    for &(off, len) in ranges {
126        let end = off.checked_add(len).expect("range offset+len overflow");
127        assert!(
128            end <= total,
129            "read range out of bounds: {off}+{len} > {total}"
130        );
131        copy_range(inner, off, len, dst);
132    }
133}
134
135fn copy_range(inner: &SwapInner, off: usize, len: usize, dst: &mut Vec<u64>) {
136    if len == 0 {
137        return;
138    }
139    let mut remaining = len;
140    let mut cur = off;
141    let mut idx = match inner.prefix.binary_search(&cur) {
142        Ok(i) => i,
143        Err(i) => i.saturating_sub(1),
144    };
145    while remaining > 0 {
146        let chunk_start = inner.prefix[idx];
147        let chunk = &inner.chunks[idx];
148        let local = cur - chunk_start;
149        let take = std::cmp::min(remaining, chunk.len() - local);
150        dst.extend_from_slice(&chunk[local..local + take]);
151        cur += take;
152        remaining -= take;
153        idx += 1;
154    }
155}
156
157pub(crate) fn take_swap(handle: Handle, dst: &mut Vec<u64>) {
158    let inner = match handle.into_swap_inner() {
159        Some(s) => s,
160        None => panic!("take_swap called on non-swap handle"),
161    };
162    dst.clear();
163    let mut chunks = inner.chunks;
164    if chunks.len() == 1 && dst.capacity() == 0 {
165        let only = chunks.pop().unwrap();
166        *dst = only;
167        return;
168    }
169    let total: usize = chunks.iter().map(|c| c.len()).sum();
170    dst.reserve(total);
171    for c in chunks {
172        dst.extend_from_slice(&c);
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use super::*;
179    use crate::pager::Handle;
180
181    #[mz_ore::test]
182    fn pageout_takes_chunks_and_records_lengths() {
183        let a = vec![1u64, 2, 3];
184        let b = vec![4u64, 5];
185        let mut chunks = [a, b];
186        let h: Handle = pageout_swap(&mut chunks);
187        assert_eq!(h.len(), 5);
188        assert!(chunks[0].is_empty());
189        assert!(chunks[1].is_empty());
190    }
191
192    #[mz_ore::test]
193    fn read_at_within_single_chunk() {
194        let mut chunks = [vec![10u64, 11, 12, 13, 14]];
195        let h = pageout_swap(&mut chunks);
196        let mut dst = Vec::new();
197        read_at_swap(&h, &[(1, 3)], &mut dst);
198        assert_eq!(dst, vec![11, 12, 13]);
199    }
200
201    #[mz_ore::test]
202    fn read_at_spans_chunks() {
203        let mut chunks = [vec![1u64, 2, 3], vec![4, 5, 6]];
204        let h = pageout_swap(&mut chunks);
205        let mut dst = Vec::new();
206        read_at_swap(&h, &[(2, 3)], &mut dst);
207        assert_eq!(dst, vec![3, 4, 5]);
208    }
209
210    #[mz_ore::test]
211    fn read_at_many_concats() {
212        let mut chunks = [vec![1u64, 2, 3, 4, 5]];
213        let h = pageout_swap(&mut chunks);
214        let mut dst = Vec::new();
215        read_at_swap(&h, &[(0, 2), (3, 2)], &mut dst);
216        assert_eq!(dst, vec![1, 2, 4, 5]);
217    }
218
219    #[mz_ore::test]
220    #[should_panic(expected = "out of bounds")]
221    fn read_at_panics_on_oob() {
222        let mut chunks = [vec![1u64, 2]];
223        let h = pageout_swap(&mut chunks);
224        let mut dst = Vec::new();
225        read_at_swap(&h, &[(1, 5)], &mut dst);
226    }
227
228    #[mz_ore::test]
229    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `madvise` on OS `linux`
230    fn take_single_chunk_zero_copy() {
231        let v = vec![100u64; 1024];
232        let ptr_before = v.as_ptr();
233        let mut chunks = [v];
234        let h = pageout_swap(&mut chunks);
235        let mut dst = Vec::new();
236        take_swap(h, &mut dst);
237        assert_eq!(dst.len(), 1024);
238        assert_eq!(
239            dst.as_ptr(),
240            ptr_before,
241            "single-chunk take should be zero-copy"
242        );
243    }
244
245    #[mz_ore::test]
246    fn take_multi_chunk_concats() {
247        let mut chunks = [vec![1u64, 2], vec![3, 4, 5]];
248        let h = pageout_swap(&mut chunks);
249        let mut dst = Vec::new();
250        take_swap(h, &mut dst);
251        assert_eq!(dst, vec![1, 2, 3, 4, 5]);
252    }
253}