Skip to main content

mz_timely_util/
column_pager.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Column-aware pager. Pages [`Column`] instances out via [`mz_ore::pager`],
17//! optionally compressing with lz4.
18//!
19//! The pager (`mz_ore::pager`) deals in `Vec<u64>` blobs and two backends. This
20//! module adds:
21//!
22//! 1. A [`PagingPolicy`] trait that decides _whether_ to page out, _which
23//!    backend_, and _whether to compress_. Decisions live in the policy
24//!    implementation, not in the global atomic the pager exposes.
25//! 2. A [`ColumnPager`] that drains a `Column<C>` into a [`PagedColumn`] and
26//!    rehydrates it on demand.
27//! 3. Lz4 frame-format compression as an optional codec.
28//!
29//! The serialization uses the existing [`ContainerBytes`] protocol on
30//! `Column<C>`, so we get a single byte layout that both raw and compressed
31//! paths share. See `doc/developer/design/20260504_pager.md` for background.
32
33#![deny(missing_docs)]
34
35pub mod metrics;
36pub mod policy;
37
38use std::io::{self, Read};
39use std::sync::{Arc, LazyLock, RwLock};
40
41use columnar::Columnar;
42use lz4_flex::frame::{FrameDecoder, FrameEncoder};
43use mz_ore::pager::{self, Backend, Handle};
44use timely::bytes::arc::BytesMut;
45use timely::dataflow::channels::ContainerBytes;
46
47use crate::columnar::Column;
48
49/// Compression codec applied to a paged-out column.
50#[derive(Copy, Clone, Debug, Eq, PartialEq)]
51pub enum Codec {
52    /// lz4 frame format (`lz4_flex::frame`). Self-delimiting, streams via
53    /// `io::Read`/`io::Write`, no random access.
54    Lz4,
55}
56
57/// Inputs to a pageout decision.
58#[derive(Copy, Clone, Debug)]
59pub struct PageHint {
60    /// Uncompressed body size in bytes (matches [`ContainerBytes::length_in_bytes`]).
61    pub len_bytes: usize,
62}
63
64/// Outcome of a policy decision.
65#[derive(Copy, Clone, Debug)]
66pub enum PageDecision {
67    /// Keep the column resident; no I/O, no compression.
68    Skip,
69    /// Page out using the given backend and (optionally) codec.
70    Page {
71        /// Pager backend to use.
72        backend: Backend,
73        /// Compression codec, or `None` for raw bytes.
74        codec: Option<Codec>,
75    },
76}
77
78/// Notifications the column-pager sends back to the policy. Implementations
79/// typically forward to metrics counters.
80#[derive(Debug)]
81pub enum PageEvent {
82    /// A successful pageout. `bytes_in` is the uncompressed body size,
83    /// `bytes_out` is the on-storage payload size (after compression).
84    PagedOut {
85        /// Uncompressed body size handed to the pager.
86        bytes_in: usize,
87        /// On-storage payload size after compression and padding.
88        bytes_out: usize,
89        /// Backend selected by the policy.
90        backend: Backend,
91        /// Codec selected by the policy.
92        codec: Option<Codec>,
93    },
94    /// A successful page-in. `bytes` is the uncompressed body size delivered to
95    /// the caller.
96    PagedIn {
97        /// Uncompressed body size delivered to the caller.
98        bytes: usize,
99    },
100    /// A pageout failure surfaced via the underlying pager.
101    Failed {
102        /// Backend that produced the error.
103        backend: Backend,
104        /// Underlying I/O error.
105        err: io::Error,
106    },
107    /// A resident column has been dropped. Fires from [`ResidentTicket::drop`]
108    /// when the [`PagedColumn::Resident`] holding the ticket is consumed by
109    /// [`ColumnPager::take`] or dropped without being taken. Policies use this
110    /// to return budget allocated when [`PagingPolicy::decide`] answered
111    /// [`PageDecision::Skip`].
112    ResidentReleased {
113        /// Uncompressed body size returned to the policy.
114        bytes: usize,
115    },
116}
117
118/// Decides whether/how to page a column out, and records page events.
119///
120/// Implementations carry their own state (counters, atomics, configuration)
121/// via interior mutability. Methods take `&self` so a single policy can be
122/// shared across operator threads.
123pub trait PagingPolicy: Send + Sync {
124    /// Returns the action to take for a column with the given hint.
125    fn decide(&self, hint: PageHint) -> PageDecision;
126    /// Records a pageout/pagein/failure event for metrics or adaptive decisions.
127    fn record(&self, event: PageEvent);
128}
129
130/// Sizing metadata captured at pageout time. Stored alongside the payload so
131/// `take` can size buffers.
132#[derive(Clone, Debug)]
133pub struct Meta {
134    /// Uncompressed body size in bytes.
135    pub len_bytes: usize,
136}
137
138/// A column whose body may be resident, paged out, or paged out and compressed.
139///
140/// Each variant corresponds to one of the [`PageDecision`] outcomes.
141///
142/// All variants are `Send`. The [`Resident`](PagedColumn::Resident) variant's
143/// drop credit goes back to the policy via [`PageEvent::ResidentReleased`];
144/// concrete policies that ship with this crate (notably
145/// [`policy::TieredPolicy`]) credit a single process-wide atomic pool, so
146/// dropping a `Resident` on a different thread than the one that called
147/// [`ColumnPager::page`] is safe. Custom policies that introduce
148/// thread-local accounting must take care to either pin the column to its
149/// origin thread (e.g. via a `SendColumn` wrapper) or carry the origin in
150/// [`PageEvent::ResidentReleased`] so credit can be routed correctly.
151pub enum PagedColumn<C: Columnar> {
152    /// Body kept resident. Returned when the policy answered
153    /// [`PageDecision::Skip`]. The accompanying [`ResidentTicket`] fires a
154    /// [`PageEvent::ResidentReleased`] when the variant is dropped or
155    /// consumed by [`ColumnPager::take`], so the policy can reclaim the
156    /// budget it granted in [`PagingPolicy::decide`].
157    Resident(Column<C>, ResidentTicket),
158    /// Raw `ContainerBytes` payload stored via [`pager::Handle`]. The backend
159    /// (Swap or File) is baked into the handle.
160    Paged {
161        /// Pager handle owning the raw payload.
162        handle: Handle,
163        /// Sizing metadata.
164        meta: Meta,
165    },
166    /// Lz4-framed serialized form. The framed bytes themselves may live in
167    /// memory or in the pager (see [`CompressedInner`]).
168    Compressed {
169        /// Where the framed bytes live.
170        inner: CompressedInner,
171        /// Sizing metadata.
172        meta: Meta,
173    },
174}
175
176/// Drop guard that returns budget to a [`PagingPolicy`] when a
177/// [`PagedColumn::Resident`] is destroyed.
178///
179/// The ticket holds an `Arc` to the policy and the byte count it was charged
180/// for at [`PagingPolicy::decide`] time. On drop it fires a
181/// [`PageEvent::ResidentReleased`] event; the policy implementation decides
182/// what to credit and where (local pool, shared pool, both).
183pub struct ResidentTicket {
184    bytes: usize,
185    policy: Arc<dyn PagingPolicy>,
186}
187
188impl Drop for ResidentTicket {
189    fn drop(&mut self) {
190        metrics::observe_resident_released(self.bytes);
191        self.policy
192            .record(PageEvent::ResidentReleased { bytes: self.bytes });
193    }
194}
195
196/// Storage location for the lz4-framed bytes inside a compressed paged column.
197pub enum CompressedInner {
198    /// Owned `Vec<u8>` held resident in the caller's address space.
199    Memory(Vec<u8>),
200    /// Framed bytes padded to a `u64` boundary and handed to the pager. The
201    /// frame trailer self-delimits, so the trailing pad is ignored on read.
202    Paged(Handle),
203}
204
205/// Pages typed [`Column`]s out and back in, driven by a [`PagingPolicy`].
206///
207/// Cheap to clone (it's an `Arc`). Hold one per operator if you want per-site
208/// policy state, or share globally if you want one policy.
209#[derive(Clone)]
210pub struct ColumnPager {
211    policy: Arc<dyn PagingPolicy>,
212}
213
214impl ColumnPager {
215    /// Constructs a column pager driven by `policy`.
216    pub fn new(policy: Arc<dyn PagingPolicy>) -> Self {
217        Self { policy }
218    }
219
220    /// Constructs a pager that never pages out: every [`page`] returns a
221    /// [`PagedColumn::Resident`] whose ticket discards release events. Useful
222    /// as a default when callers want a placeholder pager before injecting a
223    /// real policy.
224    ///
225    /// [`page`]: ColumnPager::page
226    pub fn disabled() -> Self {
227        Self::new(Arc::new(AlwaysResidentPolicy))
228    }
229}
230
231/// Policy that keeps every column resident and discards events. Backs
232/// [`ColumnPager::disabled`].
233struct AlwaysResidentPolicy;
234
235impl PagingPolicy for AlwaysResidentPolicy {
236    fn decide(&self, _hint: PageHint) -> PageDecision {
237        PageDecision::Skip
238    }
239    fn record(&self, _event: PageEvent) {}
240}
241
242//
243// Following the pager design doc's spirit (`doc/developer/design/20260504_pager.md`):
244// "the cluster runs on swap or file, not both at once; a global atomic
245// encodes that operational reality directly. A per-pager design would
246// either duplicate the global flag at the struct level or invite confusion
247// about which configuration wins."
248//
249// The lower-level `mz_ore::pager` already uses a global atomic for backend
250// selection. This module's policy/budget layer mirrors that shape: one
251// `ColumnPager` per process, swapped atomically when the controller changes
252// the configuration. Merge batchers clone the `Arc` inside on use; live
253// reinstalls take effect on the next call without per-thread coordination.
254
255/// Process-global active pager. Defaults to [`ColumnPager::disabled`]
256/// until worker init calls [`set_global_pager`].
257static GLOBAL_PAGER: LazyLock<RwLock<ColumnPager>> =
258    LazyLock::new(|| RwLock::new(ColumnPager::disabled()));
259
260/// Install `pager` as the process-wide active pager. Subsequent
261/// [`global_pager`] calls return a clone of this value across all threads.
262///
263/// Prefer [`apply_tiered_config`] for the production path so the
264/// `TieredPolicy` budget atomic stays stable across reconfigures. Direct
265/// `set_global_pager` use is appropriate for tests, the disabled pager, or
266/// callers that intentionally want a fresh policy.
267pub fn set_global_pager(pager: ColumnPager) {
268    *GLOBAL_PAGER.write().expect("global pager poisoned") = pager;
269}
270
271/// Process-wide [`policy::TieredPolicy`] singleton.
272///
273/// Why a singleton: every `ResidentTicket` keeps an `Arc<dyn PagingPolicy>`
274/// pointing at the policy that decided to keep the column resident.
275/// Replacing the global `TieredPolicy` would orphan in-flight tickets onto
276/// the previous instance — they would credit a budget atomic that the new
277/// policy can no longer see, draining the new pool monotonically until it
278/// locks up on Page decisions. A persistent singleton with in-place
279/// [`policy::TieredPolicy::reconfigure`] sidesteps the issue: all tickets,
280/// past and present, share the same atomic.
281///
282/// Initialized eagerly with zero budget so [`metrics::register`] can read
283/// it during compute startup, before any [`apply_tiered_config`] call. The
284/// first config apply resizes the pool via `reconfigure`, which is the same
285/// path operator-driven tunes take.
286static TIERED_POLICY: LazyLock<Arc<policy::TieredPolicy>> =
287    LazyLock::new(|| Arc::new(policy::TieredPolicy::new(0, Backend::Swap, None)));
288
289/// Returns a reference to the process-wide [`policy::TieredPolicy`] singleton.
290pub fn tiered_policy() -> &'static policy::TieredPolicy {
291    &TIERED_POLICY
292}
293
294/// Apply a tiered-pager configuration. Reuses the singleton
295/// [`policy::TieredPolicy`] so in-flight `ResidentTicket`s remain coherent
296/// with the running budget after the operator tunes any of the inputs.
297///
298/// When `enabled` is true, installs a [`ColumnPager`] backed by the
299/// singleton policy. When false, installs [`ColumnPager::disabled`] —
300/// in-flight tickets still credit the singleton, which is harmless: the
301/// budget grows above the configured total until the next enable reconciles
302/// it via `reconfigure`.
303pub fn apply_tiered_config(
304    enabled: bool,
305    total_budget: usize,
306    backend: Backend,
307    codec: Option<Codec>,
308) {
309    let p: &Arc<policy::TieredPolicy> = &TIERED_POLICY;
310    p.reconfigure(total_budget, backend, codec);
311    if enabled {
312        #[allow(clippy::clone_on_ref_ptr)]
313        let dyn_policy: Arc<dyn PagingPolicy> = p.clone();
314        set_global_pager(ColumnPager::new(dyn_policy));
315    } else {
316        set_global_pager(ColumnPager::disabled());
317    }
318}
319
320/// Returns the current global pager. Cheap: clones the inner `Arc<dyn
321/// PagingPolicy>`.
322pub fn global_pager() -> ColumnPager {
323    GLOBAL_PAGER.read().expect("global pager poisoned").clone()
324}
325
326impl ColumnPager {
327    /// Drains `col` into a [`PagedColumn`]. After return `col` is left as a
328    /// fresh `Column::default()` (typed, empty), ready to be refilled by the
329    /// caller on the next loop iteration.
330    ///
331    /// Backend / codec semantics:
332    ///
333    /// * Uncompressed, [`Column::Align`]: the inner `Vec<u64>` is moved into
334    ///   the pager handle with no copies. Swap backend keeps the allocation
335    ///   resident; file backend writes it out and drops it.
336    /// * Uncompressed, other variants: the column is serialized via
337    ///   [`ContainerBytes::into_bytes`] into a `Vec<u8>`, copied into a
338    ///   u64-aligned `Vec<u64>`, then handed to the pager.
339    /// * Compressed: the column is serialized through an [`FrameEncoder`]
340    ///   directly into the output buffer. No intermediate uncompressed
341    ///   `Vec<u8>` is materialized.
342    pub fn page<C: Columnar>(&self, col: &mut Column<C>) -> PagedColumn<C> {
343        let len_bytes = col.length_in_bytes();
344        let hint = PageHint { len_bytes };
345
346        let (backend, codec) = match self.policy.decide(hint) {
347            PageDecision::Skip => {
348                metrics::observe_skip(len_bytes);
349                let ticket = ResidentTicket {
350                    bytes: len_bytes,
351                    policy: Arc::clone(&self.policy),
352                };
353                return PagedColumn::Resident(std::mem::take(col), ticket);
354            }
355            PageDecision::Page { backend, codec } => (backend, codec),
356        };
357        let meta = Meta { len_bytes };
358
359        match codec {
360            None => {
361                // Raw path: the body must end up as u64-aligned bytes for the
362                // pager. `Column::Align` already is; other variants are
363                // serialized and copied.
364                debug_assert_eq!(len_bytes % 8, 0);
365                let body: Vec<u64> = match std::mem::take(col) {
366                    // Move the aligned buffer straight into the pager: the
367                    // allocation transfers with no copy. `take` already left
368                    // `col` as a refill-ready `Typed` default.
369                    Column::Align(v) => v,
370                    mut other => {
371                        let mut buf = Vec::with_capacity(len_bytes);
372                        other.into_bytes(&mut buf);
373                        debug_assert_eq!(buf.len() % 8, 0);
374                        // `into_bytes` only borrowed `other`; clear it in place
375                        // and hand it back so the caller keeps the `Typed`
376                        // allocation instead of us dropping a reusable buffer.
377                        other.clear();
378                        *col = other;
379                        bytemuck::allocation::pod_collect_to_vec::<u8, u64>(&buf)
380                    }
381                };
382                let handle = pager::pageout_with(backend, &mut [body]);
383                let bytes_out = handle.len_bytes();
384                metrics::observe_pageout(len_bytes, bytes_out);
385                self.policy.record(PageEvent::PagedOut {
386                    bytes_in: len_bytes,
387                    bytes_out,
388                    backend,
389                    codec: None,
390                });
391                PagedColumn::Paged { handle, meta }
392            }
393            Some(Codec::Lz4) => {
394                // Stream serialized bytes straight into lz4 — no intermediate
395                // uncompressed `Vec<u8>`.
396                let mut out = Vec::with_capacity(len_bytes / 4);
397                {
398                    let mut enc = FrameEncoder::new(&mut out);
399                    col.into_bytes(&mut enc);
400                    enc.finish().expect("lz4 finish into Vec is infallible");
401                }
402                // `into_bytes` borrows `col`, so empty it explicitly now that
403                // its bytes live (compressed) in `out`. `clear` retains the
404                // `Typed` allocation so the caller can refill it, rather than
405                // dropping a buffer it may want to reuse.
406                col.clear();
407                metrics::observe_pageout(len_bytes, out.len());
408                self.policy.record(PageEvent::PagedOut {
409                    bytes_in: len_bytes,
410                    bytes_out: out.len(),
411                    backend,
412                    codec: Some(Codec::Lz4),
413                });
414                let inner = match backend {
415                    Backend::Swap => CompressedInner::Memory(out),
416                    Backend::File => {
417                        // The pager deals in `Vec<u64>`, so the framed bytes
418                        // must be widened. `out` is already compressed (~4x
419                        // smaller than the source), so this copy is over the
420                        // small form; avoiding it would mean a byte-oriented
421                        // pager entry point, not worth widening that surface.
422                        let padded = pad_u8_to_u64(out);
423                        let handle = pager::pageout_with(Backend::File, &mut [padded]);
424                        CompressedInner::Paged(handle)
425                    }
426                };
427                PagedColumn::Compressed { inner, meta }
428            }
429        }
430    }
431
432    /// Rehydrates `paged` into a [`Column<C>`]. Consumes the handle and
433    /// reclaims its storage (file backend unlinks; swap backend drops the
434    /// `Vec`).
435    pub fn take<C: Columnar>(&self, paged: PagedColumn<C>) -> Column<C> {
436        match paged {
437            // `_ticket` drops here and fires `PageEvent::ResidentReleased`.
438            PagedColumn::Resident(c, _ticket) => c,
439            PagedColumn::Paged { handle, meta } => {
440                let mut body: Vec<u64> = Vec::with_capacity(handle.len());
441                pager::take(handle, &mut body);
442                debug_assert_eq!(body.len() * 8, meta.len_bytes);
443                metrics::observe_pagein(meta.len_bytes);
444                self.policy.record(PageEvent::PagedIn {
445                    bytes: meta.len_bytes,
446                });
447                Column::Align(body)
448            }
449            PagedColumn::Compressed { inner, meta } => {
450                let mut decoded = Vec::with_capacity(meta.len_bytes);
451                match inner {
452                    CompressedInner::Memory(v) => {
453                        FrameDecoder::new(&v[..])
454                            .read_to_end(&mut decoded)
455                            .expect("lz4 decode from memory");
456                    }
457                    CompressedInner::Paged(h) => {
458                        let mut padded = Vec::with_capacity(h.len());
459                        pager::take(h, &mut padded);
460                        let src: &[u8] = bytemuck::cast_slice(&padded);
461                        FrameDecoder::new(src)
462                            .read_to_end(&mut decoded)
463                            .expect("lz4 decode from pager");
464                    }
465                }
466                debug_assert_eq!(decoded.len(), meta.len_bytes);
467                metrics::observe_pagein(decoded.len());
468                self.policy.record(PageEvent::PagedIn {
469                    bytes: decoded.len(),
470                });
471                // `BytesMut::from` wraps the `Vec<u8>` without copying; `freeze`
472                // produces the refcounted `Bytes` that `ContainerBytes` expects.
473                Column::from_bytes(BytesMut::from(decoded).freeze())
474            }
475        }
476    }
477}
478
479/// Reinterprets `bytes` as a `Vec<u64>` by trailing-zero padding to a multiple
480/// of 8 and copying. The lz4 frame trailer self-delimits so the trailing pad is
481/// invisible to [`FrameDecoder`].
482fn pad_u8_to_u64(mut bytes: Vec<u8>) -> Vec<u64> {
483    let pad = bytes.len().next_multiple_of(8) - bytes.len();
484    if pad != 0 {
485        bytes.resize(bytes.len() + pad, 0);
486    }
487    debug_assert_eq!(bytes.len() % 8, 0);
488    // `Vec<u8>` and `Vec<u64>` have different layouts (size + align), so we
489    // can't transmute the allocation. Copy into a fresh, properly aligned
490    // `Vec<u64>`. The cost is one `len_bytes/8`-word memcpy per pageout.
491    let len_u64s = bytes.len() / 8;
492    let mut out = vec![0u64; len_u64s];
493    let dst: &mut [u8] = bytemuck::cast_slice_mut(&mut out);
494    dst.copy_from_slice(&bytes);
495    out
496}
497
498#[cfg(test)]
499#[allow(clippy::clone_on_ref_ptr)]
500mod tests {
501    use std::sync::atomic::{AtomicUsize, Ordering};
502
503    use columnar::Index;
504    use timely::container::PushInto;
505
506    use super::*;
507
508    /// Shared scratch directory for all tests in this module. `set_scratch_dir`
509    /// is idempotent and only honors the first path it sees, so individual
510    /// tests cannot bring their own tempdir without races when run in parallel
511    /// (a tempdir dropped at test end would invalidate `SUBDIR` for any peer
512    /// still running).
513    fn ensure_scratch() {
514        static DIR: std::sync::OnceLock<tempfile::TempDir> = std::sync::OnceLock::new();
515        let dir = DIR.get_or_init(|| tempfile::tempdir().expect("tempdir"));
516        pager::set_scratch_dir(dir.path().to_path_buf());
517    }
518
519    /// Promotes a typed policy `Arc` to `Arc<dyn PagingPolicy>`. Hides the
520    /// unsize coercion behind a `clone()` so the trait object is constructed
521    /// without the now-discouraged `as` cast.
522    fn as_dyn(p: &Arc<impl PagingPolicy + 'static>) -> Arc<dyn PagingPolicy> {
523        p.clone()
524    }
525
526    /// Recording policy: configurable decision, counts events.
527    struct TestPolicy {
528        decision: PageDecision,
529        out: AtomicUsize,
530        r#in: AtomicUsize,
531    }
532
533    impl TestPolicy {
534        fn new(decision: PageDecision) -> Arc<Self> {
535            Arc::new(Self {
536                decision,
537                out: AtomicUsize::new(0),
538                r#in: AtomicUsize::new(0),
539            })
540        }
541    }
542
543    impl PagingPolicy for TestPolicy {
544        fn decide(&self, _hint: PageHint) -> PageDecision {
545            self.decision
546        }
547        fn record(&self, event: PageEvent) {
548            match event {
549                PageEvent::PagedOut { .. } => {
550                    self.out.fetch_add(1, Ordering::Relaxed);
551                }
552                PageEvent::PagedIn { .. } => {
553                    self.r#in.fetch_add(1, Ordering::Relaxed);
554                }
555                PageEvent::ResidentReleased { .. } | PageEvent::Failed { .. } => {}
556            }
557        }
558    }
559
560    /// Builds a sample typed column of `i64`s.
561    fn sample_typed() -> Column<i64> {
562        let mut col: Column<i64> = Default::default();
563        for v in 0i64..1024 {
564            col.push_into(v);
565        }
566        col
567    }
568
569    /// Drains a column into a `Vec<i64>` for comparison via `borrow`.
570    fn collect_i64(col: &Column<i64>) -> Vec<i64> {
571        col.borrow().into_index_iter().copied().collect()
572    }
573
574    #[mz_ore::test]
575    fn skip_policy_keeps_resident() {
576        let policy = TestPolicy::new(PageDecision::Skip);
577        let cp = ColumnPager::new(as_dyn(&policy));
578        let mut col = sample_typed();
579        let paged = cp.page(&mut col);
580        assert!(matches!(paged, PagedColumn::Resident(_, _)));
581        let rt = cp.take(paged);
582        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
583        assert_eq!(policy.out.load(Ordering::Relaxed), 0);
584        assert_eq!(policy.r#in.load(Ordering::Relaxed), 0);
585    }
586
587    #[mz_ore::test]
588    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `madvise` on OS `linux`
589    fn round_trip_swap_uncompressed() {
590        let policy = TestPolicy::new(PageDecision::Page {
591            backend: Backend::Swap,
592            codec: None,
593        });
594        let cp = ColumnPager::new(as_dyn(&policy));
595        let mut col = sample_typed();
596        let paged = cp.page(&mut col);
597        assert!(matches!(paged, PagedColumn::Paged { .. }));
598        let rt = cp.take(paged);
599        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
600        assert_eq!(policy.out.load(Ordering::Relaxed), 1);
601        assert_eq!(policy.r#in.load(Ordering::Relaxed), 1);
602    }
603
604    #[mz_ore::test]
605    fn round_trip_swap_lz4() {
606        let policy = TestPolicy::new(PageDecision::Page {
607            backend: Backend::Swap,
608            codec: Some(Codec::Lz4),
609        });
610        let cp = ColumnPager::new(as_dyn(&policy));
611        let mut col = sample_typed();
612        let paged = cp.page(&mut col);
613        assert!(matches!(
614            paged,
615            PagedColumn::Compressed {
616                inner: CompressedInner::Memory(_),
617                ..
618            }
619        ));
620        let rt = cp.take(paged);
621        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
622    }
623
624    #[mz_ore::test]
625    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
626    fn round_trip_file_uncompressed() {
627        ensure_scratch();
628        let policy = TestPolicy::new(PageDecision::Page {
629            backend: Backend::File,
630            codec: None,
631        });
632        let cp = ColumnPager::new(as_dyn(&policy));
633        let mut col = sample_typed();
634        let paged = cp.page(&mut col);
635        assert!(matches!(paged, PagedColumn::Paged { .. }));
636        let rt = cp.take(paged);
637        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
638    }
639
640    #[mz_ore::test]
641    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
642    fn round_trip_file_lz4() {
643        ensure_scratch();
644        let policy = TestPolicy::new(PageDecision::Page {
645            backend: Backend::File,
646            codec: Some(Codec::Lz4),
647        });
648        let cp = ColumnPager::new(as_dyn(&policy));
649        let mut col = sample_typed();
650        let paged = cp.page(&mut col);
651        assert!(matches!(
652            paged,
653            PagedColumn::Compressed {
654                inner: CompressedInner::Paged(_),
655                ..
656            }
657        ));
658        let rt = cp.take(paged);
659        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
660    }
661
662    #[mz_ore::test]
663    fn align_variant_fast_path() {
664        // Construct an Align column directly to exercise the move-only raw path.
665        let policy = TestPolicy::new(PageDecision::Page {
666            backend: Backend::Swap,
667            codec: None,
668        });
669        let cp = ColumnPager::new(as_dyn(&policy));
670        let body: Vec<u64> = (1u64..=512).collect();
671        let mut col: Column<i64> = Column::Align(body.clone());
672        let paged = cp.page(&mut col);
673        assert!(matches!(paged, PagedColumn::Paged { .. }));
674        // After paging an Align variant, `col` is reset to the typed default.
675        assert!(matches!(col, Column::Typed(_)));
676        let rt = cp.take(paged);
677        // Round-tripped column should produce identical bytes.
678        match rt {
679            Column::Align(v) => assert_eq!(v, body),
680            other => panic!("expected Align, got {:?}", std::mem::discriminant(&other)),
681        }
682    }
683}