1use std::cmp::{Ordering, Reverse};
13use std::collections::binary_heap::PeekMut;
14use std::collections::{BinaryHeap, VecDeque};
15use std::fmt::Debug;
16use std::marker::PhantomData;
17use std::mem;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use arrow::array::{Array, Int64Array};
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use differential_dataflow::trace::Description;
25use futures_util::StreamExt;
26use futures_util::stream::FuturesUnordered;
27use itertools::Itertools;
28use mz_ore::task::JoinHandle;
29use mz_persist::indexed::encoding::BlobTraceUpdates;
30use mz_persist::location::Blob;
31use mz_persist::metrics::ColumnarMetrics;
32use mz_persist_types::arrow::{ArrayBound, ArrayIdx, ArrayOrd};
33use mz_persist_types::part::Part;
34use mz_persist_types::{Codec, Codec64};
35use semver::Version;
36use timely::progress::Timestamp;
37use tracing::{Instrument, debug_span};
38
39use crate::ShardId;
40use crate::fetch::{EncodedPart, FetchBatchFilter, FetchConfig};
41use crate::internal::encoding::Schemas;
42use crate::internal::metrics::{ReadMetrics, ShardMetrics};
43use crate::internal::state::{HollowRun, RunMeta, RunOrder, RunPart};
44use crate::metrics::Metrics;
45
46pub const MINIMUM_CONSOLIDATED_VERSION: Version = Version::new(0, 67, 0);
50
51#[derive(Debug, Clone)]
54pub(crate) struct FetchData<T> {
55 run_meta: RunMeta,
56 part_desc: Description<T>,
57 part: RunPart<T>,
58 structured_lower: Option<ArrayBound>,
59}
60
61pub(crate) trait RowSort<T, D> {
62 fn updates_from_blob(&self, updates: BlobTraceUpdates) -> StructuredUpdates;
63
64 fn updates_to_blob(&self, updates: StructuredUpdates) -> Part;
65}
66
67fn interleave_updates<T: Codec64, D: Codec64>(
68 updates: &[&Part],
69 elements: impl IntoIterator<Item = (Indices, T, D)>,
70) -> Part {
71 let (indices, timestamps, diffs): (Vec<_>, Vec<_>, Vec<_>) = elements
72 .into_iter()
73 .map(|(idx, t, d)| {
74 (
75 idx,
76 i64::from_le_bytes(T::encode(&t)),
77 i64::from_le_bytes(D::encode(&d)),
78 )
79 })
80 .multiunzip();
81
82 let mut arrays: Vec<&dyn Array> = Vec::with_capacity(updates.len());
83 let mut interleave = |get_array: fn(&Part) -> &dyn Array| {
84 arrays.clear();
85 for part in updates {
86 arrays.push(get_array(part));
87 }
88 ::arrow::compute::interleave(arrays.as_slice(), &indices).expect("type-aligned input")
89 };
90
91 let key = interleave(|p| &*p.key);
92 let val = interleave(|p| &*p.val);
93 Part {
94 key,
95 val,
96 time: Int64Array::from(timestamps),
97 diff: Int64Array::from(diffs),
98 }
99}
100
101#[derive(Clone, Debug)]
103pub struct StructuredUpdates {
104 key_ord: ArrayOrd,
105 val_ord: ArrayOrd,
106 data: Part,
107}
108
109impl StructuredUpdates {
110 fn len(&self) -> usize {
111 self.data.len()
112 }
113
114 fn get<T: Codec64, D: Codec64>(&self, index: usize) -> Option<(SortKV<'_>, T, D)> {
115 let t = self.data.time.values().get(index)?.to_le_bytes();
116 let d = self.data.diff.values().get(index)?.to_le_bytes();
117 Some((
118 (self.key_ord.at(index), Some(self.val_ord.at(index))),
119 T::decode(t),
120 D::decode(d),
121 ))
122 }
123
124 fn interleave_updates<'a, T: Codec64, D: Codec64>(
125 updates: &[&'a StructuredUpdates],
126 elements: impl IntoIterator<Item = (Indices, SortKV<'a>, T, D)>,
127 ) -> StructuredUpdates {
128 let updates: Vec<_> = updates.iter().map(|u| &u.data).collect();
129 let interleaved = interleave_updates(
130 &updates,
131 elements.into_iter().map(|(idx, _, t, d)| (idx, t, d)),
132 );
133 let key_ord = ArrayOrd::new(interleaved.key.as_ref());
134 let val_ord = ArrayOrd::new(interleaved.val.as_ref());
135 StructuredUpdates {
136 key_ord,
137 val_ord,
138 data: interleaved,
139 }
140 }
141}
142
143#[derive(Debug, Clone)]
145pub struct StructuredSort<K: Codec, V: Codec, T, D> {
146 schemas: Schemas<K, V>,
147 _time_diff: PhantomData<fn(T, D)>,
148}
149
150impl<K: Codec, V: Codec, T, D> StructuredSort<K, V, T, D> {
151 pub fn new(schemas: Schemas<K, V>) -> Self {
153 Self {
154 schemas,
155 _time_diff: Default::default(),
156 }
157 }
158}
159
160type SortKV<'a> = (ArrayIdx<'a>, Option<ArrayIdx<'a>>);
161
162fn kv_lower<T>(data: &FetchData<T>) -> Option<SortKV<'_>> {
163 let key_idx = data.structured_lower.as_ref().map(|l| l.get())?;
164 Some((key_idx, None))
165}
166
167fn kv_size((key, value): SortKV<'_>) -> usize {
168 key.goodbytes() + value.map_or(0, |v| v.goodbytes())
169}
170
171impl<K: Codec, V: Codec, T: Codec64, D: Codec64> RowSort<T, D> for StructuredSort<K, V, T, D> {
172 fn updates_from_blob(&self, mut updates: BlobTraceUpdates) -> StructuredUpdates {
173 let structured = updates
174 .get_or_make_structured::<K, V>(&*self.schemas.key, &*self.schemas.val)
175 .clone();
176 let key_ord = ArrayOrd::new(&structured.key);
177 let val_ord = ArrayOrd::new(&structured.val);
178 StructuredUpdates {
179 key_ord,
180 val_ord,
181 data: Part {
182 key: structured.key,
183 val: structured.val,
184 time: updates.timestamps().clone(),
185 diff: updates.diffs().clone(),
186 },
187 }
188 }
189
190 fn updates_to_blob(&self, updates: StructuredUpdates) -> Part {
191 updates.data
192 }
193}
194
195type FetchResult<T> = Result<EncodedPart<T>, HollowRun<T>>;
196
197impl<T: Codec64 + Timestamp + Lattice> FetchData<T> {
198 async fn fetch(
199 self,
200 cfg: &FetchConfig,
201 shard_id: ShardId,
202 blob: &dyn Blob,
203 metrics: &Metrics,
204 shard_metrics: &ShardMetrics,
205 read_metrics: &ReadMetrics,
206 ) -> anyhow::Result<FetchResult<T>> {
207 match self.part {
208 RunPart::Single(part) => {
209 let part = EncodedPart::fetch(
210 cfg,
211 &shard_id,
212 &*blob,
213 metrics,
214 shard_metrics,
215 read_metrics,
216 &self.part_desc,
217 &part,
218 )
219 .await
220 .map_err(|blob_key| anyhow!("missing unleased key {blob_key}"))?;
221 Ok(Ok(part))
222 }
223 RunPart::Many(run_ref) => {
224 let runs = run_ref
225 .get(shard_id, blob, metrics)
226 .await
227 .ok_or_else(|| anyhow!("missing run ref {}", run_ref.key))?;
228 Ok(Err(runs))
229 }
230 }
231 }
232}
233
234#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Default)]
239struct PartIndices {
240 sorted_indices: VecDeque<usize>,
241 next_index: usize,
242}
243
244impl PartIndices {
245 fn index(&self) -> usize {
246 self.sorted_indices
247 .front()
248 .copied()
249 .unwrap_or(self.next_index)
250 }
251
252 fn inc(&mut self) {
253 if self.sorted_indices.pop_front().is_none() {
254 self.next_index += 1;
255 }
256 }
257}
258
259#[derive(Debug)]
260enum ConsolidationPart<T, D> {
261 Queued {
262 data: FetchData<T>,
263 task: Option<JoinHandle<anyhow::Result<FetchResult<T>>>>,
264 _diff: PhantomData<D>,
265 },
266 Encoded {
267 part: StructuredUpdates,
268 cursor: PartIndices,
269 },
270}
271
272impl<T: Timestamp + Codec64 + Lattice, D: Codec64> ConsolidationPart<T, D> {
273 pub(crate) fn from_encoded(
274 part: EncodedPart<T>,
275 force_reconsolidation: bool,
276 metrics: &ColumnarMetrics,
277 sort: &impl RowSort<T, D>,
278 ) -> Self {
279 let reconsolidate = part.maybe_unconsolidated() || force_reconsolidation;
280 let updates = part.normalize(metrics);
281 let updates: StructuredUpdates = sort.updates_from_blob(updates);
282 let cursor = if reconsolidate {
283 let len = updates.len();
284 let mut indices: Vec<_> = (0..len).collect();
285
286 indices.sort_by_key(|i| updates.get::<T, D>(*i).map(|(kv, t, _d)| (kv, t)));
287
288 PartIndices {
289 sorted_indices: indices.into(),
290 next_index: len,
291 }
292 } else {
293 PartIndices::default()
294 };
295
296 ConsolidationPart::Encoded {
297 part: updates,
298 cursor,
299 }
300 }
301
302 fn kvt_lower(&self) -> Option<(SortKV<'_>, T)> {
303 match self {
304 ConsolidationPart::Queued { data, .. } => Some((kv_lower(data)?, T::minimum())),
305 ConsolidationPart::Encoded { part, cursor } => {
306 let (kv, t, _d) = part.get::<T, D>(cursor.index())?;
307 Some((kv, t))
308 }
309 }
310 }
311
312 pub(crate) fn is_empty(&self) -> bool {
315 match self {
316 ConsolidationPart::Encoded { part, cursor, .. } => cursor.index() >= part.len(),
317 ConsolidationPart::Queued { .. } => false,
318 }
319 }
320}
321
322#[derive(Debug)]
337pub(crate) struct Consolidator<T, D, Sort: RowSort<T, D>> {
338 context: String,
339 cfg: FetchConfig,
340 shard_id: ShardId,
341 sort: Sort,
342 blob: Arc<dyn Blob>,
343 metrics: Arc<Metrics>,
344 shard_metrics: Arc<ShardMetrics>,
345 read_metrics: Arc<ReadMetrics>,
346 runs: Vec<VecDeque<(ConsolidationPart<T, D>, usize)>>,
347 filter: FetchBatchFilter<T>,
348 budget: usize,
349 lower_bound: Option<LowerBound<T>>,
353 drop_stash: Option<StructuredUpdates>,
359}
360
361#[derive(Debug)]
362pub struct LowerBound<T> {
364 pub(crate) key_bound: ArrayBound,
365 pub(crate) val_bound: ArrayBound,
366 pub(crate) t: T,
367}
368
369impl<T: Clone> LowerBound<T> {
370 pub fn kvt_bound(&self) -> (SortKV<'_>, T) {
372 (
373 (self.key_bound.get(), Some(self.val_bound.get())),
374 self.t.clone(),
375 )
376 }
377}
378
379impl<T, D, Sort> Consolidator<T, D, Sort>
380where
381 T: Timestamp + Codec64 + Lattice,
382 D: Codec64 + Semigroup + Ord,
383 Sort: RowSort<T, D>,
384{
385 pub fn new(
389 context: String,
390 cfg: FetchConfig,
391 shard_id: ShardId,
392 sort: Sort,
393 blob: Arc<dyn Blob>,
394 metrics: Arc<Metrics>,
395 shard_metrics: Arc<ShardMetrics>,
396 read_metrics: ReadMetrics,
397 filter: FetchBatchFilter<T>,
398 lower_bound: Option<LowerBound<T>>,
399 prefetch_budget_bytes: usize,
400 ) -> Self {
401 Self {
402 context,
403 cfg,
404 metrics,
405 shard_id,
406 sort,
407 blob,
408 read_metrics: Arc::new(read_metrics),
409 shard_metrics,
410 runs: vec![],
411 filter,
412 budget: prefetch_budget_bytes,
413 drop_stash: None,
414 lower_bound,
415 }
416 }
417}
418
419impl<T, D, Sort> Consolidator<T, D, Sort>
420where
421 T: Timestamp + Codec64 + Lattice + Sync,
422 D: Codec64 + Semigroup + Ord,
423 Sort: RowSort<T, D>,
424{
425 pub fn enqueue_run(
432 &mut self,
433 desc: &Description<T>,
434 run_meta: &RunMeta,
435 parts: impl IntoIterator<Item = RunPart<T>>,
436 ) {
437 let run = parts
438 .into_iter()
439 .map(|part| {
440 let bytes = part.encoded_size_bytes();
441 let c_part = ConsolidationPart::Queued {
442 data: FetchData {
443 run_meta: run_meta.clone(),
444 part_desc: desc.clone(),
445 structured_lower: part.structured_key_lower(),
446 part,
447 },
448 task: None,
449 _diff: Default::default(),
450 };
451 (c_part, bytes)
452 })
453 .collect();
454 self.push_run(run);
455 }
456
457 fn push_run(&mut self, run: VecDeque<(ConsolidationPart<T, D>, usize)>) {
458 let wrong_sort = run.iter().any(|(p, _)| match p {
462 ConsolidationPart::Queued { data, .. } => {
463 data.run_meta.order != Some(RunOrder::Structured)
464 }
465 ConsolidationPart::Encoded { .. } => false,
466 });
467
468 if wrong_sort {
469 self.metrics.consolidation.wrong_sort.inc();
470 }
471
472 if run.len() > 1 && wrong_sort {
473 for part in run {
474 self.runs.push(VecDeque::from([part]));
475 }
476 } else {
477 self.runs.push(run);
478 }
479 }
480
481 fn trim(&mut self) {
483 self.runs.retain_mut(|run| {
484 while run.front_mut().map_or(false, |(part, _)| part.is_empty()) {
485 run.pop_front();
486 }
487 !run.is_empty()
488 });
489
490 self.start_prefetches();
492 }
493
494 fn iter(&mut self) -> Option<ConsolidatingIter<'_, T, D>> {
498 if let Some(part) = self.drop_stash.take() {
504 self.runs.push(VecDeque::from_iter([(
505 ConsolidationPart::Encoded {
506 part,
507 cursor: PartIndices::default(),
508 },
509 0,
510 )]));
511 }
512
513 if self.runs.is_empty() {
514 return None;
515 }
516
517 let bound = self.lower_bound.as_ref().map(|b| b.kvt_bound());
518 let mut iter =
519 ConsolidatingIter::new(&self.context, &self.filter, bound, &mut self.drop_stash);
520
521 for run in &mut self.runs {
522 let last_in_run = run.len() < 2;
523 if let Some((part, _)) = run.front_mut() {
524 match part {
525 ConsolidationPart::Encoded { part, cursor } => {
526 iter.push(part, cursor, last_in_run);
527 }
528 other @ ConsolidationPart::Queued { .. } => {
529 if let Some(bound) = other.kvt_lower() {
532 iter.push_upper(bound);
533 }
534 }
535 };
536 }
537 }
538
539 Some(iter)
540 }
541
542 async fn unblock_progress(&mut self) -> anyhow::Result<()> {
548 if self.runs.is_empty() {
549 return Ok(());
550 }
551 self.runs
552 .sort_by(|a, b| a[0].0.kvt_lower().cmp(&b[0].0.kvt_lower()));
553
554 let first_larger = {
555 let run = &self.runs[0];
556 let min_lower = run[0].0.kvt_lower();
557 self.runs
558 .iter()
559 .position(|q| q[0].0.kvt_lower() > min_lower)
560 .unwrap_or(self.runs.len())
561 };
562
563 let mut ready_futures: FuturesUnordered<_> = self.runs[0..first_larger]
564 .iter_mut()
565 .map(|run| async {
566 loop {
571 let (mut part, size) = run.pop_front().expect("trimmed run should be nonempty");
572
573 let ConsolidationPart::Queued { data, task, .. } = &mut part else {
574 run.push_front((part, size));
575 return Ok(true);
576 };
577
578 let is_prefetched = task.as_ref().map_or(false, |t| t.is_finished());
579 if is_prefetched {
580 self.metrics.compaction.parts_prefetched.inc();
581 } else {
582 self.metrics.compaction.parts_waited.inc()
583 }
584 self.metrics.consolidation.parts_fetched.inc();
585
586 let wrong_sort = data.run_meta.order != Some(RunOrder::Structured);
587 let fetch_result: anyhow::Result<FetchResult<T>> = match task.take() {
588 Some(handle) => handle
589 .await
590 .unwrap_or_else(|join_err| Err(anyhow!(join_err))),
591 None => {
592 data.clone()
593 .fetch(
594 &self.cfg,
595 self.shard_id,
596 &*self.blob,
597 &*self.metrics,
598 &*self.shard_metrics,
599 &self.read_metrics,
600 )
601 .await
602 }
603 };
604 match fetch_result {
605 Err(err) => {
606 run.push_front((part, size));
607 return Err(err);
608 }
609 Ok(Err(run_part)) => {
610 for part in run_part.parts.into_iter().rev() {
613 let structured_lower = part.structured_key_lower();
614 let size = part.max_part_bytes();
615 run.push_front((
616 ConsolidationPart::Queued {
617 data: FetchData {
618 run_meta: data.run_meta.clone(),
619 part_desc: data.part_desc.clone(),
620 part,
621 structured_lower,
622 },
623 task: None,
624 _diff: Default::default(),
625 },
626 size,
627 ));
628 }
629 }
630 Ok(Ok(part)) => {
631 run.push_front((
632 ConsolidationPart::from_encoded(
633 part,
634 wrong_sort,
635 &self.metrics.columnar,
636 &self.sort,
637 ),
638 size,
639 ));
640 }
641 }
642 }
643 })
644 .collect();
645
646 let mut total_ready = 0;
648 while let Some(awaited) = ready_futures.next().await {
649 if awaited? {
650 total_ready += 1;
651 }
652 }
653 assert!(
654 total_ready > 0,
655 "at least one part should be fetched and ready to go"
656 );
657
658 Ok(())
659 }
660
661 #[allow(unused)]
665 pub(crate) async fn next(
666 &mut self,
667 ) -> anyhow::Result<Option<impl Iterator<Item = (SortKV<'_>, T, D)>>> {
668 self.trim();
669 self.unblock_progress().await?;
670 Ok(self.iter().map(|i| i.map(|(_idx, kv, t, d)| (kv, t, d))))
671 }
672
673 fn chunk(&mut self, max_len: usize, max_bytes: usize) -> Option<Part> {
674 let Some(mut iter) = self.iter() else {
675 return None;
676 };
677
678 let parts = iter.parts.clone();
679
680 let mut budget = max_bytes;
686 let iter = std::iter::from_fn(move || {
687 if budget == 0 {
688 return None;
689 }
690 let update @ (_, kv, _, _) = iter.next()?;
691 budget = budget.saturating_sub(kv_size(kv) + 16);
693 Some(update)
694 });
695
696 let updates = StructuredUpdates::interleave_updates(&parts, iter.take(max_len));
697 let updates = self.sort.updates_to_blob(updates);
698 Some(updates)
699 }
700
701 pub(crate) async fn next_chunk(
705 &mut self,
706 max_len: usize,
707 max_bytes: usize,
708 ) -> anyhow::Result<Option<Part>> {
709 self.trim();
710 self.unblock_progress().await?;
711 Ok(self.chunk(max_len, max_bytes))
712 }
713
714 fn live_bytes(&self) -> usize {
718 self.runs
719 .iter()
720 .flat_map(|run| {
721 run.iter().map(|(part, size)| match part {
722 ConsolidationPart::Queued { task: None, .. } => 0,
723 ConsolidationPart::Queued { task: Some(_), .. }
724 | ConsolidationPart::Encoded { .. } => *size,
725 })
726 })
727 .sum()
728 }
729
730 pub(crate) fn start_prefetches(&mut self) -> Option<usize> {
732 let mut prefetch_budget_bytes = self.budget;
733
734 let mut check_budget = |size| {
735 prefetch_budget_bytes
737 .checked_sub(size)
738 .map(|remaining| prefetch_budget_bytes = remaining)
739 };
740
741 let live_bytes = self.live_bytes();
743 check_budget(live_bytes)?;
744 let max_run_len = self.runs.iter().map(|x| x.len()).max().unwrap_or_default();
756 for idx in 0..max_run_len {
757 for run in self.runs.iter_mut() {
758 if let Some((c_part, size)) = run.get_mut(idx) {
759 let (data, task) = match c_part {
760 ConsolidationPart::Queued { data, task, .. } if task.is_none() => {
761 check_budget(*size)?;
762 (data, task)
763 }
764 _ => continue,
765 };
766 let span = debug_span!("compaction::prefetch");
767 let data = data.clone();
768 let handle = mz_ore::task::spawn(|| "persist::compaction::prefetch", {
769 let shard_id = self.shard_id;
770 let blob = Arc::clone(&self.blob);
771 let metrics = Arc::clone(&self.metrics);
772 let shard_metrics = Arc::clone(&self.shard_metrics);
773 let read_metrics = Arc::clone(&self.read_metrics);
774 let fetch_config = self.cfg.clone();
775 async move {
776 data.fetch(
777 &fetch_config,
778 shard_id,
779 &*blob,
780 &*metrics,
781 &*shard_metrics,
782 &*read_metrics,
783 )
784 .instrument(span)
785 .await
786 }
787 });
788 *task = Some(handle);
789 }
790 }
791 }
792
793 Some(prefetch_budget_bytes)
794 }
795}
796
797impl<T, D, Sort: RowSort<T, D>> Drop for Consolidator<T, D, Sort> {
798 fn drop(&mut self) {
799 for run in &self.runs {
800 for (part, _) in run {
801 match part {
802 ConsolidationPart::Queued { task: None, .. } => {
803 self.metrics.consolidation.parts_skipped.inc();
804 }
805 ConsolidationPart::Queued { task: Some(_), .. } => {
806 self.metrics.consolidation.parts_wasted.inc();
807 }
808 _ => {}
809 }
810 }
811 }
812 }
813}
814
815type Indices = (usize, usize);
819
820#[derive(Debug, Ord, PartialOrd, Eq, PartialEq)]
822struct PartRef<'a, T: Timestamp, D> {
823 next_kvt: Reverse<Option<(SortKV<'a>, T, D)>>,
827 part_index: usize,
829 row_index: &'a mut PartIndices,
833 last_in_run: bool,
836 _phantom: PhantomData<D>,
837}
838
839impl<'a, T: Timestamp + Codec64 + Lattice, D: Codec64 + Semigroup> PartRef<'a, T, D> {
840 fn update_peek(&mut self, part: &'a StructuredUpdates, filter: &FetchBatchFilter<T>) {
841 let mut peek = part.get(self.row_index.index());
842 while let Some((_kv, t, _d)) = &mut peek {
843 let keep = filter.filter_ts(t);
844 if keep {
845 break;
846 } else {
847 self.row_index.inc();
848 peek = part.get(self.row_index.index());
849 }
850 }
851 self.next_kvt = Reverse(peek);
852 }
853
854 fn pop(
855 &mut self,
856 from: &[&'a StructuredUpdates],
857 filter: &FetchBatchFilter<T>,
858 ) -> Option<(Indices, SortKV<'a>, T, D)> {
859 let part = &from[self.part_index];
860 let Reverse(popped) = mem::take(&mut self.next_kvt);
861 let indices = (self.part_index, self.row_index.index());
862 self.row_index.inc();
863 self.update_peek(part, filter);
864 let (kv, t, d) = popped?;
865 Some((indices, kv, t, d))
866 }
867}
868
869#[derive(Debug)]
870pub(crate) struct ConsolidatingIter<'a, T, D>
871where
872 T: Timestamp + Codec64,
873 D: Codec64,
874{
875 context: &'a str,
876 filter: &'a FetchBatchFilter<T>,
877 parts: Vec<&'a StructuredUpdates>,
878 heap: BinaryHeap<PartRef<'a, T, D>>,
879 upper_bound: Option<(SortKV<'a>, T)>,
880 lower_bound: Option<(SortKV<'a>, T)>,
881 state: Option<(Indices, SortKV<'a>, T, D)>,
882 drop_stash: &'a mut Option<StructuredUpdates>,
883}
884
885impl<'a, T, D> ConsolidatingIter<'a, T, D>
886where
887 T: Timestamp + Codec64 + Lattice,
888 D: Codec64 + Semigroup + Ord,
889{
890 fn new(
891 context: &'a str,
892 filter: &'a FetchBatchFilter<T>,
893 lower_bound: Option<(SortKV<'a>, T)>,
894 drop_stash: &'a mut Option<StructuredUpdates>,
895 ) -> Self {
896 Self {
897 context,
898 filter,
899 parts: vec![],
900 heap: BinaryHeap::new(),
901 upper_bound: None,
902 state: None,
903 drop_stash,
904 lower_bound,
905 }
906 }
907
908 fn push(&mut self, iter: &'a StructuredUpdates, index: &'a mut PartIndices, last_in_run: bool) {
909 let mut part_ref = PartRef {
910 next_kvt: Reverse(None),
911 part_index: self.parts.len(),
912 row_index: index,
913 last_in_run,
914 _phantom: Default::default(),
915 };
916 part_ref.update_peek(iter, self.filter);
917 self.parts.push(iter);
918 self.heap.push(part_ref);
919 }
920
921 fn push_upper(&mut self, upper: (SortKV<'a>, T)) {
924 let update_bound = self
925 .upper_bound
926 .as_ref()
927 .map_or(true, |existing| *existing > upper);
928 if update_bound {
929 self.upper_bound = Some(upper);
930 }
931 }
932
933 fn consolidate(&mut self) -> Option<(Indices, SortKV<'a>, T, D)> {
935 loop {
936 let Some(mut part) = self.heap.peek_mut() else {
937 break;
938 };
939 if let Some((kv1, t1, _)) = part.next_kvt.0.as_ref() {
940 if let Some((idx0, kv0, t0, d0)) = &mut self.state {
941 let consolidates = match (*kv0, &*t0).cmp(&(*kv1, t1)) {
942 Ordering::Less => false,
943 Ordering::Equal => true,
944 Ordering::Greater => {
945 panic!(
948 "data arrived at the consolidator out of order ({}, kvs equal? {}, {t0:?}, {t1:?})",
949 self.context,
950 (*kv0) == (*kv1)
951 );
952 }
953 };
954 if consolidates {
955 let (idx1, _, _, d1) = part
956 .pop(&self.parts, self.filter)
957 .expect("popping from a non-empty iterator");
958 d0.plus_equals(&d1);
959 *idx0 = idx1;
960 } else {
961 break;
962 }
963 } else {
964 if let Some((kv0, t0)) = &self.upper_bound {
967 if (kv0, t0) <= (kv1, t1) {
968 return None;
969 }
970 }
971
972 if let Some((kv_lower, t_lower)) = &self.lower_bound {
975 if (kv_lower, t_lower) >= (kv1, t1) {
976 let _ = part.pop(&self.parts, self.filter);
978
979 continue;
981 }
982 }
983
984 self.state = part.pop(&self.parts, self.filter);
985 }
986 } else {
987 if part.last_in_run {
988 PeekMut::pop(part);
989 } else {
990 return None;
993 }
994 }
995 }
996
997 self.state.take()
998 }
999}
1000
1001impl<'a, T, D> Iterator for ConsolidatingIter<'a, T, D>
1002where
1003 T: Timestamp + Codec64 + Lattice,
1004 D: Codec64 + Semigroup + Ord,
1005{
1006 type Item = (Indices, SortKV<'a>, T, D);
1007
1008 fn next(&mut self) -> Option<Self::Item> {
1009 loop {
1010 match self.consolidate() {
1011 Some((_, _, _, d)) if d.is_zero() => continue,
1012 other => break other,
1013 }
1014 }
1015 }
1016}
1017
1018impl<'a, T, D> Drop for ConsolidatingIter<'a, T, D>
1019where
1020 T: Timestamp + Codec64,
1021 D: Codec64,
1022{
1023 fn drop(&mut self) {
1024 if let Some(update) = self.state.take() {
1027 let part = StructuredUpdates::interleave_updates(&self.parts, [update]);
1028 *self.drop_stash = Some(part);
1029 }
1030 }
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035 use super::*;
1036
1037 use std::sync::Arc;
1038
1039 use crate::ShardId;
1040 use crate::cfg::PersistConfig;
1041 use crate::internal::paths::PartialBatchKey;
1042 use crate::internal::state::{BatchPart, HollowBatchPart};
1043 use crate::metrics::Metrics;
1044 use arrow::array::BinaryArray;
1045 use differential_dataflow::consolidation::consolidate_updates;
1046 use differential_dataflow::trace::Description;
1047 use mz_ore::metrics::MetricsRegistry;
1048 use mz_persist::indexed::columnar::ColumnarRecordsBuilder;
1049 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1050 use mz_persist::location::Blob;
1051 use mz_persist::mem::{MemBlob, MemBlobConfig};
1052 use mz_persist_types::codec_impls::VecU8Schema;
1053 use mz_persist_types::part::PartBuilder;
1054 use proptest::collection::vec;
1055 use proptest::prelude::*;
1056 use timely::progress::Antichain;
1057
1058 #[mz_ore::test]
1059 #[cfg_attr(miri, ignore)] fn consolidation() {
1061 type Rows = Vec<((Vec<u8>, Vec<u8>), u64, i64)>;
1063
1064 fn check(
1065 metrics: &Arc<Metrics>,
1066 parts: Vec<(Rows, usize)>,
1067 lower_bound: (Vec<u8>, Vec<u8>, u64),
1068 ) {
1069 let schemas = Schemas {
1070 id: None,
1071 key: Arc::new(VecU8Schema),
1072 val: Arc::new(VecU8Schema),
1073 };
1074 let original = {
1075 let mut rows = parts
1076 .iter()
1077 .flat_map(|(p, _)| p.clone())
1078 .filter(|((k, v), t, _)| {
1079 let (k_lower, v_lower, t_lower) = &lower_bound;
1080 ((k_lower, v_lower), t_lower) < ((k, v), t)
1082 })
1083 .collect::<Vec<_>>();
1084
1085 consolidate_updates(&mut rows);
1086 let mut builder = PartBuilder::new(&*schemas.key, &*schemas.val);
1087 for ((k, v), t, d) in &rows {
1088 builder.push(k, v, *t, *d);
1089 }
1090 let part = builder.finish();
1091 part
1092 };
1093 let filter = FetchBatchFilter::Compaction {
1094 since: Antichain::from_elem(0),
1095 };
1096 let desc = Description::new(
1097 Antichain::from_elem(0),
1098 Antichain::new(),
1099 Antichain::from_elem(0),
1100 );
1101 let key_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.0]);
1102 let val_lower_bound_array = BinaryArray::from_vec(vec![&lower_bound.1]);
1103 let lower_bound = LowerBound {
1104 key_bound: ArrayBound::new(Arc::new(key_lower_bound_array), 0),
1105 val_bound: ArrayBound::new(Arc::new(val_lower_bound_array), 0),
1106 t: lower_bound.2,
1107 };
1108 let sort: StructuredSort<Vec<u8>, Vec<u8>, u64, i64> =
1109 StructuredSort::new(schemas.clone());
1110 let streaming = {
1111 let fetch_cfg = FetchConfig {
1113 validate_bounds_on_read: true,
1114 };
1115 let mut consolidator = Consolidator {
1116 cfg: fetch_cfg.clone(),
1117 context: "test".to_string(),
1118 shard_id: ShardId::new(),
1119 sort: sort.clone(),
1120 blob: Arc::new(MemBlob::open(MemBlobConfig::default())),
1121 metrics: Arc::clone(metrics),
1122 shard_metrics: metrics.shards.shard(&ShardId::new(), "test"),
1123 read_metrics: Arc::new(metrics.read.snapshot.clone()),
1124 runs: parts
1128 .into_iter()
1129 .map(|(mut part, cut)| {
1130 part.sort();
1131 let part_2 = part.split_off(cut.min(part.len()));
1132 [part, part_2]
1133 .into_iter()
1134 .map(|part| {
1135 let mut records = ColumnarRecordsBuilder::default();
1136 for ((k, v), t, d) in &part {
1137 assert!(records.push((
1138 (k, v),
1139 u64::encode(t),
1140 i64::encode(d)
1141 )));
1142 }
1143 let part = EncodedPart::new(
1144 &fetch_cfg,
1145 metrics.read.snapshot.clone(),
1146 desc.clone(),
1147 "part",
1148 None,
1149 BlobTraceBatchPart {
1150 desc: desc.clone(),
1151 index: 0,
1152 updates: BlobTraceUpdates::Row(
1153 records.finish(&metrics.columnar),
1154 ),
1155 },
1156 );
1157 (
1158 ConsolidationPart::from_encoded(
1159 part,
1160 true,
1161 &metrics.columnar,
1162 &sort,
1163 ),
1164 0,
1165 )
1166 })
1167 .collect::<VecDeque<_>>()
1168 })
1169 .collect::<Vec<_>>(),
1170 filter,
1171 budget: 0,
1172 drop_stash: None,
1173 lower_bound: Some(lower_bound),
1174 };
1175
1176 let mut out = vec![];
1177 loop {
1178 consolidator.trim();
1179 let Some(chunk) = consolidator.chunk(1000, 1000) else {
1180 break;
1181 };
1182 if chunk.len() > 0 {
1183 out.push(chunk);
1184 }
1185 }
1186 Part::concat(&out).expect("same schema")
1187 };
1188
1189 assert_eq!((original.len() > 0).then_some(original), streaming);
1190 }
1191
1192 let metrics = Arc::new(Metrics::new(
1193 &PersistConfig::new_for_tests(),
1194 &MetricsRegistry::new(),
1195 ));
1196
1197 let key_gen = (0..4usize).prop_map(|i| i.to_string().into_bytes()).boxed();
1199 let part_gen = vec(
1200 ((key_gen.clone(), key_gen.clone()), 0..10u64, -3..=3i64),
1201 0..10,
1202 );
1203 let kvt_gen = (key_gen.clone(), key_gen.clone(), 0..10u64);
1204 let run_gen = vec((part_gen, 0..10usize), 0..5);
1205 proptest!(|(state in run_gen, bound in kvt_gen)| {
1206 check(&metrics, state, bound)
1207 });
1208 }
1209
1210 #[mz_ore::test(tokio::test)]
1211 #[cfg_attr(miri, ignore)] async fn prefetches() {
1213 fn check(budget: usize, runs: Vec<Vec<usize>>, prefetch_all: bool) {
1214 let desc = Description::new(
1215 Antichain::from_elem(0u64),
1216 Antichain::new(),
1217 Antichain::from_elem(0u64),
1218 );
1219
1220 let total_size: usize = runs.iter().flat_map(|run| run.iter().map(|p| *p)).sum();
1221
1222 let shard_id = ShardId::new();
1223 let blob: Arc<dyn Blob> = Arc::new(MemBlob::open(MemBlobConfig::default()));
1224 let metrics = Arc::new(Metrics::new(
1225 &PersistConfig::new_for_tests(),
1226 &MetricsRegistry::new(),
1227 ));
1228 let shard_metrics = metrics.shards.shard(&shard_id, "");
1229 let sort: StructuredSort<Vec<u8>, Vec<u8>, _, _> = StructuredSort::new(Schemas {
1230 id: None,
1231 key: Arc::new(VecU8Schema),
1232 val: Arc::new(VecU8Schema),
1233 });
1234
1235 let fetch_cfg = FetchConfig {
1236 validate_bounds_on_read: true,
1237 };
1238
1239 let mut consolidator: Consolidator<u64, i64, StructuredSort<_, _, _, _>> =
1240 Consolidator::new(
1241 "test".to_string(),
1242 fetch_cfg,
1243 shard_id,
1244 sort,
1245 blob,
1246 Arc::clone(&metrics),
1247 shard_metrics,
1248 metrics.read.batch_fetcher.clone(),
1249 FetchBatchFilter::Compaction {
1250 since: desc.since().clone(),
1251 },
1252 None,
1253 budget,
1254 );
1255
1256 for run in runs {
1257 let parts: Vec<_> = run
1258 .into_iter()
1259 .map(|encoded_size_bytes| {
1260 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1261 key: PartialBatchKey(
1262 "n0000000/p00000000-0000-0000-0000-000000000000".into(),
1263 ),
1264 encoded_size_bytes,
1265 key_lower: vec![],
1266 structured_key_lower: None,
1267 stats: None,
1268 ts_rewrite: None,
1269 diffs_sum: None,
1270 format: None,
1271 schema_id: None,
1272 deprecated_schema_id: None,
1273 }))
1274 })
1275 .collect();
1276 consolidator.enqueue_run(&desc, &RunMeta::default(), parts)
1277 }
1278
1279 let remaining = consolidator.start_prefetches();
1281 let live_bytes = consolidator.live_bytes();
1282 assert!(live_bytes <= budget, "budget should be respected");
1283 match remaining {
1284 None => assert!(live_bytes < total_size, "not all parts fetched"),
1285 Some(remaining) => assert_eq!(
1286 live_bytes + remaining,
1287 budget,
1288 "remaining should match budget"
1289 ),
1290 }
1291
1292 if prefetch_all {
1293 consolidator.budget = total_size;
1295 assert_eq!(consolidator.start_prefetches(), Some(0));
1296 } else {
1297 }
1300 }
1301
1302 let run_gen = vec(vec(0..20usize, 0..5usize), 0..5usize);
1303 proptest!(|(budget in 0..20usize, state in run_gen, prefetch_all in any::<bool>())| {
1304 check(budget, state, prefetch_all)
1305 });
1306 }
1307}