mz_persist_client/
iter.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code for iterating through one or more parts, including streaming consolidation.
11
12use std::cmp::{Ordering, Reverse};
13use std::collections::binary_heap::PeekMut;
14use std::collections::{BinaryHeap, VecDeque};
15use std::fmt::Debug;
16use std::marker::PhantomData;
17use std::mem;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use arrow::array::{Array, Int64Array};
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::StreamExt;
26use futures_util::stream::FuturesUnordered;
27use itertools::Itertools;
28use mz_ore::task::JoinHandle;
29use mz_persist::indexed::encoding::BlobTraceUpdates;
30use mz_persist::location::Blob;
31use mz_persist::metrics::ColumnarMetrics;
32use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd};
33use mz_persist_types::part::Part;
34use mz_persist_types::{Codec, Codec64};
35use semver::Version;
36use timely::progress::Timestamp;
37use tracing::{Instrument, debug_span};
38
39use crate::ShardId;
40use crate::fetch::{EncodedPart, FetchBatchFilter, FetchConfig};
41use crate::internal::encoding::Schemas;
42use crate::internal::metrics::{ReadMetrics, ShardMetrics};
43use crate::internal::state::{HollowRun, RunMeta, RunOrder, RunPart};
44use crate::metrics::Metrics;
45
46/// Versions prior to this had bugs in consolidation, or used a different sort. However,
47/// we can assume that consolidated parts at this version or higher were consolidated
48/// according to the current definition.
49pub const MINIMUM_CONSOLIDATED_VERSION: Version = Version::new(0, 67, 0);
50
51/// The data needed to fetch a batch part, bundled up to make it easy
52/// to send between threads.
53#[derive(Debug, Clone)]
54pub(crate) struct FetchData<T> {
55    run_meta: RunMeta,
56    part_desc: Description<T>,
57    part: RunPart<T>,
58    structured_lower: Option<ArrayBound>,
59}
60
61pub(crate) trait RowSort<T, D> {
62    fn updates_from_blob(&self, updates: BlobTraceUpdates) -> StructuredUpdates;
63
64    fn updates_to_blob(&self, updates: StructuredUpdates) -> Part;
65}
66
67fn interleave_updates<T: Codec64, D: Codec64>(
68    updates: &[&Part],
69    elements: impl IntoIterator<Item = (Indices, T, D)>,
70) -> Part {
71    let (indices, timestamps, diffs): (Vec<_>, Vec<_>, Vec<_>) = elements
72        .into_iter()
73        .map(|(idx, t, d)| {
74            (
75                idx,
76                i64::from_le_bytes(T::encode(&t)),
77                i64::from_le_bytes(D::encode(&d)),
78            )
79        })
80        .multiunzip();
81
82    let mut arrays: Vec<&dyn Array> = Vec::with_capacity(updates.len());
83    let mut interleave = |get_array: fn(&Part) -> &dyn Array| {
84        arrays.clear();
85        for part in updates {
86            arrays.push(get_array(part));
87        }
88        ::arrow::compute::interleave(arrays.as_slice(), &indices).expect("type-aligned input")
89    };
90
91    let key = interleave(|p| &*p.key);
92    let val = interleave(|p| &*p.val);
93    Part {
94        key,
95        val,
96        time: Int64Array::from(timestamps),
97        diff: Int64Array::from(diffs),
98    }
99}
100
101/// An opaque update set for use by StructuredSort.
102#[derive(Clone, Debug)]
103pub struct StructuredUpdates {
104    key_ord: ArrayOrd,
105    val_ord: ArrayOrd,
106    data: Part,
107}
108
109impl StructuredUpdates {
110    fn len(&self) -> usize {
111        self.data.len()
112    }
113
114    fn get<T: Codec64, D: Codec64>(&self, index: usize) -> Option<(SortKV<'_>, T, D)> {
115        let t = self.data.time.values().get(index)?.to_le_bytes();
116        let d = self.data.diff.values().get(index)?.to_le_bytes();
117        Some((
118            (self.key_ord.at(index), Some(self.val_ord.at(index))),
119            T::decode(t),
120            D::decode(d),
121        ))
122    }
123
124    fn interleave_updates<'a, T: Codec64, D: Codec64>(
125        updates: &[&'a StructuredUpdates],
126        elements: impl IntoIterator<Item = (Indices, SortKV<'a>, T, D)>,
127    ) -> StructuredUpdates {
128        let updates: Vec<_> = updates.iter().map(|u| &u.data).collect();
129        let interleaved = interleave_updates(
130            &updates,
131            elements.into_iter().map(|(idx, _, t, d)| (idx, t, d)),
132        );
133        let key_ord = ArrayOrd::new(interleaved.key.as_ref());
134        let val_ord = ArrayOrd::new(interleaved.val.as_ref());
135        StructuredUpdates {
136            key_ord,
137            val_ord,
138            data: interleaved,
139        }
140    }
141}
142
143/// Sort parts ordered by the codec-encoded key and value columns.
144#[derive(Debug, Clone)]
145pub struct StructuredSort<K: Codec, V: Codec, T, D> {
146    schemas: Schemas<K, V>,
147    _time_diff: PhantomData<fn(T, D)>,
148}
149
150impl<K: Codec, V: Codec, T, D> StructuredSort<K, V, T, D> {
151    /// A sort for structured data with the given schema.
152    pub fn new(schemas: Schemas<K, V>) -> Self {
153        Self {
154            schemas,
155            _time_diff: Default::default(),
156        }
157    }
158}
159
160type SortKV<'a> = (ArrayIdx<'a>, Option<ArrayIdx<'a>>);
161
162fn kv_lower<T>(data: &FetchData<T>) -> Option<SortKV<'_>> {
163    let key_idx = data.structured_lower.as_ref().map(|l| l.get())?;
164    Some((key_idx, None))
165}
166
167fn kv_size((key, value): SortKV<'_>) -> usize {
168    key.goodbytes() + value.map_or(0, |v| v.goodbytes())
169}
170
171impl<K: Codec, V: Codec, T: Codec64, D: Codec64> RowSort<T, D> for StructuredSort<K, V, T, D> {
172    fn updates_from_blob(&self, mut updates: BlobTraceUpdates) -> StructuredUpdates {
173        let structured = updates
174            .get_or_make_structured::<K, V>(&*self.schemas.key, &*self.schemas.val)
175            .clone();
176        let key_ord = ArrayOrd::new(&structured.key);
177        let val_ord = ArrayOrd::new(&structured.val);
178        StructuredUpdates {
179            key_ord,
180            val_ord,
181            data: Part {
182                key: structured.key,
183                val: structured.val,
184                time: updates.timestamps().clone(),
185                diff: updates.diffs().clone(),
186            },
187        }
188    }
189
190    fn updates_to_blob(&self, updates: StructuredUpdates) -> Part {
191        updates.data
192    }
193}
194
195type FetchResult<T> = Result<EncodedPart<T>, HollowRun<T>>;
196
197impl<T: Codec64 + Timestamp + Lattice> FetchData<T> {
198    async fn fetch(
199        self,
200        cfg: &FetchConfig,
201        shard_id: ShardId,
202        blob: &dyn Blob,
203        metrics: &Metrics,
204        shard_metrics: &ShardMetrics,
205        read_metrics: &ReadMetrics,
206    ) -> anyhow::Result<FetchResult<T>> {
207        match self.part {
208            RunPart::Single(part) => {
209                let part = EncodedPart::fetch(
210                    cfg,
211                    &shard_id,
212                    &*blob,
213                    metrics,
214                    shard_metrics,
215                    read_metrics,
216                    &self.part_desc,
217                    &part,
218                )
219                .await
220                .map_err(|blob_key| anyhow!("missing unleased key {blob_key}"))?;
221                Ok(Ok(part))
222            }
223            RunPart::Many(run_ref) => {
224                let runs = run_ref
225                    .get(shard_id, blob, metrics)
226                    .await
227                    .ok_or_else(|| anyhow!("missing run ref {}", run_ref.key))?;
228                Ok(Err(runs))
229            }
230        }
231    }
232}
233
234/// Indices into a part. For most parts, all we need is a single index to the current entry...
235/// but for parts that have never been consolidated, this would return entries in the "wrong"
236/// order, and it's expensive to re-sort the columnar data. Instead, we sort a list of indices
237/// and then use this helper to hand them out in the correct order.
238#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Default)]
239struct PartIndices {
240    sorted_indices: VecDeque<usize>,
241    next_index: usize,
242}
243
244impl PartIndices {
245    fn index(&self) -> usize {
246        self.sorted_indices
247            .front()
248            .copied()
249            .unwrap_or(self.next_index)
250    }
251
252    fn inc(&mut self) {
253        if self.sorted_indices.pop_front().is_none() {
254            self.next_index += 1;
255        }
256    }
257}
258
259#[derive(Debug)]
260enum ConsolidationPart<T, D> {
261    Queued {
262        data: FetchData<T>,
263        task: Option<JoinHandle<anyhow::Result<FetchResult<T>>>>,
264        _diff: PhantomData<D>,
265    },
266    Encoded {
267        part: StructuredUpdates,
268        cursor: PartIndices,
269    },
270}
271
272impl<T: Timestamp + Codec64 + Lattice, D: Codec64> ConsolidationPart<T, D> {
273    pub(crate) fn from_encoded(
274        part: EncodedPart<T>,
275        force_reconsolidation: bool,
276        metrics: &ColumnarMetrics,
277        sort: &impl RowSort<T, D>,
278    ) -> Self {
279        let reconsolidate = part.maybe_unconsolidated() || force_reconsolidation;
280        let updates = part.normalize(metrics);
281        let updates: StructuredUpdates = sort.updates_from_blob(updates);
282        let cursor = if reconsolidate {
283            let len = updates.len();
284            let mut indices: Vec<_> = (0..len).collect();
285
286            indices.sort_by_key(|i| updates.get::<T, D>(*i).map(|(kv, t, _d)| (kv, t)));
287
288            PartIndices {
289                sorted_indices: indices.into(),
290                next_index: len,
291            }
292        } else {
293            PartIndices::default()
294        };
295
296        ConsolidationPart::Encoded {
297            part: updates,
298            cursor,
299        }
300    }
301
302    fn kvt_lower(&self) -> Option<(SortKV<'_>, T)> {
303        match self {
304            ConsolidationPart::Queued { data, .. } => Some((kv_lower(data)?, T::minimum())),
305            ConsolidationPart::Encoded { part, cursor } => {
306                let (kv, t, _d) = part.get::<T, D>(cursor.index())?;
307                Some((kv, t))
308            }
309        }
310    }
311
312    /// This requires a mutable pointer because the cursor may need to scan ahead to find the next
313    /// valid record.
314    pub(crate) fn is_empty(&self) -> bool {
315        match self {
316            ConsolidationPart::Encoded { part, cursor, .. } => cursor.index() >= part.len(),
317            ConsolidationPart::Queued { .. } => false,
318        }
319    }
320}
321
322/// A tool for incrementally consolidating a persist shard.
323///
324/// The naive way to consolidate a Persist shard would be to fetch every part, then consolidate
325/// the whole thing. We'd like to improve on that in two ways:
326/// - Concurrency: we'd like to be able to start consolidating and returning results before every
327///   part is fetched. (And continue fetching while we're doing other work.)
328/// - Memory usage: we'd like to limit the number of parts we have in memory at once, dropping
329///   parts that are fully consolidated and fetching parts just a little before they're needed.
330///
331/// This interface supports this by consolidating in multiple steps. Each call to [Self::next]
332/// will do some housekeeping work -- prefetching needed parts, dropping any unneeded parts -- and
333/// return  an iterator over a consolidated subset of the data. To read an entire dataset, the
334/// client should call `next` until it returns `None`, which signals all data has been returned...
335/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
336#[derive(Debug)]
337pub(crate) struct Consolidator<T, D, Sort: RowSort<T, D>> {
338    context: String,
339    cfg: FetchConfig,
340    shard_id: ShardId,
341    sort: Sort,
342    blob: Arc<dyn Blob>,
343    metrics: Arc<Metrics>,
344    shard_metrics: Arc<ShardMetrics>,
345    read_metrics: Arc<ReadMetrics>,
346    runs: Vec<VecDeque<(ConsolidationPart<T, D>, usize)>>,
347    filter: FetchBatchFilter<T>,
348    budget: usize,
349    /// An optional exclusive lower bound for the KVTs that this consolidator will return.
350    /// This can be used to filter out KVTs that are not strictly larger than the bound
351    /// when "resuming" a consolidation run during incremental compaction.
352    lower_bound: Option<LowerBound<T>>,
353    // NB: this is the tricky part!
354    // One hazard of streaming consolidation is that we may start consolidating a particular KVT,
355    // but not be able to finish, because some other part that might also contain the same KVT
356    // may not have been fetched yet. The `drop_stash` gives us somewhere
357    // to store the streaming iterator's work-in-progress state between runs.
358    drop_stash: Option<StructuredUpdates>,
359}
360
361#[derive(Debug)]
362/// An owned lower bound type that can be exchanged for a SortKV and a timestamp.
363pub struct LowerBound<T> {
364    pub(crate) key_bound: ArrayBound,
365    pub(crate) val_bound: ArrayBound,
366    pub(crate) t: T,
367}
368
369impl<T: Clone> LowerBound<T> {
370    /// Get a reference to the key and value bounds, along with the timestamp.
371    pub fn kvt_bound(&self) -> (SortKV<'_>, T) {
372        (
373            (self.key_bound.get(), Some(self.val_bound.get())),
374            self.t.clone(),
375        )
376    }
377}
378
379impl<T, D, Sort> Consolidator<T, D, Sort>
380where
381    T: Timestamp + Codec64 + Lattice,
382    D: Codec64 + Semigroup + Ord,
383    Sort: RowSort<T, D>,
384{
385    /// Create a new [Self] instance with the given prefetch budget. This budget is a "soft limit"
386    /// on the size of the parts that the consolidator will fetch... we'll try and stay below the
387    /// limit, but may burst above it if that's necessary to make progress.
388    pub fn new(
389        context: String,
390        cfg: FetchConfig,
391        shard_id: ShardId,
392        sort: Sort,
393        blob: Arc<dyn Blob>,
394        metrics: Arc<Metrics>,
395        shard_metrics: Arc<ShardMetrics>,
396        read_metrics: ReadMetrics,
397        filter: FetchBatchFilter<T>,
398        lower_bound: Option<LowerBound<T>>,
399        prefetch_budget_bytes: usize,
400    ) -> Self {
401        Self {
402            context,
403            cfg,
404            metrics,
405            shard_id,
406            sort,
407            blob,
408            read_metrics: Arc::new(read_metrics),
409            shard_metrics,
410            runs: vec![],
411            filter,
412            budget: prefetch_budget_bytes,
413            drop_stash: None,
414            lower_bound,
415        }
416    }
417}
418
419impl<T, D, Sort> Consolidator<T, D, Sort>
420where
421    T: Timestamp + Codec64 + Lattice + Sync,
422    D: Codec64 + Semigroup + Ord,
423    Sort: RowSort<T, D>,
424{
425    /// Add another run of data to be consolidated.
426    ///
427    /// To ensure consolidation, every tuple in this run should be larger than any tuple already
428    /// returned from the iterator. At the moment, this invariant is not checked. The simplest way
429    /// to ensure this is to enqueue every run before any calls to next.
430    // TODO(bkirwi): enforce this invariant, either by forcing all runs to be pre-registered or with an assert.
431    pub fn enqueue_run(
432        &mut self,
433        desc: &Description<T>,
434        run_meta: &RunMeta,
435        parts: impl IntoIterator<Item = RunPart<T>>,
436    ) {
437        let run = parts
438            .into_iter()
439            .map(|part| {
440                let bytes = part.encoded_size_bytes();
441                let c_part = ConsolidationPart::Queued {
442                    data: FetchData {
443                        run_meta: run_meta.clone(),
444                        part_desc: desc.clone(),
445                        structured_lower: part.structured_key_lower(),
446                        part,
447                    },
448                    task: None,
449                    _diff: Default::default(),
450                };
451                (c_part, bytes)
452            })
453            .collect();
454        self.push_run(run);
455    }
456
457    fn push_run(&mut self, run: VecDeque<(ConsolidationPart<T, D>, usize)>) {
458        // Normally unconsolidated parts are in their own run, but we can end up with unconsolidated
459        // runs if we change our sort order or have bugs, for example. Defend against this by
460        // splitting up a run if it contains possibly-unconsolidated parts.
461        let wrong_sort = run.iter().any(|(p, _)| match p {
462            ConsolidationPart::Queued { data, .. } => {
463                data.run_meta.order != Some(RunOrder::Structured)
464            }
465            ConsolidationPart::Encoded { .. } => false,
466        });
467
468        if wrong_sort {
469            self.metrics.consolidation.wrong_sort.inc();
470        }
471
472        if run.len() > 1 && wrong_sort {
473            for part in run {
474                self.runs.push(VecDeque::from([part]));
475            }
476        } else {
477            self.runs.push(run);
478        }
479    }
480
481    /// Tidy up: discard any empty parts, and discard any runs that have no parts left.
482    fn trim(&mut self) {
483        self.runs.retain_mut(|run| {
484            while run.front_mut().map_or(false, |(part, _)| part.is_empty()) {
485                run.pop_front();
486            }
487            !run.is_empty()
488        });
489
490        // Some budget may have just been freed up: start prefetching.
491        self.start_prefetches();
492    }
493
494    /// Return an iterator over the next consolidated chunk of output, if there's any left.
495    ///
496    /// Requirement: at least the first part of each run should be fetched and nonempty.
497    fn iter(&mut self) -> Option<ConsolidatingIter<'_, T, D>> {
498        // If an incompletely-consolidated part has been stashed by the last iterator,
499        // push that into state as a new run.
500        // One might worry about the list of runs growing indefinitely, if we're adding a new
501        // run to the list every iteration... but since this part has the smallest tuples
502        // of any run, it should be fully processed by the next consolidation step.
503        if let Some(part) = self.drop_stash.take() {
504            self.runs.push(VecDeque::from_iter([(
505                ConsolidationPart::Encoded {
506                    part,
507                    cursor: PartIndices::default(),
508                },
509                0,
510            )]));
511        }
512
513        if self.runs.is_empty() {
514            return None;
515        }
516
517        let bound = self.lower_bound.as_ref().map(|b| b.kvt_bound());
518        let mut iter =
519            ConsolidatingIter::new(&self.context, &self.filter, bound, &mut self.drop_stash);
520
521        for run in &mut self.runs {
522            let last_in_run = run.len() < 2;
523            if let Some((part, _)) = run.front_mut() {
524                match part {
525                    ConsolidationPart::Encoded { part, cursor } => {
526                        iter.push(part, cursor, last_in_run);
527                    }
528                    other @ ConsolidationPart::Queued { .. } => {
529                        // We don't want the iterator to return anything at or above this bound,
530                        // since it might require data that we haven't fetched yet.
531                        if let Some(bound) = other.kvt_lower() {
532                            iter.push_upper(bound);
533                        }
534                    }
535                };
536            }
537        }
538
539        Some(iter)
540    }
541
542    /// We don't need to have fetched every part to make progress, but we do at least need
543    /// to have fetched _some_ parts: in particular, parts at the beginning of their runs
544    /// which may include the smallest remaining KVT.
545    ///
546    /// Returns success when we've successfully fetched enough parts to be able to make progress.
547    async fn unblock_progress(&mut self) -> anyhow::Result<()> {
548        if self.runs.is_empty() {
549            return Ok(());
550        }
551        self.runs
552            .sort_by(|a, b| a[0].0.kvt_lower().cmp(&b[0].0.kvt_lower()));
553
554        let first_larger = {
555            let run = &self.runs[0];
556            let min_lower = run[0].0.kvt_lower();
557            self.runs
558                .iter()
559                .position(|q| q[0].0.kvt_lower() > min_lower)
560                .unwrap_or(self.runs.len())
561        };
562
563        let mut ready_futures: FuturesUnordered<_> = self.runs[0..first_larger]
564            .iter_mut()
565            .map(|run| async {
566                // It's possible for there to be multiple layers of indirection between us and the first available encoded part:
567                // if the first part is a `HollowRuns`, we'll need to fetch both that and the first part in the run to have data
568                // to consolidate. So: we loop, and bail out of the loop when either the first part in the run is available or we
569                // hit some unrecoverable error.
570                loop {
571                    let (mut part, size) = run.pop_front().expect("trimmed run should be nonempty");
572
573                    let ConsolidationPart::Queued { data, task, .. } = &mut part else {
574                        run.push_front((part, size));
575                        return Ok(true);
576                    };
577
578                    let is_prefetched = task.as_ref().map_or(false, |t| t.is_finished());
579                    if is_prefetched {
580                        self.metrics.compaction.parts_prefetched.inc();
581                    } else {
582                        self.metrics.compaction.parts_waited.inc()
583                    }
584                    self.metrics.consolidation.parts_fetched.inc();
585
586                    let wrong_sort = data.run_meta.order != Some(RunOrder::Structured);
587                    let fetch_result: anyhow::Result<FetchResult<T>> = match task.take() {
588                        Some(handle) => handle
589                            .await
590                            .unwrap_or_else(|join_err| Err(anyhow!(join_err))),
591                        None => {
592                            data.clone()
593                                .fetch(
594                                    &self.cfg,
595                                    self.shard_id,
596                                    &*self.blob,
597                                    &*self.metrics,
598                                    &*self.shard_metrics,
599                                    &self.read_metrics,
600                                )
601                                .await
602                        }
603                    };
604                    match fetch_result {
605                        Err(err) => {
606                            run.push_front((part, size));
607                            return Err(err);
608                        }
609                        Ok(Err(run_part)) => {
610                            // Since we're pushing these onto the _front_ of the queue, we need to
611                            // iterate in reverse order.
612                            for part in run_part.parts.into_iter().rev() {
613                                let structured_lower = part.structured_key_lower();
614                                let size = part.max_part_bytes();
615                                run.push_front((
616                                    ConsolidationPart::Queued {
617                                        data: FetchData {
618                                            run_meta: data.run_meta.clone(),
619                                            part_desc: data.part_desc.clone(),
620                                            part,
621                                            structured_lower,
622                                        },
623                                        task: None,
624                                        _diff: Default::default(),
625                                    },
626                                    size,
627                                ));
628                            }
629                        }
630                        Ok(Ok(part)) => {
631                            run.push_front((
632                                ConsolidationPart::from_encoded(
633                                    part,
634                                    wrong_sort,
635                                    &self.metrics.columnar,
636                                    &self.sort,
637                                ),
638                                size,
639                            ));
640                        }
641                    }
642                }
643            })
644            .collect();
645
646        // Wait for all the needed parts to be fetched, and assert that there's at least one.
647        let mut total_ready = 0;
648        while let Some(awaited) = ready_futures.next().await {
649            if awaited? {
650                total_ready += 1;
651            }
652        }
653        assert!(
654            total_ready > 0,
655            "at least one part should be fetched and ready to go"
656        );
657
658        Ok(())
659    }
660
661    /// Wait until data is available, then return an iterator over the next
662    /// consolidated chunk of output. If this method returns `None`, that all the data has been
663    /// exhausted and the full consolidated dataset has been returned.
664    #[allow(unused)]
665    pub(crate) async fn next(
666        &mut self,
667    ) -> anyhow::Result<Option<impl Iterator<Item = (SortKV<'_>, T, D)>>> {
668        self.trim();
669        self.unblock_progress().await?;
670        Ok(self.iter().map(|i| i.map(|(_idx, kv, t, d)| (kv, t, d))))
671    }
672
673    fn chunk(&mut self, max_len: usize, max_bytes: usize) -> Option<Part> {
674        let Some(mut iter) = self.iter() else {
675            return None;
676        };
677
678        let parts = iter.parts.clone();
679
680        // Keep a running estimate of the size left in the budget, returning None once
681        // budget is 0.
682        // Note that we can't use take_while here - that method drops the first non-matching
683        // element, but we want to leave any data that we don't return in state for future
684        // calls to `next`/`next_chunk`.
685        let mut budget = max_bytes;
686        let iter = std::iter::from_fn(move || {
687            if budget == 0 {
688                return None;
689            }
690            let update @ (_, kv, _, _) = iter.next()?;
691            // Budget for the K/V size plus two 8-byte Codec64 values.
692            budget = budget.saturating_sub(kv_size(kv) + 16);
693            Some(update)
694        });
695
696        let updates = StructuredUpdates::interleave_updates(&parts, iter.take(max_len));
697        let updates = self.sort.updates_to_blob(updates);
698        Some(updates)
699    }
700
701    /// Wait until data is available, then return an iterator over the next
702    /// consolidated chunk of output. If this method returns `None`, that all the data has been
703    /// exhausted and the full consolidated dataset has been returned.
704    pub(crate) async fn next_chunk(
705        &mut self,
706        max_len: usize,
707        max_bytes: usize,
708    ) -> anyhow::Result<Option<Part>> {
709        self.trim();
710        self.unblock_progress().await?;
711        Ok(self.chunk(max_len, max_bytes))
712    }
713
714    /// The size of the data that we _might_ be holding concurrently in memory. While this is
715    /// normally kept less than the budget, it may burst over it temporarily, since we need at
716    /// least one part in every run to continue making progress.
717    fn live_bytes(&self) -> usize {
718        self.runs
719            .iter()
720            .flat_map(|run| {
721                run.iter().map(|(part, size)| match part {
722                    ConsolidationPart::Queued { task: None, .. } => 0,
723                    ConsolidationPart::Queued { task: Some(_), .. }
724                    | ConsolidationPart::Encoded { .. } => *size,
725                })
726            })
727            .sum()
728    }
729
730    /// Returns None if the budget was exhausted, or Some(remaining_bytes) if it is not.
731    pub(crate) fn start_prefetches(&mut self) -> Option<usize> {
732        let mut prefetch_budget_bytes = self.budget;
733
734        let mut check_budget = |size| {
735            // Subtract the amount from the budget, returning None if the budget is exhausted.
736            prefetch_budget_bytes
737                .checked_sub(size)
738                .map(|remaining| prefetch_budget_bytes = remaining)
739        };
740
741        // First account for how much budget has already been used
742        let live_bytes = self.live_bytes();
743        check_budget(live_bytes)?;
744        // Iterate through parts in a certain order (attempting to match the
745        // order in which they'll be fetched), prefetching until we run out of
746        // budget.
747        //
748        // The order used here is the first part of each run, then the second, etc.
749        // There's a bunch of heuristics we could use here, but we'd get it exactly
750        // correct if we stored on HollowBatchPart the actual kv bounds of data
751        // contained in each part and go in sorted order of that. This information
752        // would also be useful for pushing MFP down into persist reads, so it seems
753        // like we might want to do it at some point. As a result, don't think too
754        // hard about this heuristic at first.
755        let max_run_len = self.runs.iter().map(|x| x.len()).max().unwrap_or_default();
756        for idx in 0..max_run_len {
757            for run in self.runs.iter_mut() {
758                if let Some((c_part, size)) = run.get_mut(idx) {
759                    let (data, task) = match c_part {
760                        ConsolidationPart::Queued { data, task, .. } if task.is_none() => {
761                            check_budget(*size)?;
762                            (data, task)
763                        }
764                        _ => continue,
765                    };
766                    let span = debug_span!("compaction::prefetch");
767                    let data = data.clone();
768                    let handle = mz_ore::task::spawn(|| "persist::compaction::prefetch", {
769                        let shard_id = self.shard_id;
770                        let blob = Arc::clone(&self.blob);
771                        let metrics = Arc::clone(&self.metrics);
772                        let shard_metrics = Arc::clone(&self.shard_metrics);
773                        let read_metrics = Arc::clone(&self.read_metrics);
774                        let fetch_config = self.cfg.clone();
775                        async move {
776                            data.fetch(
777                                &fetch_config,
778                                shard_id,
779                                &*blob,
780                                &*metrics,
781                                &*shard_metrics,
782                                &*read_metrics,
783                            )
784                            .instrument(span)
785                            .await
786                        }
787                    });
788                    *task = Some(handle);
789                }
790            }
791        }
792
793        Some(prefetch_budget_bytes)
794    }
795}
796
797impl<T, D, Sort: RowSort<T, D>> Drop for Consolidator<T, D, Sort> {
798    fn drop(&mut self) {
799        for run in &self.runs {
800            for (part, _) in run {
801                match part {
802                    ConsolidationPart::Queued { task: None, .. } => {
803                        self.metrics.consolidation.parts_skipped.inc();
804                    }
805                    ConsolidationPart::Queued { task: Some(_), .. } => {
806                        self.metrics.consolidation.parts_wasted.inc();
807                    }
808                    _ => {}
809                }
810            }
811        }
812    }
813}
814
815/// A pair of indices, referencing a specific row in a specific part.
816/// In the consolidating iterator, this is used to track the coordinates of some part that
817/// holds a particular K and V.
818type Indices = (usize, usize);
819
820/// This is used as a max-heap entry: the ordering of the fields is important!
821#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
822struct PartRef<'a, T: Timestamp, D> {
823    /// The smallest KVT that might be emitted from this run in the future.
824    /// This is reverse-sorted: Nones will sort largest (and be popped first on the heap)
825    /// and smaller keys will be popped before larger keys.
826    next_kvt: Reverse<Option<(SortKV<'a>, T, D)>>,
827    /// The index of the corresponding part within the [ConsolidatingIter]'s list of parts.
828    part_index: usize,
829    /// The index of the next row within that part.
830    /// This is a mutable pointer to long-lived state; we must only advance this index once
831    /// we've rolled any rows before this index into our state.
832    row_index: &'a mut PartIndices,
833    /// Whether / not the iterator for the part is the last in its run, or whether there may be
834    /// iterators for the same part in the future.
835    last_in_run: bool,
836    _phantom: PhantomData<D>,
837}
838
839impl<'a, T: Timestamp + Codec64 + Lattice, D: Codec64 + Semigroup> PartRef<'a, T, D> {
840    fn update_peek(&mut self, part: &'a StructuredUpdates, filter: &FetchBatchFilter<T>) {
841        let mut peek = part.get(self.row_index.index());
842        while let Some((_kv, t, _d)) = &mut peek {
843            let keep = filter.filter_ts(t);
844            if keep {
845                break;
846            } else {
847                self.row_index.inc();
848                peek = part.get(self.row_index.index());
849            }
850        }
851        self.next_kvt = Reverse(peek);
852    }
853
854    fn pop(
855        &mut self,
856        from: &[&'a StructuredUpdates],
857        filter: &FetchBatchFilter<T>,
858    ) -> Option<(Indices, SortKV<'a>, T, D)> {
859        let part = &from[self.part_index];
860        let Reverse(popped) = mem::take(&mut self.next_kvt);
861        let indices = (self.part_index, self.row_index.index());
862        self.row_index.inc();
863        self.update_peek(part, filter);
864        let (kv, t, d) = popped?;
865        Some((indices, kv, t, d))
866    }
867}
868
869#[derive(Debug)]
870pub(crate) struct ConsolidatingIter<'a, T, D>
871where
872    T: Timestamp + Codec64,
873    D: Codec64,
874{
875    context: &'a str,
876    filter: &'a FetchBatchFilter<T>,
877    parts: Vec<&'a StructuredUpdates>,
878    heap: BinaryHeap<PartRef<'a, T, D>>,
879    upper_bound: Option<(SortKV<'a>, T)>,
880    lower_bound: Option<(SortKV<'a>, T)>,
881    state: Option<(Indices, SortKV<'a>, T, D)>,
882    drop_stash: &'a mut Option<StructuredUpdates>,
883}
884
885impl<'a, T, D> ConsolidatingIter<'a, T, D>
886where
887    T: Timestamp + Codec64 + Lattice,
888    D: Codec64 + Semigroup + Ord,
889{
890    fn new(
891        context: &'a str,
892        filter: &'a FetchBatchFilter<T>,
893        lower_bound: Option<(SortKV<'a>, T)>,
894        drop_stash: &'a mut Option<StructuredUpdates>,
895    ) -> Self {
896        Self {
897            context,
898            filter,
899            parts: vec![],
900            heap: BinaryHeap::new(),
901            upper_bound: None,
902            state: None,
903            drop_stash,
904            lower_bound,
905        }
906    }
907
908    fn push(&mut self, iter: &'a StructuredUpdates, index: &'a mut PartIndices, last_in_run: bool) {
909        let mut part_ref = PartRef {
910            next_kvt: Reverse(None),
911            part_index: self.parts.len(),
912            row_index: index,
913            last_in_run,
914            _phantom: Default::default(),
915        };
916        part_ref.update_peek(iter, self.filter);
917        self.parts.push(iter);
918        self.heap.push(part_ref);
919    }
920
921    /// Set an upper bound based on the stats from an unfetched part. If there's already
922    /// an upper bound set, keep the most conservative / smallest one.
923    fn push_upper(&mut self, upper: (SortKV<'a>, T)) {
924        let update_bound = self
925            .upper_bound
926            .as_ref()
927            .map_or(true, |existing| *existing > upper);
928        if update_bound {
929            self.upper_bound = Some(upper);
930        }
931    }
932
933    /// Attempt to consolidate as much into the current state as possible.
934    fn consolidate(&mut self) -> Option<(Indices, SortKV<'a>, T, D)> {
935        loop {
936            let Some(mut part) = self.heap.peek_mut() else {
937                break;
938            };
939            if let Some((kv1, t1, _)) = part.next_kvt.0.as_ref() {
940                if let Some((idx0, kv0, t0, d0)) = &mut self.state {
941                    let consolidates = match (*kv0, &*t0).cmp(&(*kv1, t1)) {
942                        Ordering::Less => false,
943                        Ordering::Equal => true,
944                        Ordering::Greater => {
945                            // Don't want to log the entire KV, but it's interesting to know
946                            // whether it's KVs going backwards or 'just' timestamps.
947                            panic!(
948                                "data arrived at the consolidator out of order ({}, kvs equal? {}, {t0:?}, {t1:?})",
949                                self.context,
950                                (*kv0) == (*kv1)
951                            );
952                        }
953                    };
954                    if consolidates {
955                        let (idx1, _, _, d1) = part
956                            .pop(&self.parts, self.filter)
957                            .expect("popping from a non-empty iterator");
958                        d0.plus_equals(&d1);
959                        *idx0 = idx1;
960                    } else {
961                        break;
962                    }
963                } else {
964                    // Don't start consolidating a new KVT that's past our provided upper bound,
965                    // since that data may also live in some unfetched part.
966                    if let Some((kv0, t0)) = &self.upper_bound {
967                        if (kv0, t0) <= (kv1, t1) {
968                            return None;
969                        }
970                    }
971
972                    // Discard this KVT if it does not strictly exceed the lower bound.
973
974                    if let Some((kv_lower, t_lower)) = &self.lower_bound {
975                        if (kv_lower, t_lower) >= (kv1, t1) {
976                            // Discard this item from the part, since it's past our lower bound.
977                            let _ = part.pop(&self.parts, self.filter);
978
979                            // Continue to the next part, since it might still be relevant.
980                            continue;
981                        }
982                    }
983
984                    self.state = part.pop(&self.parts, self.filter);
985                }
986            } else {
987                if part.last_in_run {
988                    PeekMut::pop(part);
989                } else {
990                    // There may be more instances of the KVT in a later part of the same run;
991                    // exit without returning the current state.
992                    return None;
993                }
994            }
995        }
996
997        self.state.take()
998    }
999}
1000
1001impl<'a, T, D> Iterator for ConsolidatingIter<'a, T, D>
1002where
1003    T: Timestamp + Codec64 + Lattice,
1004    D: Codec64 + Semigroup + Ord,
1005{
1006    type Item = (Indices, SortKV<'a>, T, D);
1007
1008    fn next(&mut self) -> Option<Self::Item> {
1009        loop {
1010            match self.consolidate() {
1011                Some((_, _, _, d)) if d.is_zero() => continue,
1012                other => break other,
1013            }
1014        }
1015    }
1016}
1017
1018impl<'a, T, D> Drop for ConsolidatingIter<'a, T, D>
1019where
1020    T: Timestamp + Codec64,
1021    D: Codec64,
1022{
1023    fn drop(&mut self) {
1024        // Make sure to stash any incomplete state in a place where we'll pick it up on the next run.
1025        // See the comment on `Consolidator` for more on why this is necessary.
1026        if let Some(update) = self.state.take() {
1027            let part = StructuredUpdates::interleave_updates(&self.parts, [update]);
1028            *self.drop_stash = Some(part);
1029        }
1030    }
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035    use super::*;
1036
1037    use std::sync::Arc;
1038
1039    use crate::ShardId;
1040    use crate::cfg::PersistConfig;
1041    use crate::internal::paths::PartialBatchKey;
1042    use crate::internal::state::{BatchPart, HollowBatchPart};
1043    use crate::metrics::Metrics;
1044    use arrow::array::BinaryArray;
1045    use differential_dataflow::consolidation::consolidate_updates;
1046    use differential_dataflow::trace::Description;
1047    use mz_ore::metrics::MetricsRegistry;
1048    use mz_persist::indexed::columnar::ColumnarRecordsBuilder;
1049    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1050    use mz_persist::location::Blob;
1051    use mz_persist::mem::{MemBlob, MemBlobConfig};
1052    use mz_persist_types::codec_impls::VecU8Schema;
1053    use mz_persist_types::part::PartBuilder;
1054    use proptest::collection::vec;
1055    use proptest::prelude::*;
1056    use timely::progress::Antichain;
1057
1058    #[mz_ore::test]
1059    #[cfg_attr(miri, ignore)] // too slow
1060    fn consolidation() {
1061        // Check that output consolidated via this logic matches output consolidated via timely's!
1062        type Rows = Vec<((Vec<u8>, Vec<u8>), u64, i64)>;
1063
1064        fn check(
1065            metrics: &Arc<Metrics>,
1066            parts: Vec<(Rows, usize)>,
1067            lower_bound: (Vec<u8>, Vec<u8>, u64),
1068        ) {
1069            let schemas = Schemas {
1070                id: None,
1071                key: Arc::new(VecU8Schema),
1072                val: Arc::new(VecU8Schema),
1073            };
1074            let original = {
1075                let mut rows = parts
1076                    .iter()
1077                    .flat_map(|(p, _)| p.clone())
1078                    .filter(|((k, v), t, _)| {
1079                        let (k_lower, v_lower, t_lower) = &lower_bound;
1080                        // Discard any row not strictly past the lower bound.
1081                        ((k_lower, v_lower), t_lower) < ((k, v), t)
1082                    })
1083                    .collect::<Vec<_>>();
1084
1085                consolidate_updates(&mut rows);
1086                let mut builder = PartBuilder::new(&*schemas.key, &*schemas.val);
1087                for ((k, v), t, d) in &rows {
1088                    builder.push(k, v, *t, *d);
1089                }
1090                let part = builder.finish();
1091                part
1092            };
1093            let filter = FetchBatchFilter::Compaction {
1094                since: Antichain::from_elem(0),
1095            };
1096            let desc = Description::new(
1097                Antichain::from_elem(0),
1098                Antichain::new(),
1099                Antichain::from_elem(0),
1100            );
1101            let key_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.0]);
1102            let val_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.1]);
1103            let lower_bound = LowerBound {
1104                key_bound: ArrayBound::new(Arc::new(key_lower_bound_array), 0),
1105                val_bound: ArrayBound::new(Arc::new(val_lower_bound_array), 0),
1106                t: lower_bound.2,
1107            };
1108            let sort: StructuredSort<Vec<u8>, Vec<u8>, u64, i64> =
1109                StructuredSort::new(schemas.clone());
1110            let streaming = {
1111                // Toy compaction loop!
1112                let fetch_cfg = FetchConfig {
1113                    validate_bounds_on_read: true,
1114                };
1115                let mut consolidator = Consolidator {
1116                    cfg: fetch_cfg.clone(),
1117                    context: "test".to_string(),
1118                    shard_id: ShardId::new(),
1119                    sort: sort.clone(),
1120                    blob: Arc::new(MemBlob::open(MemBlobConfig::default())),
1121                    metrics: Arc::clone(metrics),
1122                    shard_metrics: metrics.shards.shard(&ShardId::new(), "test"),
1123                    read_metrics: Arc::new(metrics.read.snapshot.clone()),
1124                    // Generated runs of data that are sorted, but not necessarily consolidated.
1125                    // This is because timestamp-advancement may cause us to have duplicate KVTs,
1126                    // including those that span runs.
1127                    runs: parts
1128                        .into_iter()
1129                        .map(|(mut part, cut)| {
1130                            part.sort();
1131                            let part_2 = part.split_off(cut.min(part.len()));
1132                            [part, part_2]
1133                                .into_iter()
1134                                .map(|part| {
1135                                    let mut records = ColumnarRecordsBuilder::default();
1136                                    for ((k, v), t, d) in &part {
1137                                        assert!(records.push((
1138                                            (k, v),
1139                                            u64::encode(t),
1140                                            i64::encode(d)
1141                                        )));
1142                                    }
1143                                    let part = EncodedPart::new(
1144                                        &fetch_cfg,
1145                                        metrics.read.snapshot.clone(),
1146                                        desc.clone(),
1147                                        "part",
1148                                        None,
1149                                        BlobTraceBatchPart {
1150                                            desc: desc.clone(),
1151                                            index: 0,
1152                                            updates: BlobTraceUpdates::Row(
1153                                                records.finish(&metrics.columnar),
1154                                            ),
1155                                        },
1156                                    );
1157                                    (
1158                                        ConsolidationPart::from_encoded(
1159                                            part,
1160                                            true,
1161                                            &metrics.columnar,
1162                                            &sort,
1163                                        ),
1164                                        0,
1165                                    )
1166                                })
1167                                .collect::<VecDeque<_>>()
1168                        })
1169                        .collect::<Vec<_>>(),
1170                    filter,
1171                    budget: 0,
1172                    drop_stash: None,
1173                    lower_bound: Some(lower_bound),
1174                };
1175
1176                let mut out = vec![];
1177                loop {
1178                    consolidator.trim();
1179                    let Some(chunk) = consolidator.chunk(1000, 1000) else {
1180                        break;
1181                    };
1182                    if chunk.len() > 0 {
1183                        out.push(chunk);
1184                    }
1185                }
1186                Part::concat(&out).expect("same schema")
1187            };
1188
1189            assert_eq!((original.len() > 0).then_some(original), streaming);
1190        }
1191
1192        let metrics = Arc::new(Metrics::new(
1193            &PersistConfig::new_for_tests(),
1194            &MetricsRegistry::new(),
1195        ));
1196
1197        // Restricting the ranges to help make sure we have frequent collisions
1198        let key_gen = (0..4usize).prop_map(|i| i.to_string().into_bytes()).boxed();
1199        let part_gen = vec(
1200            ((key_gen.clone(), key_gen.clone()), 0..10u64, -3..=3i64),
1201            0..10,
1202        );
1203        let kvt_gen = (key_gen.clone(), key_gen.clone(), 0..10u64);
1204        let run_gen = vec((part_gen, 0..10usize), 0..5);
1205        proptest!(|(state in run_gen, bound in kvt_gen)| {
1206            check(&metrics, state, bound)
1207        });
1208    }
1209
1210    #[mz_ore::test(tokio::test)]
1211    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1212    async fn prefetches() {
1213        fn check(budget: usize, runs: Vec<Vec<usize>>, prefetch_all: bool) {
1214            let desc = Description::new(
1215                Antichain::from_elem(0u64),
1216                Antichain::new(),
1217                Antichain::from_elem(0u64),
1218            );
1219
1220            let total_size: usize = runs.iter().flat_map(|run| run.iter().map(|p| *p)).sum();
1221
1222            let shard_id = ShardId::new();
1223            let blob: Arc<dyn Blob> = Arc::new(MemBlob::open(MemBlobConfig::default()));
1224            let metrics = Arc::new(Metrics::new(
1225                &PersistConfig::new_for_tests(),
1226                &MetricsRegistry::new(),
1227            ));
1228            let shard_metrics = metrics.shards.shard(&shard_id, "");
1229            let sort: StructuredSort<Vec<u8>, Vec<u8>, _, _> = StructuredSort::new(Schemas {
1230                id: None,
1231                key: Arc::new(VecU8Schema),
1232                val: Arc::new(VecU8Schema),
1233            });
1234
1235            let fetch_cfg = FetchConfig {
1236                validate_bounds_on_read: true,
1237            };
1238
1239            let mut consolidator: Consolidator<u64, i64, StructuredSort<_, _, _, _>> =
1240                Consolidator::new(
1241                    "test".to_string(),
1242                    fetch_cfg,
1243                    shard_id,
1244                    sort,
1245                    blob,
1246                    Arc::clone(&metrics),
1247                    shard_metrics,
1248                    metrics.read.batch_fetcher.clone(),
1249                    FetchBatchFilter::Compaction {
1250                        since: desc.since().clone(),
1251                    },
1252                    None,
1253                    budget,
1254                );
1255
1256            for run in runs {
1257                let parts: Vec<_> = run
1258                    .into_iter()
1259                    .map(|encoded_size_bytes| {
1260                        RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1261                            key: PartialBatchKey(
1262                                "n0000000/p00000000-0000-0000-0000-000000000000".into(),
1263                            ),
1264                            encoded_size_bytes,
1265                            key_lower: vec![],
1266                            structured_key_lower: None,
1267                            stats: None,
1268                            ts_rewrite: None,
1269                            diffs_sum: None,
1270                            format: None,
1271                            schema_id: None,
1272                            deprecated_schema_id: None,
1273                        }))
1274                    })
1275                    .collect();
1276                consolidator.enqueue_run(&desc, &RunMeta::default(), parts)
1277            }
1278
1279            // No matter what, the budget should be respected.
1280            let remaining = consolidator.start_prefetches();
1281            let live_bytes = consolidator.live_bytes();
1282            assert!(live_bytes <= budget, "budget should be respected");
1283            match remaining {
1284                None => assert!(live_bytes < total_size, "not all parts fetched"),
1285                Some(remaining) => assert_eq!(
1286                    live_bytes + remaining,
1287                    budget,
1288                    "remaining should match budget"
1289                ),
1290            }
1291
1292            if prefetch_all {
1293                // If we up the budget to match the total size, we should prefetch everything.
1294                consolidator.budget = total_size;
1295                assert_eq!(consolidator.start_prefetches(), Some(0));
1296            } else {
1297                // Let the consolidator drop without fetching everything to check the Drop
1298                // impl works when not all parts are prefetched.
1299            }
1300        }
1301
1302        let run_gen = vec(vec(0..20usize, 0..5usize), 0..5usize);
1303        proptest!(|(budget in 0..20usize, state in run_gen, prefetch_all in any::<bool>())| {
1304            check(budget, state, prefetch_all)
1305        });
1306    }
1307}