Skip to main content

mz_timely_util/
column_pager.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Column-aware pager. Pages [`Column`] instances out via [`mz_ore::pager`],
17//! optionally compressing with lz4.
18//!
19//! The pager (`mz_ore::pager`) deals in `Vec<u64>` blobs and two backends. This
20//! module adds:
21//!
22//! 1. A [`PagingPolicy`] trait that decides _whether_ to page out, _which
23//!    backend_, and _whether to compress_. Decisions live in the policy
24//!    implementation, not in the global atomic the pager exposes.
25//! 2. A [`ColumnPager`] that drains a `Column<C>` into a [`PagedColumn`] and
26//!    rehydrates it on demand.
27//! 3. Lz4 frame-format compression as an optional codec.
28//!
29//! The serialization uses the existing [`ContainerBytes`] protocol on
30//! `Column<C>`, so we get a single byte layout that both raw and compressed
31//! paths share. See `doc/developer/design/20260504_pager.md` for background.
32
33#![deny(missing_docs)]
34
35pub mod policy;
36
37use std::io::{self, Read};
38use std::sync::Arc;
39
40use columnar::Columnar;
41use lz4_flex::frame::{FrameDecoder, FrameEncoder};
42use mz_ore::pager::{self, Backend, Handle};
43use timely::bytes::arc::BytesMut;
44use timely::dataflow::channels::ContainerBytes;
45
46use crate::columnar::Column;
47
48/// Compression codec applied to a paged-out column.
49#[derive(Copy, Clone, Debug, Eq, PartialEq)]
50pub enum Codec {
51    /// lz4 frame format (`lz4_flex::frame`). Self-delimiting, streams via
52    /// `io::Read`/`io::Write`, no random access.
53    Lz4,
54}
55
56/// Inputs to a pageout decision.
57#[derive(Copy, Clone, Debug)]
58pub struct PageHint {
59    /// Uncompressed body size in bytes (matches [`ContainerBytes::length_in_bytes`]).
60    pub len_bytes: usize,
61}
62
63/// Outcome of a policy decision.
64#[derive(Copy, Clone, Debug)]
65pub enum PageDecision {
66    /// Keep the column resident; no I/O, no compression.
67    Skip,
68    /// Page out using the given backend and (optionally) codec.
69    Page {
70        /// Pager backend to use.
71        backend: Backend,
72        /// Compression codec, or `None` for raw bytes.
73        codec: Option<Codec>,
74    },
75}
76
77/// Notifications the column-pager sends back to the policy. Implementations
78/// typically forward to metrics counters.
79#[derive(Debug)]
80pub enum PageEvent {
81    /// A successful pageout. `bytes_in` is the uncompressed body size,
82    /// `bytes_out` is the on-storage payload size (after compression).
83    PagedOut {
84        /// Uncompressed body size handed to the pager.
85        bytes_in: usize,
86        /// On-storage payload size after compression and padding.
87        bytes_out: usize,
88        /// Backend selected by the policy.
89        backend: Backend,
90        /// Codec selected by the policy.
91        codec: Option<Codec>,
92    },
93    /// A successful page-in. `bytes` is the uncompressed body size delivered to
94    /// the caller.
95    PagedIn {
96        /// Uncompressed body size delivered to the caller.
97        bytes: usize,
98    },
99    /// A pageout failure surfaced via the underlying pager.
100    Failed {
101        /// Backend that produced the error.
102        backend: Backend,
103        /// Underlying I/O error.
104        err: io::Error,
105    },
106    /// A resident column has been dropped. Fires from [`ResidentTicket::drop`]
107    /// when the [`PagedColumn::Resident`] holding the ticket is consumed by
108    /// [`ColumnPager::take`] or dropped without being taken. Policies use this
109    /// to return budget allocated when [`PagingPolicy::decide`] answered
110    /// [`PageDecision::Skip`].
111    ResidentReleased {
112        /// Uncompressed body size returned to the policy.
113        bytes: usize,
114    },
115}
116
117/// Decides whether/how to page a column out, and records page events.
118///
119/// Implementations carry their own state (counters, atomics, configuration)
120/// via interior mutability. Methods take `&self` so a single policy can be
121/// shared across operator threads.
122pub trait PagingPolicy: Send + Sync {
123    /// Returns the action to take for a column with the given hint.
124    fn decide(&self, hint: PageHint) -> PageDecision;
125    /// Records a pageout/pagein/failure event for metrics or adaptive decisions.
126    fn record(&self, event: PageEvent);
127}
128
129/// Sizing metadata captured at pageout time. Stored alongside the payload so
130/// `take` can size buffers.
131#[derive(Clone, Debug)]
132pub struct Meta {
133    /// Uncompressed body size in bytes.
134    pub len_bytes: usize,
135}
136
137/// A column whose body may be resident, paged out, or paged out and compressed.
138///
139/// Each variant corresponds to one of the [`PageDecision`] outcomes.
140///
141/// All variants are `Send`. The [`Resident`](PagedColumn::Resident) variant's
142/// drop credit goes back to the policy via [`PageEvent::ResidentReleased`];
143/// concrete policies that ship with this crate (notably
144/// [`policy::TieredPolicy`]) credit a single process-wide atomic pool, so
145/// dropping a `Resident` on a different thread than the one that called
146/// [`ColumnPager::page`] is safe. Custom policies that introduce
147/// thread-local accounting must take care to either pin the column to its
148/// origin thread (e.g. via a `SendColumn` wrapper) or carry the origin in
149/// [`PageEvent::ResidentReleased`] so credit can be routed correctly.
150pub enum PagedColumn<C: Columnar> {
151    /// Body kept resident. Returned when the policy answered
152    /// [`PageDecision::Skip`]. The accompanying [`ResidentTicket`] fires a
153    /// [`PageEvent::ResidentReleased`] when the variant is dropped or
154    /// consumed by [`ColumnPager::take`], so the policy can reclaim the
155    /// budget it granted in [`PagingPolicy::decide`].
156    Resident(Column<C>, ResidentTicket),
157    /// Raw `ContainerBytes` payload stored via [`pager::Handle`]. The backend
158    /// (Swap or File) is baked into the handle.
159    Paged {
160        /// Pager handle owning the raw payload.
161        handle: Handle,
162        /// Sizing metadata.
163        meta: Meta,
164    },
165    /// Lz4-framed serialized form. The framed bytes themselves may live in
166    /// memory or in the pager (see [`CompressedInner`]).
167    Compressed {
168        /// Where the framed bytes live.
169        inner: CompressedInner,
170        /// Sizing metadata.
171        meta: Meta,
172    },
173}
174
175/// Drop guard that returns budget to a [`PagingPolicy`] when a
176/// [`PagedColumn::Resident`] is destroyed.
177///
178/// The ticket holds an `Arc` to the policy and the byte count it was charged
179/// for at [`PagingPolicy::decide`] time. On drop it fires a
180/// [`PageEvent::ResidentReleased`] event; the policy implementation decides
181/// what to credit and where (local pool, shared pool, both).
182pub struct ResidentTicket {
183    bytes: usize,
184    policy: Arc<dyn PagingPolicy>,
185}
186
187impl Drop for ResidentTicket {
188    fn drop(&mut self) {
189        self.policy
190            .record(PageEvent::ResidentReleased { bytes: self.bytes });
191    }
192}
193
194/// Storage location for the lz4-framed bytes inside a compressed paged column.
195pub enum CompressedInner {
196    /// Owned `Vec<u8>` held resident in the caller's address space.
197    Memory(Vec<u8>),
198    /// Framed bytes padded to a `u64` boundary and handed to the pager. The
199    /// frame trailer self-delimits, so the trailing pad is ignored on read.
200    Paged(Handle),
201}
202
203/// Pages typed [`Column`]s out and back in, driven by a [`PagingPolicy`].
204///
205/// Cheap to clone (it's an `Arc`). Hold one per operator if you want per-site
206/// policy state, or share globally if you want one policy.
207#[derive(Clone)]
208pub struct ColumnPager {
209    policy: Arc<dyn PagingPolicy>,
210}
211
212impl ColumnPager {
213    /// Constructs a column pager driven by `policy`.
214    pub fn new(policy: Arc<dyn PagingPolicy>) -> Self {
215        Self { policy }
216    }
217
218    /// Drains `col` into a [`PagedColumn`]. After return `col` is left as a
219    /// fresh `Column::default()` (typed, empty), ready to be refilled by the
220    /// caller on the next loop iteration.
221    ///
222    /// Backend / codec semantics:
223    ///
224    /// * Uncompressed, [`Column::Align`]: the inner `Vec<u64>` is moved into
225    ///   the pager handle with no copies. Swap backend keeps the allocation
226    ///   resident; file backend writes it out and drops it.
227    /// * Uncompressed, other variants: the column is serialized via
228    ///   [`ContainerBytes::into_bytes`] into a `Vec<u8>`, copied into a
229    ///   u64-aligned `Vec<u64>`, then handed to the pager.
230    /// * Compressed: the column is serialized through an [`FrameEncoder`]
231    ///   directly into the output buffer. No intermediate uncompressed
232    ///   `Vec<u8>` is materialized.
233    pub fn page<C: Columnar>(&self, col: &mut Column<C>) -> PagedColumn<C> {
234        let len_bytes = col.length_in_bytes();
235        let hint = PageHint { len_bytes };
236
237        let (backend, codec) = match self.policy.decide(hint) {
238            PageDecision::Skip => {
239                let ticket = ResidentTicket {
240                    bytes: len_bytes,
241                    policy: Arc::clone(&self.policy),
242                };
243                return PagedColumn::Resident(std::mem::take(col), ticket);
244            }
245            PageDecision::Page { backend, codec } => (backend, codec),
246        };
247        let meta = Meta { len_bytes };
248
249        match codec {
250            None => {
251                // Raw path: the body must end up as u64-aligned bytes for the
252                // pager. `Column::Align` already is; other variants are
253                // serialized and copied.
254                debug_assert_eq!(len_bytes % 8, 0);
255                let body: Vec<u64> = match std::mem::take(col) {
256                    // Move the aligned buffer straight into the pager: the
257                    // allocation transfers with no copy. `take` already left
258                    // `col` as a refill-ready `Typed` default.
259                    Column::Align(v) => v,
260                    mut other => {
261                        let mut buf = Vec::with_capacity(len_bytes);
262                        other.into_bytes(&mut buf);
263                        debug_assert_eq!(buf.len() % 8, 0);
264                        // `into_bytes` only borrowed `other`; clear it in place
265                        // and hand it back so the caller keeps the `Typed`
266                        // allocation instead of us dropping a reusable buffer.
267                        other.clear();
268                        *col = other;
269                        bytemuck::allocation::pod_collect_to_vec::<u8, u64>(&buf)
270                    }
271                };
272                let handle = pager::pageout_with(backend, &mut [body]);
273                self.policy.record(PageEvent::PagedOut {
274                    bytes_in: len_bytes,
275                    bytes_out: handle.len_bytes(),
276                    backend,
277                    codec: None,
278                });
279                PagedColumn::Paged { handle, meta }
280            }
281            Some(Codec::Lz4) => {
282                // Stream serialized bytes straight into lz4 — no intermediate
283                // uncompressed `Vec<u8>`.
284                let mut out = Vec::with_capacity(len_bytes / 4);
285                {
286                    let mut enc = FrameEncoder::new(&mut out);
287                    col.into_bytes(&mut enc);
288                    enc.finish().expect("lz4 finish into Vec is infallible");
289                }
290                // `into_bytes` borrows `col`, so empty it explicitly now that
291                // its bytes live (compressed) in `out`. `clear` retains the
292                // `Typed` allocation so the caller can refill it, rather than
293                // dropping a buffer it may want to reuse.
294                col.clear();
295                self.policy.record(PageEvent::PagedOut {
296                    bytes_in: len_bytes,
297                    bytes_out: out.len(),
298                    backend,
299                    codec: Some(Codec::Lz4),
300                });
301                let inner = match backend {
302                    Backend::Swap => CompressedInner::Memory(out),
303                    Backend::File => {
304                        // The pager deals in `Vec<u64>`, so the framed bytes
305                        // must be widened. `out` is already compressed (~4x
306                        // smaller than the source), so this copy is over the
307                        // small form; avoiding it would mean a byte-oriented
308                        // pager entry point, not worth widening that surface.
309                        let padded = pad_u8_to_u64(out);
310                        let handle = pager::pageout_with(Backend::File, &mut [padded]);
311                        CompressedInner::Paged(handle)
312                    }
313                };
314                PagedColumn::Compressed { inner, meta }
315            }
316        }
317    }
318
319    /// Rehydrates `paged` into a [`Column<C>`]. Consumes the handle and
320    /// reclaims its storage (file backend unlinks; swap backend drops the
321    /// `Vec`).
322    pub fn take<C: Columnar>(&self, paged: PagedColumn<C>) -> Column<C> {
323        match paged {
324            // `_ticket` drops here and fires `PageEvent::ResidentReleased`.
325            PagedColumn::Resident(c, _ticket) => c,
326            PagedColumn::Paged { handle, meta } => {
327                let mut body: Vec<u64> = Vec::with_capacity(handle.len());
328                pager::take(handle, &mut body);
329                debug_assert_eq!(body.len() * 8, meta.len_bytes);
330                self.policy.record(PageEvent::PagedIn {
331                    bytes: meta.len_bytes,
332                });
333                Column::Align(body)
334            }
335            PagedColumn::Compressed { inner, meta } => {
336                let mut decoded = Vec::with_capacity(meta.len_bytes);
337                match inner {
338                    CompressedInner::Memory(v) => {
339                        FrameDecoder::new(&v[..])
340                            .read_to_end(&mut decoded)
341                            .expect("lz4 decode from memory");
342                    }
343                    CompressedInner::Paged(h) => {
344                        let mut padded = Vec::with_capacity(h.len());
345                        pager::take(h, &mut padded);
346                        let src: &[u8] = bytemuck::cast_slice(&padded);
347                        FrameDecoder::new(src)
348                            .read_to_end(&mut decoded)
349                            .expect("lz4 decode from pager");
350                    }
351                }
352                debug_assert_eq!(decoded.len(), meta.len_bytes);
353                self.policy.record(PageEvent::PagedIn {
354                    bytes: decoded.len(),
355                });
356                // `BytesMut::from` wraps the `Vec<u8>` without copying; `freeze`
357                // produces the refcounted `Bytes` that `ContainerBytes` expects.
358                Column::from_bytes(BytesMut::from(decoded).freeze())
359            }
360        }
361    }
362}
363
364/// Reinterprets `bytes` as a `Vec<u64>` by trailing-zero padding to a multiple
365/// of 8 and copying. The lz4 frame trailer self-delimits so the trailing pad is
366/// invisible to [`FrameDecoder`].
367fn pad_u8_to_u64(mut bytes: Vec<u8>) -> Vec<u64> {
368    let pad = bytes.len().next_multiple_of(8) - bytes.len();
369    if pad != 0 {
370        bytes.resize(bytes.len() + pad, 0);
371    }
372    debug_assert_eq!(bytes.len() % 8, 0);
373    // `Vec<u8>` and `Vec<u64>` have different layouts (size + align), so we
374    // can't transmute the allocation. Copy into a fresh, properly aligned
375    // `Vec<u64>`. The cost is one `len_bytes/8`-word memcpy per pageout.
376    let len_u64s = bytes.len() / 8;
377    let mut out = vec![0u64; len_u64s];
378    let dst: &mut [u8] = bytemuck::cast_slice_mut(&mut out);
379    dst.copy_from_slice(&bytes);
380    out
381}
382
383#[cfg(test)]
384#[allow(clippy::clone_on_ref_ptr)]
385mod tests {
386    use std::sync::atomic::{AtomicUsize, Ordering};
387
388    use columnar::Index;
389    use timely::container::PushInto;
390
391    use super::*;
392
393    /// Shared scratch directory for all tests in this module. `set_scratch_dir`
394    /// is idempotent and only honors the first path it sees, so individual
395    /// tests cannot bring their own tempdir without races when run in parallel
396    /// (a tempdir dropped at test end would invalidate `SUBDIR` for any peer
397    /// still running).
398    fn ensure_scratch() {
399        static DIR: std::sync::OnceLock<tempfile::TempDir> = std::sync::OnceLock::new();
400        let dir = DIR.get_or_init(|| tempfile::tempdir().expect("tempdir"));
401        pager::set_scratch_dir(dir.path().to_path_buf());
402    }
403
404    /// Promotes a typed policy `Arc` to `Arc<dyn PagingPolicy>`. Hides the
405    /// unsize coercion behind a `clone()` so the trait object is constructed
406    /// without the now-discouraged `as` cast.
407    fn as_dyn(p: &Arc<impl PagingPolicy + 'static>) -> Arc<dyn PagingPolicy> {
408        p.clone()
409    }
410
411    /// Recording policy: configurable decision, counts events.
412    struct TestPolicy {
413        decision: PageDecision,
414        out: AtomicUsize,
415        r#in: AtomicUsize,
416    }
417
418    impl TestPolicy {
419        fn new(decision: PageDecision) -> Arc<Self> {
420            Arc::new(Self {
421                decision,
422                out: AtomicUsize::new(0),
423                r#in: AtomicUsize::new(0),
424            })
425        }
426    }
427
428    impl PagingPolicy for TestPolicy {
429        fn decide(&self, _hint: PageHint) -> PageDecision {
430            self.decision
431        }
432        fn record(&self, event: PageEvent) {
433            match event {
434                PageEvent::PagedOut { .. } => {
435                    self.out.fetch_add(1, Ordering::Relaxed);
436                }
437                PageEvent::PagedIn { .. } => {
438                    self.r#in.fetch_add(1, Ordering::Relaxed);
439                }
440                PageEvent::ResidentReleased { .. } | PageEvent::Failed { .. } => {}
441            }
442        }
443    }
444
445    /// Builds a sample typed column of `i64`s.
446    fn sample_typed() -> Column<i64> {
447        let mut col: Column<i64> = Default::default();
448        for v in 0i64..1024 {
449            col.push_into(v);
450        }
451        col
452    }
453
454    /// Drains a column into a `Vec<i64>` for comparison via `borrow`.
455    fn collect_i64(col: &Column<i64>) -> Vec<i64> {
456        col.borrow().into_index_iter().copied().collect()
457    }
458
459    #[mz_ore::test]
460    fn skip_policy_keeps_resident() {
461        let policy = TestPolicy::new(PageDecision::Skip);
462        let cp = ColumnPager::new(as_dyn(&policy));
463        let mut col = sample_typed();
464        let paged = cp.page(&mut col);
465        assert!(matches!(paged, PagedColumn::Resident(_, _)));
466        let rt = cp.take(paged);
467        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
468        assert_eq!(policy.out.load(Ordering::Relaxed), 0);
469        assert_eq!(policy.r#in.load(Ordering::Relaxed), 0);
470    }
471
472    #[mz_ore::test]
473    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `madvise` on OS `linux`
474    fn round_trip_swap_uncompressed() {
475        let policy = TestPolicy::new(PageDecision::Page {
476            backend: Backend::Swap,
477            codec: None,
478        });
479        let cp = ColumnPager::new(as_dyn(&policy));
480        let mut col = sample_typed();
481        let paged = cp.page(&mut col);
482        assert!(matches!(paged, PagedColumn::Paged { .. }));
483        let rt = cp.take(paged);
484        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
485        assert_eq!(policy.out.load(Ordering::Relaxed), 1);
486        assert_eq!(policy.r#in.load(Ordering::Relaxed), 1);
487    }
488
489    #[mz_ore::test]
490    fn round_trip_swap_lz4() {
491        let policy = TestPolicy::new(PageDecision::Page {
492            backend: Backend::Swap,
493            codec: Some(Codec::Lz4),
494        });
495        let cp = ColumnPager::new(as_dyn(&policy));
496        let mut col = sample_typed();
497        let paged = cp.page(&mut col);
498        assert!(matches!(
499            paged,
500            PagedColumn::Compressed {
501                inner: CompressedInner::Memory(_),
502                ..
503            }
504        ));
505        let rt = cp.take(paged);
506        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
507    }
508
509    #[mz_ore::test]
510    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
511    fn round_trip_file_uncompressed() {
512        ensure_scratch();
513        let policy = TestPolicy::new(PageDecision::Page {
514            backend: Backend::File,
515            codec: None,
516        });
517        let cp = ColumnPager::new(as_dyn(&policy));
518        let mut col = sample_typed();
519        let paged = cp.page(&mut col);
520        assert!(matches!(paged, PagedColumn::Paged { .. }));
521        let rt = cp.take(paged);
522        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
523    }
524
525    #[mz_ore::test]
526    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `writev` on OS `linux`
527    fn round_trip_file_lz4() {
528        ensure_scratch();
529        let policy = TestPolicy::new(PageDecision::Page {
530            backend: Backend::File,
531            codec: Some(Codec::Lz4),
532        });
533        let cp = ColumnPager::new(as_dyn(&policy));
534        let mut col = sample_typed();
535        let paged = cp.page(&mut col);
536        assert!(matches!(
537            paged,
538            PagedColumn::Compressed {
539                inner: CompressedInner::Paged(_),
540                ..
541            }
542        ));
543        let rt = cp.take(paged);
544        assert_eq!(collect_i64(&rt), (0i64..1024).collect::<Vec<_>>());
545    }
546
547    #[mz_ore::test]
548    fn align_variant_fast_path() {
549        // Construct an Align column directly to exercise the move-only raw path.
550        let policy = TestPolicy::new(PageDecision::Page {
551            backend: Backend::Swap,
552            codec: None,
553        });
554        let cp = ColumnPager::new(as_dyn(&policy));
555        let body: Vec<u64> = (1u64..=512).collect();
556        let mut col: Column<i64> = Column::Align(body.clone());
557        let paged = cp.page(&mut col);
558        assert!(matches!(paged, PagedColumn::Paged { .. }));
559        // After paging an Align variant, `col` is reset to the typed default.
560        assert!(matches!(col, Column::Typed(_)));
561        let rt = cp.take(paged);
562        // Round-tripped column should produce identical bytes.
563        match rt {
564            Column::Align(v) => assert_eq!(v, body),
565            other => panic!("expected Align, got {:?}", std::mem::discriminant(&other)),
566        }
567    }
568}