1use std::cmp::{Ordering, Reverse};
13use std::collections::binary_heap::PeekMut;
14use std::collections::{BinaryHeap, VecDeque};
15use std::fmt::Debug;
16use std::marker::PhantomData;
17use std::mem;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use arrow::array::{Array, Int64Array};
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::StreamExt;
26use futures_util::stream::FuturesUnordered;
27use itertools::Itertools;
28use mz_ore::soft_assert_eq_or_log;
29use mz_ore::task::JoinHandle;
30use mz_persist::indexed::encoding::BlobTraceUpdates;
31use mz_persist::location::Blob;
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd};
34use mz_persist_types::columnar::data_type;
35use mz_persist_types::part::Part;
36use mz_persist_types::{Codec, Codec64};
37use semver::Version;
38use timely::progress::Timestamp;
39use tracing::{Instrument, debug_span};
40
41use crate::ShardId;
42use crate::fetch::{EncodedPart, FetchBatchFilter, FetchConfig};
43use crate::internal::encoding::Schemas;
44use crate::internal::metrics::{ReadMetrics, ShardMetrics};
45use crate::internal::state::{HollowRun, RunMeta, RunOrder, RunPart};
46use crate::metrics::Metrics;
47
48pub const MINIMUM_CONSOLIDATED_VERSION: Version = Version::new(0, 67, 0);
52
53#[derive(Debug, Clone)]
56pub(crate) struct FetchData<T> {
57 run_meta: RunMeta,
58 part_desc: Description<T>,
59 part: RunPart<T>,
60 structured_lower: Option<ArrayBound>,
61}
62
63pub(crate) trait RowSort<T, D> {
64 fn updates_from_blob(&self, updates: BlobTraceUpdates) -> StructuredUpdates;
65
66 fn updates_to_blob(&self, updates: StructuredUpdates) -> Part;
67}
68
69fn interleave_updates<T: Codec64, D: Codec64>(
70 updates: &[&Part],
71 elements: impl IntoIterator<Item = (Indices, T, D)>,
72) -> Part {
73 let (indices, timestamps, diffs): (Vec<_>, Vec<_>, Vec<_>) = elements
74 .into_iter()
75 .map(|(idx, t, d)| {
76 (
77 idx,
78 i64::from_le_bytes(T::encode(&t)),
79 i64::from_le_bytes(D::encode(&d)),
80 )
81 })
82 .multiunzip();
83
84 let mut arrays: Vec<&dyn Array> = Vec::with_capacity(updates.len());
85 let mut interleave = |get_array: fn(&Part) -> &dyn Array| {
86 arrays.clear();
87 for part in updates {
88 arrays.push(get_array(part));
89 }
90 ::arrow::compute::interleave(arrays.as_slice(), &indices).expect("type-aligned input")
91 };
92
93 let key = interleave(|p| &*p.key);
94 let val = interleave(|p| &*p.val);
95 Part {
96 key,
97 val,
98 time: Int64Array::from(timestamps),
99 diff: Int64Array::from(diffs),
100 }
101}
102
103#[derive(Clone, Debug)]
105pub struct StructuredUpdates {
106 key_ord: ArrayOrd,
107 val_ord: ArrayOrd,
108 data: Part,
109}
110
111impl StructuredUpdates {
112 fn len(&self) -> usize {
113 self.data.len()
114 }
115
116 fn get<T: Codec64, D: Codec64>(&self, index: usize) -> Option<(SortKV<'_>, T, D)> {
117 let t = self.data.time.values().get(index)?.to_le_bytes();
118 let d = self.data.diff.values().get(index)?.to_le_bytes();
119 Some((
120 (self.key_ord.at(index), Some(self.val_ord.at(index))),
121 T::decode(t),
122 D::decode(d),
123 ))
124 }
125
126 fn interleave_updates<'a, T: Codec64, D: Codec64>(
127 updates: &[&'a StructuredUpdates],
128 elements: impl IntoIterator<Item = (Indices, SortKV<'a>, T, D)>,
129 ) -> StructuredUpdates {
130 let updates: Vec<_> = updates.iter().map(|u| &u.data).collect();
131 let interleaved = interleave_updates(
132 &updates,
133 elements.into_iter().map(|(idx, _, t, d)| (idx, t, d)),
134 );
135 let key_ord = ArrayOrd::new(interleaved.key.as_ref());
136 let val_ord = ArrayOrd::new(interleaved.val.as_ref());
137 StructuredUpdates {
138 key_ord,
139 val_ord,
140 data: interleaved,
141 }
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct StructuredSort<K: Codec, V: Codec, T, D> {
148 schemas: Schemas<K, V>,
149 _time_diff: PhantomData<fn(T, D)>,
150}
151
152impl<K: Codec, V: Codec, T, D> StructuredSort<K, V, T, D> {
153 pub fn new(schemas: Schemas<K, V>) -> Self {
155 Self {
156 schemas,
157 _time_diff: Default::default(),
158 }
159 }
160}
161
162type SortKV<'a> = (ArrayIdx<'a>, Option<ArrayIdx<'a>>);
163
164fn kv_lower<T>(data: &FetchData<T>) -> Option<SortKV<'_>> {
165 let key_idx = data.structured_lower.as_ref().map(|l| l.get())?;
166 Some((key_idx, None))
167}
168
169fn kv_size((key, value): SortKV<'_>) -> usize {
170 key.goodbytes() + value.map_or(0, |v| v.goodbytes())
171}
172
173impl<K: Codec, V: Codec, T: Codec64, D: Codec64> RowSort<T, D> for StructuredSort<K, V, T, D> {
174 fn updates_from_blob(&self, mut updates: BlobTraceUpdates) -> StructuredUpdates {
175 let structured = updates
176 .get_or_make_structured::<K, V>(&*self.schemas.key, &*self.schemas.val)
177 .clone();
178 soft_assert_eq_or_log!(
179 Some(structured.key.data_type()),
180 data_type::<K>(&*self.schemas.key).ok().as_ref(),
181 "migrated key type should match"
182 );
183 soft_assert_eq_or_log!(
184 Some(structured.val.data_type()),
185 data_type::<V>(&*self.schemas.val).ok().as_ref(),
186 "migrated val type should match"
187 );
188 let key_ord = ArrayOrd::new(&structured.key);
189 let val_ord = ArrayOrd::new(&structured.val);
190 StructuredUpdates {
191 key_ord,
192 val_ord,
193 data: Part {
194 key: structured.key,
195 val: structured.val,
196 time: updates.timestamps().clone(),
197 diff: updates.diffs().clone(),
198 },
199 }
200 }
201
202 fn updates_to_blob(&self, updates: StructuredUpdates) -> Part {
203 updates.data
204 }
205}
206
207type FetchResult<T> = Result<EncodedPart<T>, HollowRun<T>>;
208
209impl<T: Codec64 + Timestamp + Lattice> FetchData<T> {
210 async fn fetch(
211 self,
212 cfg: &FetchConfig,
213 shard_id: ShardId,
214 blob: &dyn Blob,
215 metrics: &Metrics,
216 shard_metrics: &ShardMetrics,
217 read_metrics: &ReadMetrics,
218 ) -> anyhow::Result<FetchResult<T>> {
219 match self.part {
220 RunPart::Single(part) => {
221 let part = EncodedPart::fetch(
222 cfg,
223 &shard_id,
224 &*blob,
225 metrics,
226 shard_metrics,
227 read_metrics,
228 &self.part_desc,
229 &part,
230 )
231 .await
232 .map_err(|blob_key| anyhow!("missing unleased key {blob_key}"))?;
233 Ok(Ok(part))
234 }
235 RunPart::Many(run_ref) => {
236 let runs = run_ref
237 .get(shard_id, blob, metrics)
238 .await
239 .ok_or_else(|| anyhow!("missing run ref {}", run_ref.key))?;
240 Ok(Err(runs))
241 }
242 }
243 }
244}
245
246#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Default)]
251struct PartIndices {
252 sorted_indices: VecDeque<usize>,
253 next_index: usize,
254}
255
256impl PartIndices {
257 fn index(&self) -> usize {
258 self.sorted_indices
259 .front()
260 .copied()
261 .unwrap_or(self.next_index)
262 }
263
264 fn inc(&mut self) {
265 if self.sorted_indices.pop_front().is_none() {
266 self.next_index += 1;
267 }
268 }
269}
270
271#[derive(Debug)]
272enum ConsolidationPart<T, D> {
273 Queued {
274 data: FetchData<T>,
275 task: Option<JoinHandle<anyhow::Result<FetchResult<T>>>>,
276 _diff: PhantomData<D>,
277 },
278 Encoded {
279 part: StructuredUpdates,
280 cursor: PartIndices,
281 },
282}
283
284impl<T: Timestamp + Codec64 + Lattice, D: Codec64> ConsolidationPart<T, D> {
285 pub(crate) fn from_encoded(
286 part: EncodedPart<T>,
287 force_reconsolidation: bool,
288 metrics: &ColumnarMetrics,
289 sort: &impl RowSort<T, D>,
290 ) -> Self {
291 let reconsolidate = part.maybe_unconsolidated() || force_reconsolidation;
292 let updates = part.normalize(metrics);
293 let updates: StructuredUpdates = sort.updates_from_blob(updates);
294 let cursor = if reconsolidate {
295 let len = updates.len();
296 let mut indices: Vec<_> = (0..len).collect();
297
298 indices.sort_by_key(|i| updates.get::<T, D>(*i).map(|(kv, t, _d)| (kv, t)));
299
300 PartIndices {
301 sorted_indices: indices.into(),
302 next_index: len,
303 }
304 } else {
305 PartIndices::default()
306 };
307
308 ConsolidationPart::Encoded {
309 part: updates,
310 cursor,
311 }
312 }
313
314 fn kvt_lower(&self) -> Option<(SortKV<'_>, T)> {
315 match self {
316 ConsolidationPart::Queued { data, .. } => Some((kv_lower(data)?, T::minimum())),
317 ConsolidationPart::Encoded { part, cursor } => {
318 let (kv, t, _d) = part.get::<T, D>(cursor.index())?;
319 Some((kv, t))
320 }
321 }
322 }
323
324 pub(crate) fn is_empty(&self) -> bool {
327 match self {
328 ConsolidationPart::Encoded { part, cursor, .. } => cursor.index() >= part.len(),
329 ConsolidationPart::Queued { .. } => false,
330 }
331 }
332}
333
334#[derive(Debug)]
349pub(crate) struct Consolidator<T, D, Sort: RowSort<T, D>> {
350 context: String,
351 cfg: FetchConfig,
352 shard_id: ShardId,
353 sort: Sort,
354 blob: Arc<dyn Blob>,
355 metrics: Arc<Metrics>,
356 shard_metrics: Arc<ShardMetrics>,
357 read_metrics: Arc<ReadMetrics>,
358 runs: Vec<VecDeque<(ConsolidationPart<T, D>, usize)>>,
359 filter: FetchBatchFilter<T>,
360 budget: usize,
361 lower_bound: Option<LowerBound<T>>,
365 drop_stash: Option<StructuredUpdates>,
371}
372
373#[derive(Debug)]
374pub struct LowerBound<T> {
376 pub(crate) key_bound: ArrayBound,
377 pub(crate) val_bound: ArrayBound,
378 pub(crate) t: T,
379}
380
381impl<T: Clone> LowerBound<T> {
382 pub fn kvt_bound(&self) -> (SortKV<'_>, T) {
384 (
385 (self.key_bound.get(), Some(self.val_bound.get())),
386 self.t.clone(),
387 )
388 }
389}
390
391impl<T, D, Sort> Consolidator<T, D, Sort>
392where
393 T: Timestamp + Codec64 + Lattice,
394 D: Codec64 + Monoid + Ord,
395 Sort: RowSort<T, D>,
396{
397 pub fn new(
401 context: String,
402 cfg: FetchConfig,
403 shard_id: ShardId,
404 sort: Sort,
405 blob: Arc<dyn Blob>,
406 metrics: Arc<Metrics>,
407 shard_metrics: Arc<ShardMetrics>,
408 read_metrics: ReadMetrics,
409 filter: FetchBatchFilter<T>,
410 lower_bound: Option<LowerBound<T>>,
411 prefetch_budget_bytes: usize,
412 ) -> Self {
413 Self {
414 context,
415 cfg,
416 metrics,
417 shard_id,
418 sort,
419 blob,
420 read_metrics: Arc::new(read_metrics),
421 shard_metrics,
422 runs: vec![],
423 filter,
424 budget: prefetch_budget_bytes,
425 drop_stash: None,
426 lower_bound,
427 }
428 }
429}
430
431impl<T, D, Sort> Consolidator<T, D, Sort>
432where
433 T: Timestamp + Codec64 + Lattice + Sync,
434 D: Codec64 + Monoid + Ord,
435 Sort: RowSort<T, D>,
436{
437 pub fn enqueue_run(
444 &mut self,
445 desc: &Description<T>,
446 run_meta: &RunMeta,
447 parts: impl IntoIterator<Item = RunPart<T>>,
448 ) {
449 let run = parts
450 .into_iter()
451 .map(|part| {
452 let bytes = part.encoded_size_bytes();
453 let c_part = ConsolidationPart::Queued {
454 data: FetchData {
455 run_meta: run_meta.clone(),
456 part_desc: desc.clone(),
457 structured_lower: part.structured_key_lower(),
458 part,
459 },
460 task: None,
461 _diff: Default::default(),
462 };
463 (c_part, bytes)
464 })
465 .collect();
466 self.push_run(run);
467 }
468
469 fn push_run(&mut self, run: VecDeque<(ConsolidationPart<T, D>, usize)>) {
470 let wrong_sort = run.iter().any(|(p, _)| match p {
474 ConsolidationPart::Queued { data, .. } => {
475 data.run_meta.order != Some(RunOrder::Structured)
476 }
477 ConsolidationPart::Encoded { .. } => false,
478 });
479
480 if wrong_sort {
481 self.metrics.consolidation.wrong_sort.inc();
482 }
483
484 if run.len() > 1 && wrong_sort {
485 for part in run {
486 self.runs.push(VecDeque::from([part]));
487 }
488 } else {
489 self.runs.push(run);
490 }
491 }
492
493 fn trim(&mut self) {
495 self.runs.retain_mut(|run| {
496 while run.front_mut().map_or(false, |(part, _)| part.is_empty()) {
497 run.pop_front();
498 }
499 !run.is_empty()
500 });
501
502 self.start_prefetches();
504 }
505
506 fn iter(&mut self) -> Option<ConsolidatingIter<'_, T, D>> {
510 if let Some(part) = self.drop_stash.take() {
516 self.runs.push(VecDeque::from_iter([(
517 ConsolidationPart::Encoded {
518 part,
519 cursor: PartIndices::default(),
520 },
521 0,
522 )]));
523 }
524
525 if self.runs.is_empty() {
526 return None;
527 }
528
529 let bound = self.lower_bound.as_ref().map(|b| b.kvt_bound());
530 let mut iter =
531 ConsolidatingIter::new(&self.context, &self.filter, bound, &mut self.drop_stash);
532
533 for run in &mut self.runs {
534 let last_in_run = run.len() < 2;
535 if let Some((part, _)) = run.front_mut() {
536 match part {
537 ConsolidationPart::Encoded { part, cursor } => {
538 iter.push(part, cursor, last_in_run);
539 }
540 other @ ConsolidationPart::Queued { .. } => {
541 if let Some(bound) = other.kvt_lower() {
544 iter.push_upper(bound);
545 }
546 }
547 };
548 }
549 }
550
551 Some(iter)
552 }
553
554 async fn unblock_progress(&mut self) -> anyhow::Result<()> {
560 if self.runs.is_empty() {
561 return Ok(());
562 }
563 self.runs
564 .sort_by(|a, b| a[0].0.kvt_lower().cmp(&b[0].0.kvt_lower()));
565
566 let first_larger = {
567 let run = &self.runs[0];
568 let min_lower = run[0].0.kvt_lower();
569 self.runs
570 .iter()
571 .position(|q| q[0].0.kvt_lower() > min_lower)
572 .unwrap_or(self.runs.len())
573 };
574
575 let mut ready_futures: FuturesUnordered<_> = self.runs[0..first_larger]
576 .iter_mut()
577 .map(|run| async {
578 loop {
583 let (mut part, size) = run.pop_front().expect("trimmed run should be nonempty");
584
585 let ConsolidationPart::Queued { data, task, .. } = &mut part else {
586 run.push_front((part, size));
587 return Ok(true);
588 };
589
590 let is_prefetched = task.as_ref().map_or(false, |t| t.is_finished());
591 if is_prefetched {
592 self.metrics.compaction.parts_prefetched.inc();
593 } else {
594 self.metrics.compaction.parts_waited.inc()
595 }
596 self.metrics.consolidation.parts_fetched.inc();
597
598 let wrong_sort = data.run_meta.order != Some(RunOrder::Structured);
599 let fetch_result: anyhow::Result<FetchResult<T>> = match task.take() {
600 Some(handle) => handle
601 .await
602 .unwrap_or_else(|join_err| Err(anyhow!(join_err))),
603 None => {
604 data.clone()
605 .fetch(
606 &self.cfg,
607 self.shard_id,
608 &*self.blob,
609 &*self.metrics,
610 &*self.shard_metrics,
611 &self.read_metrics,
612 )
613 .await
614 }
615 };
616 match fetch_result {
617 Err(err) => {
618 run.push_front((part, size));
619 return Err(err);
620 }
621 Ok(Err(run_part)) => {
622 for part in run_part.parts.into_iter().rev() {
625 let structured_lower = part.structured_key_lower();
626 let size = part.max_part_bytes();
627 run.push_front((
628 ConsolidationPart::Queued {
629 data: FetchData {
630 run_meta: data.run_meta.clone(),
631 part_desc: data.part_desc.clone(),
632 part,
633 structured_lower,
634 },
635 task: None,
636 _diff: Default::default(),
637 },
638 size,
639 ));
640 }
641 }
642 Ok(Ok(part)) => {
643 run.push_front((
644 ConsolidationPart::from_encoded(
645 part,
646 wrong_sort,
647 &self.metrics.columnar,
648 &self.sort,
649 ),
650 size,
651 ));
652 }
653 }
654 }
655 })
656 .collect();
657
658 let mut total_ready = 0;
660 while let Some(awaited) = ready_futures.next().await {
661 if awaited? {
662 total_ready += 1;
663 }
664 }
665 assert!(
666 total_ready > 0,
667 "at least one part should be fetched and ready to go"
668 );
669
670 Ok(())
671 }
672
673 #[allow(unused)]
677 pub(crate) async fn next(
678 &mut self,
679 ) -> anyhow::Result<Option<impl Iterator<Item = (SortKV<'_>, T, D)>>> {
680 self.trim();
681 self.unblock_progress().await?;
682 Ok(self.iter().map(|i| i.map(|(_idx, kv, t, d)| (kv, t, d))))
683 }
684
685 fn chunk(&mut self, max_len: usize, max_bytes: usize) -> Option<Part> {
686 let Some(mut iter) = self.iter() else {
687 return None;
688 };
689
690 let parts = iter.parts.clone();
691
692 let mut budget = max_bytes;
698 let iter = std::iter::from_fn(move || {
699 if budget == 0 {
700 return None;
701 }
702 let update @ (_, kv, _, _) = iter.next()?;
703 budget = budget.saturating_sub(kv_size(kv) + 16);
705 Some(update)
706 });
707
708 let updates = StructuredUpdates::interleave_updates(&parts, iter.take(max_len));
709 let updates = self.sort.updates_to_blob(updates);
710 Some(updates)
711 }
712
713 pub(crate) async fn next_chunk(
717 &mut self,
718 max_len: usize,
719 max_bytes: usize,
720 ) -> anyhow::Result<Option<Part>> {
721 self.trim();
722 self.unblock_progress().await?;
723 Ok(self.chunk(max_len, max_bytes))
724 }
725
726 fn live_bytes(&self) -> usize {
730 self.runs
731 .iter()
732 .flat_map(|run| {
733 run.iter().map(|(part, size)| match part {
734 ConsolidationPart::Queued { task: None, .. } => 0,
735 ConsolidationPart::Queued { task: Some(_), .. }
736 | ConsolidationPart::Encoded { .. } => *size,
737 })
738 })
739 .sum()
740 }
741
742 pub(crate) fn start_prefetches(&mut self) -> Option<usize> {
744 let mut prefetch_budget_bytes = self.budget;
745
746 let mut check_budget = |size| {
747 prefetch_budget_bytes
749 .checked_sub(size)
750 .map(|remaining| prefetch_budget_bytes = remaining)
751 };
752
753 let live_bytes = self.live_bytes();
755 check_budget(live_bytes)?;
756 let max_run_len = self.runs.iter().map(|x| x.len()).max().unwrap_or_default();
768 for idx in 0..max_run_len {
769 for run in self.runs.iter_mut() {
770 if let Some((c_part, size)) = run.get_mut(idx) {
771 let (data, task) = match c_part {
772 ConsolidationPart::Queued { data, task, .. } if task.is_none() => {
773 check_budget(*size)?;
774 (data, task)
775 }
776 _ => continue,
777 };
778 let span = debug_span!("compaction::prefetch");
779 let data = data.clone();
780 let handle = mz_ore::task::spawn(|| "persist::compaction::prefetch", {
781 let shard_id = self.shard_id;
782 let blob = Arc::clone(&self.blob);
783 let metrics = Arc::clone(&self.metrics);
784 let shard_metrics = Arc::clone(&self.shard_metrics);
785 let read_metrics = Arc::clone(&self.read_metrics);
786 let fetch_config = self.cfg.clone();
787 async move {
788 data.fetch(
789 &fetch_config,
790 shard_id,
791 &*blob,
792 &*metrics,
793 &*shard_metrics,
794 &*read_metrics,
795 )
796 .instrument(span)
797 .await
798 }
799 });
800 *task = Some(handle);
801 }
802 }
803 }
804
805 Some(prefetch_budget_bytes)
806 }
807}
808
809impl<T, D, Sort: RowSort<T, D>> Drop for Consolidator<T, D, Sort> {
810 fn drop(&mut self) {
811 for run in &self.runs {
812 for (part, _) in run {
813 match part {
814 ConsolidationPart::Queued { task: None, .. } => {
815 self.metrics.consolidation.parts_skipped.inc();
816 }
817 ConsolidationPart::Queued { task: Some(_), .. } => {
818 self.metrics.consolidation.parts_wasted.inc();
819 }
820 _ => {}
821 }
822 }
823 }
824 }
825}
826
827type Indices = (usize, usize);
831
832#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
834struct PartRef<'a, T: Timestamp, D> {
835 next_kvt: Reverse<Option<(SortKV<'a>, T, D)>>,
839 part_index: usize,
841 row_index: &'a mut PartIndices,
845 last_in_run: bool,
848 _phantom: PhantomData<D>,
849}
850
851impl<'a, T: Timestamp + Codec64 + Lattice, D: Codec64 + Monoid> PartRef<'a, T, D> {
852 fn update_peek(&mut self, part: &'a StructuredUpdates, filter: &FetchBatchFilter<T>) {
853 let mut peek = part.get(self.row_index.index());
854 while let Some((_kv, t, _d)) = &mut peek {
855 let keep = filter.filter_ts(t);
856 if keep {
857 break;
858 } else {
859 self.row_index.inc();
860 peek = part.get(self.row_index.index());
861 }
862 }
863 self.next_kvt = Reverse(peek);
864 }
865
866 fn pop(
867 &mut self,
868 from: &[&'a StructuredUpdates],
869 filter: &FetchBatchFilter<T>,
870 ) -> Option<(Indices, SortKV<'a>, T, D)> {
871 let part = &from[self.part_index];
872 let Reverse(popped) = mem::take(&mut self.next_kvt);
873 let indices = (self.part_index, self.row_index.index());
874 self.row_index.inc();
875 self.update_peek(part, filter);
876 let (kv, t, d) = popped?;
877 Some((indices, kv, t, d))
878 }
879}
880
881#[derive(Debug)]
882pub(crate) struct ConsolidatingIter<'a, T, D>
883where
884 T: Timestamp + Codec64,
885 D: Codec64,
886{
887 context: &'a str,
888 filter: &'a FetchBatchFilter<T>,
889 parts: Vec<&'a StructuredUpdates>,
890 heap: BinaryHeap<PartRef<'a, T, D>>,
891 upper_bound: Option<(SortKV<'a>, T)>,
892 lower_bound: Option<(SortKV<'a>, T)>,
893 state: Option<(Indices, SortKV<'a>, T, D)>,
894 drop_stash: &'a mut Option<StructuredUpdates>,
895}
896
897impl<'a, T, D> ConsolidatingIter<'a, T, D>
898where
899 T: Timestamp + Codec64 + Lattice,
900 D: Codec64 + Monoid + Ord,
901{
902 fn new(
903 context: &'a str,
904 filter: &'a FetchBatchFilter<T>,
905 lower_bound: Option<(SortKV<'a>, T)>,
906 drop_stash: &'a mut Option<StructuredUpdates>,
907 ) -> Self {
908 Self {
909 context,
910 filter,
911 parts: vec![],
912 heap: BinaryHeap::new(),
913 upper_bound: None,
914 state: None,
915 drop_stash,
916 lower_bound,
917 }
918 }
919
920 fn push(&mut self, iter: &'a StructuredUpdates, index: &'a mut PartIndices, last_in_run: bool) {
921 let mut part_ref = PartRef {
922 next_kvt: Reverse(None),
923 part_index: self.parts.len(),
924 row_index: index,
925 last_in_run,
926 _phantom: Default::default(),
927 };
928 part_ref.update_peek(iter, self.filter);
929 self.parts.push(iter);
930 self.heap.push(part_ref);
931 }
932
933 fn push_upper(&mut self, upper: (SortKV<'a>, T)) {
936 let update_bound = self
937 .upper_bound
938 .as_ref()
939 .map_or(true, |existing| *existing > upper);
940 if update_bound {
941 self.upper_bound = Some(upper);
942 }
943 }
944
945 fn consolidate(&mut self) -> Option<(Indices, SortKV<'a>, T, D)> {
947 loop {
948 let Some(mut part) = self.heap.peek_mut() else {
949 break;
950 };
951 if let Some((kv1, t1, _)) = part.next_kvt.0.as_ref() {
952 if let Some((idx0, kv0, t0, d0)) = &mut self.state {
953 let consolidates = match (*kv0, &*t0).cmp(&(*kv1, t1)) {
954 Ordering::Less => false,
955 Ordering::Equal => true,
956 Ordering::Greater => {
957 panic!(
960 "data arrived at the consolidator out of order ({}, kvs equal? {}, {t0:?}, {t1:?})",
961 self.context,
962 (*kv0) == (*kv1)
963 );
964 }
965 };
966 if consolidates {
967 let (idx1, _, _, d1) = part
968 .pop(&self.parts, self.filter)
969 .expect("popping from a non-empty iterator");
970 d0.plus_equals(&d1);
971 *idx0 = idx1;
972 } else {
973 break;
974 }
975 } else {
976 if let Some((kv0, t0)) = &self.upper_bound {
979 if (kv0, t0) <= (kv1, t1) {
980 return None;
981 }
982 }
983
984 if let Some((kv_lower, t_lower)) = &self.lower_bound {
987 if (kv_lower, t_lower) >= (kv1, t1) {
988 let _ = part.pop(&self.parts, self.filter);
990
991 continue;
993 }
994 }
995
996 self.state = part.pop(&self.parts, self.filter);
997 }
998 } else {
999 if part.last_in_run {
1000 PeekMut::pop(part);
1001 } else {
1002 return None;
1005 }
1006 }
1007 }
1008
1009 self.state.take()
1010 }
1011}
1012
1013impl<'a, T, D> Iterator for ConsolidatingIter<'a, T, D>
1014where
1015 T: Timestamp + Codec64 + Lattice,
1016 D: Codec64 + Monoid + Ord,
1017{
1018 type Item = (Indices, SortKV<'a>, T, D);
1019
1020 fn next(&mut self) -> Option<Self::Item> {
1021 loop {
1022 match self.consolidate() {
1023 Some((_, _, _, d)) if d.is_zero() => continue,
1024 other => break other,
1025 }
1026 }
1027 }
1028}
1029
1030impl<'a, T, D> Drop for ConsolidatingIter<'a, T, D>
1031where
1032 T: Timestamp + Codec64,
1033 D: Codec64,
1034{
1035 fn drop(&mut self) {
1036 if let Some(update) = self.state.take() {
1039 let part = StructuredUpdates::interleave_updates(&self.parts, [update]);
1040 *self.drop_stash = Some(part);
1041 }
1042 }
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047 use super::*;
1048
1049 use std::sync::Arc;
1050
1051 use crate::ShardId;
1052 use crate::cfg::PersistConfig;
1053 use crate::internal::paths::PartialBatchKey;
1054 use crate::internal::state::{BatchPart, HollowBatchPart};
1055 use crate::metrics::Metrics;
1056 use arrow::array::BinaryArray;
1057 use differential_dataflow::consolidation::consolidate_updates;
1058 use differential_dataflow::trace::Description;
1059 use mz_ore::metrics::MetricsRegistry;
1060 use mz_persist::indexed::columnar::ColumnarRecordsBuilder;
1061 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1062 use mz_persist::location::Blob;
1063 use mz_persist::mem::{MemBlob, MemBlobConfig};
1064 use mz_persist_types::codec_impls::VecU8Schema;
1065 use mz_persist_types::part::PartBuilder;
1066 use proptest::collection::vec;
1067 use proptest::prelude::*;
1068 use timely::progress::Antichain;
1069
1070 #[mz_ore::test]
1071 #[cfg_attr(miri, ignore)] fn consolidation() {
1073 type Rows = Vec<((Vec<u8>, Vec<u8>), u64, i64)>;
1075
1076 fn check(
1077 metrics: &Arc<Metrics>,
1078 parts: Vec<(Rows, usize)>,
1079 lower_bound: (Vec<u8>, Vec<u8>, u64),
1080 ) {
1081 let schemas = Schemas {
1082 id: None,
1083 key: Arc::new(VecU8Schema),
1084 val: Arc::new(VecU8Schema),
1085 };
1086 let original = {
1087 let mut rows = parts
1088 .iter()
1089 .flat_map(|(p, _)| p.clone())
1090 .filter(|((k, v), t, _)| {
1091 let (k_lower, v_lower, t_lower) = &lower_bound;
1092 ((k_lower, v_lower), t_lower) < ((k, v), t)
1094 })
1095 .collect::<Vec<_>>();
1096
1097 consolidate_updates(&mut rows);
1098 let mut builder = PartBuilder::new(&*schemas.key, &*schemas.val);
1099 for ((k, v), t, d) in &rows {
1100 builder.push(k, v, *t, *d);
1101 }
1102 let part = builder.finish();
1103 part
1104 };
1105 let filter = FetchBatchFilter::Compaction {
1106 since: Antichain::from_elem(0),
1107 };
1108 let desc = Description::new(
1109 Antichain::from_elem(0),
1110 Antichain::new(),
1111 Antichain::from_elem(0),
1112 );
1113 let key_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.0]);
1114 let val_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.1]);
1115 let lower_bound = LowerBound {
1116 key_bound: ArrayBound::new(Arc::new(key_lower_bound_array), 0),
1117 val_bound: ArrayBound::new(Arc::new(val_lower_bound_array), 0),
1118 t: lower_bound.2,
1119 };
1120 let sort: StructuredSort<Vec<u8>, Vec<u8>, u64, i64> =
1121 StructuredSort::new(schemas.clone());
1122 let streaming = {
1123 let fetch_cfg = FetchConfig {
1125 validate_bounds_on_read: true,
1126 };
1127 let mut consolidator = Consolidator {
1128 cfg: fetch_cfg.clone(),
1129 context: "test".to_string(),
1130 shard_id: ShardId::new(),
1131 sort: sort.clone(),
1132 blob: Arc::new(MemBlob::open(MemBlobConfig::default())),
1133 metrics: Arc::clone(metrics),
1134 shard_metrics: metrics.shards.shard(&ShardId::new(), "test"),
1135 read_metrics: Arc::new(metrics.read.snapshot.clone()),
1136 runs: parts
1140 .into_iter()
1141 .map(|(mut part, cut)| {
1142 part.sort();
1143 let part_2 = part.split_off(cut.min(part.len()));
1144 [part, part_2]
1145 .into_iter()
1146 .map(|part| {
1147 let mut records = ColumnarRecordsBuilder::default();
1148 for ((k, v), t, d) in &part {
1149 assert!(records.push((
1150 (k, v),
1151 u64::encode(t),
1152 i64::encode(d)
1153 )));
1154 }
1155 let part = EncodedPart::new(
1156 &fetch_cfg,
1157 metrics.read.snapshot.clone(),
1158 desc.clone(),
1159 "part",
1160 None,
1161 BlobTraceBatchPart {
1162 desc: desc.clone(),
1163 index: 0,
1164 updates: BlobTraceUpdates::Row(
1165 records.finish(&metrics.columnar),
1166 ),
1167 },
1168 );
1169 (
1170 ConsolidationPart::from_encoded(
1171 part,
1172 true,
1173 &metrics.columnar,
1174 &sort,
1175 ),
1176 0,
1177 )
1178 })
1179 .collect::<VecDeque<_>>()
1180 })
1181 .collect::<Vec<_>>(),
1182 filter,
1183 budget: 0,
1184 drop_stash: None,
1185 lower_bound: Some(lower_bound),
1186 };
1187
1188 let mut out = vec![];
1189 loop {
1190 consolidator.trim();
1191 let Some(chunk) = consolidator.chunk(1000, 1000) else {
1192 break;
1193 };
1194 if chunk.len() > 0 {
1195 out.push(chunk);
1196 }
1197 }
1198 Part::concat(&out).expect("same schema")
1199 };
1200
1201 assert_eq!((original.len() > 0).then_some(original), streaming);
1202 }
1203
1204 let metrics = Arc::new(Metrics::new(
1205 &PersistConfig::new_for_tests(),
1206 &MetricsRegistry::new(),
1207 ));
1208
1209 let key_gen = (0..4usize).prop_map(|i| i.to_string().into_bytes()).boxed();
1211 let part_gen = vec(
1212 ((key_gen.clone(), key_gen.clone()), 0..10u64, -3..=3i64),
1213 0..10,
1214 );
1215 let kvt_gen = (key_gen.clone(), key_gen.clone(), 0..10u64);
1216 let run_gen = vec((part_gen, 0..10usize), 0..5);
1217 proptest!(|(state in run_gen, bound in kvt_gen)| {
1218 check(&metrics, state, bound)
1219 });
1220 }
1221
1222 #[mz_ore::test(tokio::test)]
1223 #[cfg_attr(miri, ignore)] async fn prefetches() {
1225 fn check(budget: usize, runs: Vec<Vec<usize>>, prefetch_all: bool) {
1226 let desc = Description::new(
1227 Antichain::from_elem(0u64),
1228 Antichain::new(),
1229 Antichain::from_elem(0u64),
1230 );
1231
1232 let total_size: usize = runs.iter().flat_map(|run| run.iter().map(|p| *p)).sum();
1233
1234 let shard_id = ShardId::new();
1235 let blob: Arc<dyn Blob> = Arc::new(MemBlob::open(MemBlobConfig::default()));
1236 let metrics = Arc::new(Metrics::new(
1237 &PersistConfig::new_for_tests(),
1238 &MetricsRegistry::new(),
1239 ));
1240 let shard_metrics = metrics.shards.shard(&shard_id, "");
1241 let sort: StructuredSort<Vec<u8>, Vec<u8>, _, _> = StructuredSort::new(Schemas {
1242 id: None,
1243 key: Arc::new(VecU8Schema),
1244 val: Arc::new(VecU8Schema),
1245 });
1246
1247 let fetch_cfg = FetchConfig {
1248 validate_bounds_on_read: true,
1249 };
1250
1251 let mut consolidator: Consolidator<u64, i64, StructuredSort<_, _, _, _>> =
1252 Consolidator::new(
1253 "test".to_string(),
1254 fetch_cfg,
1255 shard_id,
1256 sort,
1257 blob,
1258 Arc::clone(&metrics),
1259 shard_metrics,
1260 metrics.read.batch_fetcher.clone(),
1261 FetchBatchFilter::Compaction {
1262 since: desc.since().clone(),
1263 },
1264 None,
1265 budget,
1266 );
1267
1268 for run in runs {
1269 let parts: Vec<_> = run
1270 .into_iter()
1271 .map(|encoded_size_bytes| {
1272 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1273 key: PartialBatchKey(
1274 "n0000000/p00000000-0000-0000-0000-000000000000".into(),
1275 ),
1276 encoded_size_bytes,
1277 key_lower: vec![],
1278 structured_key_lower: None,
1279 stats: None,
1280 ts_rewrite: None,
1281 diffs_sum: None,
1282 format: None,
1283 schema_id: None,
1284 deprecated_schema_id: None,
1285 }))
1286 })
1287 .collect();
1288 consolidator.enqueue_run(&desc, &RunMeta::default(), parts)
1289 }
1290
1291 let remaining = consolidator.start_prefetches();
1293 let live_bytes = consolidator.live_bytes();
1294 assert!(live_bytes <= budget, "budget should be respected");
1295 match remaining {
1296 None => assert!(live_bytes < total_size, "not all parts fetched"),
1297 Some(remaining) => assert_eq!(
1298 live_bytes + remaining,
1299 budget,
1300 "remaining should match budget"
1301 ),
1302 }
1303
1304 if prefetch_all {
1305 consolidator.budget = total_size;
1307 assert_eq!(consolidator.start_prefetches(), Some(0));
1308 } else {
1309 }
1312 }
1313
1314 let run_gen = vec(vec(0..20usize, 0..5usize), 0..5usize);
1315 proptest!(|(budget in 0..20usize, state in run_gen, prefetch_all in any::<bool>())| {
1316 check(budget, state, prefetch_all)
1317 });
1318 }
1319}