Skip to main content

mz_timely_util/
column_pager.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Column-aware pager. Pages [`Column`] instances out via [`mz_ore::pager`],
17//! optionally compressing with lz4.
18//!
19//! The pager (`mz_ore::pager`) deals in `Vec<u64>` blobs and two backends. This
20//! module adds:
21//!
22//! 1. A [`PagingPolicy`] trait that decides _whether_ to page out, _which
23//!    backend_, and _whether to compress_. Decisions live in the policy
24//!    implementation, not in the global atomic the pager exposes.
25//! 2. A [`ColumnPager`] that drains a `Column<C>` into a [`PagedColumn`] and
26//!    rehydrates it on demand.
27//! 3. Lz4 frame-format compression as an optional codec.
28//!
29//! The serialization uses the existing [`ContainerBytes`] protocol on
30//! `Column<C>`, so we get a single byte layout that both raw and compressed
31//! paths share. See `doc/developer/design/20260504_pager.md` for background.
32
33#![deny(missing_docs)]
34
35pub mod metrics;
36pub mod policy;
37
38use std::io::{self, Read};
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::sync::{Arc, LazyLock, RwLock};
41
42use columnar::Columnar;
43use lz4_flex::frame::{FrameDecoder, FrameEncoder};
44use mz_ore::pager::{self, Backend, Handle};
45use timely::bytes::arc::BytesMut;
46use timely::dataflow::channels::ContainerBytes;
47
48use crate::columnar::Column;
49
50/// Compression codec applied to a paged-out column.
51#[derive(Copy, Clone, Debug, Eq, PartialEq)]
52pub enum Codec {
53    /// lz4 frame format (`lz4_flex::frame`). Self-delimiting, streams via
54    /// `io::Read`/`io::Write`, no random access.
55    Lz4,
56}
57
58/// Inputs to a pageout decision.
59#[derive(Copy, Clone, Debug)]
60pub struct PageHint {
61    /// Uncompressed body size in bytes (matches [`ContainerBytes::length_in_bytes`]).
62    pub len_bytes: usize,
63}
64
65/// Outcome of a policy decision.
66#[derive(Copy, Clone, Debug)]
67pub enum PageDecision {
68    /// Keep the column resident; no I/O, no compression.
69    Skip,
70    /// Page out using the given backend and (optionally) codec.
71    Page {
72        /// Pager backend to use.
73        backend: Backend,
74        /// Compression codec, or `None` for raw bytes.
75        codec: Option<Codec>,
76    },
77}
78
79/// Notifications the column-pager sends back to the policy. Implementations
80/// typically forward to metrics counters.
81#[derive(Debug)]
82pub enum PageEvent {
83    /// A successful pageout. `bytes_in` is the uncompressed body size,
84    /// `bytes_out` is the on-storage payload size (after compression).
85    PagedOut {
86        /// Uncompressed body size handed to the pager.
87        bytes_in: usize,
88        /// On-storage payload size after compression and padding.
89        bytes_out: usize,
90        /// Backend selected by the policy.
91        backend: Backend,
92        /// Codec selected by the policy.
93        codec: Option<Codec>,
94    },
95    /// A successful page-in. `bytes` is the uncompressed body size delivered to
96    /// the caller.
97    PagedIn {
98        /// Uncompressed body size delivered to the caller.
99        bytes: usize,
100    },
101    /// A pageout failure surfaced via the underlying pager.
102    Failed {
103        /// Backend that produced the error.
104        backend: Backend,
105        /// Underlying I/O error.
106        err: io::Error,
107    },
108    /// A resident column has been dropped. Fires from [`ResidentTicket::drop`]
109    /// when the [`PagedColumn::Resident`] holding the ticket is consumed by
110    /// [`ColumnPager::take`] or dropped without being taken. Policies use this
111    /// to return budget allocated when [`PagingPolicy::decide`] answered
112    /// [`PageDecision::Skip`].
113    ResidentReleased {
114        /// Uncompressed body size returned to the policy.
115        bytes: usize,
116    },
117}
118
119/// Decides whether/how to page a column out, and records page events.
120///
121/// Implementations carry their own state (counters, atomics, configuration)
122/// via interior mutability. Methods take `&self` so a single policy can be
123/// shared across operator threads.
124pub trait PagingPolicy: Send + Sync {
125    /// Returns the action to take for a column with the given hint.
126    fn decide(&self, hint: PageHint) -> PageDecision;
127    /// Records a pageout/pagein/failure event for metrics or adaptive decisions.
128    fn record(&self, event: PageEvent);
129}
130
131/// Sizing metadata captured at pageout time. Stored alongside the payload so
132/// `take` can size buffers.
133#[derive(Clone, Debug)]
134pub struct Meta {
135    /// Uncompressed body size in bytes.
136    pub len_bytes: usize,
137}
138
139/// A column whose body may be resident, paged out, or paged out and compressed.
140///
141/// Each variant corresponds to one of the [`PageDecision`] outcomes.
142///
143/// All variants are `Send`. The [`Resident`](PagedColumn::Resident) variant's
144/// drop credit goes back to the policy via [`PageEvent::ResidentReleased`];
145/// concrete policies that ship with this crate (notably
146/// [`policy::TieredPolicy`]) credit a single process-wide atomic pool, so
147/// dropping a `Resident` on a different thread than the one that called
148/// [`ColumnPager::page`] is safe. Custom policies that introduce
149/// thread-local accounting must take care to either pin the column to its
150/// origin thread (e.g. via a `SendColumn` wrapper) or carry the origin in
151/// [`PageEvent::ResidentReleased`] so credit can be routed correctly.
152pub enum PagedColumn<C: Columnar> {
153    /// Body kept resident. Returned when the policy answered
154    /// [`PageDecision::Skip`]. The accompanying [`ResidentTicket`] fires a
155    /// [`PageEvent::ResidentReleased`] when the variant is dropped or
156    /// consumed by [`ColumnPager::take`], so the policy can reclaim the
157    /// budget it granted in [`PagingPolicy::decide`].
158    Resident(Column<C>, ResidentTicket),
159    /// Raw `ContainerBytes` payload stored via [`pager::Handle`]. The backend
160    /// (Swap or File) is baked into the handle.
161    Paged {
162        /// Pager handle owning the raw payload.
163        handle: Handle,
164        /// Sizing metadata.
165        meta: Meta,
166    },
167    /// Lz4-framed serialized form. The framed bytes themselves may live in
168    /// memory or in the pager (see [`CompressedInner`]).
169    Compressed {
170        /// Where the framed bytes live.
171        inner: CompressedInner,
172        /// Sizing metadata.
173        meta: Meta,
174    },
175}
176
177/// Drop guard that returns budget to a [`PagingPolicy`] when a
178/// [`PagedColumn::Resident`] is destroyed.
179///
180/// The ticket holds an `Arc` to the policy and the byte count it was charged
181/// for at [`PagingPolicy::decide`] time. On drop it fires a
182/// [`PageEvent::ResidentReleased`] event; the policy implementation decides
183/// what to credit and where (local pool, shared pool, both).
184pub struct ResidentTicket {
185    bytes: usize,
186    policy: Arc<dyn PagingPolicy>,
187}
188
189impl Drop for ResidentTicket {
190    fn drop(&mut self) {
191        metrics::observe_resident_released(self.bytes);
192        self.policy
193            .record(PageEvent::ResidentReleased { bytes: self.bytes });
194    }
195}
196
197/// Storage location for the lz4-framed bytes inside a compressed paged column.
198pub enum CompressedInner {
199    /// Owned `Vec<u8>` held resident in the caller's address space.
200    Memory(Vec<u8>),
201    /// Framed bytes padded to a `u64` boundary and handed to the pager. The
202    /// frame trailer self-delimits, so the trailing pad is ignored on read.
203    Paged(Handle),
204}
205
206/// Pages typed [`Column`]s out and back in, driven by a [`PagingPolicy`].
207///
208/// Cheap to clone (it's an `Arc`). Hold one per operator if you want per-site
209/// policy state, or share globally if you want one policy.
210#[derive(Clone)]
211pub struct ColumnPager {
212    policy: Arc<dyn PagingPolicy>,
213}
214
215impl ColumnPager {
216    /// Constructs a column pager driven by `policy`.
217    pub fn new(policy: Arc<dyn PagingPolicy>) -> Self {
218        Self { policy }
219    }
220
221    /// Constructs a pager that never pages out: every [`page`] returns a
222    /// [`PagedColumn::Resident`] whose ticket discards release events. Useful
223    /// as a default when callers want a placeholder pager before injecting a
224    /// real policy.
225    ///
226    /// [`page`]: ColumnPager::page
227    pub fn disabled() -> Self {
228        Self::new(Arc::new(AlwaysResidentPolicy))
229    }
230}
231
232/// Policy that keeps every column resident and discards events. Backs
233/// [`ColumnPager::disabled`].
234struct AlwaysResidentPolicy;
235
236impl PagingPolicy for AlwaysResidentPolicy {
237    fn decide(&self, _hint: PageHint) -> PageDecision {
238        PageDecision::Skip
239    }
240    fn record(&self, _event: PageEvent) {}
241}
242
243//
244// Following the pager design doc's spirit (`doc/developer/design/20260504_pager.md`):
245// "the cluster runs on swap or file, not both at once; a global atomic
246// encodes that operational reality directly. A per-pager design would
247// either duplicate the global flag at the struct level or invite confusion
248// about which configuration wins."
249//
250// The lower-level `mz_ore::pager` already uses a global atomic for backend
251// selection. This module's policy/budget layer mirrors that shape: one
252// `ColumnPager` per process, swapped atomically when the controller changes
253// the configuration. Merge batchers clone the `Arc` inside on use; live
254// reinstalls take effect on the next call without per-thread coordination.
255
256/// Process-global active pager. Defaults to [`ColumnPager::disabled`]
257/// until worker init calls [`set_global_pager`].
258static GLOBAL_PAGER: LazyLock<RwLock<ColumnPager>> =
259    LazyLock::new(|| RwLock::new(ColumnPager::disabled()));
260
261/// Process-global toggle for `MADV_PAGEOUT` on the lz4 + swap spill path.
262///
263/// When set, [`ColumnPager::page`] issues `MADV_PAGEOUT` over the compressed
264/// bytes it keeps resident in [`CompressedInner::Memory`], proactively evicting
265/// them at spill time instead of leaving them for lazy kernel reclaim. A single
266/// process-global flag (set by [`apply_tiered_config`]) mirrors the backend /
267/// codec selection: it is a process-wide operational choice, not a per-column
268/// one, and every consumer of the shared pager reads the same value. Defaults
269/// to off; the eager-reclaim syscall stays gated until proven.
270static SWAP_PAGEOUT: AtomicBool = AtomicBool::new(false);
271
272/// Install `pager` as the process-wide active pager. Subsequent
273/// [`global_pager`] calls return a clone of this value across all threads.
274///
275/// Prefer [`apply_tiered_config`] for the production path so the
276/// `TieredPolicy` budget atomic stays stable across reconfigures. Direct
277/// `set_global_pager` use is appropriate for tests, the disabled pager, or
278/// callers that intentionally want a fresh policy.
279pub fn set_global_pager(pager: ColumnPager) {
280    *GLOBAL_PAGER.write().expect("global pager poisoned") = pager;
281}
282
283/// Process-wide [`policy::TieredPolicy`] singleton.
284///
285/// Why a singleton: every `ResidentTicket` keeps an `Arc<dyn PagingPolicy>`
286/// pointing at the policy that decided to keep the column resident.
287/// Replacing the global `TieredPolicy` would orphan in-flight tickets onto
288/// the previous instance — they would credit a budget atomic that the new
289/// policy can no longer see, draining the new pool monotonically until it
290/// locks up on Page decisions. A persistent singleton with in-place
291/// [`policy::TieredPolicy::reconfigure`] sidesteps the issue: all tickets,
292/// past and present, share the same atomic.
293///
294/// Initialized eagerly with zero budget so [`metrics::register`] can read
295/// it during compute startup, before any [`apply_tiered_config`] call. The
296/// first config apply resizes the pool via `reconfigure`, which is the same
297/// path operator-driven tunes take.
298static TIERED_POLICY: LazyLock<Arc<policy::TieredPolicy>> =
299    LazyLock::new(|| Arc::new(policy::TieredPolicy::new(0, Backend::Swap, None)));
300
301/// Returns a reference to the process-wide [`policy::TieredPolicy`] singleton.
302pub fn tiered_policy() -> &'static policy::TieredPolicy {
303    &TIERED_POLICY
304}
305
306/// Apply a tiered-pager configuration. Reuses the singleton
307/// [`policy::TieredPolicy`] so in-flight `ResidentTicket`s remain coherent
308/// with the running budget after the operator tunes any of the inputs.
309///
310/// When `enabled` is true, installs a [`ColumnPager`] backed by the
311/// singleton policy. When false, installs [`ColumnPager::disabled`] —
312/// in-flight tickets still credit the singleton, which is harmless: the
313/// budget grows above the configured total until the next enable reconciles
314/// it via `reconfigure`.
315///
316/// `swap_pageout` toggles `MADV_PAGEOUT` on the lz4 + swap spill path (see
317/// `SWAP_PAGEOUT`); it is stored unconditionally so the next `page` call
318/// observes it regardless of `enabled`.
319pub fn apply_tiered_config(
320    enabled: bool,
321    total_budget: usize,
322    backend: Backend,
323    codec: Option<Codec>,
324    swap_pageout: bool,
325) {
326    SWAP_PAGEOUT.store(swap_pageout, Ordering::Relaxed);
327    let p: &Arc<policy::TieredPolicy> = &TIERED_POLICY;
328    p.reconfigure(total_budget, backend, codec);
329    if enabled {
330        #[allow(clippy::clone_on_ref_ptr)]
331        let dyn_policy: Arc<dyn PagingPolicy> = p.clone();
332        set_global_pager(ColumnPager::new(dyn_policy));
333    } else {
334        set_global_pager(ColumnPager::disabled());
335    }
336}
337
338/// Returns the current global pager. Cheap: clones the inner `Arc<dyn
339/// PagingPolicy>`.
340pub fn global_pager() -> ColumnPager {
341    GLOBAL_PAGER.read().expect("global pager poisoned").clone()
342}
343
344/// A pager that, when `enabled`, draws from the shared [`tiered_policy`] budget
345/// pool — the same pool `apply_tiered_config` sizes for the process-global
346/// pager — and otherwise is a disabled (always-resident) pager.
347///
348/// This lets a second consumer (e.g. the storage upsert source stash) opt into
349/// the one shared budget independently of whether `apply_tiered_config` enabled
350/// the process-global pager for its own (compute) batchers. There is still a
351/// single budget pool and a single underlying `mz_ore::pager`; only the
352/// enable decision is per-consumer.
353pub fn shared_pager(enabled: bool) -> ColumnPager {
354    if enabled {
355        #[allow(clippy::clone_on_ref_ptr)]
356        let dyn_policy: Arc<dyn PagingPolicy> = TIERED_POLICY.clone();
357        ColumnPager::new(dyn_policy)
358    } else {
359        ColumnPager::disabled()
360    }
361}
362
363impl ColumnPager {
364    /// Drains `col` into a [`PagedColumn`]. After return `col` is left as a
365    /// fresh `Column::default()` (typed, empty), ready to be refilled by the
366    /// caller on the next loop iteration.
367    ///
368    /// Backend / codec semantics:
369    ///
370    /// * Uncompressed, [`Column::Align`]: the inner `Vec<u64>` is moved into
371    ///   the pager handle with no copies. Swap backend keeps the allocation
372    ///   resident; file backend writes it out and drops it.
373    /// * Uncompressed, other variants: the column is serialized via
374    ///   [`ContainerBytes::into_bytes`] into a `Vec<u8>`, copied into a
375    ///   u64-aligned `Vec<u64>`, then handed to the pager.
376    /// * Compressed: the column is serialized through an [`FrameEncoder`]
377    ///   directly into the output buffer. No intermediate uncompressed
378    ///   `Vec<u8>` is materialized.
379    pub fn page<C: Columnar>(&self, col: &mut Column<C>) -> PagedColumn<C> {
380        let len_bytes = col.length_in_bytes();
381        let hint = PageHint { len_bytes };
382
383        let (backend, codec) = match self.policy.decide(hint) {
384            PageDecision::Skip => {
385                metrics::observe_skip(len_bytes);
386                let ticket = ResidentTicket {
387                    bytes: len_bytes,
388                    policy: Arc::clone(&self.policy),
389                };
390                // A resident chunk joins a merge chain and may live there
391                // across many merge rounds. A `Column::Typed` body arrives
392                // carrying `Column::merge_from`'s worst-case `reserve_for`
393                // capacity — sized for the unconsolidated union of both merge
394                // inputs — and parking that slack in the chain is the dominant
395                // source of merge-batcher resident memory. Serialize the body
396                // into a `Vec<u64>` sized exactly to its content and store the
397                // fitting `Column::Align` instead, clearing `col` in place (as
398                // the codec paths below do) so the high-capacity typed buffer
399                // stays with the caller for recycling. `Align` / `Bytes`
400                // bodies are already fitting (or refcounted), so move them
401                // through unchanged.
402                let resident = if matches!(col, Column::Typed(_)) {
403                    debug_assert_eq!(len_bytes % 8, 0);
404                    let mut buf = Vec::with_capacity(len_bytes);
405                    col.into_bytes(&mut buf);
406                    debug_assert_eq!(buf.len() % 8, 0);
407                    col.clear();
408                    Column::Align(bytemuck::allocation::pod_collect_to_vec::<u8, u64>(&buf))
409                } else {
410                    std::mem::take(col)
411                };
412                return PagedColumn::Resident(resident, ticket);
413            }
414            PageDecision::Page { backend, codec } => (backend, codec),
415        };
416        let meta = Meta { len_bytes };
417
418        match codec {
419            None => {
420                // Raw path: the body must end up as u64-aligned bytes for the
421                // pager. `Column::Align` already is; other variants are
422                // serialized and copied.
423                debug_assert_eq!(len_bytes % 8, 0);
424                let body: Vec<u64> = match std::mem::take(col) {
425                    // Move the aligned buffer straight into the pager: the
426                    // allocation transfers with no copy. `take` already left
427                    // `col` as a refill-ready `Typed` default.
428                    Column::Align(v) => v,
429                    mut other => {
430                        let mut buf = Vec::with_capacity(len_bytes);
431                        other.into_bytes(&mut buf);
432                        debug_assert_eq!(buf.len() % 8, 0);
433                        // `into_bytes` only borrowed `other`; clear it in place
434                        // and hand it back so the caller keeps the `Typed`
435                        // allocation instead of us dropping a reusable buffer.
436                        other.clear();
437                        *col = other;
438                        bytemuck::allocation::pod_collect_to_vec::<u8, u64>(&buf)
439                    }
440                };
441                let handle = pager::pageout_with(backend, &mut [body]);
442                let bytes_out = handle.len_bytes();
443                metrics::observe_pageout(len_bytes, bytes_out);
444                self.policy.record(PageEvent::PagedOut {
445                    bytes_in: len_bytes,
446                    bytes_out,
447                    backend,
448                    codec: None,
449                });
450                PagedColumn::Paged { handle, meta }
451            }
452            Some(Codec::Lz4) => {
453                // Stream serialized bytes straight into lz4 — no intermediate
454                // uncompressed `Vec<u8>`.
455                let mut out = Vec::with_capacity(len_bytes / 4);
456                {
457                    let mut enc = FrameEncoder::new(&mut out);
458                    col.into_bytes(&mut enc);
459                    enc.finish().expect("lz4 finish into Vec is infallible");
460                }
461                // `into_bytes` borrows `col`, so empty it explicitly now that
462                // its bytes live (compressed) in `out`. `clear` retains the
463                // `Typed` allocation so the caller can refill it, rather than
464                // dropping a buffer it may want to reuse.
465                col.clear();
466                metrics::observe_pageout(len_bytes, out.len());
467                self.policy.record(PageEvent::PagedOut {
468                    bytes_in: len_bytes,
469                    bytes_out: out.len(),
470                    backend,
471                    codec: Some(Codec::Lz4),
472                });
473                let inner = match backend {
474                    Backend::Swap => {
475                        // The compressed bytes stay resident in our own address
476                        // space (we read them back in `take` via `FrameDecoder`),
477                        // so the pager's ownership-transferring `pageout` does not
478                        // fit. When `SWAP_PAGEOUT` is set, hint `MADV_PAGEOUT`
479                        // instead: it proactively swaps the pages out now, holding
480                        // RSS at the budget rather than leaving them as unmanaged
481                        // anonymous memory the kernel only reclaims lazily at the
482                        // pressure cliff. A later read re-faults them back in —
483                        // cheap, since lz4 shrank the byte volume.
484                        if SWAP_PAGEOUT.load(Ordering::Relaxed) {
485                            pager::advise_pageout(&out);
486                        }
487                        CompressedInner::Memory(out)
488                    }
489                    Backend::File => {
490                        // The pager deals in `Vec<u64>`, so the framed bytes
491                        // must be widened. `out` is already compressed (~4x
492                        // smaller than the source), so this copy is over the
493                        // small form; avoiding it would mean a byte-oriented
494                        // pager entry point, not worth widening that surface.
495                        let padded = pad_u8_to_u64(out);
496                        let handle = pager::pageout_with(Backend::File, &mut [padded]);
497                        CompressedInner::Paged(handle)
498                    }
499                };
500                PagedColumn::Compressed { inner, meta }
501            }
502        }
503    }
504
505    /// Rehydrates `paged` into a [`Column<C>`]. Consumes the handle and
506    /// reclaims its storage (file backend unlinks; swap backend drops the
507    /// `Vec`).
508    pub fn take<C: Columnar>(&self, paged: PagedColumn<C>) -> Column<C> {
509        match paged {
510            // `_ticket` drops here and fires `PageEvent::ResidentReleased`.
511            PagedColumn::Resident(c, _ticket) => c,
512            PagedColumn::Paged { handle, meta } => {
513                let mut body: Vec<u64> = Vec::with_capacity(handle.len());
514                pager::take(handle, &mut body);
515                debug_assert_eq!(body.len() * 8, meta.len_bytes);
516                metrics::observe_pagein(meta.len_bytes);
517                self.policy.record(PageEvent::PagedIn {
518                    bytes: meta.len_bytes,
519                });
520                Column::Align(body)
521            }
522            PagedColumn::Compressed { inner, meta } => {
523                let mut decoded = Vec::with_capacity(meta.len_bytes);
524                match inner {
525                    CompressedInner::Memory(v) => {
526                        FrameDecoder::new(&v[..])
527                            .read_to_end(&mut decoded)
528                            .expect("lz4 decode from memory");
529                    }
530                    CompressedInner::Paged(h) => {
531                        let mut padded = Vec::with_capacity(h.len());
532                        pager::take(h, &mut padded);
533                        let src: &[u8] = bytemuck::cast_slice(&padded);
534                        FrameDecoder::new(src)
535                            .read_to_end(&mut decoded)
536                            .expect("lz4 decode from pager");
537                    }
538                }
539                debug_assert_eq!(decoded.len(), meta.len_bytes);
540                metrics::observe_pagein(decoded.len());
541                self.policy.record(PageEvent::PagedIn {
542                    bytes: decoded.len(),
543                });
544                // `BytesMut::from` wraps the `Vec<u8>` without copying; `freeze`
545                // produces the refcounted `Bytes` that `ContainerBytes` expects.
546                Column::from_bytes(BytesMut::from(decoded).freeze())
547            }
548        }
549    }
550}
551
552/// Reinterprets `bytes` as a `Vec<u64>` by trailing-zero padding to a multiple
553/// of 8 and copying. The lz4 frame trailer self-delimits so the trailing pad is
554/// invisible to [`FrameDecoder`].
555fn pad_u8_to_u64(mut bytes: Vec<u8>) -> Vec<u64> {
556    let pad = bytes.len().next_multiple_of(8) - bytes.len();
557    if pad != 0 {
558        bytes.resize(bytes.len() + pad, 0);
559    }
560    debug_assert_eq!(bytes.len() % 8, 0);
561    // `Vec<u8>` and `Vec<u64>` have different layouts (size + align), so we
562    // can't transmute the allocation. Copy into a fresh, properly aligned
563    // `Vec<u64>`. The cost is one `len_bytes/8`-word memcpy per pageout.
564    let len_u64s = bytes.len() / 8;
565    let mut out = vec![0u64; len_u64s];
566    let dst: &mut [u8] = bytemuck::cast_slice_mut(&mut out);
567    dst.copy_from_slice(&bytes);
568    out
569}
570
571#[cfg(test)]
572#[allow(clippy::clone_on_ref_ptr)]
573mod tests {
574    use std::sync::atomic::{AtomicUsize, Ordering};
575
576    use columnar::Index;
577    use timely::container::PushInto;
578
579    use super::*;
580
581    /// Shared scratch directory for all tests in this module. `set_scratch_dir`
582    /// is idempotent and only honors the first path it sees, so individual
583    /// tests cannot bring their own tempdir without races when run in parallel
584    /// (a tempdir dropped at test end would invalidate `SUBDIR` for any peer
585    /// still running).
586    fn ensure_scratch() {
587        static DIR: std::sync::OnceLock<tempfile::TempDir> = std::sync::OnceLock::new();
588        let dir = DIR.get_or_init(|| tempfile::tempdir().expect("tempdir"));
589        pager::set_scratch_dir(dir.path().to_path_buf());
590    }
591
592    /// Promotes a typed policy `Arc` to `Arc<dyn PagingPolicy>`. Hides the
593    /// unsize coercion behind a `clone()` so the trait object is constructed
594    /// without the now-discouraged `as` cast.
595    fn as_dyn(p: &Arc<impl PagingPolicy + 'static>) -> Arc<dyn PagingPolicy> {
596        p.clone()
597    }
598
599    /// Recording policy: configurable decision, counts events.
600    struct TestPolicy {
601        decision: PageDecision,
602        out: AtomicUsize,
603        r#in: AtomicUsize,
604    }
605
606    impl TestPolicy {
607        fn new(decision: PageDecision) -> Arc<Self> {
608            Arc::new(Self {
609                decision,
610                out: AtomicUsize::new(0),
611                r#in: AtomicUsize::new(0),
612            })
613        }
614    }
615
616    impl PagingPolicy for TestPolicy {
617        fn decide(&self, _hint: PageHint) -> PageDecision {
618            self.decision
619        }
620        fn record(&self, event: PageEvent) {
621            match event {
622                PageEvent::PagedOut { .. } => {
623                    self.out.fetch_add(1, Ordering::Relaxed);
624                }
625                PageEvent::PagedIn { .. } => {
626                    self.r#in.fetch_add(1, Ordering::Relaxed);
627                }
628                PageEvent::ResidentReleased { .. } | PageEvent::Failed { .. } => {}
629            }
630        }
631    }
632
633    /// Builds a sample typed column of `i64`s.
634    fn sample_typed() -> Column<i64> {
635        let mut col: Column<i64> = Default::default();
636        for v in 0i64..1024 {
637            col.push_into(v);
638        }
639        col
640    }
641
642    /// Drains a column into a `Vec<i64>` for comparison via `borrow`.
643    fn collect_i64(col: &Column<i64>) -> Vec<i64> {
644        col.borrow().into_index_iter().copied().collect()
645    }
646
647    #[mz_ore::test]
648    fn skip_policy_keeps_resident() {
649        let policy = TestPolicy::new(PageDecision::Skip);
650        let cp = ColumnPager::new(as_dyn(&policy));
651        let mut col = sample_typed();
652        let paged = cp.page(&mut col);
653        assert!(matches!(paged, PagedColumn::Resident(_, _)));
654        let rt = cp.take(paged);
655        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
656        assert_eq!(policy.out.load(Ordering::Relaxed), 0);
657        assert_eq!(policy.r#in.load(Ordering::Relaxed), 0);
658    }
659
660    #[mz_ore::test]
661    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `madvise` on OS `linux`
662    fn round_trip_swap_uncompressed() {
663        let policy = TestPolicy::new(PageDecision::Page {
664            backend: Backend::Swap,
665            codec: None,
666        });
667        let cp = ColumnPager::new(as_dyn(&policy));
668        let mut col = sample_typed();
669        let paged = cp.page(&mut col);
670        assert!(matches!(paged, PagedColumn::Paged { .. }));
671        let rt = cp.take(paged);
672        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
673        assert_eq!(policy.out.load(Ordering::Relaxed), 1);
674        assert_eq!(policy.r#in.load(Ordering::Relaxed), 1);
675    }
676
677    #[mz_ore::test]
678    fn round_trip_swap_lz4() {
679        let policy = TestPolicy::new(PageDecision::Page {
680            backend: Backend::Swap,
681            codec: Some(Codec::Lz4),
682        });
683        let cp = ColumnPager::new(as_dyn(&policy));
684        let mut col = sample_typed();
685        let paged = cp.page(&mut col);
686        assert!(matches!(
687            paged,
688            PagedColumn::Compressed {
689                inner: CompressedInner::Memory(_),
690                ..
691            }
692        ));
693        let rt = cp.take(paged);
694        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
695    }
696
697    /// With the swap-pageout flag on, the lz4 + swap path issues `MADV_PAGEOUT`
698    /// over the compressed bytes; the round-trip must still reproduce the input
699    /// (the advice is a non-destructive reclaim hint). Drives the global pager
700    /// through `apply_tiered_config` — the only path that sets the flag — with a
701    /// zero budget so every column spills. Resets the globals on the way out so
702    /// peer tests see the default disabled pager.
703    #[mz_ore::test]
704    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `madvise` on OS `linux`
705    fn round_trip_swap_lz4_pageout() {
706        apply_tiered_config(true, 0, Backend::Swap, Some(Codec::Lz4), true);
707        let cp = global_pager();
708        let mut col = sample_typed();
709        let paged = cp.page(&mut col);
710        assert!(matches!(
711            paged,
712            PagedColumn::Compressed {
713                inner: CompressedInner::Memory(_),
714                ..
715            }
716        ));
717        let rt = cp.take(paged);
718        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
719        apply_tiered_config(false, 0, Backend::Swap, None, false);
720    }
721
722    #[mz_ore::test]
723    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
724    fn round_trip_file_uncompressed() {
725        ensure_scratch();
726        let policy = TestPolicy::new(PageDecision::Page {
727            backend: Backend::File,
728            codec: None,
729        });
730        let cp = ColumnPager::new(as_dyn(&policy));
731        let mut col = sample_typed();
732        let paged = cp.page(&mut col);
733        assert!(matches!(paged, PagedColumn::Paged { .. }));
734        let rt = cp.take(paged);
735        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
736    }
737
738    #[mz_ore::test]
739    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
740    fn round_trip_file_lz4() {
741        ensure_scratch();
742        let policy = TestPolicy::new(PageDecision::Page {
743            backend: Backend::File,
744            codec: Some(Codec::Lz4),
745        });
746        let cp = ColumnPager::new(as_dyn(&policy));
747        let mut col = sample_typed();
748        let paged = cp.page(&mut col);
749        assert!(matches!(
750            paged,
751            PagedColumn::Compressed {
752                inner: CompressedInner::Paged(_),
753                ..
754            }
755        ));
756        let rt = cp.take(paged);
757        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
758    }
759
760    #[mz_ore::test]
761    fn align_variant_fast_path() {
762        // Construct an Align column directly to exercise the move-only raw path.
763        let policy = TestPolicy::new(PageDecision::Page {
764            backend: Backend::Swap,
765            codec: None,
766        });
767        let cp = ColumnPager::new(as_dyn(&policy));
768        let body: Vec<u64> = (1u64..=512).collect();
769        let mut col: Column<i64> = Column::Align(body.clone());
770        let paged = cp.page(&mut col);
771        assert!(matches!(paged, PagedColumn::Paged { .. }));
772        // After paging an Align variant, `col` is reset to the typed default.
773        assert!(matches!(col, Column::Typed(_)));
774        let rt = cp.take(paged);
775        // Round-tripped column should produce identical bytes.
776        match rt {
777            Column::Align(v) => assert_eq!(v, body),
778            other => panic!("expected Align, got {:?}", std::mem::discriminant(&other)),
779        }
780    }
781}