1use std::cmp::Ordering;
51use std::collections::{BTreeMap, BTreeSet};
52use std::fmt::{Debug, Display};
53use std::mem;
54use std::ops::Range;
55use std::sync::Arc;
56
57use arrayvec::ArrayVec;
58use differential_dataflow::difference::Monoid;
59use differential_dataflow::lattice::Lattice;
60use differential_dataflow::trace::Description;
61use itertools::Itertools;
62use mz_ore::cast::CastFrom;
63use mz_persist::metrics::ColumnarMetrics;
64use mz_persist_types::Codec64;
65use serde::{Serialize, Serializer};
66use timely::PartialOrder;
67use timely::progress::frontier::AntichainRef;
68use timely::progress::{Antichain, Timestamp};
69use tracing::{error, warn};
70
71use crate::internal::paths::WriterKey;
72use crate::internal::state::{HollowBatch, RunId};
73
74use super::state::RunPart;
75
76#[derive(Debug, Clone, PartialEq)]
77pub struct FueledMergeReq<T> {
78 pub id: SpineId,
79 pub desc: Description<T>,
80 pub inputs: Vec<IdHollowBatch<T>>,
81}
82
83#[derive(Debug)]
84pub struct FueledMergeRes<T> {
85 pub output: HollowBatch<T>,
86 pub input: CompactionInput,
87 pub new_active_compaction: Option<ActiveCompaction>,
88}
89
90#[derive(Debug, Clone)]
95pub struct Trace<T> {
96 spine: Spine<T>,
97 pub(crate) roundtrip_structure: bool,
98}
99
100#[cfg(any(test, debug_assertions))]
101impl<T: PartialEq> PartialEq for Trace<T> {
102 fn eq(&self, other: &Self) -> bool {
103 let Trace {
106 spine: _,
107 roundtrip_structure: _,
108 } = self;
109 let Trace {
110 spine: _,
111 roundtrip_structure: _,
112 } = other;
113
114 self.batches().eq(other.batches())
117 }
118}
119
120impl<T: Timestamp + Lattice> Default for Trace<T> {
121 fn default() -> Self {
122 Self {
123 spine: Spine::new(),
124 roundtrip_structure: true,
125 }
126 }
127}
128
129#[derive(Clone, Debug, Serialize)]
130pub struct ThinSpineBatch<T> {
131 pub(crate) level: usize,
132 pub(crate) desc: Description<T>,
133 pub(crate) parts: Vec<SpineId>,
134 pub(crate) descs: Vec<Description<T>>,
137}
138
139impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
140 fn eq(&self, other: &Self) -> bool {
141 (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
143 }
144}
145
146#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
147pub struct ThinMerge<T> {
148 pub(crate) since: Antichain<T>,
149 pub(crate) remaining_work: usize,
150 pub(crate) active_compaction: Option<ActiveCompaction>,
151}
152
153impl<T: Clone> ThinMerge<T> {
154 fn fueling(merge: &FuelingMerge<T>) -> Self {
155 ThinMerge {
156 since: merge.since.clone(),
157 remaining_work: merge.remaining_work,
158 active_compaction: None,
159 }
160 }
161
162 fn fueled(batch: &SpineBatch<T>) -> Self {
163 ThinMerge {
164 since: batch.desc.since().clone(),
165 remaining_work: 0,
166 active_compaction: batch.active_compaction.clone(),
167 }
168 }
169}
170
171#[derive(Clone, Debug)]
179pub struct FlatTrace<T> {
180 pub(crate) since: Antichain<T>,
181 pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
189 pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
192 pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
197 pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
201}
202
203impl<T: Timestamp + Lattice> Trace<T> {
204 pub(crate) fn flatten(&self) -> FlatTrace<T> {
205 let since = self.spine.since.clone();
206 let mut legacy_batches = BTreeMap::new();
207 let mut hollow_batches = BTreeMap::new();
208 let mut spine_batches = BTreeMap::new();
209 let mut merges = BTreeMap::new();
210
211 let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
212 let id = batch.id();
213 let desc = batch.desc.clone();
214 let mut parts = Vec::with_capacity(batch.parts.len());
215 let mut descs = Vec::with_capacity(batch.parts.len());
216 for IdHollowBatch { id, batch } in &batch.parts {
217 parts.push(*id);
218 descs.push(batch.desc.clone());
219 if batch.desc.lower() == batch.desc.upper() {
227 hollow_batches.insert(*id, Arc::clone(batch));
228 } else {
229 legacy_batches.insert(Arc::clone(batch), ());
230 }
231 }
232
233 let spine_batch = ThinSpineBatch {
234 level,
235 desc,
236 parts,
237 descs,
238 };
239 spine_batches.insert(id, spine_batch);
240 };
241
242 for (level, state) in self.spine.merging.iter().enumerate() {
243 for batch in &state.batches {
244 push_spine_batch(level, batch);
245 if let Some(c) = &batch.active_compaction {
246 let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
247 assert!(
248 previous.is_none(),
249 "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
250 batch.id,
251 )
252 }
253 }
254 if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
255 let previous = merges.insert(*id, ThinMerge::fueling(merge));
256 assert!(
257 previous.is_none(),
258 "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
259 )
260 }
261 }
262
263 if !self.roundtrip_structure {
264 assert!(hollow_batches.is_empty());
265 spine_batches.clear();
266 merges.clear();
267 }
268
269 FlatTrace {
270 since,
271 legacy_batches,
272 hollow_batches,
273 spine_batches,
274 merges,
275 }
276 }
277 pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
278 let FlatTrace {
279 since,
280 legacy_batches,
281 mut hollow_batches,
282 spine_batches,
283 mut merges,
284 } = value;
285
286 let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
289
290 let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
296 if PartialOrder::less_than(left, right) {
297 Ordering::Less
298 } else if PartialOrder::less_than(right, left) {
299 Ordering::Greater
300 } else {
301 Ordering::Equal
302 }
303 };
304 let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
305 legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
306
307 let mut pop_batch =
308 |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
309 if let Some(batch) = hollow_batches.remove(&id) {
310 if let Some(desc) = expected_desc {
311 assert_eq!(desc.lower(), batch.desc.lower());
313 assert_eq!(desc.upper(), batch.desc.upper());
314 if desc.since() != batch.desc.since() {
317 warn!(
318 "unexpected since out of sync for spine batch: {:?} != {:?}",
319 desc.since().elements(),
320 batch.desc.since().elements()
321 );
322 }
323 }
324 return Ok(IdHollowBatch { id, batch });
325 }
326 let mut batch = legacy_batches
327 .pop()
328 .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
329
330 let Some(expected_desc) = expected_desc else {
331 return Ok(IdHollowBatch { id, batch });
332 };
333
334 if expected_desc.lower() != batch.desc.lower() {
335 return Err(format!(
336 "hollow batch lower {:?} did not match expected lower {:?}",
337 batch.desc.lower().elements(),
338 expected_desc.lower().elements()
339 ));
340 }
341
342 if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
345 let mut new_upper = batch.desc.upper().clone();
346
347 while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
350 let Some(next_batch) = legacy_batches.pop() else {
351 break;
352 };
353 if next_batch.is_empty() {
354 new_upper.clone_from(next_batch.desc.upper());
355 } else {
356 legacy_batches.push(next_batch);
357 break;
358 }
359 }
360
361 if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
364 legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
365 expected_desc.upper().clone(),
366 new_upper.clone(),
367 batch.desc.since().clone(),
368 ))));
369 new_upper.clone_from(expected_desc.upper());
370 }
371 batch = Arc::new(HollowBatch::empty(Description::new(
372 batch.desc.lower().clone(),
373 new_upper,
374 batch.desc.since().clone(),
375 )))
376 }
377
378 if expected_desc.upper() != batch.desc.upper() {
379 return Err(format!(
380 "hollow batch upper {:?} did not match expected upper {:?}",
381 batch.desc.upper().elements(),
382 expected_desc.upper().elements()
383 ));
384 }
385
386 Ok(IdHollowBatch { id, batch })
387 };
388
389 let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
390 (batch.desc.upper().clone(), id.1)
391 } else {
392 (Antichain::from_elem(T::minimum()), 0)
393 };
394 let levels = spine_batches
395 .first_key_value()
396 .map(|(_, batch)| batch.level + 1)
397 .unwrap_or(0);
398 let mut merging = vec![MergeState::default(); levels];
399 for (id, batch) in spine_batches {
400 let level = batch.level;
401
402 let descs = batch.descs.iter().map(Some).chain(std::iter::repeat_n(
403 None,
404 batch.parts.len() - batch.descs.len(),
405 ));
406 let parts = batch
407 .parts
408 .into_iter()
409 .zip_eq(descs)
410 .map(|(id, desc)| pop_batch(id, desc))
411 .collect::<Result<Vec<_>, _>>()?;
412 let len = parts.iter().map(|p| (*p).batch.len).sum();
413 let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
414 let batch = SpineBatch {
415 id,
416 desc: batch.desc,
417 parts,
418 active_compaction,
419 len,
420 };
421
422 let state = &mut merging[level];
423
424 state.push_batch(batch);
425 if let Some(id) = state.id() {
426 if let Some(merge) = merges.remove(&id) {
427 state.merge = Some(IdFuelingMerge {
428 id,
429 merge: FuelingMerge {
430 since: merge.since,
431 remaining_work: merge.remaining_work,
432 },
433 })
434 }
435 }
436 }
437
438 let mut trace = Trace {
439 spine: Spine {
440 effort: 1,
441 next_id,
442 since,
443 upper,
444 merging,
445 },
446 roundtrip_structure,
447 };
448
449 fn check_empty(name: &str, len: usize) -> Result<(), String> {
450 if len != 0 {
451 Err(format!("{len} {name} left after reconstructing spine"))
452 } else {
453 Ok(())
454 }
455 }
456
457 if roundtrip_structure {
458 check_empty("legacy batches", legacy_batches.len())?;
459 } else {
460 for batch in legacy_batches.into_iter().rev() {
462 trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
463 }
464 }
465 check_empty("hollow batches", hollow_batches.len())?;
466 check_empty("merges", merges.len())?;
467
468 debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
469
470 Ok(trace)
471 }
472}
473
474#[derive(Clone, Debug, Default)]
475pub(crate) struct SpineMetrics {
476 pub compact_batches: u64,
477 pub compacting_batches: u64,
478 pub noncompact_batches: u64,
479}
480
481impl<T> Trace<T> {
482 pub fn since(&self) -> &Antichain<T> {
483 &self.spine.since
484 }
485
486 pub fn upper(&self) -> &Antichain<T> {
487 &self.spine.upper
488 }
489
490 pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
491 for batch in self.batches() {
492 f(batch);
493 }
494 }
495
496 pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
497 self.spine
498 .spine_batches()
499 .flat_map(|b| b.parts.as_slice())
500 .map(|b| &*b.batch)
501 }
502
503 pub fn num_spine_batches(&self) -> usize {
504 self.spine.spine_batches().count()
505 }
506
507 #[cfg(test)]
508 pub fn num_hollow_batches(&self) -> usize {
509 self.batches().count()
510 }
511
512 #[cfg(test)]
513 pub fn num_updates(&self) -> usize {
514 self.batches().map(|b| b.len).sum()
515 }
516}
517
518impl<T: Timestamp + Lattice> Trace<T> {
519 pub fn downgrade_since(&mut self, since: &Antichain<T>) {
520 self.spine.since.clone_from(since);
521 }
522
523 #[must_use]
524 pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
525 let mut merge_reqs = Vec::new();
526 self.spine.insert(
527 batch,
528 &mut SpineLog::Enabled {
529 merge_reqs: &mut merge_reqs,
530 },
531 );
532 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
533 Self::remove_redundant_merge_reqs(merge_reqs)
540 }
541
542 pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
543 for batch in self.spine.spine_batches_mut().rev() {
546 if batch.id == id {
547 batch.active_compaction = Some(compaction);
548 break;
549 }
550 }
551 }
552
553 pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
556 self.spine.insert(batch, &mut SpineLog::Disabled);
557 }
558
559 #[must_use]
567 pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
568 let mut merge_reqs = Vec::new();
569 let did_work = self.spine.exert(
570 fuel,
571 &mut SpineLog::Enabled {
572 merge_reqs: &mut merge_reqs,
573 },
574 );
575 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
576 let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
578 (merge_reqs, did_work)
579 }
580
581 pub fn validate(&self) -> Result<(), String> {
585 self.spine.validate()
586 }
587
588 pub(crate) fn fueled_merge_reqs_before_ms(
591 &self,
592 threshold_ms: u64,
593 threshold_writer: Option<WriterKey>,
594 ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
595 self.spine
596 .spine_batches()
597 .filter(move |b| {
598 let noncompact = !b.is_compact();
599 let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
600 b.parts.iter().any(|b| {
601 b.batch
602 .parts
603 .iter()
604 .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
605 })
606 });
607 noncompact || old_writer
608 })
609 .filter(move |b| {
610 b.active_compaction
613 .as_ref()
614 .map_or(true, move |c| c.start_ms <= threshold_ms)
615 })
616 .map(|b| FueledMergeReq {
617 id: b.id,
618 desc: b.desc.clone(),
619 inputs: b.parts.clone(),
620 })
621 }
622
623 fn remove_redundant_merge_reqs(
630 mut merge_reqs: Vec<FueledMergeReq<T>>,
631 ) -> Vec<FueledMergeReq<T>> {
632 fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
634 b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
636 }
637
638 let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
639 while let Some(merge_req) = merge_reqs.pop() {
643 let covered = ret.iter().any(|r| covers(r, &merge_req));
644 if !covered {
645 ret.retain(|r| !covers(&merge_req, r));
649 ret.push(merge_req);
650 }
651 }
652 ret
653 }
654
655 pub fn spine_metrics(&self) -> SpineMetrics {
656 let mut metrics = SpineMetrics::default();
657 for batch in self.spine.spine_batches() {
658 if batch.is_compact() {
659 metrics.compact_batches += 1;
660 } else if batch.is_merging() {
661 metrics.compacting_batches += 1;
662 } else {
663 metrics.noncompact_batches += 1;
664 }
665 }
666 metrics
667 }
668}
669
670impl<T: Timestamp + Lattice + Codec64> Trace<T> {
671 pub fn apply_merge_res_checked<D: Codec64 + Monoid + PartialEq>(
672 &mut self,
673 res: &FueledMergeRes<T>,
674 metrics: &ColumnarMetrics,
675 ) -> ApplyMergeResult {
676 for batch in self.spine.spine_batches_mut().rev() {
677 let result = batch.maybe_replace_checked::<D>(res, metrics);
678 if result.matched() {
679 return result;
680 }
681 }
682 ApplyMergeResult::NotAppliedNoMatch
683 }
684
685 pub fn apply_merge_res_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
686 for batch in self.spine.spine_batches_mut().rev() {
687 let result = batch.maybe_replace_unchecked(res);
688 if result.matched() {
689 return result;
690 }
691 }
692 ApplyMergeResult::NotAppliedNoMatch
693 }
694
695 pub fn apply_tombstone_merge(&mut self, desc: &Description<T>) -> ApplyMergeResult {
696 for batch in self.spine.spine_batches_mut().rev() {
697 let result = batch.maybe_replace_with_tombstone(desc);
698 if result.matched() {
699 return result;
700 }
701 }
702 ApplyMergeResult::NotAppliedNoMatch
703 }
704}
705
706enum SpineLog<'a, T> {
709 Enabled {
710 merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
711 },
712 Disabled,
713}
714
715#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
716pub enum CompactionInput {
717 Legacy,
720 IdRange(SpineId),
722 PartialBatch(SpineId, BTreeSet<RunId>),
724}
725
726#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
727pub struct SpineId(pub usize, pub usize);
728
729impl Display for SpineId {
730 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
731 write!(f, "[{}, {})", self.0, self.1)
732 }
733}
734
735impl Serialize for SpineId {
736 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
737 where
738 S: Serializer,
739 {
740 let SpineId(lo, hi) = self;
741 serializer.serialize_str(&format!("{lo}-{hi}"))
742 }
743}
744
745pub fn id_range(ids: BTreeSet<SpineId>) -> SpineId {
747 let mut id_iter = ids.iter().copied();
748 let Some(mut result) = id_iter.next() else {
749 panic!("at least one batch must be present")
750 };
751
752 for id in id_iter {
753 assert_eq!(
754 result.1, id.0,
755 "expected contiguous ids, but {result:?} is not adjacent to {id:?} in ids {ids:?}"
756 );
757 result.1 = id.1;
758 }
759 result
760}
761
762impl SpineId {
763 fn covers(self, other: SpineId) -> bool {
764 self.0 <= other.0 && other.1 <= self.1
765 }
766}
767
768#[derive(Debug, Clone, PartialEq)]
769pub struct IdHollowBatch<T> {
770 pub id: SpineId,
771 pub batch: Arc<HollowBatch<T>>,
772}
773
774#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
775pub struct ActiveCompaction {
776 pub start_ms: u64,
777}
778
779#[derive(Debug, Clone, PartialEq)]
780struct SpineBatch<T> {
781 id: SpineId,
782 desc: Description<T>,
783 parts: Vec<IdHollowBatch<T>>,
784 active_compaction: Option<ActiveCompaction>,
785 len: usize,
787}
788
789impl<T> SpineBatch<T> {
790 fn merged(batch: IdHollowBatch<T>) -> Self
791 where
792 T: Clone,
793 {
794 Self {
795 id: batch.id,
796 desc: batch.batch.desc.clone(),
797 len: batch.batch.len,
798 parts: vec![batch],
799 active_compaction: None,
800 }
801 }
802}
803
804#[derive(Debug, Copy, Clone)]
805pub enum ApplyMergeResult {
806 AppliedExact,
807 AppliedSubset,
808 NotAppliedNoMatch,
809 NotAppliedInvalidSince,
810 NotAppliedTooManyUpdates,
811}
812
813impl ApplyMergeResult {
814 pub fn applied(&self) -> bool {
815 match self {
816 ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
817 _ => false,
818 }
819 }
820 pub fn matched(&self) -> bool {
821 match self {
822 ApplyMergeResult::AppliedExact
823 | ApplyMergeResult::AppliedSubset
824 | ApplyMergeResult::NotAppliedTooManyUpdates => true,
825 _ => false,
826 }
827 }
828}
829
830impl<T: Timestamp + Lattice> SpineBatch<T> {
831 pub fn lower(&self) -> &Antichain<T> {
832 self.desc().lower()
833 }
834
835 pub fn upper(&self) -> &Antichain<T> {
836 self.desc().upper()
837 }
838
839 fn id(&self) -> SpineId {
840 debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
841 debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
842 self.id
843 }
844
845 pub fn is_compact(&self) -> bool {
846 self.parts
853 .iter()
854 .map(|p| p.batch.run_meta.len())
855 .sum::<usize>()
856 <= 1
857 }
858
859 pub fn is_merging(&self) -> bool {
860 self.active_compaction.is_some()
861 }
862
863 fn desc(&self) -> &Description<T> {
864 &self.desc
865 }
866
867 pub fn len(&self) -> usize {
868 debug_assert_eq!(
871 self.len,
872 self.parts.iter().map(|x| x.batch.len).sum::<usize>()
873 );
874 self.len
875 }
876
877 pub fn is_empty(&self) -> bool {
878 self.len() == 0
879 }
880
881 pub fn empty(
882 id: SpineId,
883 lower: Antichain<T>,
884 upper: Antichain<T>,
885 since: Antichain<T>,
886 ) -> Self {
887 SpineBatch::merged(IdHollowBatch {
888 id,
889 batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
890 })
891 }
892
893 pub fn begin_merge(
894 bs: &[Self],
895 compaction_frontier: Option<AntichainRef<T>>,
896 ) -> Option<IdFuelingMerge<T>> {
897 let from = bs.first()?.id().0;
898 let until = bs.last()?.id().1;
899 let id = SpineId(from, until);
900 let mut sinces = bs.iter().map(|b| b.desc().since());
901 let mut since = sinces.next()?.clone();
902 for b in bs {
903 since.join_assign(b.desc().since())
904 }
905 if let Some(compaction_frontier) = compaction_frontier {
906 since.join_assign(&compaction_frontier.to_owned());
907 }
908 let remaining_work = bs.iter().map(|x| x.len()).sum();
909 Some(IdFuelingMerge {
910 id,
911 merge: FuelingMerge {
912 since,
913 remaining_work,
914 },
915 })
916 }
917
918 #[cfg(test)]
919 fn describe(&self, extended: bool) -> String {
920 let SpineBatch {
921 id,
922 parts,
923 desc,
924 active_compaction,
925 len,
926 } = self;
927 let compaction = match active_compaction {
928 None => "".to_owned(),
929 Some(c) => format!(" (c@{})", c.start_ms),
930 };
931 match extended {
932 false => format!(
933 "[{}-{}]{:?}{:?}{}/{}{compaction}",
934 id.0,
935 id.1,
936 desc.lower().elements(),
937 desc.upper().elements(),
938 parts.len(),
939 len
940 ),
941 true => {
942 format!(
943 "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
944 id.0,
945 id.1,
946 desc.lower().elements(),
947 desc.upper().elements(),
948 desc.since().elements(),
949 parts.len(),
950 len,
951 parts
952 .iter()
953 .flat_map(|x| x.batch.parts.iter())
954 .map(|x| format!(" {}", x.printable_name()))
955 .collect::<Vec<_>>()
956 .join("")
957 )
958 }
959 }
960 }
961}
962
963impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
964 fn diffs_sum<'a, D: Monoid + Codec64>(
965 parts: impl IntoIterator<Item = &'a RunPart<T>>,
966 metrics: &ColumnarMetrics,
967 ) -> Option<D> {
968 let mut sum = D::zero();
969 for part in parts {
970 sum.plus_equals(&part.diffs_sum::<D>(metrics)?);
971 }
972 Some(sum)
973 }
974
975 fn diffs_sum_for_runs<D: Monoid + Codec64>(
978 batch: &HollowBatch<T>,
979 run_ids: &[RunId],
980 metrics: &ColumnarMetrics,
981 ) -> Option<D> {
982 let mut run_ids = BTreeSet::from_iter(run_ids.iter().copied());
983 let mut sum = D::zero();
984
985 for (meta, run) in batch.runs() {
986 let id = meta.id?;
987 if run_ids.remove(&id) {
988 sum.plus_equals(&Self::diffs_sum(run, metrics)?);
989 }
990 }
991
992 run_ids.is_empty().then_some(sum)
993 }
994
995 fn maybe_replace_with_tombstone(&mut self, desc: &Description<T>) -> ApplyMergeResult {
996 let exact_match =
997 desc.lower() == self.desc().lower() && desc.upper() == self.desc().upper();
998
999 let empty_batch = HollowBatch::empty(desc.clone());
1000 if exact_match {
1001 *self = SpineBatch::merged(IdHollowBatch {
1002 id: self.id(),
1003 batch: Arc::new(empty_batch),
1004 });
1005 return ApplyMergeResult::AppliedExact;
1006 }
1007
1008 if let Some((id, range)) = self.find_replacement_range(desc) {
1009 self.perform_subset_replacement(&empty_batch, id, range, None)
1010 } else {
1011 ApplyMergeResult::NotAppliedNoMatch
1012 }
1013 }
1014
1015 fn construct_batch_with_runs_replaced(
1016 original: &HollowBatch<T>,
1017 run_ids: &[RunId],
1018 replacement: &HollowBatch<T>,
1019 ) -> Result<HollowBatch<T>, ApplyMergeResult> {
1020 if run_ids.is_empty() {
1021 return Err(ApplyMergeResult::NotAppliedNoMatch);
1022 }
1023
1024 let orig_run_ids: BTreeSet<_> = original.runs().filter_map(|(meta, _)| meta.id).collect();
1025 let run_ids: BTreeSet<_> = run_ids.iter().cloned().collect();
1026 if !orig_run_ids.is_superset(&run_ids) {
1027 return Err(ApplyMergeResult::NotAppliedNoMatch);
1028 }
1029
1030 let runs: Vec<_> = original
1031 .runs()
1032 .filter(|(meta, _)| {
1033 !run_ids.contains(&meta.id.expect("id should be present at this point"))
1034 })
1035 .chain(replacement.runs())
1036 .collect();
1037
1038 let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::<usize>();
1039
1040 let run_meta = runs
1041 .iter()
1042 .map(|(meta, _)| *meta)
1043 .cloned()
1044 .collect::<Vec<_>>();
1045
1046 let parts = runs
1047 .iter()
1048 .flat_map(|(_, parts)| *parts)
1049 .cloned()
1050 .collect::<Vec<_>>();
1051
1052 let run_splits = {
1053 let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1));
1054 let mut pointer = 0;
1055 for (i, (_, parts)) in runs.into_iter().enumerate() {
1056 if parts.is_empty() {
1057 continue;
1058 }
1059 if i < run_meta.len() - 1 {
1060 splits.push(pointer + parts.len());
1061 }
1062 pointer += parts.len();
1063 }
1064 splits
1065 };
1066
1067 Ok(HollowBatch::new(
1068 replacement.desc.clone(),
1069 parts,
1070 len,
1071 run_meta,
1072 run_splits,
1073 ))
1074 }
1075
1076 fn maybe_replace_checked<D>(
1077 &mut self,
1078 res: &FueledMergeRes<T>,
1079 metrics: &ColumnarMetrics,
1080 ) -> ApplyMergeResult
1081 where
1082 D: Monoid + Codec64 + PartialEq + Debug,
1083 {
1084 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1088 return ApplyMergeResult::NotAppliedInvalidSince;
1089 }
1090
1091 let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1092 let num_batches = self.parts.len();
1093
1094 let result = match &res.input {
1095 CompactionInput::IdRange(id) => {
1096 self.handle_id_range_replacement::<D>(res, id, new_diffs_sum, metrics)
1097 }
1098 CompactionInput::PartialBatch(id, runs) => {
1099 self.handle_partial_batch_replacement::<D>(res, *id, runs, new_diffs_sum, metrics)
1100 }
1101 CompactionInput::Legacy => self.maybe_replace_checked_classic::<D>(res, metrics),
1102 };
1103
1104 let num_batches_after = self.parts.len();
1105 assert!(
1106 num_batches_after <= num_batches,
1107 "replacing parts should not increase the number of batches"
1108 );
1109 result
1110 }
1111
1112 fn handle_id_range_replacement<D>(
1113 &mut self,
1114 res: &FueledMergeRes<T>,
1115 id: &SpineId,
1116 new_diffs_sum: Option<D>,
1117 metrics: &ColumnarMetrics,
1118 ) -> ApplyMergeResult
1119 where
1120 D: Monoid + Codec64 + PartialEq + Debug,
1121 {
1122 let range = self
1123 .parts
1124 .iter()
1125 .enumerate()
1126 .filter_map(|(i, p)| {
1127 if id.covers(p.id) {
1128 Some((i, p.id))
1129 } else {
1130 None
1131 }
1132 })
1133 .collect::<Vec<_>>();
1134
1135 let ids: BTreeSet<_> = range.iter().map(|(_, id)| *id).collect();
1136
1137 if ids.is_empty() || id != &id_range(ids) {
1147 return ApplyMergeResult::NotAppliedNoMatch;
1148 }
1149
1150 let (min, max) = match range.iter().map(|(i, _)| *i).minmax() {
1152 itertools::MinMaxResult::NoElements => return ApplyMergeResult::NotAppliedNoMatch,
1153 itertools::MinMaxResult::OneElement(elt) => (elt, elt),
1154 itertools::MinMaxResult::MinMax(min, max) => (min, max),
1155 };
1156 let replacement_range = min..max + 1;
1157
1158 let old_diffs_sum = Self::diffs_sum::<D>(
1161 self.parts[replacement_range.clone()]
1162 .iter()
1163 .flat_map(|p| p.batch.parts.iter()),
1164 metrics,
1165 );
1166
1167 Self::validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "id range replacement");
1168
1169 self.perform_subset_replacement(
1170 &res.output,
1171 *id,
1172 replacement_range,
1173 res.new_active_compaction.clone(),
1174 )
1175 }
1176
1177 fn handle_partial_batch_replacement<D>(
1178 &mut self,
1179 res: &FueledMergeRes<T>,
1180 id: SpineId,
1181 runs: &BTreeSet<RunId>,
1182 new_diffs_sum: Option<D>,
1183 metrics: &ColumnarMetrics,
1184 ) -> ApplyMergeResult
1185 where
1186 D: Monoid + Codec64 + PartialEq + Debug,
1187 {
1188 if runs.is_empty() {
1189 return ApplyMergeResult::NotAppliedNoMatch;
1190 }
1191
1192 let part = self.parts.iter().enumerate().find(|(_, p)| p.id == id);
1193 let Some((i, batch)) = part else {
1194 return ApplyMergeResult::NotAppliedNoMatch;
1195 };
1196 let replacement_range = i..(i + 1);
1197
1198 let replacement_desc = &res.output.desc;
1199 let existing_desc = &batch.batch.desc;
1200 assert_eq!(
1201 replacement_desc.lower(),
1202 existing_desc.lower(),
1203 "batch lower should match, but {:?} != {:?}",
1204 replacement_desc.lower(),
1205 existing_desc.lower()
1206 );
1207 assert_eq!(
1208 replacement_desc.upper(),
1209 existing_desc.upper(),
1210 "batch upper should match, but {:?} != {:?}",
1211 replacement_desc.upper(),
1212 existing_desc.upper()
1213 );
1214 if !PartialOrder::less_equal(existing_desc.since(), replacement_desc.since()) {
1215 error!(
1216 "batch since should advance, but {:?} !<= {:?}",
1217 existing_desc.since(),
1218 replacement_desc.since()
1219 );
1220 return ApplyMergeResult::NotAppliedInvalidSince;
1221 }
1222
1223 let batch = &batch.batch;
1224 let run_ids = runs.iter().cloned().collect::<Vec<_>>();
1225
1226 match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) {
1227 Ok(new_batch) => {
1228 let old_diffs_sum = Self::diffs_sum_for_runs::<D>(batch, &run_ids, metrics);
1229 Self::validate_diffs_sum_match(
1230 old_diffs_sum,
1231 new_diffs_sum,
1232 "partial batch replacement",
1233 );
1234 let old_batch_diff_sum = Self::diffs_sum::<D>(batch.parts.iter(), metrics);
1235 let new_batch_diff_sum = Self::diffs_sum::<D>(new_batch.parts.iter(), metrics);
1236 Self::validate_diffs_sum_match(
1237 old_batch_diff_sum,
1238 new_batch_diff_sum,
1239 "sanity checking diffs sum for replaced runs",
1240 );
1241 self.perform_subset_replacement(
1242 &new_batch,
1243 id,
1244 replacement_range,
1245 res.new_active_compaction.clone(),
1246 )
1247 }
1248 Err(err) => err,
1249 }
1250 }
1251
1252 fn validate_diffs_sum_match<D>(
1253 old_diffs_sum: Option<D>,
1254 new_diffs_sum: Option<D>,
1255 context: &str,
1256 ) where
1257 D: Monoid + Codec64 + PartialEq + Debug,
1258 {
1259 let new_diffs_sum = new_diffs_sum.unwrap_or_else(D::zero);
1260 if let Some(old_diffs_sum) = old_diffs_sum {
1261 assert_eq!(
1262 old_diffs_sum, new_diffs_sum,
1263 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?}) ({})",
1264 new_diffs_sum, old_diffs_sum, context
1265 )
1266 }
1267 }
1268
1269 fn maybe_replace_checked_classic<D>(
1275 &mut self,
1276 res: &FueledMergeRes<T>,
1277 metrics: &ColumnarMetrics,
1278 ) -> ApplyMergeResult
1279 where
1280 D: Monoid + Codec64 + PartialEq + Debug,
1281 {
1282 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1286 return ApplyMergeResult::NotAppliedInvalidSince;
1287 }
1288
1289 let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1290
1291 let exact_match = res.output.desc.lower() == self.desc().lower()
1293 && res.output.desc.upper() == self.desc().upper();
1294 if exact_match {
1295 let old_diffs_sum = Self::diffs_sum::<D>(
1296 self.parts.iter().flat_map(|p| p.batch.parts.iter()),
1297 metrics,
1298 );
1299
1300 if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1301 assert_eq!(
1302 old_diffs_sum, new_diffs_sum,
1303 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1304 new_diffs_sum, old_diffs_sum
1305 );
1306 }
1307
1308 if res.output.len > self.len() {
1317 return ApplyMergeResult::NotAppliedTooManyUpdates;
1318 }
1319 *self = SpineBatch::merged(IdHollowBatch {
1320 id: self.id(),
1321 batch: Arc::new(res.output.clone()),
1322 });
1323 return ApplyMergeResult::AppliedExact;
1324 }
1325
1326 if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1328 let old_diffs_sum = Self::diffs_sum::<D>(
1329 self.parts[range.clone()]
1330 .iter()
1331 .flat_map(|p| p.batch.parts.iter()),
1332 metrics,
1333 );
1334
1335 if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1336 assert_eq!(
1337 old_diffs_sum, new_diffs_sum,
1338 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1339 new_diffs_sum, old_diffs_sum
1340 );
1341 }
1342
1343 self.perform_subset_replacement(
1344 &res.output,
1345 id,
1346 range,
1347 res.new_active_compaction.clone(),
1348 )
1349 } else {
1350 ApplyMergeResult::NotAppliedNoMatch
1351 }
1352 }
1353
1354 fn maybe_replace_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
1360 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1364 return ApplyMergeResult::NotAppliedInvalidSince;
1365 }
1366
1367 let exact_match = res.output.desc.lower() == self.desc().lower()
1369 && res.output.desc.upper() == self.desc().upper();
1370 if exact_match {
1371 if res.output.len > self.len() {
1380 return ApplyMergeResult::NotAppliedTooManyUpdates;
1381 }
1382
1383 *self = SpineBatch::merged(IdHollowBatch {
1384 id: self.id(),
1385 batch: Arc::new(res.output.clone()),
1386 });
1387 return ApplyMergeResult::AppliedExact;
1388 }
1389
1390 if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1392 self.perform_subset_replacement(
1393 &res.output,
1394 id,
1395 range,
1396 res.new_active_compaction.clone(),
1397 )
1398 } else {
1399 ApplyMergeResult::NotAppliedNoMatch
1400 }
1401 }
1402
1403 fn find_replacement_range(&self, desc: &Description<T>) -> Option<(SpineId, Range<usize>)> {
1405 let mut lower = None;
1415 let mut upper = None;
1416
1417 for (i, batch) in self.parts.iter().enumerate() {
1418 if batch.batch.desc.lower() == desc.lower() {
1419 lower = Some((i, batch.id.0));
1420 }
1421 if batch.batch.desc.upper() == desc.upper() {
1422 upper = Some((i, batch.id.1));
1423 }
1424 if lower.is_some() && upper.is_some() {
1425 break;
1426 }
1427 }
1428
1429 match (lower, upper) {
1430 (Some((lower_idx, id_lower)), Some((upper_idx, id_upper))) => {
1431 Some((SpineId(id_lower, id_upper), lower_idx..(upper_idx + 1)))
1432 }
1433 _ => None,
1434 }
1435 }
1436
1437 fn perform_subset_replacement(
1439 &mut self,
1440 res: &HollowBatch<T>,
1441 spine_id: SpineId,
1442 range: Range<usize>,
1443 new_active_compaction: Option<ActiveCompaction>,
1444 ) -> ApplyMergeResult {
1445 let SpineBatch {
1446 id,
1447 parts,
1448 desc,
1449 active_compaction: _,
1450 len: _,
1451 } = self;
1452
1453 let mut new_parts = vec![];
1454 new_parts.extend_from_slice(&parts[..range.start]);
1455 new_parts.push(IdHollowBatch {
1456 id: spine_id,
1457 batch: Arc::new(res.clone()),
1458 });
1459 new_parts.extend_from_slice(&parts[range.end..]);
1460
1461 let res = if range.len() == parts.len() {
1462 ApplyMergeResult::AppliedExact
1463 } else {
1464 ApplyMergeResult::AppliedSubset
1465 };
1466
1467 let new_spine_batch = SpineBatch {
1468 id: *id,
1469 desc: desc.to_owned(),
1470 len: new_parts.iter().map(|x| x.batch.len).sum(),
1471 parts: new_parts,
1472 active_compaction: new_active_compaction,
1473 };
1474
1475 if new_spine_batch.len() > self.len() {
1476 return ApplyMergeResult::NotAppliedTooManyUpdates;
1477 }
1478
1479 *self = new_spine_batch;
1480 res
1481 }
1482}
1483
1484#[derive(Debug, Clone, PartialEq, Serialize)]
1485pub struct FuelingMerge<T> {
1486 pub(crate) since: Antichain<T>,
1487 pub(crate) remaining_work: usize,
1488}
1489
1490#[derive(Debug, Clone, PartialEq, Serialize)]
1491pub struct IdFuelingMerge<T> {
1492 id: SpineId,
1493 merge: FuelingMerge<T>,
1494}
1495
1496impl<T: Timestamp + Lattice> FuelingMerge<T> {
1497 #[allow(clippy::as_conversions)]
1503 fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
1504 let used = std::cmp::min(*fuel as usize, self.remaining_work);
1505 self.remaining_work = self.remaining_work.saturating_sub(used);
1506 *fuel -= used as isize;
1507 }
1508
1509 fn done(
1514 self,
1515 bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1516 log: &mut SpineLog<'_, T>,
1517 ) -> Option<SpineBatch<T>> {
1518 let first = bs.first()?;
1519 let last = bs.last()?;
1520 let id = SpineId(first.id().0, last.id().1);
1521 assert!(id.0 < id.1);
1522 let lower = first.desc().lower().clone();
1523 let upper = last.desc().upper().clone();
1524 let since = self.since;
1525
1526 if bs.iter().all(SpineBatch::is_empty) {
1528 return Some(SpineBatch::empty(id, lower, upper, since));
1529 }
1530
1531 let desc = Description::new(lower, upper, since);
1532 let len = bs.iter().map(SpineBatch::len).sum();
1533
1534 let mut merged_parts_len = 0;
1538 for b in &bs {
1539 merged_parts_len += b.parts.len();
1540 }
1541 let mut merged_parts = Vec::with_capacity(merged_parts_len);
1542 for b in bs {
1543 merged_parts.extend(b.parts)
1544 }
1545 debug_assert_eq!(merged_parts.len(), merged_parts_len);
1547
1548 if let SpineLog::Enabled { merge_reqs } = log {
1549 merge_reqs.push(FueledMergeReq {
1550 id,
1551 desc: desc.clone(),
1552 inputs: merged_parts.clone(),
1553 });
1554 }
1555
1556 Some(SpineBatch {
1557 id,
1558 desc,
1559 len,
1560 parts: merged_parts,
1561 active_compaction: None,
1562 })
1563 }
1564}
1565
1566const BATCHES_PER_LEVEL: usize = 2;
1571
1572#[derive(Debug, Clone)]
1655struct Spine<T> {
1656 effort: usize,
1657 next_id: usize,
1658 since: Antichain<T>,
1659 upper: Antichain<T>,
1660 merging: Vec<MergeState<T>>,
1661}
1662
1663impl<T> Spine<T> {
1664 pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1666 self.merging.iter().rev().flat_map(|m| &m.batches)
1667 }
1668
1669 pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1671 self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1672 }
1673}
1674
1675impl<T: Timestamp + Lattice> Spine<T> {
1676 pub fn new() -> Self {
1683 Spine {
1684 effort: 1,
1685 next_id: 0,
1686 since: Antichain::from_elem(T::minimum()),
1687 upper: Antichain::from_elem(T::minimum()),
1688 merging: Vec::new(),
1689 }
1690 }
1691
1692 fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1700 self.tidy_layers();
1701 if self.reduced() {
1702 return false;
1703 }
1704
1705 if self.merging.iter().any(|b| b.merge.is_some()) {
1706 let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1707 self.apply_fuel(&fuel, log);
1709 } else {
1710 let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1715 let id = self.next_id();
1716 self.introduce_batch(
1717 SpineBatch::empty(
1718 id,
1719 self.upper.clone(),
1720 self.upper.clone(),
1721 self.since.clone(),
1722 ),
1723 level,
1724 log,
1725 );
1726 }
1727 true
1728 }
1729
1730 pub fn next_id(&mut self) -> SpineId {
1731 let id = self.next_id;
1732 self.next_id += 1;
1733 SpineId(id, self.next_id)
1734 }
1735
1736 pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1740 assert!(batch.desc.lower() != batch.desc.upper());
1741 assert_eq!(batch.desc.lower(), &self.upper);
1742
1743 let id = self.next_id();
1744 let batch = SpineBatch::merged(IdHollowBatch {
1745 id,
1746 batch: Arc::new(batch),
1747 });
1748
1749 self.upper.clone_from(batch.upper());
1750
1751 if batch.is_empty() {
1754 if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1755 if self.merging[position].is_single() && self.merging[position].is_empty() {
1756 self.insert_at(batch, position);
1757 if let Some(merged) = self.complete_at(position, log) {
1760 self.merging[position] = MergeState::single(merged);
1761 }
1762 return;
1763 }
1764 }
1765 }
1766
1767 let index = batch.len().next_power_of_two();
1769 self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1770 }
1771
1772 fn reduced(&self) -> bool {
1779 self.spine_batches()
1780 .map(|b| {
1781 b.parts
1782 .iter()
1783 .map(|p| p.batch.run_meta.len())
1784 .sum::<usize>()
1785 })
1786 .sum::<usize>()
1787 < 2
1788 }
1789
1790 #[allow(dead_code)]
1794 fn describe(&self) -> Vec<(usize, usize)> {
1795 self.merging
1796 .iter()
1797 .map(|b| (b.batches.len(), b.len()))
1798 .collect()
1799 }
1800
1801 fn introduce_batch(
1807 &mut self,
1808 batch: SpineBatch<T>,
1809 batch_index: usize,
1810 log: &mut SpineLog<'_, T>,
1811 ) {
1812 if batch_index > 32 {
1833 println!("Large batch index: {}", batch_index);
1834 }
1835
1836 let mut fuel = 8 << batch_index;
1843 fuel *= self.effort;
1846 #[allow(clippy::as_conversions)]
1849 let fuel = fuel as isize;
1850
1851 self.apply_fuel(&fuel, log);
1858
1859 self.roll_up(batch_index, log);
1876
1877 self.insert_at(batch, batch_index);
1881
1882 self.tidy_layers();
1888 }
1889
1890 fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1898 while self.merging.len() <= index {
1900 self.merging.push(MergeState::default());
1901 }
1902
1903 if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1905 let mut merged = None;
1908 for i in 0..index {
1909 if let Some(merged) = merged.take() {
1910 self.insert_at(merged, i);
1911 }
1912 merged = self.complete_at(i, log);
1913 }
1914
1915 if let Some(merged) = merged {
1919 self.insert_at(merged, index);
1920 }
1921
1922 if self.merging[index].is_full() {
1925 let merged = self.complete_at(index, log).expect("double batch");
1926 self.insert_at(merged, index + 1);
1927 }
1928 }
1929 }
1930
1931 pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1939 for index in 0..self.merging.len() {
1945 let mut fuel = *fuel;
1947 self.merging[index].work(&mut fuel);
1950 if self.merging[index].is_complete() {
1962 let complete = self.complete_at(index, log).expect("complete batch");
1963 self.insert_at(complete, index + 1);
1964 }
1965 }
1966 }
1967
1968 fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1974 while self.merging.len() <= index {
1976 self.merging.push(MergeState::default());
1977 }
1978
1979 let merging = &mut self.merging[index];
1981 merging.push_batch(batch);
1982 if merging.batches.is_full() {
1983 let compaction_frontier = Some(self.since.borrow());
1984 merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1985 }
1986 }
1987
1988 fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1990 self.merging[index].complete(log)
1991 }
1992
1993 fn tidy_layers(&mut self) {
1995 if !self.merging.is_empty() {
2000 let mut length = self.merging.len();
2001 if self.merging[length - 1].is_single() {
2002 let appropriate_level = usize::cast_from(
2006 self.merging[length - 1]
2007 .len()
2008 .next_power_of_two()
2009 .trailing_zeros(),
2010 );
2011
2012 while appropriate_level < length - 1 {
2014 let current = &mut self.merging[length - 2];
2015 if current.is_vacant() {
2016 self.merging.remove(length - 2);
2018 length = self.merging.len();
2019 } else {
2020 if !current.is_full() {
2021 let mut smaller = 0;
2029 for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
2030 smaller += batch.batches.len() << index;
2031 }
2032
2033 if smaller <= (1 << length) / 8 {
2034 let state = self.merging.remove(length - 2);
2037 assert_eq!(state.batches.len(), 1);
2038 for batch in state.batches {
2039 self.insert_at(batch, length - 2);
2040 }
2041 }
2042 }
2043 break;
2044 }
2045 }
2046 }
2047 }
2048 }
2049
2050 fn validate(&self) -> Result<(), String> {
2058 let mut id = SpineId(0, 0);
2059 let mut frontier = Antichain::from_elem(T::minimum());
2060 for x in self.merging.iter().rev() {
2061 if x.is_full() != x.merge.is_some() {
2062 return Err(format!(
2063 "all (and only) full batches should have fueling merges (full={}, merge={:?})",
2064 x.is_full(),
2065 x.merge,
2066 ));
2067 }
2068
2069 if let Some(m) = &x.merge {
2070 if !x.is_full() {
2071 return Err(format!(
2072 "merge should only exist for full batches (len={:?}, merge={:?})",
2073 x.batches.len(),
2074 m.id,
2075 ));
2076 }
2077 if x.id() != Some(m.id) {
2078 return Err(format!(
2079 "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
2080 x.id(),
2081 m.id,
2082 ));
2083 }
2084 }
2085
2086 for batch in &x.batches {
2091 if batch.id().0 != id.1 {
2092 return Err(format!(
2093 "batch id {:?} does not match the previous id {:?}: {:?}",
2094 batch.id(),
2095 id,
2096 self
2097 ));
2098 }
2099 id = batch.id();
2100 if batch.desc().lower() != &frontier {
2101 return Err(format!(
2102 "batch lower {:?} does not match the previous upper {:?}: {:?}",
2103 batch.desc().lower(),
2104 frontier,
2105 self
2106 ));
2107 }
2108 frontier.clone_from(batch.desc().upper());
2109 if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
2110 return Err(format!(
2111 "since of batch {:?} past the spine since {:?}: {:?}",
2112 batch.desc().since(),
2113 self.since,
2114 self
2115 ));
2116 }
2117 }
2118 }
2119 if self.next_id != id.1 {
2120 return Err(format!(
2121 "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
2122 self.next_id, id, self
2123 ));
2124 }
2125 if self.upper != frontier {
2126 return Err(format!(
2127 "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
2128 self.upper, frontier, self
2129 ));
2130 }
2131 Ok(())
2132 }
2133}
2134
2135#[derive(Debug, Clone)]
2140struct MergeState<T> {
2141 batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
2142 merge: Option<IdFuelingMerge<T>>,
2143}
2144
2145impl<T> Default for MergeState<T> {
2146 fn default() -> Self {
2147 Self {
2148 batches: ArrayVec::new(),
2149 merge: None,
2150 }
2151 }
2152}
2153
2154impl<T: Timestamp + Lattice> MergeState<T> {
2155 fn id(&self) -> Option<SpineId> {
2157 if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
2158 Some(SpineId(first.id().0, last.id().1))
2159 } else {
2160 None
2161 }
2162 }
2163
2164 fn single(batch: SpineBatch<T>) -> Self {
2166 let mut state = Self::default();
2167 state.push_batch(batch);
2168 state
2169 }
2170
2171 fn push_batch(&mut self, batch: SpineBatch<T>) {
2173 if let Some(last) = self.batches.last() {
2174 assert_eq!(last.id().1, batch.id().0);
2175 assert_eq!(last.upper(), batch.lower());
2176 }
2177 assert!(
2178 self.merge.is_none(),
2179 "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
2180 batch.id,
2181 self.batches.len(),
2182 );
2183 self.batches
2184 .try_push(batch)
2185 .expect("Attempted to insert batch into full layer!");
2186 }
2187
2188 fn len(&self) -> usize {
2190 self.batches.iter().map(SpineBatch::len).sum()
2191 }
2192
2193 fn is_empty(&self) -> bool {
2195 self.batches.iter().all(SpineBatch::is_empty)
2196 }
2197
2198 fn is_vacant(&self) -> bool {
2200 self.batches.is_empty()
2201 }
2202
2203 fn is_single(&self) -> bool {
2205 self.batches.len() == 1
2206 }
2207
2208 fn is_full(&self) -> bool {
2211 self.batches.is_full()
2212 }
2213
2214 fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
2221 let mut this = mem::take(self);
2222 if this.batches.len() <= 1 {
2223 this.batches.pop()
2224 } else {
2225 let id_merge = this
2227 .merge
2228 .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
2229 id_merge.merge.done(this.batches, log)
2230 }
2231 }
2232
2233 fn is_complete(&self) -> bool {
2235 match &self.merge {
2236 Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
2237 None => false,
2238 }
2239 }
2240
2241 fn work(&mut self, fuel: &mut isize) {
2243 if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
2245 merge.work(&self.batches[..], fuel)
2246 }
2247 }
2248}
2249
2250#[cfg(test)]
2251pub mod datadriven {
2252 use mz_ore::fmt::FormatBuffer;
2253
2254 use crate::internal::datadriven::DirectiveArgs;
2255
2256 use super::*;
2257
2258 #[derive(Debug, Default)]
2260 pub struct TraceState {
2261 pub trace: Trace<u64>,
2262 pub merge_reqs: Vec<FueledMergeReq<u64>>,
2263 }
2264
2265 pub fn since_upper(
2266 datadriven: &TraceState,
2267 _args: DirectiveArgs,
2268 ) -> Result<String, anyhow::Error> {
2269 Ok(format!(
2270 "{:?}{:?}\n",
2271 datadriven.trace.since().elements(),
2272 datadriven.trace.upper().elements()
2273 ))
2274 }
2275
2276 pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
2277 let mut s = String::new();
2278 for b in datadriven.trace.spine.spine_batches() {
2279 s.push_str(b.describe(true).as_str());
2280 s.push('\n');
2281 }
2282 Ok(s)
2283 }
2284
2285 pub fn insert(
2286 datadriven: &mut TraceState,
2287 args: DirectiveArgs,
2288 ) -> Result<String, anyhow::Error> {
2289 for x in args
2290 .input
2291 .trim()
2292 .split('\n')
2293 .map(DirectiveArgs::parse_hollow_batch)
2294 {
2295 datadriven
2296 .merge_reqs
2297 .append(&mut datadriven.trace.push_batch(x));
2298 }
2299 Ok("ok\n".to_owned())
2300 }
2301
2302 pub fn downgrade_since(
2303 datadriven: &mut TraceState,
2304 args: DirectiveArgs,
2305 ) -> Result<String, anyhow::Error> {
2306 let since = args.expect("since");
2307 datadriven
2308 .trace
2309 .downgrade_since(&Antichain::from_elem(since));
2310 Ok("ok\n".to_owned())
2311 }
2312
2313 pub fn take_merge_req(
2314 datadriven: &mut TraceState,
2315 _args: DirectiveArgs,
2316 ) -> Result<String, anyhow::Error> {
2317 let mut s = String::new();
2318 for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
2319 write!(
2320 s,
2321 "{:?}{:?}{:?} {}\n",
2322 merge_req.desc.lower().elements(),
2323 merge_req.desc.upper().elements(),
2324 merge_req.desc.since().elements(),
2325 merge_req
2326 .inputs
2327 .iter()
2328 .flat_map(|x| x.batch.parts.iter())
2329 .map(|x| x.printable_name())
2330 .collect::<Vec<_>>()
2331 .join(" ")
2332 );
2333 }
2334 Ok(s)
2335 }
2336
2337 pub fn apply_merge_res(
2338 datadriven: &mut TraceState,
2339 args: DirectiveArgs,
2340 ) -> Result<String, anyhow::Error> {
2341 let res = FueledMergeRes {
2342 output: DirectiveArgs::parse_hollow_batch(args.input),
2343 input: CompactionInput::Legacy,
2344 new_active_compaction: None,
2345 };
2346 match datadriven.trace.apply_merge_res_unchecked(&res) {
2347 ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
2348 ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
2349 ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
2350 ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
2351 ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
2352 }
2353 }
2354}
2355
2356#[cfg(test)]
2357pub(crate) mod tests {
2358 use std::ops::Range;
2359
2360 use proptest::prelude::*;
2361 use semver::Version;
2362
2363 use crate::internal::state::tests::{any_hollow_batch, any_hollow_batch_with_exact_runs};
2364
2365 use super::*;
2366
2367 pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
2368 num_batches: Range<usize>,
2369 ) -> impl Strategy<Value = Trace<T>> {
2370 Strategy::prop_map(
2371 (
2372 any::<Option<T>>(),
2373 proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
2374 any::<bool>(),
2375 any::<u64>(),
2376 ),
2377 |(since, mut batches, roundtrip_structure, timeout_ms)| {
2378 let mut trace = Trace::<T>::default();
2379 trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
2380
2381 batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
2384 let mut lower = Antichain::from_elem(T::minimum());
2385 for mut batch in batches {
2386 if PartialOrder::less_than(trace.since(), batch.desc.since()) {
2388 trace.downgrade_since(batch.desc.since());
2389 }
2390 batch.desc = Description::new(
2391 lower.clone(),
2392 batch.desc.upper().clone(),
2393 batch.desc.since().clone(),
2394 );
2395 lower.clone_from(batch.desc.upper());
2396 let _merge_req = trace.push_batch(batch);
2397 }
2398 let reqs: Vec<_> = trace
2399 .fueled_merge_reqs_before_ms(timeout_ms, None)
2400 .collect();
2401 for req in reqs {
2402 trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
2403 }
2404 trace.roundtrip_structure = roundtrip_structure;
2405 trace
2406 },
2407 )
2408 }
2409
2410 #[mz_ore::test]
2411 #[cfg_attr(miri, ignore)] fn test_roundtrips() {
2413 fn check(trace: Trace<i64>) {
2414 trace.validate().unwrap();
2415 let flat = trace.flatten();
2416 let unflat = Trace::unflatten(flat).unwrap();
2417 assert_eq!(trace, unflat);
2418 }
2419
2420 proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
2421 }
2422
2423 #[mz_ore::test]
2424 fn fueled_merge_reqs() {
2425 let mut trace: Trace<u64> = Trace::default();
2426 let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
2427 0,
2428 10,
2429 &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
2430 1000,
2431 ));
2432
2433 assert!(fueled_reqs.is_empty());
2434 assert_eq!(
2435 trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
2436 0,
2437 "no merge reqs when not filtering by version"
2438 );
2439 assert_eq!(
2440 trace
2441 .fueled_merge_reqs_before_ms(
2442 u64::MAX,
2443 Some(WriterKey::for_version(&Version::new(0, 50, 0)))
2444 )
2445 .count(),
2446 0,
2447 "zero batches are older than a past version"
2448 );
2449 assert_eq!(
2450 trace
2451 .fueled_merge_reqs_before_ms(
2452 u64::MAX,
2453 Some(WriterKey::for_version(&Version::new(99, 99, 0)))
2454 )
2455 .count(),
2456 1,
2457 "one batch is older than a future version"
2458 );
2459 }
2460
2461 #[mz_ore::test]
2462 fn remove_redundant_merge_reqs() {
2463 fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
2464 FueledMergeReq {
2465 id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
2466 desc: Description::new(
2467 Antichain::from_elem(lower),
2468 Antichain::from_elem(upper),
2469 Antichain::new(),
2470 ),
2471 inputs: vec![],
2472 }
2473 }
2474
2475 assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
2477
2478 assert_eq!(
2480 Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
2481 vec![req(0, 1)]
2482 );
2483
2484 assert_eq!(
2486 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
2487 vec![req(0, 1)]
2488 );
2489
2490 assert_eq!(
2492 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
2493 vec![req(1, 2), req(0, 1)]
2494 );
2495
2496 assert_eq!(
2498 Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
2499 vec![req(0, 3)]
2500 );
2501
2502 assert_eq!(
2504 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
2505 vec![req(0, 3)]
2506 );
2507
2508 assert_eq!(
2510 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
2511 vec![req(0, 3)]
2512 );
2513
2514 assert_eq!(
2516 Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
2517 vec![req(0, 3)]
2518 );
2519
2520 assert_eq!(
2522 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
2523 vec![req(1, 3), req(0, 2)]
2524 );
2525
2526 assert_eq!(
2528 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
2529 vec![req(0, 2), req(1, 3)]
2530 );
2531
2532 let req015 = FueledMergeReq {
2534 id: SpineId(0, 1),
2535 desc: Description::new(
2536 Antichain::from_elem(0),
2537 Antichain::from_elem(1),
2538 Antichain::from_elem(5),
2539 ),
2540 inputs: vec![],
2541 };
2542 assert_eq!(
2543 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2544 vec![req015, req(0, 1)]
2545 );
2546 }
2547
2548 #[mz_ore::test]
2549 #[cfg_attr(miri, ignore)] fn construct_batch_with_runs_replaced_test() {
2551 let batch_strategy = any_hollow_batch::<u64>();
2552 let to_replace_strategy = any_hollow_batch_with_exact_runs::<u64>(1);
2553
2554 let combined_strategy = (batch_strategy, to_replace_strategy)
2555 .prop_filter("non-empty batch", |(batch, _)| batch.run_meta.len() >= 1);
2556
2557 let final_strategy = combined_strategy.prop_flat_map(|(batch, to_replace)| {
2558 let batch_len = batch.run_meta.len();
2559 let batch_clone = batch.clone();
2560 let to_replace_clone = to_replace.clone();
2561
2562 proptest::collection::vec(any::<bool>(), batch_len)
2563 .prop_filter("at least one run selected", |mask| mask.iter().any(|&x| x))
2564 .prop_map(move |mask| {
2565 let indices: Vec<usize> = mask
2566 .iter()
2567 .enumerate()
2568 .filter_map(|(i, &selected)| if selected { Some(i) } else { None })
2569 .collect();
2570 (batch_clone.clone(), to_replace_clone.clone(), indices)
2571 })
2572 });
2573
2574 proptest!(|(
2575 (batch, to_replace, runs) in final_strategy
2576 )| {
2577 let original_run_ids: Vec<_> = batch.run_meta.iter().map(|x|
2578 x.id.unwrap().clone()
2579 ).collect();
2580
2581 let run_ids = runs.iter().map(|&i| original_run_ids[i].clone()).collect::<Vec<_>>();
2582
2583 let new_batch = SpineBatch::construct_batch_with_runs_replaced(
2584 &batch,
2585 &run_ids,
2586 &to_replace,
2587 ).unwrap();
2588
2589 let expected_len = batch.run_meta.len() - runs.len()
2590 + to_replace.run_meta.len();
2591 prop_assert!(new_batch.run_meta.len() == expected_len);
2592 });
2593 }
2594
2595 #[mz_ore::test]
2596 fn test_perform_subset_replacement() {
2597 let batch1 = crate::internal::state::tests::hollow::<u64>(0, 10, &["a"], 10);
2598 let batch2 = crate::internal::state::tests::hollow::<u64>(10, 20, &["b"], 10);
2599 let batch3 = crate::internal::state::tests::hollow::<u64>(20, 30, &["c"], 10);
2600
2601 let id_batch1 = IdHollowBatch {
2602 id: SpineId(0, 1),
2603 batch: Arc::new(batch1.clone()),
2604 };
2605 let id_batch2 = IdHollowBatch {
2606 id: SpineId(1, 2),
2607 batch: Arc::new(batch2.clone()),
2608 };
2609 let id_batch3 = IdHollowBatch {
2610 id: SpineId(2, 3),
2611 batch: Arc::new(batch3.clone()),
2612 };
2613
2614 let spine_batch = SpineBatch {
2615 id: SpineId(0, 3),
2616 desc: Description::new(
2617 Antichain::from_elem(0),
2618 Antichain::from_elem(30),
2619 Antichain::from_elem(0),
2620 ),
2621 parts: vec![id_batch1, id_batch2, id_batch3],
2622 active_compaction: None,
2623 len: 30,
2624 };
2625
2626 let res_exact = crate::internal::state::tests::hollow::<u64>(0, 30, &["d"], 30);
2627 let mut sb_exact = spine_batch.clone();
2628 let result = sb_exact.perform_subset_replacement(&res_exact, SpineId(0, 3), 0..3, None);
2629 assert!(matches!(result, ApplyMergeResult::AppliedExact));
2630 assert_eq!(sb_exact.parts.len(), 1);
2631 assert_eq!(sb_exact.len(), 30);
2632
2633 let res_subset = crate::internal::state::tests::hollow::<u64>(0, 20, &["e"], 20);
2634 let mut sb_subset = spine_batch.clone();
2635 let result = sb_subset.perform_subset_replacement(&res_subset, SpineId(0, 2), 0..2, None);
2636 assert!(matches!(result, ApplyMergeResult::AppliedSubset));
2637 assert_eq!(sb_subset.parts.len(), 2); assert_eq!(sb_subset.len(), 30);
2639
2640 let res_too_big = crate::internal::state::tests::hollow::<u64>(0, 30, &["f"], 31);
2641 let mut sb_too_big = spine_batch.clone();
2642 let result = sb_too_big.perform_subset_replacement(&res_too_big, SpineId(0, 3), 0..3, None);
2643 assert!(matches!(result, ApplyMergeResult::NotAppliedTooManyUpdates));
2644 assert_eq!(sb_too_big.parts.len(), 3);
2645 assert_eq!(sb_too_big.len(), 30);
2646 }
2647}