1use arrayvec::ArrayVec;
51use differential_dataflow::difference::Semigroup;
52use mz_persist::metrics::ColumnarMetrics;
53use mz_persist_types::Codec64;
54use std::cmp::Ordering;
55use std::collections::{BTreeMap, BTreeSet};
56use std::fmt::Debug;
57use std::mem;
58use std::ops::Range;
59use std::sync::Arc;
60use tracing::error;
61
62use crate::internal::paths::WriterKey;
63use differential_dataflow::lattice::Lattice;
64use differential_dataflow::trace::Description;
65use mz_ore::cast::CastFrom;
66#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
68use serde::{Serialize, Serializer};
69use timely::PartialOrder;
70use timely::progress::frontier::AntichainRef;
71use timely::progress::{Antichain, Timestamp};
72
73use crate::internal::state::{HollowBatch, RunId};
74
75use super::state::RunPart;
76
77#[derive(Debug, Clone, PartialEq)]
78pub struct FueledMergeReq<T> {
79 pub id: SpineId,
80 pub desc: Description<T>,
81 pub inputs: Vec<IdHollowBatch<T>>,
82}
83
84#[derive(Debug)]
85pub struct FueledMergeRes<T> {
86 pub output: HollowBatch<T>,
87 pub input: CompactionInput,
88 pub new_active_compaction: Option<ActiveCompaction>,
89}
90
91#[derive(Debug, Clone)]
96pub struct Trace<T> {
97 spine: Spine<T>,
98 pub(crate) roundtrip_structure: bool,
99}
100
101#[cfg(any(test, debug_assertions))]
102impl<T: PartialEq> PartialEq for Trace<T> {
103 fn eq(&self, other: &Self) -> bool {
104 let Trace {
107 spine: _,
108 roundtrip_structure: _,
109 } = self;
110 let Trace {
111 spine: _,
112 roundtrip_structure: _,
113 } = other;
114
115 self.batches().eq(other.batches())
118 }
119}
120
121impl<T: Timestamp + Lattice> Default for Trace<T> {
122 fn default() -> Self {
123 Self {
124 spine: Spine::new(),
125 roundtrip_structure: true,
126 }
127 }
128}
129
130#[derive(Clone, Debug, Serialize)]
131pub struct ThinSpineBatch<T> {
132 pub(crate) level: usize,
133 pub(crate) desc: Description<T>,
134 pub(crate) parts: Vec<SpineId>,
135 pub(crate) descs: Vec<Description<T>>,
138}
139
140impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
141 fn eq(&self, other: &Self) -> bool {
142 (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
144 }
145}
146
147#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
148pub struct ThinMerge<T> {
149 pub(crate) since: Antichain<T>,
150 pub(crate) remaining_work: usize,
151 pub(crate) active_compaction: Option<ActiveCompaction>,
152}
153
154impl<T: Clone> ThinMerge<T> {
155 fn fueling(merge: &FuelingMerge<T>) -> Self {
156 ThinMerge {
157 since: merge.since.clone(),
158 remaining_work: merge.remaining_work,
159 active_compaction: None,
160 }
161 }
162
163 fn fueled(batch: &SpineBatch<T>) -> Self {
164 ThinMerge {
165 since: batch.desc.since().clone(),
166 remaining_work: 0,
167 active_compaction: batch.active_compaction.clone(),
168 }
169 }
170}
171
172#[derive(Clone, Debug)]
180pub struct FlatTrace<T> {
181 pub(crate) since: Antichain<T>,
182 pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
190 pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
193 pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
198 pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
202}
203
204impl<T: Timestamp + Lattice> Trace<T> {
205 pub(crate) fn flatten(&self) -> FlatTrace<T> {
206 let since = self.spine.since.clone();
207 let mut legacy_batches = BTreeMap::new();
208 let mut hollow_batches = BTreeMap::new();
209 let mut spine_batches = BTreeMap::new();
210 let mut merges = BTreeMap::new();
211
212 let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
213 let id = batch.id();
214 let desc = batch.desc.clone();
215 let mut parts = Vec::with_capacity(batch.parts.len());
216 let mut descs = Vec::with_capacity(batch.parts.len());
217 for IdHollowBatch { id, batch } in &batch.parts {
218 parts.push(*id);
219 descs.push(batch.desc.clone());
220 if batch.desc.lower() == batch.desc.upper() {
228 hollow_batches.insert(*id, Arc::clone(batch));
229 } else {
230 legacy_batches.insert(Arc::clone(batch), ());
231 }
232 }
233
234 let spine_batch = ThinSpineBatch {
235 level,
236 desc,
237 parts,
238 descs,
239 };
240 spine_batches.insert(id, spine_batch);
241 };
242
243 for (level, state) in self.spine.merging.iter().enumerate() {
244 for batch in &state.batches {
245 push_spine_batch(level, batch);
246 if let Some(c) = &batch.active_compaction {
247 let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
248 assert!(
249 previous.is_none(),
250 "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
251 batch.id,
252 )
253 }
254 }
255 if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
256 let previous = merges.insert(*id, ThinMerge::fueling(merge));
257 assert!(
258 previous.is_none(),
259 "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
260 )
261 }
262 }
263
264 if !self.roundtrip_structure {
265 assert!(hollow_batches.is_empty());
266 spine_batches.clear();
267 merges.clear();
268 }
269
270 FlatTrace {
271 since,
272 legacy_batches,
273 hollow_batches,
274 spine_batches,
275 merges,
276 }
277 }
278 pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
279 let FlatTrace {
280 since,
281 legacy_batches,
282 mut hollow_batches,
283 spine_batches,
284 mut merges,
285 } = value;
286
287 let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
290
291 let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
297 if PartialOrder::less_than(left, right) {
298 Ordering::Less
299 } else if PartialOrder::less_than(right, left) {
300 Ordering::Greater
301 } else {
302 Ordering::Equal
303 }
304 };
305 let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
306 legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
307
308 let mut pop_batch =
309 |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
310 if let Some(batch) = hollow_batches.remove(&id) {
311 if let Some(desc) = expected_desc {
312 assert_eq!(*desc, batch.desc);
313 }
314 return Ok(IdHollowBatch { id, batch });
315 }
316 let mut batch = legacy_batches
317 .pop()
318 .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
319
320 let Some(expected_desc) = expected_desc else {
321 return Ok(IdHollowBatch { id, batch });
322 };
323
324 if expected_desc.lower() != batch.desc.lower() {
325 return Err(format!(
326 "hollow batch lower {:?} did not match expected lower {:?}",
327 batch.desc.lower().elements(),
328 expected_desc.lower().elements()
329 ));
330 }
331
332 if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
335 let mut new_upper = batch.desc.upper().clone();
336
337 while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
340 let Some(next_batch) = legacy_batches.pop() else {
341 break;
342 };
343 if next_batch.is_empty() {
344 new_upper.clone_from(next_batch.desc.upper());
345 } else {
346 legacy_batches.push(next_batch);
347 break;
348 }
349 }
350
351 if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
354 legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
355 expected_desc.upper().clone(),
356 new_upper.clone(),
357 batch.desc.since().clone(),
358 ))));
359 new_upper.clone_from(expected_desc.upper());
360 }
361 batch = Arc::new(HollowBatch::empty(Description::new(
362 batch.desc.lower().clone(),
363 new_upper,
364 batch.desc.since().clone(),
365 )))
366 }
367
368 if expected_desc.upper() != batch.desc.upper() {
369 return Err(format!(
370 "hollow batch upper {:?} did not match expected upper {:?}",
371 batch.desc.upper().elements(),
372 expected_desc.upper().elements()
373 ));
374 }
375
376 Ok(IdHollowBatch { id, batch })
377 };
378
379 let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
380 (batch.desc.upper().clone(), id.1)
381 } else {
382 (Antichain::from_elem(T::minimum()), 0)
383 };
384 let levels = spine_batches
385 .first_key_value()
386 .map(|(_, batch)| batch.level + 1)
387 .unwrap_or(0);
388 let mut merging = vec![MergeState::default(); levels];
389 for (id, batch) in spine_batches {
390 let level = batch.level;
391
392 let parts = batch
393 .parts
394 .into_iter()
395 .zip(batch.descs.iter().map(Some).chain(std::iter::repeat(None)))
396 .map(|(id, desc)| pop_batch(id, desc))
397 .collect::<Result<Vec<_>, _>>()?;
398 let len = parts.iter().map(|p| (*p).batch.len).sum();
399 let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
400 let batch = SpineBatch {
401 id,
402 desc: batch.desc,
403 parts,
404 active_compaction,
405 len,
406 };
407
408 let state = &mut merging[level];
409
410 state.push_batch(batch);
411 if let Some(id) = state.id() {
412 if let Some(merge) = merges.remove(&id) {
413 state.merge = Some(IdFuelingMerge {
414 id,
415 merge: FuelingMerge {
416 since: merge.since,
417 remaining_work: merge.remaining_work,
418 },
419 })
420 }
421 }
422 }
423
424 let mut trace = Trace {
425 spine: Spine {
426 effort: 1,
427 next_id,
428 since,
429 upper,
430 merging,
431 },
432 roundtrip_structure,
433 };
434
435 fn check_empty(name: &str, len: usize) -> Result<(), String> {
436 if len != 0 {
437 Err(format!("{len} {name} left after reconstructing spine"))
438 } else {
439 Ok(())
440 }
441 }
442
443 if roundtrip_structure {
444 check_empty("legacy batches", legacy_batches.len())?;
445 } else {
446 for batch in legacy_batches.into_iter().rev() {
448 trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
449 }
450 }
451 check_empty("hollow batches", hollow_batches.len())?;
452 check_empty("merges", merges.len())?;
453
454 debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
455
456 Ok(trace)
457 }
458}
459
460#[derive(Clone, Debug, Default)]
461pub(crate) struct SpineMetrics {
462 pub compact_batches: u64,
463 pub compacting_batches: u64,
464 pub noncompact_batches: u64,
465}
466
467impl<T> Trace<T> {
468 pub fn since(&self) -> &Antichain<T> {
469 &self.spine.since
470 }
471
472 pub fn upper(&self) -> &Antichain<T> {
473 &self.spine.upper
474 }
475
476 pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
477 for batch in self.batches() {
478 f(batch);
479 }
480 }
481
482 pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
483 self.spine
484 .spine_batches()
485 .flat_map(|b| b.parts.as_slice())
486 .map(|b| &*b.batch)
487 }
488
489 pub fn num_spine_batches(&self) -> usize {
490 self.spine.spine_batches().count()
491 }
492
493 #[cfg(test)]
494 pub fn num_hollow_batches(&self) -> usize {
495 self.batches().count()
496 }
497
498 #[cfg(test)]
499 pub fn num_updates(&self) -> usize {
500 self.batches().map(|b| b.len).sum()
501 }
502}
503
504impl<T: Timestamp + Lattice> Trace<T> {
505 pub fn downgrade_since(&mut self, since: &Antichain<T>) {
506 self.spine.since.clone_from(since);
507 }
508
509 #[must_use]
510 pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
511 let mut merge_reqs = Vec::new();
512 self.spine.insert(
513 batch,
514 &mut SpineLog::Enabled {
515 merge_reqs: &mut merge_reqs,
516 },
517 );
518 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
519 Self::remove_redundant_merge_reqs(merge_reqs)
526 }
527
528 pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
529 for batch in self.spine.spine_batches_mut().rev() {
532 if batch.id == id {
533 batch.active_compaction = Some(compaction);
534 break;
535 }
536 }
537 }
538
539 pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
542 self.spine.insert(batch, &mut SpineLog::Disabled);
543 }
544
545 #[must_use]
553 pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
554 let mut merge_reqs = Vec::new();
555 let did_work = self.spine.exert(
556 fuel,
557 &mut SpineLog::Enabled {
558 merge_reqs: &mut merge_reqs,
559 },
560 );
561 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
562 let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
564 (merge_reqs, did_work)
565 }
566
567 pub fn validate(&self) -> Result<(), String> {
571 self.spine.validate()
572 }
573
574 pub(crate) fn fueled_merge_reqs_before_ms(
577 &self,
578 threshold_ms: u64,
579 threshold_writer: Option<WriterKey>,
580 ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
581 self.spine
582 .spine_batches()
583 .filter(move |b| {
584 let noncompact = !b.is_compact();
585 let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
586 b.parts.iter().any(|b| {
587 b.batch
588 .parts
589 .iter()
590 .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
591 })
592 });
593 noncompact || old_writer
594 })
595 .filter(move |b| {
596 b.active_compaction
599 .as_ref()
600 .map_or(true, move |c| c.start_ms <= threshold_ms)
601 })
602 .map(|b| FueledMergeReq {
603 id: b.id,
604 desc: b.desc.clone(),
605 inputs: b.parts.clone(),
606 })
607 }
608
609 fn remove_redundant_merge_reqs(
616 mut merge_reqs: Vec<FueledMergeReq<T>>,
617 ) -> Vec<FueledMergeReq<T>> {
618 fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
620 b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
622 }
623
624 let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
625 while let Some(merge_req) = merge_reqs.pop() {
629 let covered = ret.iter().any(|r| covers(r, &merge_req));
630 if !covered {
631 ret.retain(|r| !covers(&merge_req, r));
635 ret.push(merge_req);
636 }
637 }
638 ret
639 }
640
641 pub fn spine_metrics(&self) -> SpineMetrics {
642 let mut metrics = SpineMetrics::default();
643 for batch in self.spine.spine_batches() {
644 if batch.is_compact() {
645 metrics.compact_batches += 1;
646 } else if batch.is_merging() {
647 metrics.compacting_batches += 1;
648 } else {
649 metrics.noncompact_batches += 1;
650 }
651 }
652 metrics
653 }
654}
655
656impl<T: Timestamp + Lattice + Codec64> Trace<T> {
657 pub fn apply_merge_res_checked_classic<D: Codec64 + Semigroup + PartialEq>(
658 &mut self,
659 res: &FueledMergeRes<T>,
660 metrics: &ColumnarMetrics,
661 ) -> ApplyMergeResult {
662 for batch in self.spine.spine_batches_mut().rev() {
663 let result = batch.maybe_replace_checked_classic::<D>(res, metrics);
664 if result.matched() {
665 return result;
666 }
667 }
668 ApplyMergeResult::NotAppliedNoMatch
669 }
670
671 pub fn apply_merge_res_checked<D: Codec64 + Semigroup + PartialEq>(
672 &mut self,
673 res: &FueledMergeRes<T>,
674 metrics: &ColumnarMetrics,
675 ) -> ApplyMergeResult {
676 for batch in self.spine.spine_batches_mut().rev() {
677 let result = batch.maybe_replace_checked::<D>(res, metrics);
678 if result.matched() {
679 return result;
680 }
681 }
682 ApplyMergeResult::NotAppliedNoMatch
683 }
684
685 pub fn apply_merge_res_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
686 for batch in self.spine.spine_batches_mut().rev() {
687 let result = batch.maybe_replace_unchecked(res);
688 if result.matched() {
689 return result;
690 }
691 }
692 ApplyMergeResult::NotAppliedNoMatch
693 }
694
695 pub fn apply_tombstone_merge(&mut self, desc: &Description<T>) -> ApplyMergeResult {
696 for batch in self.spine.spine_batches_mut().rev() {
697 let result = batch.maybe_replace_with_tombstone(desc);
698 if result.matched() {
699 return result;
700 }
701 }
702 ApplyMergeResult::NotAppliedNoMatch
703 }
704}
705
706enum SpineLog<'a, T> {
709 Enabled {
710 merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
711 },
712 Disabled,
713}
714
715#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
716pub enum CompactionInput {
717 Legacy,
720 IdRange(SpineId),
722 PartialBatch(SpineId, BTreeSet<RunId>),
724}
725
726#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
727pub struct SpineId(pub usize, pub usize);
728
729impl Serialize for SpineId {
730 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
731 where
732 S: Serializer,
733 {
734 let SpineId(lo, hi) = self;
735 serializer.serialize_str(&format!("{lo}-{hi}"))
736 }
737}
738
739pub fn id_range(ids: BTreeSet<SpineId>) -> SpineId {
741 let lower_spine_bound = ids
742 .first()
743 .map(|id| id.0)
744 .expect("at least one batch must be present");
745 let upper_spine_bound = ids
746 .last()
747 .map(|id| id.1)
748 .expect("at least one batch must be present");
749
750 SpineId(lower_spine_bound, upper_spine_bound)
751}
752
753impl SpineId {
754 fn covers(self, other: SpineId) -> bool {
755 self.0 <= other.0 && other.1 <= self.1
756 }
757}
758
759#[derive(Debug, Clone, PartialEq)]
760pub struct IdHollowBatch<T> {
761 pub id: SpineId,
762 pub batch: Arc<HollowBatch<T>>,
763}
764
765#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
766pub struct ActiveCompaction {
767 pub start_ms: u64,
768}
769
770#[derive(Debug, Clone, PartialEq)]
771struct SpineBatch<T> {
772 id: SpineId,
773 desc: Description<T>,
774 parts: Vec<IdHollowBatch<T>>,
775 active_compaction: Option<ActiveCompaction>,
776 len: usize,
778}
779
780impl<T> SpineBatch<T> {
781 fn merged(batch: IdHollowBatch<T>) -> Self
782 where
783 T: Clone,
784 {
785 Self {
786 id: batch.id,
787 desc: batch.batch.desc.clone(),
788 len: batch.batch.len,
789 parts: vec![batch],
790 active_compaction: None,
791 }
792 }
793}
794
795#[derive(Debug, Copy, Clone)]
796pub enum ApplyMergeResult {
797 AppliedExact,
798 AppliedSubset,
799 NotAppliedNoMatch,
800 NotAppliedInvalidSince,
801 NotAppliedTooManyUpdates,
802}
803
804impl ApplyMergeResult {
805 pub fn applied(&self) -> bool {
806 match self {
807 ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
808 _ => false,
809 }
810 }
811 pub fn matched(&self) -> bool {
812 match self {
813 ApplyMergeResult::AppliedExact
814 | ApplyMergeResult::AppliedSubset
815 | ApplyMergeResult::NotAppliedTooManyUpdates => true,
816 _ => false,
817 }
818 }
819}
820
821impl<T: Timestamp + Lattice> SpineBatch<T> {
822 pub fn lower(&self) -> &Antichain<T> {
823 self.desc().lower()
824 }
825
826 pub fn upper(&self) -> &Antichain<T> {
827 self.desc().upper()
828 }
829
830 fn id(&self) -> SpineId {
831 debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
832 debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
833 self.id
834 }
835
836 pub fn is_compact(&self) -> bool {
837 self.parts
844 .iter()
845 .map(|p| p.batch.run_meta.len())
846 .sum::<usize>()
847 <= 1
848 }
849
850 pub fn is_merging(&self) -> bool {
851 self.active_compaction.is_some()
852 }
853
854 fn desc(&self) -> &Description<T> {
855 &self.desc
856 }
857
858 pub fn len(&self) -> usize {
859 debug_assert_eq!(
862 self.len,
863 self.parts.iter().map(|x| x.batch.len).sum::<usize>()
864 );
865 self.len
866 }
867
868 pub fn is_empty(&self) -> bool {
869 self.len() == 0
870 }
871
872 pub fn empty(
873 id: SpineId,
874 lower: Antichain<T>,
875 upper: Antichain<T>,
876 since: Antichain<T>,
877 ) -> Self {
878 SpineBatch::merged(IdHollowBatch {
879 id,
880 batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
881 })
882 }
883
884 pub fn begin_merge(
885 bs: &[Self],
886 compaction_frontier: Option<AntichainRef<T>>,
887 ) -> Option<IdFuelingMerge<T>> {
888 let from = bs.first()?.id().0;
889 let until = bs.last()?.id().1;
890 let id = SpineId(from, until);
891 let mut sinces = bs.iter().map(|b| b.desc().since());
892 let mut since = sinces.next()?.clone();
893 for b in bs {
894 since.join_assign(b.desc().since())
895 }
896 if let Some(compaction_frontier) = compaction_frontier {
897 since.join_assign(&compaction_frontier.to_owned());
898 }
899 let remaining_work = bs.iter().map(|x| x.len()).sum();
900 Some(IdFuelingMerge {
901 id,
902 merge: FuelingMerge {
903 since,
904 remaining_work,
905 },
906 })
907 }
908
909 #[cfg(test)]
910 fn describe(&self, extended: bool) -> String {
911 let SpineBatch {
912 id,
913 parts,
914 desc,
915 active_compaction,
916 len,
917 } = self;
918 let compaction = match active_compaction {
919 None => "".to_owned(),
920 Some(c) => format!(" (c@{})", c.start_ms),
921 };
922 match extended {
923 false => format!(
924 "[{}-{}]{:?}{:?}{}/{}{compaction}",
925 id.0,
926 id.1,
927 desc.lower().elements(),
928 desc.upper().elements(),
929 parts.len(),
930 len
931 ),
932 true => {
933 format!(
934 "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
935 id.0,
936 id.1,
937 desc.lower().elements(),
938 desc.upper().elements(),
939 desc.since().elements(),
940 parts.len(),
941 len,
942 parts
943 .iter()
944 .flat_map(|x| x.batch.parts.iter())
945 .map(|x| format!(" {}", x.printable_name()))
946 .collect::<Vec<_>>()
947 .join("")
948 )
949 }
950 }
951 }
952}
953
954impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
955 fn diffs_sum<'a, D: Semigroup + Codec64>(
956 parts: impl Iterator<Item = &'a RunPart<T>>,
957 metrics: &ColumnarMetrics,
958 ) -> Option<D> {
959 parts
960 .map(|p| p.diffs_sum::<D>(metrics))
961 .reduce(|a, b| match (a, b) {
962 (Some(mut a), Some(b)) => {
963 a.plus_equals(&b);
964 Some(a)
965 }
966 _ => None,
967 })
968 .flatten()
969 }
970
971 fn diffs_sum_for_runs<D: Semigroup + Codec64>(
972 batch: &HollowBatch<T>,
973 run_ids: &[RunId],
974 metrics: &ColumnarMetrics,
975 ) -> Option<D> {
976 if run_ids.is_empty() {
977 return None;
978 }
979
980 let parts = batch
981 .runs()
982 .filter(|(meta, _)| {
983 run_ids.contains(&meta.id.expect("id should be present at this point"))
984 })
985 .flat_map(|(_, parts)| parts);
986
987 Self::diffs_sum(parts, metrics)
988 }
989
990 fn maybe_replace_with_tombstone(&mut self, desc: &Description<T>) -> ApplyMergeResult {
991 let exact_match =
992 desc.lower() == self.desc().lower() && desc.upper() == self.desc().upper();
993
994 let empty_batch = HollowBatch::empty(desc.clone());
995 if exact_match {
996 *self = SpineBatch::merged(IdHollowBatch {
997 id: self.id(),
998 batch: Arc::new(empty_batch),
999 });
1000 return ApplyMergeResult::AppliedExact;
1001 }
1002
1003 if let Some((id, range)) = self.find_replacement_range(desc) {
1004 self.perform_subset_replacement(&empty_batch, id, range, None)
1005 } else {
1006 ApplyMergeResult::NotAppliedNoMatch
1007 }
1008 }
1009
1010 fn construct_batch_with_runs_replaced(
1011 original: &HollowBatch<T>,
1012 run_ids: &[RunId],
1013 replacement: &HollowBatch<T>,
1014 ) -> Result<HollowBatch<T>, ApplyMergeResult> {
1015 if run_ids.is_empty() {
1016 return Err(ApplyMergeResult::NotAppliedNoMatch);
1017 }
1018
1019 let orig_run_ids: BTreeSet<_> = original.runs().filter_map(|(meta, _)| meta.id).collect();
1020 let run_ids: BTreeSet<_> = run_ids.iter().cloned().collect();
1021 if !orig_run_ids.is_superset(&run_ids) {
1022 return Err(ApplyMergeResult::NotAppliedNoMatch);
1023 }
1024
1025 let runs: Vec<_> = original
1026 .runs()
1027 .filter(|(meta, _)| {
1028 !run_ids.contains(&meta.id.expect("id should be present at this point"))
1029 })
1030 .chain(replacement.runs())
1031 .collect();
1032
1033 let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::<usize>();
1034
1035 let run_meta = runs
1036 .iter()
1037 .map(|(meta, _)| *meta)
1038 .cloned()
1039 .collect::<Vec<_>>();
1040
1041 let parts = runs
1042 .iter()
1043 .flat_map(|(_, parts)| *parts)
1044 .cloned()
1045 .collect::<Vec<_>>();
1046
1047 let run_splits = {
1048 let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1));
1049 let mut pointer = 0;
1050 for (i, (_, parts)) in runs.into_iter().enumerate() {
1051 if parts.is_empty() {
1052 continue;
1053 }
1054 if i < run_meta.len() - 1 {
1055 splits.push(pointer + parts.len());
1056 }
1057 pointer += parts.len();
1058 }
1059 splits
1060 };
1061
1062 Ok(HollowBatch::new(
1063 replacement.desc.clone(),
1064 parts,
1065 len,
1066 run_meta,
1067 run_splits,
1068 ))
1069 }
1070
1071 fn maybe_replace_checked<D>(
1072 &mut self,
1073 res: &FueledMergeRes<T>,
1074 metrics: &ColumnarMetrics,
1075 ) -> ApplyMergeResult
1076 where
1077 D: Semigroup + Codec64 + PartialEq + Debug,
1078 {
1079 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1083 return ApplyMergeResult::NotAppliedInvalidSince;
1084 }
1085
1086 let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1087 let num_batches = self.parts.len();
1088
1089 let result = match &res.input {
1090 CompactionInput::IdRange(id) => {
1091 self.handle_id_range_replacement::<D>(res, id, new_diffs_sum, metrics)
1092 }
1093 CompactionInput::PartialBatch(id, runs) => {
1094 self.handle_partial_batch_replacement::<D>(res, *id, runs, new_diffs_sum, metrics)
1095 }
1096 CompactionInput::Legacy => {
1097 error!("legacy compaction input is not supported");
1098 return ApplyMergeResult::NotAppliedNoMatch;
1099 }
1100 };
1101
1102 let num_batches_after = self.parts.len();
1103 assert!(
1104 num_batches_after <= num_batches,
1105 "replacing parts should not increase the number of batches"
1106 );
1107 result
1108 }
1109
1110 fn handle_id_range_replacement<D>(
1111 &mut self,
1112 res: &FueledMergeRes<T>,
1113 id: &SpineId,
1114 new_diffs_sum: Option<D>,
1115 metrics: &ColumnarMetrics,
1116 ) -> ApplyMergeResult
1117 where
1118 D: Semigroup + Codec64 + PartialEq + Debug,
1119 {
1120 let range = self
1121 .parts
1122 .iter()
1123 .enumerate()
1124 .filter_map(|(i, p)| {
1125 if id.covers(p.id) {
1126 Some((i, p.id))
1127 } else {
1128 None
1129 }
1130 })
1131 .collect::<Vec<_>>();
1132
1133 let ids: BTreeSet<_> = range.iter().map(|(_, id)| *id).collect();
1134
1135 if ids.is_empty() || id != &id_range(ids) {
1145 return ApplyMergeResult::NotAppliedNoMatch;
1146 }
1147
1148 let range: BTreeSet<_> = range.iter().map(|(i, _)| *i).collect();
1149
1150 let min = *range.iter().min().unwrap();
1152 let max = *range.iter().max().unwrap();
1153 let replacement_range = min..max + 1;
1154
1155 let old_diffs_sum = Self::diffs_sum::<D>(
1158 self.parts[replacement_range.clone()]
1159 .iter()
1160 .flat_map(|p| p.batch.parts.iter()),
1161 metrics,
1162 );
1163
1164 self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "id range replacement");
1165
1166 self.perform_subset_replacement(
1167 &res.output,
1168 *id,
1169 replacement_range,
1170 res.new_active_compaction.clone(),
1171 )
1172 }
1173
1174 fn handle_partial_batch_replacement<D>(
1175 &mut self,
1176 res: &FueledMergeRes<T>,
1177 id: SpineId,
1178 runs: &BTreeSet<RunId>,
1179 new_diffs_sum: Option<D>,
1180 metrics: &ColumnarMetrics,
1181 ) -> ApplyMergeResult
1182 where
1183 D: Semigroup + Codec64 + PartialEq + Debug,
1184 {
1185 if runs.is_empty() {
1186 return ApplyMergeResult::NotAppliedNoMatch;
1187 }
1188
1189 let part = self.parts.iter().enumerate().find(|(_, p)| p.id == id);
1190 let Some((i, batch)) = part else {
1191 return ApplyMergeResult::NotAppliedNoMatch;
1192 };
1193 let replacement_range = i..(i + 1);
1194
1195 let batch = &batch.batch;
1196 let run_ids = runs.iter().cloned().collect::<Vec<_>>();
1197
1198 let old_batch_diff_sum = Self::diffs_sum::<D>(batch.parts.iter(), metrics);
1199 let old_diffs_sum = Self::diffs_sum_for_runs::<D>(batch, &run_ids, metrics);
1200
1201 self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "partial batch replacement");
1202
1203 match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) {
1204 Ok(new_batch) => {
1205 let new_batch_diff_sum = Self::diffs_sum::<D>(new_batch.parts.iter(), metrics);
1206 self.validate_diffs_sum_match(
1207 old_batch_diff_sum,
1208 new_batch_diff_sum,
1209 "sanity checking diffs sum for replaced runs",
1210 );
1211 self.perform_subset_replacement(
1212 &new_batch,
1213 id,
1214 replacement_range,
1215 res.new_active_compaction.clone(),
1216 )
1217 }
1218 Err(err) => err,
1219 }
1220 }
1221
1222 fn validate_diffs_sum_match<D>(
1223 &self,
1224 old_diffs_sum: Option<D>,
1225 new_diffs_sum: Option<D>,
1226 context: &str,
1227 ) where
1228 D: Semigroup + Codec64 + PartialEq + Debug,
1229 {
1230 match (new_diffs_sum, old_diffs_sum) {
1231 (None, Some(old)) => {
1232 if !D::is_zero(&old) {
1233 panic!(
1234 "merge res diffs sum is None, but spine batch diffs sum ({:?}) is not zero ({})",
1235 old, context
1236 );
1237 }
1238 }
1239 (Some(new_diffs_sum), Some(old_diffs_sum)) => {
1240 assert_eq!(
1241 old_diffs_sum, new_diffs_sum,
1242 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?}) ({})",
1243 new_diffs_sum, old_diffs_sum, context
1244 );
1245 }
1246 _ => {}
1247 };
1248 }
1249
1250 fn maybe_replace_checked_classic<D>(
1256 &mut self,
1257 res: &FueledMergeRes<T>,
1258 metrics: &ColumnarMetrics,
1259 ) -> ApplyMergeResult
1260 where
1261 D: Semigroup + Codec64 + PartialEq + Debug,
1262 {
1263 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1267 return ApplyMergeResult::NotAppliedInvalidSince;
1268 }
1269
1270 let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1271
1272 let exact_match = res.output.desc.lower() == self.desc().lower()
1274 && res.output.desc.upper() == self.desc().upper();
1275 if exact_match {
1276 let old_diffs_sum = Self::diffs_sum::<D>(
1277 self.parts.iter().flat_map(|p| p.batch.parts.iter()),
1278 metrics,
1279 );
1280
1281 if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1282 assert_eq!(
1283 old_diffs_sum, new_diffs_sum,
1284 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1285 new_diffs_sum, old_diffs_sum
1286 );
1287 }
1288
1289 if res.output.len > self.len() {
1298 return ApplyMergeResult::NotAppliedTooManyUpdates;
1299 }
1300 *self = SpineBatch::merged(IdHollowBatch {
1301 id: self.id(),
1302 batch: Arc::new(res.output.clone()),
1303 });
1304 return ApplyMergeResult::AppliedExact;
1305 }
1306
1307 if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1309 let old_diffs_sum = Self::diffs_sum::<D>(
1310 self.parts[range.clone()]
1311 .iter()
1312 .flat_map(|p| p.batch.parts.iter()),
1313 metrics,
1314 );
1315
1316 if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1317 assert_eq!(
1318 old_diffs_sum, new_diffs_sum,
1319 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1320 new_diffs_sum, old_diffs_sum
1321 );
1322 }
1323
1324 self.perform_subset_replacement(
1325 &res.output,
1326 id,
1327 range,
1328 res.new_active_compaction.clone(),
1329 )
1330 } else {
1331 ApplyMergeResult::NotAppliedNoMatch
1332 }
1333 }
1334
1335 fn maybe_replace_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
1341 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1345 return ApplyMergeResult::NotAppliedInvalidSince;
1346 }
1347
1348 let exact_match = res.output.desc.lower() == self.desc().lower()
1350 && res.output.desc.upper() == self.desc().upper();
1351 if exact_match {
1352 if res.output.len > self.len() {
1361 return ApplyMergeResult::NotAppliedTooManyUpdates;
1362 }
1363
1364 *self = SpineBatch::merged(IdHollowBatch {
1365 id: self.id(),
1366 batch: Arc::new(res.output.clone()),
1367 });
1368 return ApplyMergeResult::AppliedExact;
1369 }
1370
1371 if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1373 self.perform_subset_replacement(
1374 &res.output,
1375 id,
1376 range,
1377 res.new_active_compaction.clone(),
1378 )
1379 } else {
1380 ApplyMergeResult::NotAppliedNoMatch
1381 }
1382 }
1383
1384 fn find_replacement_range(&self, desc: &Description<T>) -> Option<(SpineId, Range<usize>)> {
1386 let mut lower = None;
1396 let mut upper = None;
1397
1398 for (i, batch) in self.parts.iter().enumerate() {
1399 if batch.batch.desc.lower() == desc.lower() {
1400 lower = Some((i, batch.id.0));
1401 }
1402 if batch.batch.desc.upper() == desc.upper() {
1403 upper = Some((i, batch.id.1));
1404 }
1405 if lower.is_some() && upper.is_some() {
1406 break;
1407 }
1408 }
1409
1410 match (lower, upper) {
1411 (Some((lower_idx, id_lower)), Some((upper_idx, id_upper))) => {
1412 Some((SpineId(id_lower, id_upper), lower_idx..(upper_idx + 1)))
1413 }
1414 _ => None,
1415 }
1416 }
1417
1418 fn perform_subset_replacement(
1420 &mut self,
1421 res: &HollowBatch<T>,
1422 spine_id: SpineId,
1423 range: Range<usize>,
1424 new_active_compaction: Option<ActiveCompaction>,
1425 ) -> ApplyMergeResult {
1426 let SpineBatch {
1427 id,
1428 parts,
1429 desc,
1430 active_compaction: _,
1431 len: _,
1432 } = self;
1433
1434 let mut new_parts = vec![];
1435 new_parts.extend_from_slice(&parts[..range.start]);
1436 new_parts.push(IdHollowBatch {
1437 id: spine_id,
1438 batch: Arc::new(res.clone()),
1439 });
1440 new_parts.extend_from_slice(&parts[range.end..]);
1441
1442 let new_spine_batch = SpineBatch {
1443 id: *id,
1444 desc: desc.to_owned(),
1445 len: new_parts.iter().map(|x| x.batch.len).sum(),
1446 parts: new_parts,
1447 active_compaction: new_active_compaction,
1448 };
1449
1450 if new_spine_batch.len() > self.len() {
1451 return ApplyMergeResult::NotAppliedTooManyUpdates;
1452 }
1453
1454 *self = new_spine_batch;
1455 ApplyMergeResult::AppliedSubset
1456 }
1457}
1458
1459#[derive(Debug, Clone, PartialEq, Serialize)]
1460pub struct FuelingMerge<T> {
1461 pub(crate) since: Antichain<T>,
1462 pub(crate) remaining_work: usize,
1463}
1464
1465#[derive(Debug, Clone, PartialEq, Serialize)]
1466pub struct IdFuelingMerge<T> {
1467 id: SpineId,
1468 merge: FuelingMerge<T>,
1469}
1470
1471impl<T: Timestamp + Lattice> FuelingMerge<T> {
1472 #[allow(clippy::as_conversions)]
1478 fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
1479 let used = std::cmp::min(*fuel as usize, self.remaining_work);
1480 self.remaining_work = self.remaining_work.saturating_sub(used);
1481 *fuel -= used as isize;
1482 }
1483
1484 fn done(
1489 self,
1490 bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1491 log: &mut SpineLog<'_, T>,
1492 ) -> Option<SpineBatch<T>> {
1493 let first = bs.first()?;
1494 let last = bs.last()?;
1495 let id = SpineId(first.id().0, last.id().1);
1496 assert!(id.0 < id.1);
1497 let lower = first.desc().lower().clone();
1498 let upper = last.desc().upper().clone();
1499 let since = self.since;
1500
1501 if bs.iter().all(SpineBatch::is_empty) {
1503 return Some(SpineBatch::empty(id, lower, upper, since));
1504 }
1505
1506 let desc = Description::new(lower, upper, since);
1507 let len = bs.iter().map(SpineBatch::len).sum();
1508
1509 let mut merged_parts_len = 0;
1513 for b in &bs {
1514 merged_parts_len += b.parts.len();
1515 }
1516 let mut merged_parts = Vec::with_capacity(merged_parts_len);
1517 for b in bs {
1518 merged_parts.extend(b.parts)
1519 }
1520 debug_assert_eq!(merged_parts.len(), merged_parts_len);
1522
1523 if let SpineLog::Enabled { merge_reqs } = log {
1524 merge_reqs.push(FueledMergeReq {
1525 id,
1526 desc: desc.clone(),
1527 inputs: merged_parts.clone(),
1528 });
1529 }
1530
1531 Some(SpineBatch {
1532 id,
1533 desc,
1534 len,
1535 parts: merged_parts,
1536 active_compaction: None,
1537 })
1538 }
1539}
1540
1541const BATCHES_PER_LEVEL: usize = 2;
1546
1547#[derive(Debug, Clone)]
1630struct Spine<T> {
1631 effort: usize,
1632 next_id: usize,
1633 since: Antichain<T>,
1634 upper: Antichain<T>,
1635 merging: Vec<MergeState<T>>,
1636}
1637
1638impl<T> Spine<T> {
1639 pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1641 self.merging.iter().rev().flat_map(|m| &m.batches)
1642 }
1643
1644 pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1646 self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1647 }
1648}
1649
1650impl<T: Timestamp + Lattice> Spine<T> {
1651 pub fn new() -> Self {
1658 Spine {
1659 effort: 1,
1660 next_id: 0,
1661 since: Antichain::from_elem(T::minimum()),
1662 upper: Antichain::from_elem(T::minimum()),
1663 merging: Vec::new(),
1664 }
1665 }
1666
1667 fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1675 self.tidy_layers();
1676 if self.reduced() {
1677 return false;
1678 }
1679
1680 if self.merging.iter().any(|b| b.merge.is_some()) {
1681 let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1682 self.apply_fuel(&fuel, log);
1684 } else {
1685 let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1690 let id = self.next_id();
1691 self.introduce_batch(
1692 SpineBatch::empty(
1693 id,
1694 self.upper.clone(),
1695 self.upper.clone(),
1696 self.since.clone(),
1697 ),
1698 level,
1699 log,
1700 );
1701 }
1702 true
1703 }
1704
1705 pub fn next_id(&mut self) -> SpineId {
1706 let id = self.next_id;
1707 self.next_id += 1;
1708 SpineId(id, self.next_id)
1709 }
1710
1711 pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1715 assert!(batch.desc.lower() != batch.desc.upper());
1716 assert_eq!(batch.desc.lower(), &self.upper);
1717
1718 let id = self.next_id();
1719 let batch = SpineBatch::merged(IdHollowBatch {
1720 id,
1721 batch: Arc::new(batch),
1722 });
1723
1724 self.upper.clone_from(batch.upper());
1725
1726 if batch.is_empty() {
1729 if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1730 if self.merging[position].is_single() && self.merging[position].is_empty() {
1731 self.insert_at(batch, position);
1732 if let Some(merged) = self.complete_at(position, log) {
1735 self.merging[position] = MergeState::single(merged);
1736 }
1737 return;
1738 }
1739 }
1740 }
1741
1742 let index = batch.len().next_power_of_two();
1744 self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1745 }
1746
1747 fn reduced(&self) -> bool {
1753 self.spine_batches()
1754 .flat_map(|b| b.parts.as_slice())
1755 .count()
1756 < 2
1757 }
1758
1759 #[allow(dead_code)]
1763 fn describe(&self) -> Vec<(usize, usize)> {
1764 self.merging
1765 .iter()
1766 .map(|b| (b.batches.len(), b.len()))
1767 .collect()
1768 }
1769
1770 fn introduce_batch(
1776 &mut self,
1777 batch: SpineBatch<T>,
1778 batch_index: usize,
1779 log: &mut SpineLog<'_, T>,
1780 ) {
1781 if batch_index > 32 {
1802 println!("Large batch index: {}", batch_index);
1803 }
1804
1805 let mut fuel = 8 << batch_index;
1812 fuel *= self.effort;
1815 #[allow(clippy::as_conversions)]
1818 let fuel = fuel as isize;
1819
1820 self.apply_fuel(&fuel, log);
1827
1828 self.roll_up(batch_index, log);
1845
1846 self.insert_at(batch, batch_index);
1850
1851 self.tidy_layers();
1857 }
1858
1859 fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1867 while self.merging.len() <= index {
1869 self.merging.push(MergeState::default());
1870 }
1871
1872 if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1874 let mut merged = None;
1877 for i in 0..index {
1878 if let Some(merged) = merged.take() {
1879 self.insert_at(merged, i);
1880 }
1881 merged = self.complete_at(i, log);
1882 }
1883
1884 if let Some(merged) = merged {
1888 self.insert_at(merged, index);
1889 }
1890
1891 if self.merging[index].is_full() {
1894 let merged = self.complete_at(index, log).expect("double batch");
1895 self.insert_at(merged, index + 1);
1896 }
1897 }
1898 }
1899
1900 pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1908 for index in 0..self.merging.len() {
1914 let mut fuel = *fuel;
1916 self.merging[index].work(&mut fuel);
1919 if self.merging[index].is_complete() {
1931 let complete = self.complete_at(index, log).expect("complete batch");
1932 self.insert_at(complete, index + 1);
1933 }
1934 }
1935 }
1936
1937 fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1943 while self.merging.len() <= index {
1945 self.merging.push(MergeState::default());
1946 }
1947
1948 let merging = &mut self.merging[index];
1950 merging.push_batch(batch);
1951 if merging.batches.is_full() {
1952 let compaction_frontier = Some(self.since.borrow());
1953 merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1954 }
1955 }
1956
1957 fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1959 self.merging[index].complete(log)
1960 }
1961
1962 fn tidy_layers(&mut self) {
1964 if !self.merging.is_empty() {
1969 let mut length = self.merging.len();
1970 if self.merging[length - 1].is_single() {
1971 let appropriate_level = usize::cast_from(
1975 self.merging[length - 1]
1976 .len()
1977 .next_power_of_two()
1978 .trailing_zeros(),
1979 );
1980
1981 while appropriate_level < length - 1 {
1983 let current = &mut self.merging[length - 2];
1984 if current.is_vacant() {
1985 self.merging.remove(length - 2);
1987 length = self.merging.len();
1988 } else {
1989 if !current.is_full() {
1990 let mut smaller = 0;
1998 for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
1999 smaller += batch.batches.len() << index;
2000 }
2001
2002 if smaller <= (1 << length) / 8 {
2003 let state = self.merging.remove(length - 2);
2006 assert_eq!(state.batches.len(), 1);
2007 for batch in state.batches {
2008 self.insert_at(batch, length - 2);
2009 }
2010 }
2011 }
2012 break;
2013 }
2014 }
2015 }
2016 }
2017 }
2018
2019 fn validate(&self) -> Result<(), String> {
2027 let mut id = SpineId(0, 0);
2028 let mut frontier = Antichain::from_elem(T::minimum());
2029 for x in self.merging.iter().rev() {
2030 if x.is_full() != x.merge.is_some() {
2031 return Err(format!(
2032 "all (and only) full batches should have fueling merges (full={}, merge={:?})",
2033 x.is_full(),
2034 x.merge,
2035 ));
2036 }
2037
2038 if let Some(m) = &x.merge {
2039 if !x.is_full() {
2040 return Err(format!(
2041 "merge should only exist for full batches (len={:?}, merge={:?})",
2042 x.batches.len(),
2043 m.id,
2044 ));
2045 }
2046 if x.id() != Some(m.id) {
2047 return Err(format!(
2048 "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
2049 x.id(),
2050 m.id,
2051 ));
2052 }
2053 }
2054
2055 for batch in &x.batches {
2060 if batch.id().0 != id.1 {
2061 return Err(format!(
2062 "batch id {:?} does not match the previous id {:?}: {:?}",
2063 batch.id(),
2064 id,
2065 self
2066 ));
2067 }
2068 id = batch.id();
2069 if batch.desc().lower() != &frontier {
2070 return Err(format!(
2071 "batch lower {:?} does not match the previous upper {:?}: {:?}",
2072 batch.desc().lower(),
2073 frontier,
2074 self
2075 ));
2076 }
2077 frontier.clone_from(batch.desc().upper());
2078 if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
2079 return Err(format!(
2080 "since of batch {:?} past the spine since {:?}: {:?}",
2081 batch.desc().since(),
2082 self.since,
2083 self
2084 ));
2085 }
2086 }
2087 }
2088 if self.next_id != id.1 {
2089 return Err(format!(
2090 "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
2091 self.next_id, id, self
2092 ));
2093 }
2094 if self.upper != frontier {
2095 return Err(format!(
2096 "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
2097 self.upper, frontier, self
2098 ));
2099 }
2100 Ok(())
2101 }
2102}
2103
2104#[derive(Debug, Clone)]
2109struct MergeState<T> {
2110 batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
2111 merge: Option<IdFuelingMerge<T>>,
2112}
2113
2114impl<T> Default for MergeState<T> {
2115 fn default() -> Self {
2116 Self {
2117 batches: ArrayVec::new(),
2118 merge: None,
2119 }
2120 }
2121}
2122
2123impl<T: Timestamp + Lattice> MergeState<T> {
2124 fn id(&self) -> Option<SpineId> {
2126 if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
2127 Some(SpineId(first.id().0, last.id().1))
2128 } else {
2129 None
2130 }
2131 }
2132
2133 fn single(batch: SpineBatch<T>) -> Self {
2135 let mut state = Self::default();
2136 state.push_batch(batch);
2137 state
2138 }
2139
2140 fn push_batch(&mut self, batch: SpineBatch<T>) {
2142 if let Some(last) = self.batches.last() {
2143 assert_eq!(last.id().1, batch.id().0);
2144 assert_eq!(last.upper(), batch.lower());
2145 }
2146 assert!(
2147 self.merge.is_none(),
2148 "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
2149 batch.id,
2150 self.batches.len(),
2151 );
2152 self.batches
2153 .try_push(batch)
2154 .expect("Attempted to insert batch into full layer!");
2155 }
2156
2157 fn len(&self) -> usize {
2159 self.batches.iter().map(SpineBatch::len).sum()
2160 }
2161
2162 fn is_empty(&self) -> bool {
2164 self.batches.iter().all(SpineBatch::is_empty)
2165 }
2166
2167 fn is_vacant(&self) -> bool {
2169 self.batches.is_empty()
2170 }
2171
2172 fn is_single(&self) -> bool {
2174 self.batches.len() == 1
2175 }
2176
2177 fn is_full(&self) -> bool {
2180 self.batches.is_full()
2181 }
2182
2183 fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
2190 let mut this = mem::take(self);
2191 if this.batches.len() <= 1 {
2192 this.batches.pop()
2193 } else {
2194 let id_merge = this
2196 .merge
2197 .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
2198 id_merge.merge.done(this.batches, log)
2199 }
2200 }
2201
2202 fn is_complete(&self) -> bool {
2204 match &self.merge {
2205 Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
2206 None => false,
2207 }
2208 }
2209
2210 fn work(&mut self, fuel: &mut isize) {
2212 if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
2214 merge.work(&self.batches[..], fuel)
2215 }
2216 }
2217}
2218
2219#[cfg(test)]
2220pub mod datadriven {
2221 use crate::internal::datadriven::DirectiveArgs;
2222
2223 use super::*;
2224
2225 #[derive(Debug, Default)]
2227 pub struct TraceState {
2228 pub trace: Trace<u64>,
2229 pub merge_reqs: Vec<FueledMergeReq<u64>>,
2230 }
2231
2232 pub fn since_upper(
2233 datadriven: &TraceState,
2234 _args: DirectiveArgs,
2235 ) -> Result<String, anyhow::Error> {
2236 Ok(format!(
2237 "{:?}{:?}\n",
2238 datadriven.trace.since().elements(),
2239 datadriven.trace.upper().elements()
2240 ))
2241 }
2242
2243 pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
2244 let mut s = String::new();
2245 for b in datadriven.trace.spine.spine_batches() {
2246 s.push_str(b.describe(true).as_str());
2247 s.push('\n');
2248 }
2249 Ok(s)
2250 }
2251
2252 pub fn insert(
2253 datadriven: &mut TraceState,
2254 args: DirectiveArgs,
2255 ) -> Result<String, anyhow::Error> {
2256 for x in args
2257 .input
2258 .trim()
2259 .split('\n')
2260 .map(DirectiveArgs::parse_hollow_batch)
2261 {
2262 datadriven
2263 .merge_reqs
2264 .append(&mut datadriven.trace.push_batch(x));
2265 }
2266 Ok("ok\n".to_owned())
2267 }
2268
2269 pub fn downgrade_since(
2270 datadriven: &mut TraceState,
2271 args: DirectiveArgs,
2272 ) -> Result<String, anyhow::Error> {
2273 let since = args.expect("since");
2274 datadriven
2275 .trace
2276 .downgrade_since(&Antichain::from_elem(since));
2277 Ok("ok\n".to_owned())
2278 }
2279
2280 pub fn take_merge_req(
2281 datadriven: &mut TraceState,
2282 _args: DirectiveArgs,
2283 ) -> Result<String, anyhow::Error> {
2284 let mut s = String::new();
2285 for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
2286 write!(
2287 s,
2288 "{:?}{:?}{:?} {}\n",
2289 merge_req.desc.lower().elements(),
2290 merge_req.desc.upper().elements(),
2291 merge_req.desc.since().elements(),
2292 merge_req
2293 .inputs
2294 .iter()
2295 .flat_map(|x| x.batch.parts.iter())
2296 .map(|x| x.printable_name())
2297 .collect::<Vec<_>>()
2298 .join(" ")
2299 );
2300 }
2301 Ok(s)
2302 }
2303
2304 pub fn apply_merge_res(
2305 datadriven: &mut TraceState,
2306 args: DirectiveArgs,
2307 ) -> Result<String, anyhow::Error> {
2308 let res = FueledMergeRes {
2309 output: DirectiveArgs::parse_hollow_batch(args.input),
2310 input: CompactionInput::Legacy,
2311 new_active_compaction: None,
2312 };
2313 match datadriven.trace.apply_merge_res_unchecked(&res) {
2314 ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
2315 ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
2316 ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
2317 ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
2318 ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
2319 }
2320 }
2321}
2322
2323#[cfg(test)]
2324pub(crate) mod tests {
2325 use std::ops::Range;
2326
2327 use proptest::prelude::*;
2328 use semver::Version;
2329
2330 use crate::internal::state::tests::{any_hollow_batch, any_hollow_batch_with_exact_runs};
2331
2332 use super::*;
2333
2334 pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
2335 num_batches: Range<usize>,
2336 ) -> impl Strategy<Value = Trace<T>> {
2337 Strategy::prop_map(
2338 (
2339 any::<Option<T>>(),
2340 proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
2341 any::<bool>(),
2342 any::<u64>(),
2343 ),
2344 |(since, mut batches, roundtrip_structure, timeout_ms)| {
2345 let mut trace = Trace::<T>::default();
2346 trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
2347
2348 batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
2351 let mut lower = Antichain::from_elem(T::minimum());
2352 for mut batch in batches {
2353 if PartialOrder::less_than(trace.since(), batch.desc.since()) {
2355 trace.downgrade_since(batch.desc.since());
2356 }
2357 batch.desc = Description::new(
2358 lower.clone(),
2359 batch.desc.upper().clone(),
2360 batch.desc.since().clone(),
2361 );
2362 lower.clone_from(batch.desc.upper());
2363 let _merge_req = trace.push_batch(batch);
2364 }
2365 let reqs: Vec<_> = trace
2366 .fueled_merge_reqs_before_ms(timeout_ms, None)
2367 .collect();
2368 for req in reqs {
2369 trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
2370 }
2371 trace.roundtrip_structure = roundtrip_structure;
2372 trace
2373 },
2374 )
2375 }
2376
2377 #[mz_ore::test]
2378 #[cfg_attr(miri, ignore)] fn test_roundtrips() {
2380 fn check(trace: Trace<i64>) {
2381 trace.validate().unwrap();
2382 let flat = trace.flatten();
2383 let unflat = Trace::unflatten(flat).unwrap();
2384 assert_eq!(trace, unflat);
2385 }
2386
2387 proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
2388 }
2389
2390 #[mz_ore::test]
2391 fn fueled_merge_reqs() {
2392 let mut trace: Trace<u64> = Trace::default();
2393 let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
2394 0,
2395 10,
2396 &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
2397 1000,
2398 ));
2399
2400 assert!(fueled_reqs.is_empty());
2401 assert_eq!(
2402 trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
2403 0,
2404 "no merge reqs when not filtering by version"
2405 );
2406 assert_eq!(
2407 trace
2408 .fueled_merge_reqs_before_ms(
2409 u64::MAX,
2410 Some(WriterKey::for_version(&Version::new(0, 50, 0)))
2411 )
2412 .count(),
2413 0,
2414 "zero batches are older than a past version"
2415 );
2416 assert_eq!(
2417 trace
2418 .fueled_merge_reqs_before_ms(
2419 u64::MAX,
2420 Some(WriterKey::for_version(&Version::new(99, 99, 0)))
2421 )
2422 .count(),
2423 1,
2424 "one batch is older than a future version"
2425 );
2426 }
2427
2428 #[mz_ore::test]
2429 fn remove_redundant_merge_reqs() {
2430 fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
2431 FueledMergeReq {
2432 id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
2433 desc: Description::new(
2434 Antichain::from_elem(lower),
2435 Antichain::from_elem(upper),
2436 Antichain::new(),
2437 ),
2438 inputs: vec![],
2439 }
2440 }
2441
2442 assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
2444
2445 assert_eq!(
2447 Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
2448 vec![req(0, 1)]
2449 );
2450
2451 assert_eq!(
2453 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
2454 vec![req(0, 1)]
2455 );
2456
2457 assert_eq!(
2459 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
2460 vec![req(1, 2), req(0, 1)]
2461 );
2462
2463 assert_eq!(
2465 Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
2466 vec![req(0, 3)]
2467 );
2468
2469 assert_eq!(
2471 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
2472 vec![req(0, 3)]
2473 );
2474
2475 assert_eq!(
2477 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
2478 vec![req(0, 3)]
2479 );
2480
2481 assert_eq!(
2483 Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
2484 vec![req(0, 3)]
2485 );
2486
2487 assert_eq!(
2489 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
2490 vec![req(1, 3), req(0, 2)]
2491 );
2492
2493 assert_eq!(
2495 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
2496 vec![req(0, 2), req(1, 3)]
2497 );
2498
2499 let req015 = FueledMergeReq {
2501 id: SpineId(0, 1),
2502 desc: Description::new(
2503 Antichain::from_elem(0),
2504 Antichain::from_elem(1),
2505 Antichain::from_elem(5),
2506 ),
2507 inputs: vec![],
2508 };
2509 assert_eq!(
2510 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2511 vec![req015, req(0, 1)]
2512 );
2513 }
2514
2515 #[mz_ore::test]
2516 #[cfg_attr(miri, ignore)] fn construct_batch_with_runs_replaced_test() {
2518 let batch_strategy = any_hollow_batch::<u64>();
2519 let to_replace_strategy = any_hollow_batch_with_exact_runs::<u64>(1);
2520
2521 let combined_strategy = (batch_strategy, to_replace_strategy)
2522 .prop_filter("non-empty batch", |(batch, _)| batch.run_meta.len() >= 1);
2523
2524 let final_strategy = combined_strategy.prop_flat_map(|(batch, to_replace)| {
2525 let batch_len = batch.run_meta.len();
2526 let batch_clone = batch.clone();
2527 let to_replace_clone = to_replace.clone();
2528
2529 proptest::collection::vec(any::<bool>(), batch_len)
2530 .prop_filter("at least one run selected", |mask| mask.iter().any(|&x| x))
2531 .prop_map(move |mask| {
2532 let indices: Vec<usize> = mask
2533 .iter()
2534 .enumerate()
2535 .filter_map(|(i, &selected)| if selected { Some(i) } else { None })
2536 .collect();
2537 (batch_clone.clone(), to_replace_clone.clone(), indices)
2538 })
2539 });
2540
2541 proptest!(|(
2542 (batch, to_replace, runs) in final_strategy
2543 )| {
2544 let original_run_ids: Vec<_> = batch.run_meta.iter().map(|x|
2545 x.id.unwrap().clone()
2546 ).collect();
2547
2548 let run_ids = runs.iter().map(|&i| original_run_ids[i].clone()).collect::<Vec<_>>();
2549
2550 let new_batch = SpineBatch::construct_batch_with_runs_replaced(
2551 &batch,
2552 &run_ids,
2553 &to_replace,
2554 ).unwrap();
2555
2556 prop_assert!(new_batch.run_meta.len() == batch.run_meta.len() - runs.len() + to_replace.run_meta.len());
2557 });
2558 }
2559}