1use std::fs::File;
24use std::io::IoSlice;
25use std::path::{Path, PathBuf};
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Mutex, OnceLock};
28
29use crate::cast::CastFrom;
30use crate::pager::Handle;
31
32static SCRATCH_DIR: OnceLock<PathBuf> = OnceLock::new();
33static SUBDIR: OnceLock<PathBuf> = OnceLock::new();
34static SCRATCH_ID: AtomicU64 = AtomicU64::new(0);
35static INIT_LOCK: Mutex<()> = Mutex::new(());
38
39pub fn set_scratch_dir(root: PathBuf) {
46 let _guard = INIT_LOCK.lock().expect("mz_ore::pager INIT_LOCK poisoned");
47 if let Some(existing) = SCRATCH_DIR.get() {
48 if *existing != root {
49 tracing::warn!(
50 ?root,
51 ?existing,
52 "mz_ore::pager scratch dir already set; ignoring",
53 );
54 }
55 return;
56 }
57 match init_subdir(&root) {
58 Ok(()) => {
59 let _ = SCRATCH_DIR.set(root);
62 }
63 Err(err) => {
64 tracing::warn!(
65 ?root,
66 %err,
67 "mz_ore::pager: failed to initialize scratch subdir; will retry on next call",
68 );
69 }
70 }
71}
72
73fn init_subdir(root: &Path) -> std::io::Result<()> {
74 let nonce: u64 = rand::random();
75 let pid = std::process::id();
76 let subdir = root.join(format!("mz-pager-{pid}-{nonce:016x}"));
77 std::fs::create_dir_all(&subdir)?;
78 let _ = SUBDIR.set(subdir);
79 Ok(())
80}
81
82pub(crate) fn scratch_path(id: u64) -> PathBuf {
83 SUBDIR
84 .get()
85 .expect("mz_ore::pager file backend used before set_scratch_dir")
86 .join(format!("{id}.bin"))
87}
88
89pub(crate) fn alloc_scratch_id() -> u64 {
90 SCRATCH_ID.fetch_add(1, Ordering::Relaxed)
91}
92
93#[derive(Debug)]
98pub(crate) struct FileInner {
99 pub(crate) id: u64,
100 pub(crate) len_u64s: usize,
101}
102
103impl FileInner {
104 pub(crate) fn new(id: u64, len_u64s: usize) -> Self {
105 Self { id, len_u64s }
106 }
107}
108
109impl Drop for FileInner {
110 fn drop(&mut self) {
111 if self.len_u64s == 0 {
113 return;
114 }
115 let Some(subdir) = SUBDIR.get() else {
118 return;
119 };
120 let path = subdir.join(format!("{}.bin", self.id));
121 if let Err(err) = std::fs::remove_file(&path) {
122 if err.kind() != std::io::ErrorKind::NotFound {
124 tracing::warn!(?path, %err, "mz_ore::pager: failed to unlink scratch file");
125 }
126 }
127 }
128}
129
130pub(crate) fn try_pageout_file(chunks: &mut [Vec<u64>]) -> std::io::Result<Handle> {
134 let total: usize = chunks.iter().map(|c| c.len()).sum();
135 if total == 0 {
136 return Ok(Handle::from_file(FileInner::new(alloc_scratch_id(), 0)));
140 }
141 let id = alloc_scratch_id();
142 let path = scratch_path(id);
143 match write_chunks(&path, chunks) {
144 Ok(()) => {
145 for c in chunks.iter_mut() {
146 c.clear();
147 }
148 Ok(Handle::from_file(FileInner::new(id, total)))
149 }
150 Err(err) => {
151 let _ = std::fs::remove_file(&path);
154 Err(err)
155 }
156 }
157}
158
159pub(crate) fn pageout_file(chunks: &mut [Vec<u64>]) -> Handle {
160 try_pageout_file(chunks)
161 .unwrap_or_else(|err| panic!("mz_ore::pager: file pageout failed: {err}"))
162}
163
164fn write_chunks(path: &Path, chunks: &[Vec<u64>]) -> std::io::Result<()> {
165 let file = File::options().write(true).create_new(true).open(path)?;
166 let mut slices: Vec<IoSlice<'_>> = chunks
167 .iter()
168 .filter(|c| !c.is_empty())
169 .map(|c| IoSlice::new(bytemuck::cast_slice(c.as_slice())))
170 .collect();
171 write_all_vectored(&file, slices.as_mut_slice())?;
172 Ok(())
173}
174
175fn write_all_vectored(mut file: &File, mut slices: &mut [IoSlice<'_>]) -> std::io::Result<()> {
176 use std::io::Write;
177 while !slices.is_empty() {
178 let written = file.write_vectored(slices)?;
179 if written == 0 {
180 return Err(std::io::Error::new(
181 std::io::ErrorKind::WriteZero,
182 "write_vectored returned 0",
183 ));
184 }
185 IoSlice::advance_slices(&mut slices, written);
186 }
187 Ok(())
188}
189
190pub(crate) fn try_read_at_file(
194 handle: &Handle,
195 ranges: &[(usize, usize)],
196 dst: &mut Vec<u64>,
197) -> std::io::Result<()> {
198 use std::os::unix::fs::FileExt;
199
200 let inner = handle
201 .file_inner()
202 .expect("try_read_at_file called on non-file handle");
203 let total = inner.len_u64s;
204 for &(off, len) in ranges {
205 let end = off.checked_add(len).expect("range offset+len overflow");
206 assert!(
207 end <= total,
208 "read range out of bounds: {off}+{len} > {total}"
209 );
210 }
211 if total == 0 {
214 return Ok(());
215 }
216 let path = scratch_path(inner.id);
217 let file = File::open(&path).map_err(|err| {
218 std::io::Error::new(err.kind(), format!("mz_ore::pager: open {path:?}: {err}"))
219 })?;
220
221 let coalesced = coalesce(ranges);
222 for (range_idx, (off, len)) in coalesced.iter().copied().enumerate() {
223 let byte_off = u64::cast_from(off)
226 .checked_mul(8)
227 .expect("byte offset overflow");
228 let byte_len = len.checked_mul(8).expect("byte length overflow");
229 let buf_start = dst.len();
230 dst.resize(buf_start + len, 0);
231 let buf: &mut [u8] = bytemuck::cast_slice_mut(&mut dst[buf_start..buf_start + len]);
232 let mut filled = 0;
233 while filled < byte_len {
234 let pos = byte_off + u64::cast_from(filled);
235 let n = file
236 .read_at(&mut buf[filled..byte_len], pos)
237 .map_err(|err| {
238 std::io::Error::new(
239 err.kind(),
240 format!(
241 "mz_ore::pager: pread {path:?} pos={pos} \
242 (range #{range_idx}, off={off} len={len}): {err}",
243 ),
244 )
245 })?;
246 if n == 0 {
247 return Err(std::io::Error::new(
248 std::io::ErrorKind::UnexpectedEof,
249 format!(
250 "mz_ore::pager: pread short read at {path:?} pos={pos} \
251 (range #{range_idx}, off={off} len={len}): \
252 filled {filled} of {byte_len} bytes for this range",
253 ),
254 ));
255 }
256 filled += n;
257 }
258 }
259 Ok(())
260}
261
262pub(crate) fn read_at_file(handle: &Handle, ranges: &[(usize, usize)], dst: &mut Vec<u64>) {
263 try_read_at_file(handle, ranges, dst)
264 .unwrap_or_else(|err| panic!("mz_ore::pager: read_at_file failed: {err}"));
265}
266
267fn coalesce(ranges: &[(usize, usize)]) -> Vec<(usize, usize)> {
268 let mut out: Vec<(usize, usize)> = Vec::with_capacity(ranges.len());
269 for &(off, len) in ranges {
270 if let Some(last) = out.last_mut() {
271 if last.0 + last.1 == off {
272 last.1 += len;
273 continue;
274 }
275 }
276 out.push((off, len));
277 }
278 out
279}
280
281pub(crate) fn try_take_file(handle: Handle, dst: &mut Vec<u64>) -> std::io::Result<()> {
286 use std::os::unix::fs::FileExt;
287
288 let inner = handle
289 .into_file_inner()
290 .expect("try_take_file called on non-file handle");
291 dst.clear();
292 if inner.len_u64s == 0 {
293 drop(inner);
295 return Ok(());
296 }
297 let path = scratch_path(inner.id);
298 let file = File::open(&path).map_err(|err| {
299 std::io::Error::new(
300 err.kind(),
301 format!("mz_ore::pager: take open {path:?}: {err}"),
302 )
303 })?;
304 dst.resize(inner.len_u64s, 0);
305 let buf: &mut [u8] = bytemuck::cast_slice_mut(dst.as_mut_slice());
306 let buf_len = buf.len();
307 let mut filled = 0;
308 while filled < buf_len {
309 let pos = u64::cast_from(filled);
310 let n = file.read_at(&mut buf[filled..], pos).map_err(|err| {
311 std::io::Error::new(
312 err.kind(),
313 format!("mz_ore::pager: take pread {path:?} pos={pos}: {err}"),
314 )
315 })?;
316 if n == 0 {
317 return Err(std::io::Error::new(
318 std::io::ErrorKind::UnexpectedEof,
319 format!(
320 "mz_ore::pager: take short read at {path:?}: filled {filled} of {buf_len} bytes",
321 ),
322 ));
323 }
324 filled += n;
325 }
326 drop(file);
327 drop(inner);
329 Ok(())
330}
331
332pub(crate) fn take_file(handle: Handle, dst: &mut Vec<u64>) {
333 try_take_file(handle, dst)
334 .unwrap_or_else(|err| panic!("mz_ore::pager: take_file failed: {err}"));
335}
336
337#[cfg(test)]
338mod backend_tests {
339 use super::*;
340
341 fn setup_dir() {
342 let _ = super::tests::shared_scratch();
343 }
344
345 #[mz_ore::test]
346 #[cfg_attr(miri, ignore)] fn pageout_writes_file_and_clears_capacity() {
348 setup_dir();
349 let mut chunks = [vec![10u64, 20, 30], vec![40, 50]];
350 let cap_before_0 = chunks[0].capacity();
351 let cap_before_1 = chunks[1].capacity();
352 let h = pageout_file(&mut chunks);
353 assert_eq!(h.len(), 5);
354 assert!(chunks[0].is_empty());
355 assert!(chunks[1].is_empty());
356 assert_eq!(chunks[0].capacity(), cap_before_0);
358 assert_eq!(chunks[1].capacity(), cap_before_1);
359
360 let inner = h.file_inner().expect("file inner");
361 let path = scratch_path(inner.id);
362 assert!(path.exists());
363 let bytes = std::fs::read(&path).expect("read scratch");
364 assert_eq!(bytes.len(), 5 * 8);
365 }
366
367 #[mz_ore::test]
368 #[cfg_attr(miri, ignore)] fn file_read_at_basic() {
370 setup_dir();
371 let mut chunks = [vec![1u64, 2, 3, 4, 5]];
372 let h = pageout_file(&mut chunks);
373 let mut dst = Vec::new();
374 read_at_file(&h, &[(1, 3)], &mut dst);
375 assert_eq!(dst, vec![2, 3, 4]);
376 }
377
378 #[mz_ore::test]
379 #[cfg_attr(miri, ignore)] fn file_read_at_many_concats_and_coalesces() {
381 setup_dir();
382 let mut chunks = [vec![10u64, 20, 30, 40, 50, 60]];
383 let h = pageout_file(&mut chunks);
384 let mut dst = Vec::new();
385 read_at_file(&h, &[(0, 2), (2, 2), (5, 1)], &mut dst);
387 assert_eq!(dst, vec![10, 20, 30, 40, 60]);
388 }
389
390 #[mz_ore::test]
391 #[cfg_attr(miri, ignore)] #[should_panic(expected = "out of bounds")]
393 fn file_read_at_panics_on_oob() {
394 setup_dir();
395 let mut chunks = [vec![1u64, 2]];
396 let h = pageout_file(&mut chunks);
397 let mut dst = Vec::new();
398 read_at_file(&h, &[(0, 99)], &mut dst);
399 }
400
401 #[mz_ore::test]
402 #[cfg_attr(miri, ignore)] fn file_take_returns_data_and_unlinks() {
404 setup_dir();
405 let mut chunks = [vec![7u64; 100]];
406 let h = pageout_file(&mut chunks);
407 let inner_id = h.file_inner().unwrap().id;
408 let path = scratch_path(inner_id);
409 assert!(path.exists());
410 let mut dst = Vec::new();
411 take_file(h, &mut dst);
412 assert_eq!(dst, vec![7u64; 100]);
413 assert!(!path.exists(), "scratch file should be unlinked after take");
414 }
415
416 #[mz_ore::test]
417 #[cfg_attr(miri, ignore)] fn file_drop_unlinks_when_not_taken() {
419 setup_dir();
420 let mut chunks = [vec![1u64, 2, 3]];
421 let h = pageout_file(&mut chunks);
422 let id = h.file_inner().unwrap().id;
423 let path = scratch_path(id);
424 assert!(path.exists());
425 drop(h);
426 assert!(!path.exists(), "scratch file should be unlinked on drop");
427 }
428
429 #[mz_ore::test]
430 fn file_empty_handle_round_trips() {
431 setup_dir();
432 let mut chunks: [Vec<u64>; 0] = [];
433 let h = pageout_file(&mut chunks);
434 assert_eq!(h.len(), 0);
435 assert!(
437 h.file_inner().is_some(),
438 "empty handle should be File-variant"
439 );
440
441 let mut dst = vec![0xdeadu64];
442 read_at_file(&h, &[], &mut dst);
443 assert_eq!(dst, vec![0xdeadu64], "empty read leaves dst untouched");
444
445 let mut dst = Vec::new();
446 take_file(h, &mut dst);
447 assert!(dst.is_empty());
448 }
449
450 #[mz_ore::test]
451 #[cfg_attr(miri, ignore)] fn try_read_at_file_surfaces_missing_file() {
453 setup_dir();
454 let mut chunks = [vec![1u64, 2, 3]];
455 let h = pageout_file(&mut chunks);
456 let path = scratch_path(h.file_inner().unwrap().id);
458 std::fs::remove_file(&path).expect("unlink scratch");
459 let mut dst = Vec::new();
460 let err = try_read_at_file(&h, &[(0, 3)], &mut dst).expect_err("should surface ENOENT");
461 assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use tempfile::tempdir;
469
470 static TEST_DIR: std::sync::OnceLock<tempfile::TempDir> = std::sync::OnceLock::new();
471
472 pub(super) fn shared_scratch() -> &'static std::path::Path {
473 let dir = TEST_DIR.get_or_init(|| tempdir().expect("tempdir"));
474 set_scratch_dir(dir.path().to_owned());
475 dir.path()
476 }
477
478 #[mz_ore::test]
479 fn set_scratch_dir_creates_subdir() {
480 let root = shared_scratch();
481 let subdir = SUBDIR.get().expect("subdir was initialized");
482 assert!(subdir.exists());
483 assert!(subdir.starts_with(root));
484 assert!(
485 subdir
486 .file_name()
487 .unwrap()
488 .to_str()
489 .unwrap()
490 .starts_with("mz-pager-")
491 );
492 }
493}