1use std::cmp::{Ordering, Reverse};
13use std::collections::binary_heap::PeekMut;
14use std::collections::{BinaryHeap, VecDeque};
15use std::fmt::Debug;
16use std::marker::PhantomData;
17use std::mem;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use arrow::array::{Array, Int64Array};
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::StreamExt;
26use futures_util::stream::FuturesUnordered;
27use itertools::Itertools;
28use mz_ore::task::JoinHandle;
29use mz_persist::indexed::encoding::BlobTraceUpdates;
30use mz_persist::location::Blob;
31use mz_persist::metrics::ColumnarMetrics;
32use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd};
33use mz_persist_types::part::Part;
34use mz_persist_types::{Codec, Codec64};
35use semver::Version;
36use timely::progress::Timestamp;
37use tracing::{Instrument, debug_span};
38
39use crate::ShardId;
40use crate::fetch::{EncodedPart, FetchBatchFilter};
41use crate::internal::encoding::Schemas;
42use crate::internal::metrics::{ReadMetrics, ShardMetrics};
43use crate::internal::state::{HollowRun, RunMeta, RunOrder, RunPart};
44use crate::metrics::Metrics;
45
46pub const MINIMUM_CONSOLIDATED_VERSION: Version = Version::new(0, 67, 0);
50
51#[derive(Debug, Clone)]
54pub(crate) struct FetchData<T> {
55 run_meta: RunMeta,
56 part_desc: Description<T>,
57 part: RunPart<T>,
58 structured_lower: Option<ArrayBound>,
59}
60
61pub(crate) trait RowSort<T, D> {
62 fn updates_from_blob(&self, updates: BlobTraceUpdates) -> StructuredUpdates;
63
64 fn updates_to_blob(&self, updates: StructuredUpdates) -> Part;
65}
66
67fn interleave_updates<T: Codec64, D: Codec64>(
68 updates: &[&Part],
69 elements: impl IntoIterator<Item = (Indices, T, D)>,
70) -> Part {
71 let (indices, timestamps, diffs): (Vec<_>, Vec<_>, Vec<_>) = elements
72 .into_iter()
73 .map(|(idx, t, d)| {
74 (
75 idx,
76 i64::from_le_bytes(T::encode(&t)),
77 i64::from_le_bytes(D::encode(&d)),
78 )
79 })
80 .multiunzip();
81
82 let mut arrays: Vec<&dyn Array> = Vec::with_capacity(updates.len());
83 let mut interleave = |get_array: fn(&Part) -> &dyn Array| {
84 arrays.clear();
85 for part in updates {
86 arrays.push(get_array(part));
87 }
88 ::arrow::compute::interleave(arrays.as_slice(), &indices).expect("type-aligned input")
89 };
90
91 let key = interleave(|p| &*p.key);
92 let val = interleave(|p| &*p.val);
93 Part {
94 key,
95 val,
96 time: Int64Array::from(timestamps),
97 diff: Int64Array::from(diffs),
98 }
99}
100
101#[derive(Clone, Debug)]
103pub struct StructuredUpdates {
104 key_ord: ArrayOrd,
105 val_ord: ArrayOrd,
106 data: Part,
107}
108
109impl StructuredUpdates {
110 fn len(&self) -> usize {
111 self.data.len()
112 }
113
114 fn get<T: Codec64, D: Codec64>(&self, index: usize) -> Option<(SortKV<'_>, T, D)> {
115 let t = self.data.time.values().get(index)?.to_le_bytes();
116 let d = self.data.diff.values().get(index)?.to_le_bytes();
117 Some((
118 (self.key_ord.at(index), Some(self.val_ord.at(index))),
119 T::decode(t),
120 D::decode(d),
121 ))
122 }
123
124 fn interleave_updates<'a, T: Codec64, D: Codec64>(
125 updates: &[&'a StructuredUpdates],
126 elements: impl IntoIterator<Item = (Indices, SortKV<'a>, T, D)>,
127 ) -> StructuredUpdates {
128 let updates: Vec<_> = updates.iter().map(|u| &u.data).collect();
129 let interleaved = interleave_updates(
130 &updates,
131 elements.into_iter().map(|(idx, _, t, d)| (idx, t, d)),
132 );
133 let key_ord = ArrayOrd::new(interleaved.key.as_ref());
134 let val_ord = ArrayOrd::new(interleaved.val.as_ref());
135 StructuredUpdates {
136 key_ord,
137 val_ord,
138 data: interleaved,
139 }
140 }
141}
142
143#[derive(Debug, Clone)]
145pub struct StructuredSort<K: Codec, V: Codec, T, D> {
146 schemas: Schemas<K, V>,
147 _time_diff: PhantomData<fn(T, D)>,
148}
149
150impl<K: Codec, V: Codec, T, D> StructuredSort<K, V, T, D> {
151 pub fn new(schemas: Schemas<K, V>) -> Self {
153 Self {
154 schemas,
155 _time_diff: Default::default(),
156 }
157 }
158}
159
160type SortKV<'a> = (ArrayIdx<'a>, Option<ArrayIdx<'a>>);
161
162fn kv_lower<T>(data: &FetchData<T>) -> Option<SortKV<'_>> {
163 let key_idx = data.structured_lower.as_ref().map(|l| l.get())?;
164 Some((key_idx, None))
165}
166
167fn kv_size((key, value): SortKV<'_>) -> usize {
168 key.goodbytes() + value.map_or(0, |v| v.goodbytes())
169}
170
171impl<K: Codec, V: Codec, T: Codec64, D: Codec64> RowSort<T, D> for StructuredSort<K, V, T, D> {
172 fn updates_from_blob(&self, mut updates: BlobTraceUpdates) -> StructuredUpdates {
173 let structured = updates
174 .get_or_make_structured::<K, V>(&*self.schemas.key, &*self.schemas.val)
175 .clone();
176 let key_ord = ArrayOrd::new(&structured.key);
177 let val_ord = ArrayOrd::new(&structured.val);
178 StructuredUpdates {
179 key_ord,
180 val_ord,
181 data: Part {
182 key: structured.key,
183 val: structured.val,
184 time: updates.timestamps().clone(),
185 diff: updates.diffs().clone(),
186 },
187 }
188 }
189
190 fn updates_to_blob(&self, updates: StructuredUpdates) -> Part {
191 updates.data
192 }
193}
194
195type FetchResult<T> = Result<EncodedPart<T>, HollowRun<T>>;
196
197impl<T: Codec64 + Timestamp + Lattice> FetchData<T> {
198 async fn fetch(
199 self,
200 shard_id: ShardId,
201 blob: &dyn Blob,
202 metrics: &Metrics,
203 shard_metrics: &ShardMetrics,
204 read_metrics: &ReadMetrics,
205 ) -> anyhow::Result<FetchResult<T>> {
206 match self.part {
207 RunPart::Single(part) => {
208 let part = EncodedPart::fetch(
209 &shard_id,
210 &*blob,
211 metrics,
212 shard_metrics,
213 read_metrics,
214 &self.part_desc,
215 &part,
216 )
217 .await
218 .map_err(|blob_key| anyhow!("missing unleased key {blob_key}"))?;
219 Ok(Ok(part))
220 }
221 RunPart::Many(run_ref) => {
222 let runs = run_ref
223 .get(shard_id, blob, metrics)
224 .await
225 .ok_or_else(|| anyhow!("missing run ref {}", run_ref.key))?;
226 Ok(Err(runs))
227 }
228 }
229 }
230}
231
232#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Default)]
237struct PartIndices {
238 sorted_indices: VecDeque<usize>,
239 next_index: usize,
240}
241
242impl PartIndices {
243 fn index(&self) -> usize {
244 self.sorted_indices
245 .front()
246 .copied()
247 .unwrap_or(self.next_index)
248 }
249
250 fn inc(&mut self) {
251 if self.sorted_indices.pop_front().is_none() {
252 self.next_index += 1;
253 }
254 }
255}
256
257#[derive(Debug)]
258enum ConsolidationPart<T, D> {
259 Queued {
260 data: FetchData<T>,
261 task: Option<JoinHandle<anyhow::Result<FetchResult<T>>>>,
262 _diff: PhantomData<D>,
263 },
264 Encoded {
265 part: StructuredUpdates,
266 cursor: PartIndices,
267 },
268}
269
270impl<T: Timestamp + Codec64 + Lattice, D: Codec64> ConsolidationPart<T, D> {
271 pub(crate) fn from_encoded(
272 part: EncodedPart<T>,
273 force_reconsolidation: bool,
274 metrics: &ColumnarMetrics,
275 sort: &impl RowSort<T, D>,
276 ) -> Self {
277 let reconsolidate = part.maybe_unconsolidated() || force_reconsolidation;
278 let updates = part.normalize(metrics);
279 let updates: StructuredUpdates = sort.updates_from_blob(updates);
280 let cursor = if reconsolidate {
281 let len = updates.len();
282 let mut indices: Vec<_> = (0..len).collect();
283
284 indices.sort_by_key(|i| updates.get::<T, D>(*i).map(|(kv, t, _d)| (kv, t)));
285
286 PartIndices {
287 sorted_indices: indices.into(),
288 next_index: len,
289 }
290 } else {
291 PartIndices::default()
292 };
293
294 ConsolidationPart::Encoded {
295 part: updates,
296 cursor,
297 }
298 }
299
300 fn kvt_lower(&self) -> Option<(SortKV<'_>, T)> {
301 match self {
302 ConsolidationPart::Queued { data, .. } => Some((kv_lower(data)?, T::minimum())),
303 ConsolidationPart::Encoded { part, cursor } => {
304 let (kv, t, _d) = part.get::<T, D>(cursor.index())?;
305 Some((kv, t))
306 }
307 }
308 }
309
310 pub(crate) fn is_empty(&self) -> bool {
313 match self {
314 ConsolidationPart::Encoded { part, cursor, .. } => cursor.index() >= part.len(),
315 ConsolidationPart::Queued { .. } => false,
316 }
317 }
318}
319
320#[derive(Debug)]
335pub(crate) struct Consolidator<T, D, Sort: RowSort<T, D>> {
336 context: String,
337 shard_id: ShardId,
338 sort: Sort,
339 blob: Arc<dyn Blob>,
340 metrics: Arc<Metrics>,
341 shard_metrics: Arc<ShardMetrics>,
342 read_metrics: Arc<ReadMetrics>,
343 runs: Vec<VecDeque<(ConsolidationPart<T, D>, usize)>>,
344 filter: FetchBatchFilter<T>,
345 budget: usize,
346 drop_stash: Option<StructuredUpdates>,
352}
353
354impl<T, D, Sort> Consolidator<T, D, Sort>
355where
356 T: Timestamp + Codec64 + Lattice,
357 D: Codec64 + Semigroup + Ord,
358 Sort: RowSort<T, D>,
359{
360 pub fn new(
364 context: String,
365 shard_id: ShardId,
366 sort: Sort,
367 blob: Arc<dyn Blob>,
368 metrics: Arc<Metrics>,
369 shard_metrics: Arc<ShardMetrics>,
370 read_metrics: ReadMetrics,
371 filter: FetchBatchFilter<T>,
372 prefetch_budget_bytes: usize,
373 ) -> Self {
374 Self {
375 context,
376 metrics,
377 shard_id,
378 sort,
379 blob,
380 read_metrics: Arc::new(read_metrics),
381 shard_metrics,
382 runs: vec![],
383 filter,
384 budget: prefetch_budget_bytes,
385 drop_stash: None,
386 }
387 }
388}
389
390impl<T, D, Sort> Consolidator<T, D, Sort>
391where
392 T: Timestamp + Codec64 + Lattice + Sync,
393 D: Codec64 + Semigroup + Ord,
394 Sort: RowSort<T, D>,
395{
396 pub fn enqueue_run(
403 &mut self,
404 desc: &Description<T>,
405 run_meta: &RunMeta,
406 parts: impl IntoIterator<Item = RunPart<T>>,
407 ) {
408 let run = parts
409 .into_iter()
410 .map(|part| {
411 let bytes = part.encoded_size_bytes();
412 let c_part = ConsolidationPart::Queued {
413 data: FetchData {
414 run_meta: run_meta.clone(),
415 part_desc: desc.clone(),
416 structured_lower: part.structured_key_lower(),
417 part,
418 },
419 task: None,
420 _diff: Default::default(),
421 };
422 (c_part, bytes)
423 })
424 .collect();
425 self.push_run(run);
426 }
427
428 fn push_run(&mut self, run: VecDeque<(ConsolidationPart<T, D>, usize)>) {
429 let wrong_sort = run.iter().any(|(p, _)| match p {
433 ConsolidationPart::Queued { data, .. } => {
434 data.run_meta.order != Some(RunOrder::Structured)
435 }
436 ConsolidationPart::Encoded { .. } => false,
437 });
438
439 if wrong_sort {
440 self.metrics.consolidation.wrong_sort.inc();
441 }
442
443 if run.len() > 1 && wrong_sort {
444 for part in run {
445 self.runs.push(VecDeque::from([part]));
446 }
447 } else {
448 self.runs.push(run);
449 }
450 }
451
452 fn trim(&mut self) {
454 self.runs.retain_mut(|run| {
455 while run.front_mut().map_or(false, |(part, _)| part.is_empty()) {
456 run.pop_front();
457 }
458 !run.is_empty()
459 });
460
461 self.start_prefetches();
463 }
464
465 fn iter(&mut self) -> Option<ConsolidatingIter<T, D>> {
469 if let Some(part) = self.drop_stash.take() {
475 self.runs.push(VecDeque::from_iter([(
476 ConsolidationPart::Encoded {
477 part,
478 cursor: PartIndices::default(),
479 },
480 0,
481 )]));
482 }
483
484 if self.runs.is_empty() {
485 return None;
486 }
487
488 let mut iter = ConsolidatingIter::new(&self.context, &self.filter, &mut self.drop_stash);
489
490 for run in &mut self.runs {
491 let last_in_run = run.len() < 2;
492 if let Some((part, _)) = run.front_mut() {
493 match part {
494 ConsolidationPart::Encoded { part, cursor } => {
495 iter.push(part, cursor, last_in_run);
496 }
497 other @ ConsolidationPart::Queued { .. } => {
498 if let Some(bound) = other.kvt_lower() {
501 iter.push_upper(bound);
502 }
503 }
504 };
505 }
506 }
507
508 Some(iter)
509 }
510
511 async fn unblock_progress(&mut self) -> anyhow::Result<()> {
517 if self.runs.is_empty() {
518 return Ok(());
519 }
520 self.runs
521 .sort_by(|a, b| a[0].0.kvt_lower().cmp(&b[0].0.kvt_lower()));
522
523 let first_larger = {
524 let run = &self.runs[0];
525 let min_lower = run[0].0.kvt_lower();
526 self.runs
527 .iter()
528 .position(|q| q[0].0.kvt_lower() > min_lower)
529 .unwrap_or(self.runs.len())
530 };
531
532 let mut ready_futures: FuturesUnordered<_> = self.runs[0..first_larger]
533 .iter_mut()
534 .map(|run| async {
535 loop {
540 let (mut part, size) = run.pop_front().expect("trimmed run should be nonempty");
541
542 let ConsolidationPart::Queued { data, task, .. } = &mut part else {
543 run.push_front((part, size));
544 return Ok(true);
545 };
546
547 let is_prefetched = task.as_ref().map_or(false, |t| t.is_finished());
548 if is_prefetched {
549 self.metrics.compaction.parts_prefetched.inc();
550 } else {
551 self.metrics.compaction.parts_waited.inc()
552 }
553 self.metrics.consolidation.parts_fetched.inc();
554
555 let wrong_sort = data.run_meta.order != Some(RunOrder::Structured);
556 let fetch_result: anyhow::Result<FetchResult<T>> = match task.take() {
557 Some(handle) => handle
558 .await
559 .unwrap_or_else(|join_err| Err(anyhow!(join_err))),
560 None => {
561 data.clone()
562 .fetch(
563 self.shard_id,
564 &*self.blob,
565 &*self.metrics,
566 &*self.shard_metrics,
567 &self.read_metrics,
568 )
569 .await
570 }
571 };
572 match fetch_result {
573 Err(err) => {
574 run.push_front((part, size));
575 return Err(err);
576 }
577 Ok(Err(run_part)) => {
578 for part in run_part.parts.into_iter().rev() {
581 let structured_lower = part.structured_key_lower();
582 let size = part.max_part_bytes();
583 run.push_front((
584 ConsolidationPart::Queued {
585 data: FetchData {
586 run_meta: data.run_meta.clone(),
587 part_desc: data.part_desc.clone(),
588 part,
589 structured_lower,
590 },
591 task: None,
592 _diff: Default::default(),
593 },
594 size,
595 ));
596 }
597 }
598 Ok(Ok(part)) => {
599 run.push_front((
600 ConsolidationPart::from_encoded(
601 part,
602 wrong_sort,
603 &self.metrics.columnar,
604 &self.sort,
605 ),
606 size,
607 ));
608 }
609 }
610 }
611 })
612 .collect();
613
614 let mut total_ready = 0;
616 while let Some(awaited) = ready_futures.next().await {
617 if awaited? {
618 total_ready += 1;
619 }
620 }
621 assert!(
622 total_ready > 0,
623 "at least one part should be fetched and ready to go"
624 );
625
626 Ok(())
627 }
628
629 #[allow(unused)]
633 pub(crate) async fn next(
634 &mut self,
635 ) -> anyhow::Result<Option<impl Iterator<Item = (SortKV<'_>, T, D)>>> {
636 self.trim();
637 self.unblock_progress().await?;
638 Ok(self.iter().map(|i| i.map(|(_idx, kv, t, d)| (kv, t, d))))
639 }
640
641 fn chunk(&mut self, max_len: usize, max_bytes: usize) -> Option<Part> {
642 let Some(mut iter) = self.iter() else {
643 return None;
644 };
645
646 let parts = iter.parts.clone();
647
648 let mut budget = max_bytes;
654 let iter = std::iter::from_fn(move || {
655 if budget == 0 {
656 return None;
657 }
658 let update @ (_, kv, _, _) = iter.next()?;
659 budget = budget.saturating_sub(kv_size(kv) + 16);
661 Some(update)
662 });
663
664 let updates = StructuredUpdates::interleave_updates(&parts, iter.take(max_len));
665 let updates = self.sort.updates_to_blob(updates);
666 Some(updates)
667 }
668
669 pub(crate) async fn next_chunk(
673 &mut self,
674 max_len: usize,
675 max_bytes: usize,
676 ) -> anyhow::Result<Option<Part>> {
677 self.trim();
678 self.unblock_progress().await?;
679 Ok(self.chunk(max_len, max_bytes))
680 }
681
682 fn live_bytes(&self) -> usize {
686 self.runs
687 .iter()
688 .flat_map(|run| {
689 run.iter().map(|(part, size)| match part {
690 ConsolidationPart::Queued { task: None, .. } => 0,
691 ConsolidationPart::Queued { task: Some(_), .. }
692 | ConsolidationPart::Encoded { .. } => *size,
693 })
694 })
695 .sum()
696 }
697
698 pub(crate) fn start_prefetches(&mut self) -> Option<usize> {
700 let mut prefetch_budget_bytes = self.budget;
701
702 let mut check_budget = |size| {
703 prefetch_budget_bytes
705 .checked_sub(size)
706 .map(|remaining| prefetch_budget_bytes = remaining)
707 };
708
709 let live_bytes = self.live_bytes();
711 check_budget(live_bytes)?;
712 let max_run_len = self.runs.iter().map(|x| x.len()).max().unwrap_or_default();
724 for idx in 0..max_run_len {
725 for run in self.runs.iter_mut() {
726 if let Some((c_part, size)) = run.get_mut(idx) {
727 let (data, task) = match c_part {
728 ConsolidationPart::Queued { data, task, .. } if task.is_none() => {
729 check_budget(*size)?;
730 (data, task)
731 }
732 _ => continue,
733 };
734 let span = debug_span!("compaction::prefetch");
735 let data = data.clone();
736 let handle = mz_ore::task::spawn(|| "persist::compaction::prefetch", {
737 let shard_id = self.shard_id;
738 let blob = Arc::clone(&self.blob);
739 let metrics = Arc::clone(&self.metrics);
740 let shard_metrics = Arc::clone(&self.shard_metrics);
741 let read_metrics = Arc::clone(&self.read_metrics);
742 async move {
743 data.fetch(shard_id, &*blob, &*metrics, &*shard_metrics, &*read_metrics)
744 .instrument(span)
745 .await
746 }
747 });
748 *task = Some(handle);
749 }
750 }
751 }
752
753 Some(prefetch_budget_bytes)
754 }
755}
756
757impl<T, D, Sort: RowSort<T, D>> Drop for Consolidator<T, D, Sort> {
758 fn drop(&mut self) {
759 for run in &self.runs {
760 for (part, _) in run {
761 match part {
762 ConsolidationPart::Queued { task: None, .. } => {
763 self.metrics.consolidation.parts_skipped.inc();
764 }
765 ConsolidationPart::Queued { task: Some(_), .. } => {
766 self.metrics.consolidation.parts_wasted.inc();
767 }
768 _ => {}
769 }
770 }
771 }
772 }
773}
774
775type Indices = (usize, usize);
779
780#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
782struct PartRef<'a, T: Timestamp, D> {
783 next_kvt: Reverse<Option<(SortKV<'a>, T, D)>>,
787 part_index: usize,
789 row_index: &'a mut PartIndices,
793 last_in_run: bool,
796 _phantom: PhantomData<D>,
797}
798
799impl<'a, T: Timestamp + Codec64 + Lattice, D: Codec64 + Semigroup> PartRef<'a, T, D> {
800 fn update_peek(&mut self, part: &'a StructuredUpdates, filter: &FetchBatchFilter<T>) {
801 let mut peek = part.get(self.row_index.index());
802 while let Some((_kv, t, _d)) = &mut peek {
803 let keep = filter.filter_ts(t);
804 if keep {
805 break;
806 } else {
807 self.row_index.inc();
808 peek = part.get(self.row_index.index());
809 }
810 }
811 self.next_kvt = Reverse(peek);
812 }
813
814 fn pop(
815 &mut self,
816 from: &[&'a StructuredUpdates],
817 filter: &FetchBatchFilter<T>,
818 ) -> Option<(Indices, SortKV<'a>, T, D)> {
819 let part = &from[self.part_index];
820 let Reverse(popped) = mem::take(&mut self.next_kvt);
821 let indices = (self.part_index, self.row_index.index());
822 self.row_index.inc();
823 self.update_peek(part, filter);
824 let (kv, t, d) = popped?;
825 Some((indices, kv, t, d))
826 }
827}
828
829#[derive(Debug)]
830pub(crate) struct ConsolidatingIter<'a, T, D>
831where
832 T: Timestamp + Codec64,
833 D: Codec64,
834{
835 context: &'a str,
836 filter: &'a FetchBatchFilter<T>,
837 parts: Vec<&'a StructuredUpdates>,
838 heap: BinaryHeap<PartRef<'a, T, D>>,
839 upper_bound: Option<(SortKV<'a>, T)>,
840 state: Option<(Indices, SortKV<'a>, T, D)>,
841 drop_stash: &'a mut Option<StructuredUpdates>,
842}
843
844impl<'a, T, D> ConsolidatingIter<'a, T, D>
845where
846 T: Timestamp + Codec64 + Lattice,
847 D: Codec64 + Semigroup + Ord,
848{
849 fn new(
850 context: &'a str,
851 filter: &'a FetchBatchFilter<T>,
852 drop_stash: &'a mut Option<StructuredUpdates>,
853 ) -> Self {
854 Self {
855 context,
856 filter,
857 parts: vec![],
858 heap: BinaryHeap::new(),
859 upper_bound: None,
860 state: None,
861 drop_stash,
862 }
863 }
864
865 fn push(&mut self, iter: &'a StructuredUpdates, index: &'a mut PartIndices, last_in_run: bool) {
866 let mut part_ref = PartRef {
867 next_kvt: Reverse(None),
868 part_index: self.parts.len(),
869 row_index: index,
870 last_in_run,
871 _phantom: Default::default(),
872 };
873 part_ref.update_peek(iter, self.filter);
874 self.parts.push(iter);
875 self.heap.push(part_ref);
876 }
877
878 fn push_upper(&mut self, upper: (SortKV<'a>, T)) {
881 let update_bound = self
882 .upper_bound
883 .as_ref()
884 .map_or(true, |existing| *existing > upper);
885 if update_bound {
886 self.upper_bound = Some(upper);
887 }
888 }
889
890 fn consolidate(&mut self) -> Option<(Indices, SortKV<'a>, T, D)> {
892 loop {
893 let Some(mut part) = self.heap.peek_mut() else {
894 break;
895 };
896 if let Some((kv1, t1, _)) = part.next_kvt.0.as_ref() {
897 if let Some((idx0, kv0, t0, d0)) = &mut self.state {
898 let consolidates = match (*kv0, &*t0).cmp(&(*kv1, t1)) {
899 Ordering::Less => false,
900 Ordering::Equal => true,
901 Ordering::Greater => {
902 panic!(
905 "data arrived at the consolidator out of order ({}, kvs equal? {}, {t0:?}, {t1:?})",
906 self.context,
907 (*kv0) == (*kv1)
908 );
909 }
910 };
911 if consolidates {
912 let (idx1, _, _, d1) = part
913 .pop(&self.parts, self.filter)
914 .expect("popping from a non-empty iterator");
915 d0.plus_equals(&d1);
916 *idx0 = idx1;
917 } else {
918 break;
919 }
920 } else {
921 if let Some((kv0, t0)) = &self.upper_bound {
924 if (kv0, t0) <= (kv1, t1) {
925 return None;
926 }
927 }
928
929 self.state = part.pop(&self.parts, self.filter);
930 }
931 } else {
932 if part.last_in_run {
933 PeekMut::pop(part);
934 } else {
935 return None;
938 }
939 }
940 }
941
942 self.state.take()
943 }
944}
945
946impl<'a, T, D> Iterator for ConsolidatingIter<'a, T, D>
947where
948 T: Timestamp + Codec64 + Lattice,
949 D: Codec64 + Semigroup + Ord,
950{
951 type Item = (Indices, SortKV<'a>, T, D);
952
953 fn next(&mut self) -> Option<Self::Item> {
954 loop {
955 match self.consolidate() {
956 Some((_, _, _, d)) if d.is_zero() => continue,
957 other => break other,
958 }
959 }
960 }
961}
962
963impl<'a, T, D> Drop for ConsolidatingIter<'a, T, D>
964where
965 T: Timestamp + Codec64,
966 D: Codec64,
967{
968 fn drop(&mut self) {
969 if let Some(update) = self.state.take() {
972 let part = StructuredUpdates::interleave_updates(&self.parts, [update]);
973 *self.drop_stash = Some(part);
974 }
975 }
976}
977
978#[cfg(test)]
979mod tests {
980 use super::*;
981
982 use std::sync::Arc;
983
984 use crate::ShardId;
985 use crate::cfg::PersistConfig;
986 use crate::internal::paths::PartialBatchKey;
987 use crate::internal::state::{BatchPart, HollowBatchPart};
988 use crate::metrics::Metrics;
989 use differential_dataflow::consolidation::consolidate_updates;
990 use differential_dataflow::trace::Description;
991 use mz_ore::metrics::MetricsRegistry;
992 use mz_persist::indexed::columnar::ColumnarRecordsBuilder;
993 use mz_persist::indexed::encoding::BlobTraceBatchPart;
994 use mz_persist::location::Blob;
995 use mz_persist::mem::{MemBlob, MemBlobConfig};
996 use mz_persist_types::codec_impls::VecU8Schema;
997 use mz_persist_types::part::PartBuilder;
998 use proptest::collection::vec;
999 use proptest::prelude::*;
1000 use timely::progress::Antichain;
1001
1002 #[mz_ore::test]
1003 #[cfg_attr(miri, ignore)] fn consolidation() {
1005 type Rows = Vec<((Vec<u8>, Vec<u8>), u64, i64)>;
1007
1008 fn check(metrics: &Arc<Metrics>, parts: Vec<(Rows, usize)>) {
1009 let schemas = Schemas {
1010 id: None,
1011 key: Arc::new(VecU8Schema),
1012 val: Arc::new(VecU8Schema),
1013 };
1014 let original = {
1015 let mut rows = parts
1016 .iter()
1017 .flat_map(|(p, _)| p.clone())
1018 .collect::<Vec<_>>();
1019
1020 consolidate_updates(&mut rows);
1021 let mut builder = PartBuilder::new(&*schemas.key, &*schemas.val);
1022 for ((k, v), t, d) in &rows {
1023 builder.push(k, v, *t, *d);
1024 }
1025 let part = builder.finish();
1026 part
1027 };
1028 let filter = FetchBatchFilter::Compaction {
1029 since: Antichain::from_elem(0),
1030 };
1031 let desc = Description::new(
1032 Antichain::from_elem(0),
1033 Antichain::new(),
1034 Antichain::from_elem(0),
1035 );
1036 let sort: StructuredSort<Vec<u8>, Vec<u8>, u64, i64> =
1037 StructuredSort::new(schemas.clone());
1038 let streaming = {
1039 let mut consolidator = Consolidator {
1041 context: "test".to_string(),
1042 shard_id: ShardId::new(),
1043 sort: sort.clone(),
1044 blob: Arc::new(MemBlob::open(MemBlobConfig::default())),
1045 metrics: Arc::clone(metrics),
1046 shard_metrics: metrics.shards.shard(&ShardId::new(), "test"),
1047 read_metrics: Arc::new(metrics.read.snapshot.clone()),
1048 runs: parts
1052 .into_iter()
1053 .map(|(mut part, cut)| {
1054 part.sort();
1055 let part_2 = part.split_off(cut.min(part.len()));
1056 [part, part_2]
1057 .into_iter()
1058 .map(|part| {
1059 let mut records = ColumnarRecordsBuilder::default();
1060 for ((k, v), t, d) in &part {
1061 assert!(records.push((
1062 (k, v),
1063 u64::encode(t),
1064 i64::encode(d)
1065 )));
1066 }
1067 let part = EncodedPart::new(
1068 metrics.read.snapshot.clone(),
1069 desc.clone(),
1070 "part",
1071 None,
1072 BlobTraceBatchPart {
1073 desc: desc.clone(),
1074 index: 0,
1075 updates: BlobTraceUpdates::Row(
1076 records.finish(&metrics.columnar),
1077 ),
1078 },
1079 );
1080 (
1081 ConsolidationPart::from_encoded(
1082 part,
1083 true,
1084 &metrics.columnar,
1085 &sort,
1086 ),
1087 0,
1088 )
1089 })
1090 .collect::<VecDeque<_>>()
1091 })
1092 .collect::<Vec<_>>(),
1093 filter,
1094 budget: 0,
1095 drop_stash: None,
1096 };
1097
1098 let mut out = vec![];
1099 loop {
1100 consolidator.trim();
1101 let Some(chunk) = consolidator.chunk(1000, 1000) else {
1102 break;
1103 };
1104 if chunk.len() > 0 {
1105 out.push(chunk);
1106 }
1107 }
1108 Part::concat(&out).expect("same schema")
1109 };
1110
1111 assert_eq!((original.len() > 0).then_some(original), streaming);
1112 }
1113
1114 let metrics = Arc::new(Metrics::new(
1115 &PersistConfig::new_for_tests(),
1116 &MetricsRegistry::new(),
1117 ));
1118
1119 let key_gen = (0..4usize).prop_map(|i| i.to_string().into_bytes()).boxed();
1121 let part_gen = vec(
1122 ((key_gen.clone(), key_gen.clone()), 0..10u64, -3..=3i64),
1123 0..10,
1124 );
1125 let run_gen = vec((part_gen, 0..10usize), 0..5);
1126 proptest!(|(state in run_gen)| {
1127 check(&metrics, state)
1128 });
1129 }
1130
1131 #[mz_ore::test(tokio::test)]
1132 #[cfg_attr(miri, ignore)] async fn prefetches() {
1134 fn check(budget: usize, runs: Vec<Vec<usize>>, prefetch_all: bool) {
1135 let desc = Description::new(
1136 Antichain::from_elem(0u64),
1137 Antichain::new(),
1138 Antichain::from_elem(0u64),
1139 );
1140
1141 let total_size: usize = runs.iter().flat_map(|run| run.iter().map(|p| *p)).sum();
1142
1143 let shard_id = ShardId::new();
1144 let blob: Arc<dyn Blob> = Arc::new(MemBlob::open(MemBlobConfig::default()));
1145 let metrics = Arc::new(Metrics::new(
1146 &PersistConfig::new_for_tests(),
1147 &MetricsRegistry::new(),
1148 ));
1149 let shard_metrics = metrics.shards.shard(&shard_id, "");
1150 let sort: StructuredSort<Vec<u8>, Vec<u8>, _, _> = StructuredSort::new(Schemas {
1151 id: None,
1152 key: Arc::new(VecU8Schema),
1153 val: Arc::new(VecU8Schema),
1154 });
1155 let mut consolidator: Consolidator<u64, i64, StructuredSort<_, _, _, _>> =
1156 Consolidator::new(
1157 "test".to_string(),
1158 shard_id,
1159 sort,
1160 blob,
1161 Arc::clone(&metrics),
1162 shard_metrics,
1163 metrics.read.batch_fetcher.clone(),
1164 FetchBatchFilter::Compaction {
1165 since: desc.since().clone(),
1166 },
1167 budget,
1168 );
1169
1170 for run in runs {
1171 let parts: Vec<_> = run
1172 .into_iter()
1173 .map(|encoded_size_bytes| {
1174 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1175 key: PartialBatchKey(
1176 "n0000000/p00000000-0000-0000-0000-000000000000".into(),
1177 ),
1178 encoded_size_bytes,
1179 key_lower: vec![],
1180 structured_key_lower: None,
1181 stats: None,
1182 ts_rewrite: None,
1183 diffs_sum: None,
1184 format: None,
1185 schema_id: None,
1186 deprecated_schema_id: None,
1187 }))
1188 })
1189 .collect();
1190 consolidator.enqueue_run(&desc, &RunMeta::default(), parts)
1191 }
1192
1193 let remaining = consolidator.start_prefetches();
1195 let live_bytes = consolidator.live_bytes();
1196 assert!(live_bytes <= budget, "budget should be respected");
1197 match remaining {
1198 None => assert!(live_bytes < total_size, "not all parts fetched"),
1199 Some(remaining) => assert_eq!(
1200 live_bytes + remaining,
1201 budget,
1202 "remaining should match budget"
1203 ),
1204 }
1205
1206 if prefetch_all {
1207 consolidator.budget = total_size;
1209 assert_eq!(consolidator.start_prefetches(), Some(0));
1210 } else {
1211 }
1214 }
1215
1216 let run_gen = vec(vec(0..20usize, 0..5usize), 0..5usize);
1217 proptest!(|(budget in 0..20usize, state in run_gen, prefetch_all in any::<bool>())| {
1218 check(budget, state, prefetch_all)
1219 });
1220 }
1221}