mz_persist_client/
iter.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code for iterating through one or more parts, including streaming consolidation.
11
12use std::cmp::{Ordering, Reverse};
13use std::collections::binary_heap::PeekMut;
14use std::collections::{BinaryHeap, VecDeque};
15use std::fmt::Debug;
16use std::marker::PhantomData;
17use std::mem;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use arrow::array::{Array, Int64Array};
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::StreamExt;
26use futures_util::stream::FuturesUnordered;
27use itertools::Itertools;
28use mz_ore::soft_assert_eq_or_log;
29use mz_ore::task::JoinHandle;
30use mz_persist::indexed::encoding::BlobTraceUpdates;
31use mz_persist::location::Blob;
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd};
34use mz_persist_types::columnar::data_type;
35use mz_persist_types::part::Part;
36use mz_persist_types::{Codec, Codec64};
37use semver::Version;
38use timely::progress::Timestamp;
39use tracing::{Instrument, debug_span};
40
41use crate::ShardId;
42use crate::fetch::{EncodedPart, FetchBatchFilter, FetchConfig};
43use crate::internal::encoding::Schemas;
44use crate::internal::metrics::{ReadMetrics, ShardMetrics};
45use crate::internal::state::{HollowRun, RunMeta, RunOrder, RunPart};
46use crate::metrics::Metrics;
47
48/// Versions prior to this had bugs in consolidation, or used a different sort. However,
49/// we can assume that consolidated parts at this version or higher were consolidated
50/// according to the current definition.
51pub const MINIMUM_CONSOLIDATED_VERSION: Version = Version::new(0, 67, 0);
52
53/// The data needed to fetch a batch part, bundled up to make it easy
54/// to send between threads.
55#[derive(Debug, Clone)]
56pub(crate) struct FetchData<T> {
57    run_meta: RunMeta,
58    part_desc: Description<T>,
59    part: RunPart<T>,
60    structured_lower: Option<ArrayBound>,
61}
62
63pub(crate) trait RowSort<T, D> {
64    fn updates_from_blob(&self, updates: BlobTraceUpdates) -> StructuredUpdates;
65
66    fn updates_to_blob(&self, updates: StructuredUpdates) -> Part;
67}
68
69fn interleave_updates<T: Codec64, D: Codec64>(
70    updates: &[&Part],
71    elements: impl IntoIterator<Item = (Indices, T, D)>,
72) -> Part {
73    let (indices, timestamps, diffs): (Vec<_>, Vec<_>, Vec<_>) = elements
74        .into_iter()
75        .map(|(idx, t, d)| {
76            (
77                idx,
78                i64::from_le_bytes(T::encode(&t)),
79                i64::from_le_bytes(D::encode(&d)),
80            )
81        })
82        .multiunzip();
83
84    let mut arrays: Vec<&dyn Array> = Vec::with_capacity(updates.len());
85    let mut interleave = |get_array: fn(&Part) -> &dyn Array| {
86        arrays.clear();
87        for part in updates {
88            arrays.push(get_array(part));
89        }
90        ::arrow::compute::interleave(arrays.as_slice(), &indices).expect("type-aligned input")
91    };
92
93    let key = interleave(|p| &*p.key);
94    let val = interleave(|p| &*p.val);
95    Part {
96        key,
97        val,
98        time: Int64Array::from(timestamps),
99        diff: Int64Array::from(diffs),
100    }
101}
102
103/// An opaque update set for use by StructuredSort.
104#[derive(Clone, Debug)]
105pub struct StructuredUpdates {
106    key_ord: ArrayOrd,
107    val_ord: ArrayOrd,
108    data: Part,
109}
110
111impl StructuredUpdates {
112    fn len(&self) -> usize {
113        self.data.len()
114    }
115
116    fn get<T: Codec64, D: Codec64>(&self, index: usize) -> Option<(SortKV<'_>, T, D)> {
117        let t = self.data.time.values().get(index)?.to_le_bytes();
118        let d = self.data.diff.values().get(index)?.to_le_bytes();
119        Some((
120            (self.key_ord.at(index), Some(self.val_ord.at(index))),
121            T::decode(t),
122            D::decode(d),
123        ))
124    }
125
126    fn interleave_updates<'a, T: Codec64, D: Codec64>(
127        updates: &[&'a StructuredUpdates],
128        elements: impl IntoIterator<Item = (Indices, SortKV<'a>, T, D)>,
129    ) -> StructuredUpdates {
130        let updates: Vec<_> = updates.iter().map(|u| &u.data).collect();
131        let interleaved = interleave_updates(
132            &updates,
133            elements.into_iter().map(|(idx, _, t, d)| (idx, t, d)),
134        );
135        let key_ord = ArrayOrd::new(interleaved.key.as_ref());
136        let val_ord = ArrayOrd::new(interleaved.val.as_ref());
137        StructuredUpdates {
138            key_ord,
139            val_ord,
140            data: interleaved,
141        }
142    }
143}
144
145/// Sort parts ordered by the codec-encoded key and value columns.
146#[derive(Debug, Clone)]
147pub struct StructuredSort<K: Codec, V: Codec, T, D> {
148    schemas: Schemas<K, V>,
149    _time_diff: PhantomData<fn(T, D)>,
150}
151
152impl<K: Codec, V: Codec, T, D> StructuredSort<K, V, T, D> {
153    /// A sort for structured data with the given schema.
154    pub fn new(schemas: Schemas<K, V>) -> Self {
155        Self {
156            schemas,
157            _time_diff: Default::default(),
158        }
159    }
160}
161
162type SortKV<'a> = (ArrayIdx<'a>, Option<ArrayIdx<'a>>);
163
164fn kv_lower<T>(data: &FetchData<T>) -> Option<SortKV<'_>> {
165    let key_idx = data.structured_lower.as_ref().map(|l| l.get())?;
166    Some((key_idx, None))
167}
168
169fn kv_size((key, value): SortKV<'_>) -> usize {
170    key.goodbytes() + value.map_or(0, |v| v.goodbytes())
171}
172
173impl<K: Codec, V: Codec, T: Codec64, D: Codec64> RowSort<T, D> for StructuredSort<K, V, T, D> {
174    fn updates_from_blob(&self, mut updates: BlobTraceUpdates) -> StructuredUpdates {
175        let structured = updates
176            .get_or_make_structured::<K, V>(&*self.schemas.key, &*self.schemas.val)
177            .clone();
178        soft_assert_eq_or_log!(
179            Some(structured.key.data_type()),
180            data_type::<K>(&*self.schemas.key).ok().as_ref(),
181            "migrated key type should match"
182        );
183        soft_assert_eq_or_log!(
184            Some(structured.val.data_type()),
185            data_type::<V>(&*self.schemas.val).ok().as_ref(),
186            "migrated val type should match"
187        );
188        let key_ord = ArrayOrd::new(&structured.key);
189        let val_ord = ArrayOrd::new(&structured.val);
190        StructuredUpdates {
191            key_ord,
192            val_ord,
193            data: Part {
194                key: structured.key,
195                val: structured.val,
196                time: updates.timestamps().clone(),
197                diff: updates.diffs().clone(),
198            },
199        }
200    }
201
202    fn updates_to_blob(&self, updates: StructuredUpdates) -> Part {
203        updates.data
204    }
205}
206
207type FetchResult<T> = Result<EncodedPart<T>, HollowRun<T>>;
208
209impl<T: Codec64 + Timestamp + Lattice> FetchData<T> {
210    async fn fetch(
211        self,
212        cfg: &FetchConfig,
213        shard_id: ShardId,
214        blob: &dyn Blob,
215        metrics: &Metrics,
216        shard_metrics: &ShardMetrics,
217        read_metrics: &ReadMetrics,
218    ) -> anyhow::Result<FetchResult<T>> {
219        match self.part {
220            RunPart::Single(part) => {
221                let part = EncodedPart::fetch(
222                    cfg,
223                    &shard_id,
224                    &*blob,
225                    metrics,
226                    shard_metrics,
227                    read_metrics,
228                    &self.part_desc,
229                    &part,
230                )
231                .await
232                .map_err(|blob_key| anyhow!("missing unleased key {blob_key}"))?;
233                Ok(Ok(part))
234            }
235            RunPart::Many(run_ref) => {
236                let runs = run_ref
237                    .get(shard_id, blob, metrics)
238                    .await
239                    .ok_or_else(|| anyhow!("missing run ref {}", run_ref.key))?;
240                Ok(Err(runs))
241            }
242        }
243    }
244}
245
246/// Indices into a part. For most parts, all we need is a single index to the current entry...
247/// but for parts that have never been consolidated, this would return entries in the "wrong"
248/// order, and it's expensive to re-sort the columnar data. Instead, we sort a list of indices
249/// and then use this helper to hand them out in the correct order.
250#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Default)]
251struct PartIndices {
252    sorted_indices: VecDeque<usize>,
253    next_index: usize,
254}
255
256impl PartIndices {
257    fn index(&self) -> usize {
258        self.sorted_indices
259            .front()
260            .copied()
261            .unwrap_or(self.next_index)
262    }
263
264    fn inc(&mut self) {
265        if self.sorted_indices.pop_front().is_none() {
266            self.next_index += 1;
267        }
268    }
269}
270
271#[derive(Debug)]
272enum ConsolidationPart<T, D> {
273    Queued {
274        data: FetchData<T>,
275        task: Option<JoinHandle<anyhow::Result<FetchResult<T>>>>,
276        _diff: PhantomData<D>,
277    },
278    Encoded {
279        part: StructuredUpdates,
280        cursor: PartIndices,
281    },
282}
283
284impl<T: Timestamp + Codec64 + Lattice, D: Codec64> ConsolidationPart<T, D> {
285    pub(crate) fn from_encoded(
286        part: EncodedPart<T>,
287        force_reconsolidation: bool,
288        metrics: &ColumnarMetrics,
289        sort: &impl RowSort<T, D>,
290    ) -> Self {
291        let reconsolidate = part.maybe_unconsolidated() || force_reconsolidation;
292        let updates = part.normalize(metrics);
293        let updates: StructuredUpdates = sort.updates_from_blob(updates);
294        let cursor = if reconsolidate {
295            let len = updates.len();
296            let mut indices: Vec<_> = (0..len).collect();
297
298            indices.sort_by_key(|i| updates.get::<T, D>(*i).map(|(kv, t, _d)| (kv, t)));
299
300            PartIndices {
301                sorted_indices: indices.into(),
302                next_index: len,
303            }
304        } else {
305            PartIndices::default()
306        };
307
308        ConsolidationPart::Encoded {
309            part: updates,
310            cursor,
311        }
312    }
313
314    fn kvt_lower(&self) -> Option<(SortKV<'_>, T)> {
315        match self {
316            ConsolidationPart::Queued { data, .. } => Some((kv_lower(data)?, T::minimum())),
317            ConsolidationPart::Encoded { part, cursor } => {
318                let (kv, t, _d) = part.get::<T, D>(cursor.index())?;
319                Some((kv, t))
320            }
321        }
322    }
323
324    /// This requires a mutable pointer because the cursor may need to scan ahead to find the next
325    /// valid record.
326    pub(crate) fn is_empty(&self) -> bool {
327        match self {
328            ConsolidationPart::Encoded { part, cursor, .. } => cursor.index() >= part.len(),
329            ConsolidationPart::Queued { .. } => false,
330        }
331    }
332}
333
334/// A tool for incrementally consolidating a persist shard.
335///
336/// The naive way to consolidate a Persist shard would be to fetch every part, then consolidate
337/// the whole thing. We'd like to improve on that in two ways:
338/// - Concurrency: we'd like to be able to start consolidating and returning results before every
339///   part is fetched. (And continue fetching while we're doing other work.)
340/// - Memory usage: we'd like to limit the number of parts we have in memory at once, dropping
341///   parts that are fully consolidated and fetching parts just a little before they're needed.
342///
343/// This interface supports this by consolidating in multiple steps. Each call to [Self::next]
344/// will do some housekeeping work -- prefetching needed parts, dropping any unneeded parts -- and
345/// return  an iterator over a consolidated subset of the data. To read an entire dataset, the
346/// client should call `next` until it returns `None`, which signals all data has been returned...
347/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
348#[derive(Debug)]
349pub(crate) struct Consolidator<T, D, Sort: RowSort<T, D>> {
350    context: String,
351    cfg: FetchConfig,
352    shard_id: ShardId,
353    sort: Sort,
354    blob: Arc<dyn Blob>,
355    metrics: Arc<Metrics>,
356    shard_metrics: Arc<ShardMetrics>,
357    read_metrics: Arc<ReadMetrics>,
358    runs: Vec<VecDeque<(ConsolidationPart<T, D>, usize)>>,
359    filter: FetchBatchFilter<T>,
360    budget: usize,
361    /// An optional exclusive lower bound for the KVTs that this consolidator will return.
362    /// This can be used to filter out KVTs that are not strictly larger than the bound
363    /// when "resuming" a consolidation run during incremental compaction.
364    lower_bound: Option<LowerBound<T>>,
365    // NB: this is the tricky part!
366    // One hazard of streaming consolidation is that we may start consolidating a particular KVT,
367    // but not be able to finish, because some other part that might also contain the same KVT
368    // may not have been fetched yet. The `drop_stash` gives us somewhere
369    // to store the streaming iterator's work-in-progress state between runs.
370    drop_stash: Option<StructuredUpdates>,
371}
372
373#[derive(Debug)]
374/// An owned lower bound type that can be exchanged for a SortKV and a timestamp.
375pub struct LowerBound<T> {
376    pub(crate) key_bound: ArrayBound,
377    pub(crate) val_bound: ArrayBound,
378    pub(crate) t: T,
379}
380
381impl<T: Clone> LowerBound<T> {
382    /// Get a reference to the key and value bounds, along with the timestamp.
383    pub fn kvt_bound(&self) -> (SortKV<'_>, T) {
384        (
385            (self.key_bound.get(), Some(self.val_bound.get())),
386            self.t.clone(),
387        )
388    }
389}
390
391impl<T, D, Sort> Consolidator<T, D, Sort>
392where
393    T: Timestamp + Codec64 + Lattice,
394    D: Codec64 + Monoid + Ord,
395    Sort: RowSort<T, D>,
396{
397    /// Create a new [Self] instance with the given prefetch budget. This budget is a "soft limit"
398    /// on the size of the parts that the consolidator will fetch... we'll try and stay below the
399    /// limit, but may burst above it if that's necessary to make progress.
400    pub fn new(
401        context: String,
402        cfg: FetchConfig,
403        shard_id: ShardId,
404        sort: Sort,
405        blob: Arc<dyn Blob>,
406        metrics: Arc<Metrics>,
407        shard_metrics: Arc<ShardMetrics>,
408        read_metrics: ReadMetrics,
409        filter: FetchBatchFilter<T>,
410        lower_bound: Option<LowerBound<T>>,
411        prefetch_budget_bytes: usize,
412    ) -> Self {
413        Self {
414            context,
415            cfg,
416            metrics,
417            shard_id,
418            sort,
419            blob,
420            read_metrics: Arc::new(read_metrics),
421            shard_metrics,
422            runs: vec![],
423            filter,
424            budget: prefetch_budget_bytes,
425            drop_stash: None,
426            lower_bound,
427        }
428    }
429}
430
431impl<T, D, Sort> Consolidator<T, D, Sort>
432where
433    T: Timestamp + Codec64 + Lattice + Sync,
434    D: Codec64 + Monoid + Ord,
435    Sort: RowSort<T, D>,
436{
437    /// Add another run of data to be consolidated.
438    ///
439    /// To ensure consolidation, every tuple in this run should be larger than any tuple already
440    /// returned from the iterator. At the moment, this invariant is not checked. The simplest way
441    /// to ensure this is to enqueue every run before any calls to next.
442    // TODO(bkirwi): enforce this invariant, either by forcing all runs to be pre-registered or with an assert.
443    pub fn enqueue_run(
444        &mut self,
445        desc: &Description<T>,
446        run_meta: &RunMeta,
447        parts: impl IntoIterator<Item = RunPart<T>>,
448    ) {
449        let run = parts
450            .into_iter()
451            .map(|part| {
452                let bytes = part.encoded_size_bytes();
453                let c_part = ConsolidationPart::Queued {
454                    data: FetchData {
455                        run_meta: run_meta.clone(),
456                        part_desc: desc.clone(),
457                        structured_lower: part.structured_key_lower(),
458                        part,
459                    },
460                    task: None,
461                    _diff: Default::default(),
462                };
463                (c_part, bytes)
464            })
465            .collect();
466        self.push_run(run);
467    }
468
469    fn push_run(&mut self, run: VecDeque<(ConsolidationPart<T, D>, usize)>) {
470        // Normally unconsolidated parts are in their own run, but we can end up with unconsolidated
471        // runs if we change our sort order or have bugs, for example. Defend against this by
472        // splitting up a run if it contains possibly-unconsolidated parts.
473        let wrong_sort = run.iter().any(|(p, _)| match p {
474            ConsolidationPart::Queued { data, .. } => {
475                data.run_meta.order != Some(RunOrder::Structured)
476            }
477            ConsolidationPart::Encoded { .. } => false,
478        });
479
480        if wrong_sort {
481            self.metrics.consolidation.wrong_sort.inc();
482        }
483
484        if run.len() > 1 && wrong_sort {
485            for part in run {
486                self.runs.push(VecDeque::from([part]));
487            }
488        } else {
489            self.runs.push(run);
490        }
491    }
492
493    /// Tidy up: discard any empty parts, and discard any runs that have no parts left.
494    fn trim(&mut self) {
495        self.runs.retain_mut(|run| {
496            while run.front_mut().map_or(false, |(part, _)| part.is_empty()) {
497                run.pop_front();
498            }
499            !run.is_empty()
500        });
501
502        // Some budget may have just been freed up: start prefetching.
503        self.start_prefetches();
504    }
505
506    /// Return an iterator over the next consolidated chunk of output, if there's any left.
507    ///
508    /// Requirement: at least the first part of each run should be fetched and nonempty.
509    fn iter(&mut self) -> Option<ConsolidatingIter<'_, T, D>> {
510        // If an incompletely-consolidated part has been stashed by the last iterator,
511        // push that into state as a new run.
512        // One might worry about the list of runs growing indefinitely, if we're adding a new
513        // run to the list every iteration... but since this part has the smallest tuples
514        // of any run, it should be fully processed by the next consolidation step.
515        if let Some(part) = self.drop_stash.take() {
516            self.runs.push(VecDeque::from_iter([(
517                ConsolidationPart::Encoded {
518                    part,
519                    cursor: PartIndices::default(),
520                },
521                0,
522            )]));
523        }
524
525        if self.runs.is_empty() {
526            return None;
527        }
528
529        let bound = self.lower_bound.as_ref().map(|b| b.kvt_bound());
530        let mut iter =
531            ConsolidatingIter::new(&self.context, &self.filter, bound, &mut self.drop_stash);
532
533        for run in &mut self.runs {
534            let last_in_run = run.len() < 2;
535            if let Some((part, _)) = run.front_mut() {
536                match part {
537                    ConsolidationPart::Encoded { part, cursor } => {
538                        iter.push(part, cursor, last_in_run);
539                    }
540                    other @ ConsolidationPart::Queued { .. } => {
541                        // We don't want the iterator to return anything at or above this bound,
542                        // since it might require data that we haven't fetched yet.
543                        if let Some(bound) = other.kvt_lower() {
544                            iter.push_upper(bound);
545                        }
546                    }
547                };
548            }
549        }
550
551        Some(iter)
552    }
553
554    /// We don't need to have fetched every part to make progress, but we do at least need
555    /// to have fetched _some_ parts: in particular, parts at the beginning of their runs
556    /// which may include the smallest remaining KVT.
557    ///
558    /// Returns success when we've successfully fetched enough parts to be able to make progress.
559    async fn unblock_progress(&mut self) -> anyhow::Result<()> {
560        if self.runs.is_empty() {
561            return Ok(());
562        }
563        self.runs
564            .sort_by(|a, b| a[0].0.kvt_lower().cmp(&b[0].0.kvt_lower()));
565
566        let first_larger = {
567            let run = &self.runs[0];
568            let min_lower = run[0].0.kvt_lower();
569            self.runs
570                .iter()
571                .position(|q| q[0].0.kvt_lower() > min_lower)
572                .unwrap_or(self.runs.len())
573        };
574
575        let mut ready_futures: FuturesUnordered<_> = self.runs[0..first_larger]
576            .iter_mut()
577            .map(|run| async {
578                // It's possible for there to be multiple layers of indirection between us and the first available encoded part:
579                // if the first part is a `HollowRuns`, we'll need to fetch both that and the first part in the run to have data
580                // to consolidate. So: we loop, and bail out of the loop when either the first part in the run is available or we
581                // hit some unrecoverable error.
582                loop {
583                    let (mut part, size) = run.pop_front().expect("trimmed run should be nonempty");
584
585                    let ConsolidationPart::Queued { data, task, .. } = &mut part else {
586                        run.push_front((part, size));
587                        return Ok(true);
588                    };
589
590                    let is_prefetched = task.as_ref().map_or(false, |t| t.is_finished());
591                    if is_prefetched {
592                        self.metrics.compaction.parts_prefetched.inc();
593                    } else {
594                        self.metrics.compaction.parts_waited.inc()
595                    }
596                    self.metrics.consolidation.parts_fetched.inc();
597
598                    let wrong_sort = data.run_meta.order != Some(RunOrder::Structured);
599                    let fetch_result: anyhow::Result<FetchResult<T>> = match task.take() {
600                        Some(handle) => handle
601                            .await
602                            .unwrap_or_else(|join_err| Err(anyhow!(join_err))),
603                        None => {
604                            data.clone()
605                                .fetch(
606                                    &self.cfg,
607                                    self.shard_id,
608                                    &*self.blob,
609                                    &*self.metrics,
610                                    &*self.shard_metrics,
611                                    &self.read_metrics,
612                                )
613                                .await
614                        }
615                    };
616                    match fetch_result {
617                        Err(err) => {
618                            run.push_front((part, size));
619                            return Err(err);
620                        }
621                        Ok(Err(run_part)) => {
622                            // Since we're pushing these onto the _front_ of the queue, we need to
623                            // iterate in reverse order.
624                            for part in run_part.parts.into_iter().rev() {
625                                let structured_lower = part.structured_key_lower();
626                                let size = part.max_part_bytes();
627                                run.push_front((
628                                    ConsolidationPart::Queued {
629                                        data: FetchData {
630                                            run_meta: data.run_meta.clone(),
631                                            part_desc: data.part_desc.clone(),
632                                            part,
633                                            structured_lower,
634                                        },
635                                        task: None,
636                                        _diff: Default::default(),
637                                    },
638                                    size,
639                                ));
640                            }
641                        }
642                        Ok(Ok(part)) => {
643                            run.push_front((
644                                ConsolidationPart::from_encoded(
645                                    part,
646                                    wrong_sort,
647                                    &self.metrics.columnar,
648                                    &self.sort,
649                                ),
650                                size,
651                            ));
652                        }
653                    }
654                }
655            })
656            .collect();
657
658        // Wait for all the needed parts to be fetched, and assert that there's at least one.
659        let mut total_ready = 0;
660        while let Some(awaited) = ready_futures.next().await {
661            if awaited? {
662                total_ready += 1;
663            }
664        }
665        assert!(
666            total_ready > 0,
667            "at least one part should be fetched and ready to go"
668        );
669
670        Ok(())
671    }
672
673    /// Wait until data is available, then return an iterator over the next
674    /// consolidated chunk of output. If this method returns `None`, that all the data has been
675    /// exhausted and the full consolidated dataset has been returned.
676    #[allow(unused)]
677    pub(crate) async fn next(
678        &mut self,
679    ) -> anyhow::Result<Option<impl Iterator<Item = (SortKV<'_>, T, D)>>> {
680        self.trim();
681        self.unblock_progress().await?;
682        Ok(self.iter().map(|i| i.map(|(_idx, kv, t, d)| (kv, t, d))))
683    }
684
685    fn chunk(&mut self, max_len: usize, max_bytes: usize) -> Option<Part> {
686        let Some(mut iter) = self.iter() else {
687            return None;
688        };
689
690        let parts = iter.parts.clone();
691
692        // Keep a running estimate of the size left in the budget, returning None once
693        // budget is 0.
694        // Note that we can't use take_while here - that method drops the first non-matching
695        // element, but we want to leave any data that we don't return in state for future
696        // calls to `next`/`next_chunk`.
697        let mut budget = max_bytes;
698        let iter = std::iter::from_fn(move || {
699            if budget == 0 {
700                return None;
701            }
702            let update @ (_, kv, _, _) = iter.next()?;
703            // Budget for the K/V size plus two 8-byte Codec64 values.
704            budget = budget.saturating_sub(kv_size(kv) + 16);
705            Some(update)
706        });
707
708        let updates = StructuredUpdates::interleave_updates(&parts, iter.take(max_len));
709        let updates = self.sort.updates_to_blob(updates);
710        Some(updates)
711    }
712
713    /// Wait until data is available, then return an iterator over the next
714    /// consolidated chunk of output. If this method returns `None`, that all the data has been
715    /// exhausted and the full consolidated dataset has been returned.
716    pub(crate) async fn next_chunk(
717        &mut self,
718        max_len: usize,
719        max_bytes: usize,
720    ) -> anyhow::Result<Option<Part>> {
721        self.trim();
722        self.unblock_progress().await?;
723        Ok(self.chunk(max_len, max_bytes))
724    }
725
726    /// The size of the data that we _might_ be holding concurrently in memory. While this is
727    /// normally kept less than the budget, it may burst over it temporarily, since we need at
728    /// least one part in every run to continue making progress.
729    fn live_bytes(&self) -> usize {
730        self.runs
731            .iter()
732            .flat_map(|run| {
733                run.iter().map(|(part, size)| match part {
734                    ConsolidationPart::Queued { task: None, .. } => 0,
735                    ConsolidationPart::Queued { task: Some(_), .. }
736                    | ConsolidationPart::Encoded { .. } => *size,
737                })
738            })
739            .sum()
740    }
741
742    /// Returns None if the budget was exhausted, or Some(remaining_bytes) if it is not.
743    pub(crate) fn start_prefetches(&mut self) -> Option<usize> {
744        let mut prefetch_budget_bytes = self.budget;
745
746        let mut check_budget = |size| {
747            // Subtract the amount from the budget, returning None if the budget is exhausted.
748            prefetch_budget_bytes
749                .checked_sub(size)
750                .map(|remaining| prefetch_budget_bytes = remaining)
751        };
752
753        // First account for how much budget has already been used
754        let live_bytes = self.live_bytes();
755        check_budget(live_bytes)?;
756        // Iterate through parts in a certain order (attempting to match the
757        // order in which they'll be fetched), prefetching until we run out of
758        // budget.
759        //
760        // The order used here is the first part of each run, then the second, etc.
761        // There's a bunch of heuristics we could use here, but we'd get it exactly
762        // correct if we stored on HollowBatchPart the actual kv bounds of data
763        // contained in each part and go in sorted order of that. This information
764        // would also be useful for pushing MFP down into persist reads, so it seems
765        // like we might want to do it at some point. As a result, don't think too
766        // hard about this heuristic at first.
767        let max_run_len = self.runs.iter().map(|x| x.len()).max().unwrap_or_default();
768        for idx in 0..max_run_len {
769            for run in self.runs.iter_mut() {
770                if let Some((c_part, size)) = run.get_mut(idx) {
771                    let (data, task) = match c_part {
772                        ConsolidationPart::Queued { data, task, .. } if task.is_none() => {
773                            check_budget(*size)?;
774                            (data, task)
775                        }
776                        _ => continue,
777                    };
778                    let span = debug_span!("compaction::prefetch");
779                    let data = data.clone();
780                    let handle = mz_ore::task::spawn(|| "persist::compaction::prefetch", {
781                        let shard_id = self.shard_id;
782                        let blob = Arc::clone(&self.blob);
783                        let metrics = Arc::clone(&self.metrics);
784                        let shard_metrics = Arc::clone(&self.shard_metrics);
785                        let read_metrics = Arc::clone(&self.read_metrics);
786                        let fetch_config = self.cfg.clone();
787                        async move {
788                            data.fetch(
789                                &fetch_config,
790                                shard_id,
791                                &*blob,
792                                &*metrics,
793                                &*shard_metrics,
794                                &*read_metrics,
795                            )
796                            .instrument(span)
797                            .await
798                        }
799                    });
800                    *task = Some(handle);
801                }
802            }
803        }
804
805        Some(prefetch_budget_bytes)
806    }
807}
808
809impl<T, D, Sort: RowSort<T, D>> Drop for Consolidator<T, D, Sort> {
810    fn drop(&mut self) {
811        for run in &self.runs {
812            for (part, _) in run {
813                match part {
814                    ConsolidationPart::Queued { task: None, .. } => {
815                        self.metrics.consolidation.parts_skipped.inc();
816                    }
817                    ConsolidationPart::Queued { task: Some(_), .. } => {
818                        self.metrics.consolidation.parts_wasted.inc();
819                    }
820                    _ => {}
821                }
822            }
823        }
824    }
825}
826
827/// A pair of indices, referencing a specific row in a specific part.
828/// In the consolidating iterator, this is used to track the coordinates of some part that
829/// holds a particular K and V.
830type Indices = (usize, usize);
831
832/// This is used as a max-heap entry: the ordering of the fields is important!
833#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
834struct PartRef<'a, T: Timestamp, D> {
835    /// The smallest KVT that might be emitted from this run in the future.
836    /// This is reverse-sorted: Nones will sort largest (and be popped first on the heap)
837    /// and smaller keys will be popped before larger keys.
838    next_kvt: Reverse<Option<(SortKV<'a>, T, D)>>,
839    /// The index of the corresponding part within the [ConsolidatingIter]'s list of parts.
840    part_index: usize,
841    /// The index of the next row within that part.
842    /// This is a mutable pointer to long-lived state; we must only advance this index once
843    /// we've rolled any rows before this index into our state.
844    row_index: &'a mut PartIndices,
845    /// Whether / not the iterator for the part is the last in its run, or whether there may be
846    /// iterators for the same part in the future.
847    last_in_run: bool,
848    _phantom: PhantomData<D>,
849}
850
851impl<'a, T: Timestamp + Codec64 + Lattice, D: Codec64 + Monoid> PartRef<'a, T, D> {
852    fn update_peek(&mut self, part: &'a StructuredUpdates, filter: &FetchBatchFilter<T>) {
853        let mut peek = part.get(self.row_index.index());
854        while let Some((_kv, t, _d)) = &mut peek {
855            let keep = filter.filter_ts(t);
856            if keep {
857                break;
858            } else {
859                self.row_index.inc();
860                peek = part.get(self.row_index.index());
861            }
862        }
863        self.next_kvt = Reverse(peek);
864    }
865
866    fn pop(
867        &mut self,
868        from: &[&'a StructuredUpdates],
869        filter: &FetchBatchFilter<T>,
870    ) -> Option<(Indices, SortKV<'a>, T, D)> {
871        let part = &from[self.part_index];
872        let Reverse(popped) = mem::take(&mut self.next_kvt);
873        let indices = (self.part_index, self.row_index.index());
874        self.row_index.inc();
875        self.update_peek(part, filter);
876        let (kv, t, d) = popped?;
877        Some((indices, kv, t, d))
878    }
879}
880
881#[derive(Debug)]
882pub(crate) struct ConsolidatingIter<'a, T, D>
883where
884    T: Timestamp + Codec64,
885    D: Codec64,
886{
887    context: &'a str,
888    filter: &'a FetchBatchFilter<T>,
889    parts: Vec<&'a StructuredUpdates>,
890    heap: BinaryHeap<PartRef<'a, T, D>>,
891    upper_bound: Option<(SortKV<'a>, T)>,
892    lower_bound: Option<(SortKV<'a>, T)>,
893    state: Option<(Indices, SortKV<'a>, T, D)>,
894    drop_stash: &'a mut Option<StructuredUpdates>,
895}
896
897impl<'a, T, D> ConsolidatingIter<'a, T, D>
898where
899    T: Timestamp + Codec64 + Lattice,
900    D: Codec64 + Monoid + Ord,
901{
902    fn new(
903        context: &'a str,
904        filter: &'a FetchBatchFilter<T>,
905        lower_bound: Option<(SortKV<'a>, T)>,
906        drop_stash: &'a mut Option<StructuredUpdates>,
907    ) -> Self {
908        Self {
909            context,
910            filter,
911            parts: vec![],
912            heap: BinaryHeap::new(),
913            upper_bound: None,
914            state: None,
915            drop_stash,
916            lower_bound,
917        }
918    }
919
920    fn push(&mut self, iter: &'a StructuredUpdates, index: &'a mut PartIndices, last_in_run: bool) {
921        let mut part_ref = PartRef {
922            next_kvt: Reverse(None),
923            part_index: self.parts.len(),
924            row_index: index,
925            last_in_run,
926            _phantom: Default::default(),
927        };
928        part_ref.update_peek(iter, self.filter);
929        self.parts.push(iter);
930        self.heap.push(part_ref);
931    }
932
933    /// Set an upper bound based on the stats from an unfetched part. If there's already
934    /// an upper bound set, keep the most conservative / smallest one.
935    fn push_upper(&mut self, upper: (SortKV<'a>, T)) {
936        let update_bound = self
937            .upper_bound
938            .as_ref()
939            .map_or(true, |existing| *existing > upper);
940        if update_bound {
941            self.upper_bound = Some(upper);
942        }
943    }
944
945    /// Attempt to consolidate as much into the current state as possible.
946    fn consolidate(&mut self) -> Option<(Indices, SortKV<'a>, T, D)> {
947        loop {
948            let Some(mut part) = self.heap.peek_mut() else {
949                break;
950            };
951            if let Some((kv1, t1, _)) = part.next_kvt.0.as_ref() {
952                if let Some((idx0, kv0, t0, d0)) = &mut self.state {
953                    let consolidates = match (*kv0, &*t0).cmp(&(*kv1, t1)) {
954                        Ordering::Less => false,
955                        Ordering::Equal => true,
956                        Ordering::Greater => {
957                            // Don't want to log the entire KV, but it's interesting to know
958                            // whether it's KVs going backwards or 'just' timestamps.
959                            panic!(
960                                "data arrived at the consolidator out of order ({}, kvs equal? {}, {t0:?}, {t1:?})",
961                                self.context,
962                                (*kv0) == (*kv1)
963                            );
964                        }
965                    };
966                    if consolidates {
967                        let (idx1, _, _, d1) = part
968                            .pop(&self.parts, self.filter)
969                            .expect("popping from a non-empty iterator");
970                        d0.plus_equals(&d1);
971                        *idx0 = idx1;
972                    } else {
973                        break;
974                    }
975                } else {
976                    // Don't start consolidating a new KVT that's past our provided upper bound,
977                    // since that data may also live in some unfetched part.
978                    if let Some((kv0, t0)) = &self.upper_bound {
979                        if (kv0, t0) <= (kv1, t1) {
980                            return None;
981                        }
982                    }
983
984                    // Discard this KVT if it does not strictly exceed the lower bound.
985
986                    if let Some((kv_lower, t_lower)) = &self.lower_bound {
987                        if (kv_lower, t_lower) >= (kv1, t1) {
988                            // Discard this item from the part, since it's past our lower bound.
989                            let _ = part.pop(&self.parts, self.filter);
990
991                            // Continue to the next part, since it might still be relevant.
992                            continue;
993                        }
994                    }
995
996                    self.state = part.pop(&self.parts, self.filter);
997                }
998            } else {
999                if part.last_in_run {
1000                    PeekMut::pop(part);
1001                } else {
1002                    // There may be more instances of the KVT in a later part of the same run;
1003                    // exit without returning the current state.
1004                    return None;
1005                }
1006            }
1007        }
1008
1009        self.state.take()
1010    }
1011}
1012
1013impl<'a, T, D> Iterator for ConsolidatingIter<'a, T, D>
1014where
1015    T: Timestamp + Codec64 + Lattice,
1016    D: Codec64 + Monoid + Ord,
1017{
1018    type Item = (Indices, SortKV<'a>, T, D);
1019
1020    fn next(&mut self) -> Option<Self::Item> {
1021        loop {
1022            match self.consolidate() {
1023                Some((_, _, _, d)) if d.is_zero() => continue,
1024                other => break other,
1025            }
1026        }
1027    }
1028}
1029
1030impl<'a, T, D> Drop for ConsolidatingIter<'a, T, D>
1031where
1032    T: Timestamp + Codec64,
1033    D: Codec64,
1034{
1035    fn drop(&mut self) {
1036        // Make sure to stash any incomplete state in a place where we'll pick it up on the next run.
1037        // See the comment on `Consolidator` for more on why this is necessary.
1038        if let Some(update) = self.state.take() {
1039            let part = StructuredUpdates::interleave_updates(&self.parts, [update]);
1040            *self.drop_stash = Some(part);
1041        }
1042    }
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047    use super::*;
1048
1049    use std::sync::Arc;
1050
1051    use crate::ShardId;
1052    use crate::cfg::PersistConfig;
1053    use crate::internal::paths::PartialBatchKey;
1054    use crate::internal::state::{BatchPart, HollowBatchPart};
1055    use crate::metrics::Metrics;
1056    use arrow::array::BinaryArray;
1057    use differential_dataflow::consolidation::consolidate_updates;
1058    use differential_dataflow::trace::Description;
1059    use mz_ore::metrics::MetricsRegistry;
1060    use mz_persist::indexed::columnar::ColumnarRecordsBuilder;
1061    use mz_persist::indexed::encoding::BlobTraceBatchPart;
1062    use mz_persist::location::Blob;
1063    use mz_persist::mem::{MemBlob, MemBlobConfig};
1064    use mz_persist_types::codec_impls::VecU8Schema;
1065    use mz_persist_types::part::PartBuilder;
1066    use proptest::collection::vec;
1067    use proptest::prelude::*;
1068    use timely::progress::Antichain;
1069
1070    #[mz_ore::test]
1071    #[cfg_attr(miri, ignore)] // too slow
1072    fn consolidation() {
1073        // Check that output consolidated via this logic matches output consolidated via timely's!
1074        type Rows = Vec<((Vec<u8>, Vec<u8>), u64, i64)>;
1075
1076        fn check(
1077            metrics: &Arc<Metrics>,
1078            parts: Vec<(Rows, usize)>,
1079            lower_bound: (Vec<u8>, Vec<u8>, u64),
1080        ) {
1081            let schemas = Schemas {
1082                id: None,
1083                key: Arc::new(VecU8Schema),
1084                val: Arc::new(VecU8Schema),
1085            };
1086            let original = {
1087                let mut rows = parts
1088                    .iter()
1089                    .flat_map(|(p, _)| p.clone())
1090                    .filter(|((k, v), t, _)| {
1091                        let (k_lower, v_lower, t_lower) = &lower_bound;
1092                        // Discard any row not strictly past the lower bound.
1093                        ((k_lower, v_lower), t_lower) < ((k, v), t)
1094                    })
1095                    .collect::<Vec<_>>();
1096
1097                consolidate_updates(&mut rows);
1098                let mut builder = PartBuilder::new(&*schemas.key, &*schemas.val);
1099                for ((k, v), t, d) in &rows {
1100                    builder.push(k, v, *t, *d);
1101                }
1102                let part = builder.finish();
1103                part
1104            };
1105            let filter = FetchBatchFilter::Compaction {
1106                since: Antichain::from_elem(0),
1107            };
1108            let desc = Description::new(
1109                Antichain::from_elem(0),
1110                Antichain::new(),
1111                Antichain::from_elem(0),
1112            );
1113            let key_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.0]);
1114            let val_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.1]);
1115            let lower_bound = LowerBound {
1116                key_bound: ArrayBound::new(Arc::new(key_lower_bound_array), 0),
1117                val_bound: ArrayBound::new(Arc::new(val_lower_bound_array), 0),
1118                t: lower_bound.2,
1119            };
1120            let sort: StructuredSort<Vec<u8>, Vec<u8>, u64, i64> =
1121                StructuredSort::new(schemas.clone());
1122            let streaming = {
1123                // Toy compaction loop!
1124                let fetch_cfg = FetchConfig {
1125                    validate_bounds_on_read: true,
1126                };
1127                let mut consolidator = Consolidator {
1128                    cfg: fetch_cfg.clone(),
1129                    context: "test".to_string(),
1130                    shard_id: ShardId::new(),
1131                    sort: sort.clone(),
1132                    blob: Arc::new(MemBlob::open(MemBlobConfig::default())),
1133                    metrics: Arc::clone(metrics),
1134                    shard_metrics: metrics.shards.shard(&ShardId::new(), "test"),
1135                    read_metrics: Arc::new(metrics.read.snapshot.clone()),
1136                    // Generated runs of data that are sorted, but not necessarily consolidated.
1137                    // This is because timestamp-advancement may cause us to have duplicate KVTs,
1138                    // including those that span runs.
1139                    runs: parts
1140                        .into_iter()
1141                        .map(|(mut part, cut)| {
1142                            part.sort();
1143                            let part_2 = part.split_off(cut.min(part.len()));
1144                            [part, part_2]
1145                                .into_iter()
1146                                .map(|part| {
1147                                    let mut records = ColumnarRecordsBuilder::default();
1148                                    for ((k, v), t, d) in &part {
1149                                        assert!(records.push((
1150                                            (k, v),
1151                                            u64::encode(t),
1152                                            i64::encode(d)
1153                                        )));
1154                                    }
1155                                    let part = EncodedPart::new(
1156                                        &fetch_cfg,
1157                                        metrics.read.snapshot.clone(),
1158                                        desc.clone(),
1159                                        "part",
1160                                        None,
1161                                        BlobTraceBatchPart {
1162                                            desc: desc.clone(),
1163                                            index: 0,
1164                                            updates: BlobTraceUpdates::Row(
1165                                                records.finish(&metrics.columnar),
1166                                            ),
1167                                        },
1168                                    );
1169                                    (
1170                                        ConsolidationPart::from_encoded(
1171                                            part,
1172                                            true,
1173                                            &metrics.columnar,
1174                                            &sort,
1175                                        ),
1176                                        0,
1177                                    )
1178                                })
1179                                .collect::<VecDeque<_>>()
1180                        })
1181                        .collect::<Vec<_>>(),
1182                    filter,
1183                    budget: 0,
1184                    drop_stash: None,
1185                    lower_bound: Some(lower_bound),
1186                };
1187
1188                let mut out = vec![];
1189                loop {
1190                    consolidator.trim();
1191                    let Some(chunk) = consolidator.chunk(1000, 1000) else {
1192                        break;
1193                    };
1194                    if chunk.len() > 0 {
1195                        out.push(chunk);
1196                    }
1197                }
1198                Part::concat(&out).expect("same schema")
1199            };
1200
1201            assert_eq!((original.len() > 0).then_some(original), streaming);
1202        }
1203
1204        let metrics = Arc::new(Metrics::new(
1205            &PersistConfig::new_for_tests(),
1206            &MetricsRegistry::new(),
1207        ));
1208
1209        // Restricting the ranges to help make sure we have frequent collisions
1210        let key_gen = (0..4usize).prop_map(|i| i.to_string().into_bytes()).boxed();
1211        let part_gen = vec(
1212            ((key_gen.clone(), key_gen.clone()), 0..10u64, -3..=3i64),
1213            0..10,
1214        );
1215        let kvt_gen = (key_gen.clone(), key_gen.clone(), 0..10u64);
1216        let run_gen = vec((part_gen, 0..10usize), 0..5);
1217        proptest!(|(state in run_gen, bound in kvt_gen)| {
1218            check(&metrics, state, bound)
1219        });
1220    }
1221
1222    #[mz_ore::test(tokio::test)]
1223    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1224    async fn prefetches() {
1225        fn check(budget: usize, runs: Vec<Vec<usize>>, prefetch_all: bool) {
1226            let desc = Description::new(
1227                Antichain::from_elem(0u64),
1228                Antichain::new(),
1229                Antichain::from_elem(0u64),
1230            );
1231
1232            let total_size: usize = runs.iter().flat_map(|run| run.iter().map(|p| *p)).sum();
1233
1234            let shard_id = ShardId::new();
1235            let blob: Arc<dyn Blob> = Arc::new(MemBlob::open(MemBlobConfig::default()));
1236            let metrics = Arc::new(Metrics::new(
1237                &PersistConfig::new_for_tests(),
1238                &MetricsRegistry::new(),
1239            ));
1240            let shard_metrics = metrics.shards.shard(&shard_id, "");
1241            let sort: StructuredSort<Vec<u8>, Vec<u8>, _, _> = StructuredSort::new(Schemas {
1242                id: None,
1243                key: Arc::new(VecU8Schema),
1244                val: Arc::new(VecU8Schema),
1245            });
1246
1247            let fetch_cfg = FetchConfig {
1248                validate_bounds_on_read: true,
1249            };
1250
1251            let mut consolidator: Consolidator<u64, i64, StructuredSort<_, _, _, _>> =
1252                Consolidator::new(
1253                    "test".to_string(),
1254                    fetch_cfg,
1255                    shard_id,
1256                    sort,
1257                    blob,
1258                    Arc::clone(&metrics),
1259                    shard_metrics,
1260                    metrics.read.batch_fetcher.clone(),
1261                    FetchBatchFilter::Compaction {
1262                        since: desc.since().clone(),
1263                    },
1264                    None,
1265                    budget,
1266                );
1267
1268            for run in runs {
1269                let parts: Vec<_> = run
1270                    .into_iter()
1271                    .map(|encoded_size_bytes| {
1272                        RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1273                            key: PartialBatchKey(
1274                                "n0000000/p00000000-0000-0000-0000-000000000000".into(),
1275                            ),
1276                            encoded_size_bytes,
1277                            key_lower: vec![],
1278                            structured_key_lower: None,
1279                            stats: None,
1280                            ts_rewrite: None,
1281                            diffs_sum: None,
1282                            format: None,
1283                            schema_id: None,
1284                            deprecated_schema_id: None,
1285                        }))
1286                    })
1287                    .collect();
1288                consolidator.enqueue_run(&desc, &RunMeta::default(), parts)
1289            }
1290
1291            // No matter what, the budget should be respected.
1292            let remaining = consolidator.start_prefetches();
1293            let live_bytes = consolidator.live_bytes();
1294            assert!(live_bytes <= budget, "budget should be respected");
1295            match remaining {
1296                None => assert!(live_bytes < total_size, "not all parts fetched"),
1297                Some(remaining) => assert_eq!(
1298                    live_bytes + remaining,
1299                    budget,
1300                    "remaining should match budget"
1301                ),
1302            }
1303
1304            if prefetch_all {
1305                // If we up the budget to match the total size, we should prefetch everything.
1306                consolidator.budget = total_size;
1307                assert_eq!(consolidator.start_prefetches(), Some(0));
1308            } else {
1309                // Let the consolidator drop without fetching everything to check the Drop
1310                // impl works when not all parts are prefetched.
1311            }
1312        }
1313
1314        let run_gen = vec(vec(0..20usize, 0..5usize), 0..5usize);
1315        proptest!(|(budget in 0..20usize, state in run_gen, prefetch_all in any::<bool>())| {
1316            check(budget, state, prefetch_all)
1317        });
1318    }
1319}