1use crate::pager::Handle;
19
20#[derive(Debug)]
22pub(crate) struct SwapInner {
23 pub(crate) chunks: Vec<Vec<u64>>,
25 pub(crate) prefix: Vec<usize>,
28}
29
30impl SwapInner {
31 pub(crate) fn new(chunks: Vec<Vec<u64>>) -> Self {
32 let mut prefix = Vec::with_capacity(chunks.len() + 1);
33 prefix.push(0);
34 let mut sum = 0;
35 for c in &chunks {
36 sum += c.len();
37 prefix.push(sum);
38 }
39 Self { chunks, prefix }
40 }
41
42 pub(crate) fn total_len(&self) -> usize {
43 *self
45 .prefix
46 .last()
47 .expect("SwapInner::prefix invariant: at least [0]")
48 }
49}
50
51pub(crate) fn pageout_swap(chunks: &mut [Vec<u64>]) -> Handle {
52 let mut taken: Vec<Vec<u64>> = Vec::with_capacity(chunks.len());
53 for c in chunks.iter_mut() {
54 taken.push(std::mem::take(c));
55 }
56 for c in &taken {
57 madvise_cold(c);
58 }
59 Handle::from_swap(SwapInner::new(taken))
60}
61
62#[cfg(target_os = "linux")]
63fn madvise_cold(chunk: &[u64]) {
64 if chunk.is_empty() {
65 return;
66 }
67 let page = page_size();
68 let base_ptr = chunk.as_ptr();
69 let base_addr = base_ptr.addr();
70 let Some(len_bytes) = chunk.len().checked_mul(std::mem::size_of::<u64>()) else {
74 return;
75 };
76 let Some(start_unaligned) = base_addr.checked_add(page - 1) else {
80 return;
81 };
82 let Some(end_unaligned) = base_addr.checked_add(len_bytes) else {
83 return;
84 };
85 let aligned_start_addr = start_unaligned & !(page - 1);
86 let aligned_end_addr = end_unaligned & !(page - 1);
87 if aligned_end_addr <= aligned_start_addr {
88 return;
89 }
90 let aligned_len = aligned_end_addr - aligned_start_addr;
91 let aligned_ptr = unsafe { base_ptr.byte_add(aligned_start_addr - base_addr) }
97 .cast::<libc::c_void>()
98 .cast_mut();
99 unsafe {
104 libc::madvise(aligned_ptr, aligned_len, libc::MADV_COLD);
105 }
106}
107
108#[cfg(not(target_os = "linux"))]
109fn madvise_cold(_chunk: &[u64]) {}
110
111#[cfg(target_os = "linux")]
112fn page_size() -> usize {
113 let raw = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
115 usize::try_from(raw).expect("page size is positive and fits usize")
116}
117
118pub(crate) fn read_at_swap(handle: &Handle, ranges: &[(usize, usize)], dst: &mut Vec<u64>) {
119 let inner = handle
120 .swap_inner()
121 .expect("read_at_swap called on non-swap handle");
122 let total = inner.total_len();
123 let total_out: usize = ranges.iter().map(|(_, l)| *l).sum();
124 dst.reserve(total_out);
125 for &(off, len) in ranges {
126 let end = off.checked_add(len).expect("range offset+len overflow");
127 assert!(
128 end <= total,
129 "read range out of bounds: {off}+{len} > {total}"
130 );
131 copy_range(inner, off, len, dst);
132 }
133}
134
135fn copy_range(inner: &SwapInner, off: usize, len: usize, dst: &mut Vec<u64>) {
136 if len == 0 {
137 return;
138 }
139 let mut remaining = len;
140 let mut cur = off;
141 let mut idx = match inner.prefix.binary_search(&cur) {
142 Ok(i) => i,
143 Err(i) => i.saturating_sub(1),
144 };
145 while remaining > 0 {
146 let chunk_start = inner.prefix[idx];
147 let chunk = &inner.chunks[idx];
148 let local = cur - chunk_start;
149 let take = std::cmp::min(remaining, chunk.len() - local);
150 dst.extend_from_slice(&chunk[local..local + take]);
151 cur += take;
152 remaining -= take;
153 idx += 1;
154 }
155}
156
157pub(crate) fn take_swap(handle: Handle, dst: &mut Vec<u64>) {
158 let inner = match handle.into_swap_inner() {
159 Some(s) => s,
160 None => panic!("take_swap called on non-swap handle"),
161 };
162 dst.clear();
163 let mut chunks = inner.chunks;
164 if chunks.len() == 1 && dst.capacity() == 0 {
165 let only = chunks.pop().unwrap();
166 *dst = only;
167 return;
168 }
169 let total: usize = chunks.iter().map(|c| c.len()).sum();
170 dst.reserve(total);
171 for c in chunks {
172 dst.extend_from_slice(&c);
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use super::*;
179 use crate::pager::Handle;
180
181 #[mz_ore::test]
182 fn pageout_takes_chunks_and_records_lengths() {
183 let a = vec![1u64, 2, 3];
184 let b = vec![4u64, 5];
185 let mut chunks = [a, b];
186 let h: Handle = pageout_swap(&mut chunks);
187 assert_eq!(h.len(), 5);
188 assert!(chunks[0].is_empty());
189 assert!(chunks[1].is_empty());
190 }
191
192 #[mz_ore::test]
193 fn read_at_within_single_chunk() {
194 let mut chunks = [vec![10u64, 11, 12, 13, 14]];
195 let h = pageout_swap(&mut chunks);
196 let mut dst = Vec::new();
197 read_at_swap(&h, &[(1, 3)], &mut dst);
198 assert_eq!(dst, vec![11, 12, 13]);
199 }
200
201 #[mz_ore::test]
202 fn read_at_spans_chunks() {
203 let mut chunks = [vec![1u64, 2, 3], vec![4, 5, 6]];
204 let h = pageout_swap(&mut chunks);
205 let mut dst = Vec::new();
206 read_at_swap(&h, &[(2, 3)], &mut dst);
207 assert_eq!(dst, vec![3, 4, 5]);
208 }
209
210 #[mz_ore::test]
211 fn read_at_many_concats() {
212 let mut chunks = [vec![1u64, 2, 3, 4, 5]];
213 let h = pageout_swap(&mut chunks);
214 let mut dst = Vec::new();
215 read_at_swap(&h, &[(0, 2), (3, 2)], &mut dst);
216 assert_eq!(dst, vec![1, 2, 4, 5]);
217 }
218
219 #[mz_ore::test]
220 #[should_panic(expected = "out of bounds")]
221 fn read_at_panics_on_oob() {
222 let mut chunks = [vec![1u64, 2]];
223 let h = pageout_swap(&mut chunks);
224 let mut dst = Vec::new();
225 read_at_swap(&h, &[(1, 5)], &mut dst);
226 }
227
228 #[mz_ore::test]
229 #[cfg_attr(miri, ignore)] fn take_single_chunk_zero_copy() {
231 let v = vec![100u64; 1024];
232 let ptr_before = v.as_ptr();
233 let mut chunks = [v];
234 let h = pageout_swap(&mut chunks);
235 let mut dst = Vec::new();
236 take_swap(h, &mut dst);
237 assert_eq!(dst.len(), 1024);
238 assert_eq!(
239 dst.as_ptr(),
240 ptr_before,
241 "single-chunk take should be zero-copy"
242 );
243 }
244
245 #[mz_ore::test]
246 fn take_multi_chunk_concats() {
247 let mut chunks = [vec![1u64, 2], vec![3, 4, 5]];
248 let h = pageout_swap(&mut chunks);
249 let mut dst = Vec::new();
250 take_swap(h, &mut dst);
251 assert_eq!(dst, vec![1, 2, 3, 4, 5]);
252 }
253}