Skip to main content

mz_ore/pager/
file.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! File backend for the pager. See `mz_ore::pager` for the public API.
17//!
18//! Stale-scratch cleanup is intentionally out of scope: production runs on
19//! Kubernetes with per-pod ephemeral volumes, so a crashed predecessor's
20//! files are reclaimed with the volume. Local dev tooling is responsible
21//! for sweeping leftovers from crashed processes.
22
23use std::fs::File;
24use std::io::IoSlice;
25use std::path::{Path, PathBuf};
26use std::sync::atomic::{AtomicU64, Ordering};
27use std::sync::{Mutex, OnceLock};
28
29use crate::cast::CastFrom;
30use crate::pager::Handle;
31
32static SCRATCH_DIR: OnceLock<PathBuf> = OnceLock::new();
33static SUBDIR: OnceLock<PathBuf> = OnceLock::new();
34static SCRATCH_ID: AtomicU64 = AtomicU64::new(0);
35/// Serializes `set_scratch_dir` callers so init failures can be retried on
36/// the next call. A plain `Once` would burn the only retry opportunity.
37static INIT_LOCK: Mutex<()> = Mutex::new(());
38
39/// Configures the scratch directory for the file backend.
40///
41/// Idempotent across multiple calls with the same path. A different path on a
42/// subsequent call is logged and ignored. If the first call fails to initialize
43/// the subdir, later calls retry — the scratch directory is committed only
44/// after a successful init.
45pub fn set_scratch_dir(root: PathBuf) {
46    let _guard = INIT_LOCK.lock().expect("mz_ore::pager INIT_LOCK poisoned");
47    if let Some(existing) = SCRATCH_DIR.get() {
48        if *existing != root {
49            tracing::warn!(
50                ?root,
51                ?existing,
52                "mz_ore::pager scratch dir already set; ignoring",
53            );
54        }
55        return;
56    }
57    match init_subdir(&root) {
58        Ok(()) => {
59            // We hold INIT_LOCK and just verified the OnceLock is empty above,
60            // so this set always wins.
61            let _ = SCRATCH_DIR.set(root);
62        }
63        Err(err) => {
64            tracing::warn!(
65                ?root,
66                %err,
67                "mz_ore::pager: failed to initialize scratch subdir; will retry on next call",
68            );
69        }
70    }
71}
72
73fn init_subdir(root: &Path) -> std::io::Result<()> {
74    let nonce: u64 = rand::random();
75    let pid = std::process::id();
76    let subdir = root.join(format!("mz-pager-{pid}-{nonce:016x}"));
77    std::fs::create_dir_all(&subdir)?;
78    let _ = SUBDIR.set(subdir);
79    Ok(())
80}
81
82pub(crate) fn scratch_path(id: u64) -> PathBuf {
83    SUBDIR
84        .get()
85        .expect("mz_ore::pager file backend used before set_scratch_dir")
86        .join(format!("{id}.bin"))
87}
88
89pub(crate) fn alloc_scratch_id() -> u64 {
90    SCRATCH_ID.fetch_add(1, Ordering::Relaxed)
91}
92
93/// Storage for a file-backed handle. The file at `scratch_path(id)` holds the
94/// bytes for non-empty handles. For `len_u64s == 0`, no file is created; drop
95/// is a no-op.
96/// No file descriptor is retained.
97#[derive(Debug)]
98pub(crate) struct FileInner {
99    pub(crate) id: u64,
100    pub(crate) len_u64s: usize,
101}
102
103impl FileInner {
104    pub(crate) fn new(id: u64, len_u64s: usize) -> Self {
105        Self { id, len_u64s }
106    }
107}
108
109impl Drop for FileInner {
110    fn drop(&mut self) {
111        // Empty handles never created a file.
112        if self.len_u64s == 0 {
113            return;
114        }
115        // If the scratch dir was never set up (e.g. construction failed before
116        // any I/O was attempted), there is nothing on disk to clean up.
117        let Some(subdir) = SUBDIR.get() else {
118            return;
119        };
120        let path = subdir.join(format!("{}.bin", self.id));
121        if let Err(err) = std::fs::remove_file(&path) {
122            // ENOENT is fine: a successful `take` already unlinked.
123            if err.kind() != std::io::ErrorKind::NotFound {
124                tracing::warn!(?path, %err, "mz_ore::pager: failed to unlink scratch file");
125            }
126        }
127    }
128}
129
130/// Fallible counterpart to [`pageout_file`]. Returns the underlying I/O error
131/// instead of falling back to a different backend, so callers can distinguish
132/// "scratch volume is broken" from "everything is fine."
133pub(crate) fn try_pageout_file(chunks: &mut [Vec<u64>]) -> std::io::Result<Handle> {
134    let total: usize = chunks.iter().map(|c| c.len()).sum();
135    if total == 0 {
136        // Honor the backend invariant: an empty handle from the file backend
137        // is still a file-variant handle. No file is created on disk; drop
138        // short-circuits when `len_u64s == 0`.
139        return Ok(Handle::from_file(FileInner::new(alloc_scratch_id(), 0)));
140    }
141    let id = alloc_scratch_id();
142    let path = scratch_path(id);
143    match write_chunks(&path, chunks) {
144        Ok(()) => {
145            for c in chunks.iter_mut() {
146                c.clear();
147            }
148            Ok(Handle::from_file(FileInner::new(id, total)))
149        }
150        Err(err) => {
151            // Best-effort cleanup; ignore secondary errors here so we surface
152            // the primary write error to the caller.
153            let _ = std::fs::remove_file(&path);
154            Err(err)
155        }
156    }
157}
158
159pub(crate) fn pageout_file(chunks: &mut [Vec<u64>]) -> Handle {
160    try_pageout_file(chunks)
161        .unwrap_or_else(|err| panic!("mz_ore::pager: file pageout failed: {err}"))
162}
163
164fn write_chunks(path: &Path, chunks: &[Vec<u64>]) -> std::io::Result<()> {
165    let file = File::options().write(true).create_new(true).open(path)?;
166    let mut slices: Vec<IoSlice<'_>> = chunks
167        .iter()
168        .filter(|c| !c.is_empty())
169        .map(|c| IoSlice::new(bytemuck::cast_slice(c.as_slice())))
170        .collect();
171    write_all_vectored(&file, slices.as_mut_slice())?;
172    Ok(())
173}
174
175fn write_all_vectored(mut file: &File, mut slices: &mut [IoSlice<'_>]) -> std::io::Result<()> {
176    use std::io::Write;
177    while !slices.is_empty() {
178        let written = file.write_vectored(slices)?;
179        if written == 0 {
180            return Err(std::io::Error::new(
181                std::io::ErrorKind::WriteZero,
182                "write_vectored returned 0",
183            ));
184        }
185        IoSlice::advance_slices(&mut slices, written);
186    }
187    Ok(())
188}
189
190/// Fallible counterpart to [`read_at_file`]. Returns `Err` instead of
191/// panicking on I/O failure (open/pread errors). Bounds violations and
192/// non-file handles still panic — those are caller bugs, not I/O failures.
193pub(crate) fn try_read_at_file(
194    handle: &Handle,
195    ranges: &[(usize, usize)],
196    dst: &mut Vec<u64>,
197) -> std::io::Result<()> {
198    use std::os::unix::fs::FileExt;
199
200    let inner = handle
201        .file_inner()
202        .expect("try_read_at_file called on non-file handle");
203    let total = inner.len_u64s;
204    for &(off, len) in ranges {
205        let end = off.checked_add(len).expect("range offset+len overflow");
206        assert!(
207            end <= total,
208            "read range out of bounds: {off}+{len} > {total}"
209        );
210    }
211    // Empty handle: all ranges must be `(_, 0)` after the bounds check above,
212    // so there is no I/O to perform. Skip opening the (nonexistent) file.
213    if total == 0 {
214        return Ok(());
215    }
216    let path = scratch_path(inner.id);
217    let file = File::open(&path).map_err(|err| {
218        std::io::Error::new(err.kind(), format!("mz_ore::pager: open {path:?}: {err}"))
219    })?;
220
221    let coalesced = coalesce(ranges);
222    for (range_idx, (off, len)) in coalesced.iter().copied().enumerate() {
223        // Multiply in `u64` space: `off * 8` would overflow on 32-bit targets
224        // for handles holding more than 512Mi `u64`s.
225        let byte_off = u64::cast_from(off)
226            .checked_mul(8)
227            .expect("byte offset overflow");
228        let byte_len = len.checked_mul(8).expect("byte length overflow");
229        let buf_start = dst.len();
230        dst.resize(buf_start + len, 0);
231        let buf: &mut [u8] = bytemuck::cast_slice_mut(&mut dst[buf_start..buf_start + len]);
232        let mut filled = 0;
233        while filled < byte_len {
234            let pos = byte_off + u64::cast_from(filled);
235            let n = file
236                .read_at(&mut buf[filled..byte_len], pos)
237                .map_err(|err| {
238                    std::io::Error::new(
239                        err.kind(),
240                        format!(
241                            "mz_ore::pager: pread {path:?} pos={pos} \
242                         (range #{range_idx}, off={off} len={len}): {err}",
243                        ),
244                    )
245                })?;
246            if n == 0 {
247                return Err(std::io::Error::new(
248                    std::io::ErrorKind::UnexpectedEof,
249                    format!(
250                        "mz_ore::pager: pread short read at {path:?} pos={pos} \
251                         (range #{range_idx}, off={off} len={len}): \
252                         filled {filled} of {byte_len} bytes for this range",
253                    ),
254                ));
255            }
256            filled += n;
257        }
258    }
259    Ok(())
260}
261
262pub(crate) fn read_at_file(handle: &Handle, ranges: &[(usize, usize)], dst: &mut Vec<u64>) {
263    try_read_at_file(handle, ranges, dst)
264        .unwrap_or_else(|err| panic!("mz_ore::pager: read_at_file failed: {err}"));
265}
266
267fn coalesce(ranges: &[(usize, usize)]) -> Vec<(usize, usize)> {
268    let mut out: Vec<(usize, usize)> = Vec::with_capacity(ranges.len());
269    for &(off, len) in ranges {
270        if let Some(last) = out.last_mut() {
271            if last.0 + last.1 == off {
272                last.1 += len;
273                continue;
274            }
275        }
276        out.push((off, len));
277    }
278    out
279}
280
281/// Fallible counterpart to [`take_file`]. Returns `Err` instead of panicking
282/// on I/O failure. The handle is consumed in either case; on `Err`, `dst` may
283/// hold partial data and the scratch file is unlinked when the inner is
284/// dropped.
285pub(crate) fn try_take_file(handle: Handle, dst: &mut Vec<u64>) -> std::io::Result<()> {
286    use std::os::unix::fs::FileExt;
287
288    let inner = handle
289        .into_file_inner()
290        .expect("try_take_file called on non-file handle");
291    dst.clear();
292    if inner.len_u64s == 0 {
293        // Empty handle: no file exists; nothing to read or unlink.
294        drop(inner);
295        return Ok(());
296    }
297    let path = scratch_path(inner.id);
298    let file = File::open(&path).map_err(|err| {
299        std::io::Error::new(
300            err.kind(),
301            format!("mz_ore::pager: take open {path:?}: {err}"),
302        )
303    })?;
304    dst.resize(inner.len_u64s, 0);
305    let buf: &mut [u8] = bytemuck::cast_slice_mut(dst.as_mut_slice());
306    let buf_len = buf.len();
307    let mut filled = 0;
308    while filled < buf_len {
309        let pos = u64::cast_from(filled);
310        let n = file.read_at(&mut buf[filled..], pos).map_err(|err| {
311            std::io::Error::new(
312                err.kind(),
313                format!("mz_ore::pager: take pread {path:?} pos={pos}: {err}"),
314            )
315        })?;
316        if n == 0 {
317            return Err(std::io::Error::new(
318                std::io::ErrorKind::UnexpectedEof,
319                format!(
320                    "mz_ore::pager: take short read at {path:?}: filled {filled} of {buf_len} bytes",
321                ),
322            ));
323        }
324        filled += n;
325    }
326    drop(file);
327    // FileInner::drop will unlink the scratch file.
328    drop(inner);
329    Ok(())
330}
331
332pub(crate) fn take_file(handle: Handle, dst: &mut Vec<u64>) {
333    try_take_file(handle, dst)
334        .unwrap_or_else(|err| panic!("mz_ore::pager: take_file failed: {err}"));
335}
336
337#[cfg(test)]
338mod backend_tests {
339    use super::*;
340
341    fn setup_dir() {
342        let _ = super::tests::shared_scratch();
343    }
344
345    #[mz_ore::test]
346    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
347    fn pageout_writes_file_and_clears_capacity() {
348        setup_dir();
349        let mut chunks = [vec![10u64, 20, 30], vec![40, 50]];
350        let cap_before_0 = chunks[0].capacity();
351        let cap_before_1 = chunks[1].capacity();
352        let h = pageout_file(&mut chunks);
353        assert_eq!(h.len(), 5);
354        assert!(chunks[0].is_empty());
355        assert!(chunks[1].is_empty());
356        // File backend preserves capacity:
357        assert_eq!(chunks[0].capacity(), cap_before_0);
358        assert_eq!(chunks[1].capacity(), cap_before_1);
359
360        let inner = h.file_inner().expect("file inner");
361        let path = scratch_path(inner.id);
362        assert!(path.exists());
363        let bytes = std::fs::read(&path).expect("read scratch");
364        assert_eq!(bytes.len(), 5 * 8);
365    }
366
367    #[mz_ore::test]
368    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
369    fn file_read_at_basic() {
370        setup_dir();
371        let mut chunks = [vec![1u64, 2, 3, 4, 5]];
372        let h = pageout_file(&mut chunks);
373        let mut dst = Vec::new();
374        read_at_file(&h, &[(1, 3)], &mut dst);
375        assert_eq!(dst, vec![2, 3, 4]);
376    }
377
378    #[mz_ore::test]
379    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
380    fn file_read_at_many_concats_and_coalesces() {
381        setup_dir();
382        let mut chunks = [vec![10u64, 20, 30, 40, 50, 60]];
383        let h = pageout_file(&mut chunks);
384        let mut dst = Vec::new();
385        // (0,2) and (2,2) are adjacent => single pread internally.
386        read_at_file(&h, &[(0, 2), (2, 2), (5, 1)], &mut dst);
387        assert_eq!(dst, vec![10, 20, 30, 40, 60]);
388    }
389
390    #[mz_ore::test]
391    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
392    #[should_panic(expected = "out of bounds")]
393    fn file_read_at_panics_on_oob() {
394        setup_dir();
395        let mut chunks = [vec![1u64, 2]];
396        let h = pageout_file(&mut chunks);
397        let mut dst = Vec::new();
398        read_at_file(&h, &[(0, 99)], &mut dst);
399    }
400
401    #[mz_ore::test]
402    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
403    fn file_take_returns_data_and_unlinks() {
404        setup_dir();
405        let mut chunks = [vec![7u64; 100]];
406        let h = pageout_file(&mut chunks);
407        let inner_id = h.file_inner().unwrap().id;
408        let path = scratch_path(inner_id);
409        assert!(path.exists());
410        let mut dst = Vec::new();
411        take_file(h, &mut dst);
412        assert_eq!(dst, vec![7u64; 100]);
413        assert!(!path.exists(), "scratch file should be unlinked after take");
414    }
415
416    #[mz_ore::test]
417    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
418    fn file_drop_unlinks_when_not_taken() {
419        setup_dir();
420        let mut chunks = [vec![1u64, 2, 3]];
421        let h = pageout_file(&mut chunks);
422        let id = h.file_inner().unwrap().id;
423        let path = scratch_path(id);
424        assert!(path.exists());
425        drop(h);
426        assert!(!path.exists(), "scratch file should be unlinked on drop");
427    }
428
429    #[mz_ore::test]
430    fn file_empty_handle_round_trips() {
431        setup_dir();
432        let mut chunks: [Vec<u64>; 0] = [];
433        let h = pageout_file(&mut chunks);
434        assert_eq!(h.len(), 0);
435        // Variant must be `File`, honoring the documented backend invariant.
436        assert!(
437            h.file_inner().is_some(),
438            "empty handle should be File-variant"
439        );
440
441        let mut dst = vec![0xdeadu64];
442        read_at_file(&h, &[], &mut dst);
443        assert_eq!(dst, vec![0xdeadu64], "empty read leaves dst untouched");
444
445        let mut dst = Vec::new();
446        take_file(h, &mut dst);
447        assert!(dst.is_empty());
448    }
449
450    #[mz_ore::test]
451    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
452    fn try_read_at_file_surfaces_missing_file() {
453        setup_dir();
454        let mut chunks = [vec![1u64, 2, 3]];
455        let h = pageout_file(&mut chunks);
456        // Concurrently unlink the scratch file out from under us.
457        let path = scratch_path(h.file_inner().unwrap().id);
458        std::fs::remove_file(&path).expect("unlink scratch");
459        let mut dst = Vec::new();
460        let err = try_read_at_file(&h, &[(0, 3)], &mut dst).expect_err("should surface ENOENT");
461        assert_eq!(err.kind(), std::io::ErrorKind::NotFound);
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use tempfile::tempdir;
469
470    static TEST_DIR: std::sync::OnceLock<tempfile::TempDir> = std::sync::OnceLock::new();
471
472    pub(super) fn shared_scratch() -> &'static std::path::Path {
473        let dir = TEST_DIR.get_or_init(|| tempdir().expect("tempdir"));
474        set_scratch_dir(dir.path().to_owned());
475        dir.path()
476    }
477
478    #[mz_ore::test]
479    fn set_scratch_dir_creates_subdir() {
480        let root = shared_scratch();
481        let subdir = SUBDIR.get().expect("subdir was initialized");
482        assert!(subdir.exists());
483        assert!(subdir.starts_with(root));
484        assert!(
485            subdir
486                .file_name()
487                .unwrap()
488                .to_str()
489                .unwrap()
490                .starts_with("mz-pager-")
491        );
492    }
493}