1use std::cmp::Ordering;
51use std::collections::{BTreeMap, BTreeSet};
52use std::fmt::Debug;
53use std::mem;
54use std::ops::Range;
55use std::sync::Arc;
56
57use arrayvec::ArrayVec;
58use differential_dataflow::difference::Semigroup;
59use differential_dataflow::lattice::Lattice;
60use differential_dataflow::trace::Description;
61use itertools::Itertools;
62use mz_ore::cast::CastFrom;
63use mz_persist::metrics::ColumnarMetrics;
64use mz_persist_types::Codec64;
65use serde::{Serialize, Serializer};
66use timely::PartialOrder;
67use timely::progress::frontier::AntichainRef;
68use timely::progress::{Antichain, Timestamp};
69use tracing::error;
70
71use crate::internal::paths::WriterKey;
72use crate::internal::state::{HollowBatch, RunId};
73
74use super::state::RunPart;
75
76#[derive(Debug, Clone, PartialEq)]
77pub struct FueledMergeReq<T> {
78 pub id: SpineId,
79 pub desc: Description<T>,
80 pub inputs: Vec<IdHollowBatch<T>>,
81}
82
83#[derive(Debug)]
84pub struct FueledMergeRes<T> {
85 pub output: HollowBatch<T>,
86 pub input: CompactionInput,
87 pub new_active_compaction: Option<ActiveCompaction>,
88}
89
90#[derive(Debug, Clone)]
95pub struct Trace<T> {
96 spine: Spine<T>,
97 pub(crate) roundtrip_structure: bool,
98}
99
100#[cfg(any(test, debug_assertions))]
101impl<T: PartialEq> PartialEq for Trace<T> {
102 fn eq(&self, other: &Self) -> bool {
103 let Trace {
106 spine: _,
107 roundtrip_structure: _,
108 } = self;
109 let Trace {
110 spine: _,
111 roundtrip_structure: _,
112 } = other;
113
114 self.batches().eq(other.batches())
117 }
118}
119
120impl<T: Timestamp + Lattice> Default for Trace<T> {
121 fn default() -> Self {
122 Self {
123 spine: Spine::new(),
124 roundtrip_structure: true,
125 }
126 }
127}
128
129#[derive(Clone, Debug, Serialize)]
130pub struct ThinSpineBatch<T> {
131 pub(crate) level: usize,
132 pub(crate) desc: Description<T>,
133 pub(crate) parts: Vec<SpineId>,
134 pub(crate) descs: Vec<Description<T>>,
137}
138
139impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
140 fn eq(&self, other: &Self) -> bool {
141 (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
143 }
144}
145
146#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
147pub struct ThinMerge<T> {
148 pub(crate) since: Antichain<T>,
149 pub(crate) remaining_work: usize,
150 pub(crate) active_compaction: Option<ActiveCompaction>,
151}
152
153impl<T: Clone> ThinMerge<T> {
154 fn fueling(merge: &FuelingMerge<T>) -> Self {
155 ThinMerge {
156 since: merge.since.clone(),
157 remaining_work: merge.remaining_work,
158 active_compaction: None,
159 }
160 }
161
162 fn fueled(batch: &SpineBatch<T>) -> Self {
163 ThinMerge {
164 since: batch.desc.since().clone(),
165 remaining_work: 0,
166 active_compaction: batch.active_compaction.clone(),
167 }
168 }
169}
170
171#[derive(Clone, Debug)]
179pub struct FlatTrace<T> {
180 pub(crate) since: Antichain<T>,
181 pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
189 pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
192 pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
197 pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
201}
202
203impl<T: Timestamp + Lattice> Trace<T> {
204 pub(crate) fn flatten(&self) -> FlatTrace<T> {
205 let since = self.spine.since.clone();
206 let mut legacy_batches = BTreeMap::new();
207 let mut hollow_batches = BTreeMap::new();
208 let mut spine_batches = BTreeMap::new();
209 let mut merges = BTreeMap::new();
210
211 let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
212 let id = batch.id();
213 let desc = batch.desc.clone();
214 let mut parts = Vec::with_capacity(batch.parts.len());
215 let mut descs = Vec::with_capacity(batch.parts.len());
216 for IdHollowBatch { id, batch } in &batch.parts {
217 parts.push(*id);
218 descs.push(batch.desc.clone());
219 if batch.desc.lower() == batch.desc.upper() {
227 hollow_batches.insert(*id, Arc::clone(batch));
228 } else {
229 legacy_batches.insert(Arc::clone(batch), ());
230 }
231 }
232
233 let spine_batch = ThinSpineBatch {
234 level,
235 desc,
236 parts,
237 descs,
238 };
239 spine_batches.insert(id, spine_batch);
240 };
241
242 for (level, state) in self.spine.merging.iter().enumerate() {
243 for batch in &state.batches {
244 push_spine_batch(level, batch);
245 if let Some(c) = &batch.active_compaction {
246 let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
247 assert!(
248 previous.is_none(),
249 "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
250 batch.id,
251 )
252 }
253 }
254 if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
255 let previous = merges.insert(*id, ThinMerge::fueling(merge));
256 assert!(
257 previous.is_none(),
258 "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
259 )
260 }
261 }
262
263 if !self.roundtrip_structure {
264 assert!(hollow_batches.is_empty());
265 spine_batches.clear();
266 merges.clear();
267 }
268
269 FlatTrace {
270 since,
271 legacy_batches,
272 hollow_batches,
273 spine_batches,
274 merges,
275 }
276 }
277 pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
278 let FlatTrace {
279 since,
280 legacy_batches,
281 mut hollow_batches,
282 spine_batches,
283 mut merges,
284 } = value;
285
286 let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
289
290 let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
296 if PartialOrder::less_than(left, right) {
297 Ordering::Less
298 } else if PartialOrder::less_than(right, left) {
299 Ordering::Greater
300 } else {
301 Ordering::Equal
302 }
303 };
304 let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
305 legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
306
307 let mut pop_batch =
308 |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
309 if let Some(batch) = hollow_batches.remove(&id) {
310 if let Some(desc) = expected_desc {
311 assert_eq!(*desc, batch.desc);
312 }
313 return Ok(IdHollowBatch { id, batch });
314 }
315 let mut batch = legacy_batches
316 .pop()
317 .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
318
319 let Some(expected_desc) = expected_desc else {
320 return Ok(IdHollowBatch { id, batch });
321 };
322
323 if expected_desc.lower() != batch.desc.lower() {
324 return Err(format!(
325 "hollow batch lower {:?} did not match expected lower {:?}",
326 batch.desc.lower().elements(),
327 expected_desc.lower().elements()
328 ));
329 }
330
331 if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
334 let mut new_upper = batch.desc.upper().clone();
335
336 while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
339 let Some(next_batch) = legacy_batches.pop() else {
340 break;
341 };
342 if next_batch.is_empty() {
343 new_upper.clone_from(next_batch.desc.upper());
344 } else {
345 legacy_batches.push(next_batch);
346 break;
347 }
348 }
349
350 if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
353 legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
354 expected_desc.upper().clone(),
355 new_upper.clone(),
356 batch.desc.since().clone(),
357 ))));
358 new_upper.clone_from(expected_desc.upper());
359 }
360 batch = Arc::new(HollowBatch::empty(Description::new(
361 batch.desc.lower().clone(),
362 new_upper,
363 batch.desc.since().clone(),
364 )))
365 }
366
367 if expected_desc.upper() != batch.desc.upper() {
368 return Err(format!(
369 "hollow batch upper {:?} did not match expected upper {:?}",
370 batch.desc.upper().elements(),
371 expected_desc.upper().elements()
372 ));
373 }
374
375 Ok(IdHollowBatch { id, batch })
376 };
377
378 let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
379 (batch.desc.upper().clone(), id.1)
380 } else {
381 (Antichain::from_elem(T::minimum()), 0)
382 };
383 let levels = spine_batches
384 .first_key_value()
385 .map(|(_, batch)| batch.level + 1)
386 .unwrap_or(0);
387 let mut merging = vec![MergeState::default(); levels];
388 for (id, batch) in spine_batches {
389 let level = batch.level;
390
391 let descs = batch.descs.iter().map(Some).chain(std::iter::repeat_n(
392 None,
393 batch.parts.len() - batch.descs.len(),
394 ));
395 let parts = batch
396 .parts
397 .into_iter()
398 .zip_eq(descs)
399 .map(|(id, desc)| pop_batch(id, desc))
400 .collect::<Result<Vec<_>, _>>()?;
401 let len = parts.iter().map(|p| (*p).batch.len).sum();
402 let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
403 let batch = SpineBatch {
404 id,
405 desc: batch.desc,
406 parts,
407 active_compaction,
408 len,
409 };
410
411 let state = &mut merging[level];
412
413 state.push_batch(batch);
414 if let Some(id) = state.id() {
415 if let Some(merge) = merges.remove(&id) {
416 state.merge = Some(IdFuelingMerge {
417 id,
418 merge: FuelingMerge {
419 since: merge.since,
420 remaining_work: merge.remaining_work,
421 },
422 })
423 }
424 }
425 }
426
427 let mut trace = Trace {
428 spine: Spine {
429 effort: 1,
430 next_id,
431 since,
432 upper,
433 merging,
434 },
435 roundtrip_structure,
436 };
437
438 fn check_empty(name: &str, len: usize) -> Result<(), String> {
439 if len != 0 {
440 Err(format!("{len} {name} left after reconstructing spine"))
441 } else {
442 Ok(())
443 }
444 }
445
446 if roundtrip_structure {
447 check_empty("legacy batches", legacy_batches.len())?;
448 } else {
449 for batch in legacy_batches.into_iter().rev() {
451 trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
452 }
453 }
454 check_empty("hollow batches", hollow_batches.len())?;
455 check_empty("merges", merges.len())?;
456
457 debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
458
459 Ok(trace)
460 }
461}
462
463#[derive(Clone, Debug, Default)]
464pub(crate) struct SpineMetrics {
465 pub compact_batches: u64,
466 pub compacting_batches: u64,
467 pub noncompact_batches: u64,
468}
469
470impl<T> Trace<T> {
471 pub fn since(&self) -> &Antichain<T> {
472 &self.spine.since
473 }
474
475 pub fn upper(&self) -> &Antichain<T> {
476 &self.spine.upper
477 }
478
479 pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
480 for batch in self.batches() {
481 f(batch);
482 }
483 }
484
485 pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
486 self.spine
487 .spine_batches()
488 .flat_map(|b| b.parts.as_slice())
489 .map(|b| &*b.batch)
490 }
491
492 pub fn num_spine_batches(&self) -> usize {
493 self.spine.spine_batches().count()
494 }
495
496 #[cfg(test)]
497 pub fn num_hollow_batches(&self) -> usize {
498 self.batches().count()
499 }
500
501 #[cfg(test)]
502 pub fn num_updates(&self) -> usize {
503 self.batches().map(|b| b.len).sum()
504 }
505}
506
507impl<T: Timestamp + Lattice> Trace<T> {
508 pub fn downgrade_since(&mut self, since: &Antichain<T>) {
509 self.spine.since.clone_from(since);
510 }
511
512 #[must_use]
513 pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
514 let mut merge_reqs = Vec::new();
515 self.spine.insert(
516 batch,
517 &mut SpineLog::Enabled {
518 merge_reqs: &mut merge_reqs,
519 },
520 );
521 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
522 Self::remove_redundant_merge_reqs(merge_reqs)
529 }
530
531 pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
532 for batch in self.spine.spine_batches_mut().rev() {
535 if batch.id == id {
536 batch.active_compaction = Some(compaction);
537 break;
538 }
539 }
540 }
541
542 pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
545 self.spine.insert(batch, &mut SpineLog::Disabled);
546 }
547
548 #[must_use]
556 pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
557 let mut merge_reqs = Vec::new();
558 let did_work = self.spine.exert(
559 fuel,
560 &mut SpineLog::Enabled {
561 merge_reqs: &mut merge_reqs,
562 },
563 );
564 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
565 let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
567 (merge_reqs, did_work)
568 }
569
570 pub fn validate(&self) -> Result<(), String> {
574 self.spine.validate()
575 }
576
577 pub(crate) fn fueled_merge_reqs_before_ms(
580 &self,
581 threshold_ms: u64,
582 threshold_writer: Option<WriterKey>,
583 ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
584 self.spine
585 .spine_batches()
586 .filter(move |b| {
587 let noncompact = !b.is_compact();
588 let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
589 b.parts.iter().any(|b| {
590 b.batch
591 .parts
592 .iter()
593 .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
594 })
595 });
596 noncompact || old_writer
597 })
598 .filter(move |b| {
599 b.active_compaction
602 .as_ref()
603 .map_or(true, move |c| c.start_ms <= threshold_ms)
604 })
605 .map(|b| FueledMergeReq {
606 id: b.id,
607 desc: b.desc.clone(),
608 inputs: b.parts.clone(),
609 })
610 }
611
612 fn remove_redundant_merge_reqs(
619 mut merge_reqs: Vec<FueledMergeReq<T>>,
620 ) -> Vec<FueledMergeReq<T>> {
621 fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
623 b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
625 }
626
627 let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
628 while let Some(merge_req) = merge_reqs.pop() {
632 let covered = ret.iter().any(|r| covers(r, &merge_req));
633 if !covered {
634 ret.retain(|r| !covers(&merge_req, r));
638 ret.push(merge_req);
639 }
640 }
641 ret
642 }
643
644 pub fn spine_metrics(&self) -> SpineMetrics {
645 let mut metrics = SpineMetrics::default();
646 for batch in self.spine.spine_batches() {
647 if batch.is_compact() {
648 metrics.compact_batches += 1;
649 } else if batch.is_merging() {
650 metrics.compacting_batches += 1;
651 } else {
652 metrics.noncompact_batches += 1;
653 }
654 }
655 metrics
656 }
657}
658
659impl<T: Timestamp + Lattice + Codec64> Trace<T> {
660 pub fn apply_merge_res_checked_classic<D: Codec64 + Semigroup + PartialEq>(
661 &mut self,
662 res: &FueledMergeRes<T>,
663 metrics: &ColumnarMetrics,
664 ) -> ApplyMergeResult {
665 for batch in self.spine.spine_batches_mut().rev() {
666 let result = batch.maybe_replace_checked_classic::<D>(res, metrics);
667 if result.matched() {
668 return result;
669 }
670 }
671 ApplyMergeResult::NotAppliedNoMatch
672 }
673
674 pub fn apply_merge_res_checked<D: Codec64 + Semigroup + PartialEq>(
675 &mut self,
676 res: &FueledMergeRes<T>,
677 metrics: &ColumnarMetrics,
678 ) -> ApplyMergeResult {
679 for batch in self.spine.spine_batches_mut().rev() {
680 let result = batch.maybe_replace_checked::<D>(res, metrics);
681 if result.matched() {
682 return result;
683 }
684 }
685 ApplyMergeResult::NotAppliedNoMatch
686 }
687
688 pub fn apply_merge_res_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
689 for batch in self.spine.spine_batches_mut().rev() {
690 let result = batch.maybe_replace_unchecked(res);
691 if result.matched() {
692 return result;
693 }
694 }
695 ApplyMergeResult::NotAppliedNoMatch
696 }
697
698 pub fn apply_tombstone_merge(&mut self, desc: &Description<T>) -> ApplyMergeResult {
699 for batch in self.spine.spine_batches_mut().rev() {
700 let result = batch.maybe_replace_with_tombstone(desc);
701 if result.matched() {
702 return result;
703 }
704 }
705 ApplyMergeResult::NotAppliedNoMatch
706 }
707}
708
709enum SpineLog<'a, T> {
712 Enabled {
713 merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
714 },
715 Disabled,
716}
717
718#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
719pub enum CompactionInput {
720 Legacy,
723 IdRange(SpineId),
725 PartialBatch(SpineId, BTreeSet<RunId>),
727}
728
729#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
730pub struct SpineId(pub usize, pub usize);
731
732impl Serialize for SpineId {
733 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
734 where
735 S: Serializer,
736 {
737 let SpineId(lo, hi) = self;
738 serializer.serialize_str(&format!("{lo}-{hi}"))
739 }
740}
741
742pub fn id_range(ids: BTreeSet<SpineId>) -> SpineId {
744 let lower_spine_bound = ids
745 .first()
746 .map(|id| id.0)
747 .expect("at least one batch must be present");
748 let upper_spine_bound = ids
749 .last()
750 .map(|id| id.1)
751 .expect("at least one batch must be present");
752
753 SpineId(lower_spine_bound, upper_spine_bound)
754}
755
756impl SpineId {
757 fn covers(self, other: SpineId) -> bool {
758 self.0 <= other.0 && other.1 <= self.1
759 }
760}
761
762#[derive(Debug, Clone, PartialEq)]
763pub struct IdHollowBatch<T> {
764 pub id: SpineId,
765 pub batch: Arc<HollowBatch<T>>,
766}
767
768#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
769pub struct ActiveCompaction {
770 pub start_ms: u64,
771}
772
773#[derive(Debug, Clone, PartialEq)]
774struct SpineBatch<T> {
775 id: SpineId,
776 desc: Description<T>,
777 parts: Vec<IdHollowBatch<T>>,
778 active_compaction: Option<ActiveCompaction>,
779 len: usize,
781}
782
783impl<T> SpineBatch<T> {
784 fn merged(batch: IdHollowBatch<T>) -> Self
785 where
786 T: Clone,
787 {
788 Self {
789 id: batch.id,
790 desc: batch.batch.desc.clone(),
791 len: batch.batch.len,
792 parts: vec![batch],
793 active_compaction: None,
794 }
795 }
796}
797
798#[derive(Debug, Copy, Clone)]
799pub enum ApplyMergeResult {
800 AppliedExact,
801 AppliedSubset,
802 NotAppliedNoMatch,
803 NotAppliedInvalidSince,
804 NotAppliedTooManyUpdates,
805}
806
807impl ApplyMergeResult {
808 pub fn applied(&self) -> bool {
809 match self {
810 ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
811 _ => false,
812 }
813 }
814 pub fn matched(&self) -> bool {
815 match self {
816 ApplyMergeResult::AppliedExact
817 | ApplyMergeResult::AppliedSubset
818 | ApplyMergeResult::NotAppliedTooManyUpdates => true,
819 _ => false,
820 }
821 }
822}
823
824impl<T: Timestamp + Lattice> SpineBatch<T> {
825 pub fn lower(&self) -> &Antichain<T> {
826 self.desc().lower()
827 }
828
829 pub fn upper(&self) -> &Antichain<T> {
830 self.desc().upper()
831 }
832
833 fn id(&self) -> SpineId {
834 debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
835 debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
836 self.id
837 }
838
839 pub fn is_compact(&self) -> bool {
840 self.parts
847 .iter()
848 .map(|p| p.batch.run_meta.len())
849 .sum::<usize>()
850 <= 1
851 }
852
853 pub fn is_merging(&self) -> bool {
854 self.active_compaction.is_some()
855 }
856
857 fn desc(&self) -> &Description<T> {
858 &self.desc
859 }
860
861 pub fn len(&self) -> usize {
862 debug_assert_eq!(
865 self.len,
866 self.parts.iter().map(|x| x.batch.len).sum::<usize>()
867 );
868 self.len
869 }
870
871 pub fn is_empty(&self) -> bool {
872 self.len() == 0
873 }
874
875 pub fn empty(
876 id: SpineId,
877 lower: Antichain<T>,
878 upper: Antichain<T>,
879 since: Antichain<T>,
880 ) -> Self {
881 SpineBatch::merged(IdHollowBatch {
882 id,
883 batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
884 })
885 }
886
887 pub fn begin_merge(
888 bs: &[Self],
889 compaction_frontier: Option<AntichainRef<T>>,
890 ) -> Option<IdFuelingMerge<T>> {
891 let from = bs.first()?.id().0;
892 let until = bs.last()?.id().1;
893 let id = SpineId(from, until);
894 let mut sinces = bs.iter().map(|b| b.desc().since());
895 let mut since = sinces.next()?.clone();
896 for b in bs {
897 since.join_assign(b.desc().since())
898 }
899 if let Some(compaction_frontier) = compaction_frontier {
900 since.join_assign(&compaction_frontier.to_owned());
901 }
902 let remaining_work = bs.iter().map(|x| x.len()).sum();
903 Some(IdFuelingMerge {
904 id,
905 merge: FuelingMerge {
906 since,
907 remaining_work,
908 },
909 })
910 }
911
912 #[cfg(test)]
913 fn describe(&self, extended: bool) -> String {
914 let SpineBatch {
915 id,
916 parts,
917 desc,
918 active_compaction,
919 len,
920 } = self;
921 let compaction = match active_compaction {
922 None => "".to_owned(),
923 Some(c) => format!(" (c@{})", c.start_ms),
924 };
925 match extended {
926 false => format!(
927 "[{}-{}]{:?}{:?}{}/{}{compaction}",
928 id.0,
929 id.1,
930 desc.lower().elements(),
931 desc.upper().elements(),
932 parts.len(),
933 len
934 ),
935 true => {
936 format!(
937 "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
938 id.0,
939 id.1,
940 desc.lower().elements(),
941 desc.upper().elements(),
942 desc.since().elements(),
943 parts.len(),
944 len,
945 parts
946 .iter()
947 .flat_map(|x| x.batch.parts.iter())
948 .map(|x| format!(" {}", x.printable_name()))
949 .collect::<Vec<_>>()
950 .join("")
951 )
952 }
953 }
954 }
955}
956
957impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
958 fn diffs_sum<'a, D: Semigroup + Codec64>(
959 parts: impl Iterator<Item = &'a RunPart<T>>,
960 metrics: &ColumnarMetrics,
961 ) -> Option<D> {
962 parts
963 .map(|p| p.diffs_sum::<D>(metrics))
964 .reduce(|a, b| match (a, b) {
965 (Some(mut a), Some(b)) => {
966 a.plus_equals(&b);
967 Some(a)
968 }
969 _ => None,
970 })
971 .flatten()
972 }
973
974 fn diffs_sum_for_runs<D: Semigroup + Codec64>(
975 batch: &HollowBatch<T>,
976 run_ids: &[RunId],
977 metrics: &ColumnarMetrics,
978 ) -> Option<D> {
979 if run_ids.is_empty() {
980 return None;
981 }
982
983 let parts = batch
984 .runs()
985 .filter(|(meta, _)| {
986 run_ids.contains(&meta.id.expect("id should be present at this point"))
987 })
988 .flat_map(|(_, parts)| parts);
989
990 Self::diffs_sum(parts, metrics)
991 }
992
993 fn maybe_replace_with_tombstone(&mut self, desc: &Description<T>) -> ApplyMergeResult {
994 let exact_match =
995 desc.lower() == self.desc().lower() && desc.upper() == self.desc().upper();
996
997 let empty_batch = HollowBatch::empty(desc.clone());
998 if exact_match {
999 *self = SpineBatch::merged(IdHollowBatch {
1000 id: self.id(),
1001 batch: Arc::new(empty_batch),
1002 });
1003 return ApplyMergeResult::AppliedExact;
1004 }
1005
1006 if let Some((id, range)) = self.find_replacement_range(desc) {
1007 self.perform_subset_replacement(&empty_batch, id, range, None)
1008 } else {
1009 ApplyMergeResult::NotAppliedNoMatch
1010 }
1011 }
1012
1013 fn construct_batch_with_runs_replaced(
1014 original: &HollowBatch<T>,
1015 run_ids: &[RunId],
1016 replacement: &HollowBatch<T>,
1017 ) -> Result<HollowBatch<T>, ApplyMergeResult> {
1018 if run_ids.is_empty() {
1019 return Err(ApplyMergeResult::NotAppliedNoMatch);
1020 }
1021
1022 let orig_run_ids: BTreeSet<_> = original.runs().filter_map(|(meta, _)| meta.id).collect();
1023 let run_ids: BTreeSet<_> = run_ids.iter().cloned().collect();
1024 if !orig_run_ids.is_superset(&run_ids) {
1025 return Err(ApplyMergeResult::NotAppliedNoMatch);
1026 }
1027
1028 let runs: Vec<_> = original
1029 .runs()
1030 .filter(|(meta, _)| {
1031 !run_ids.contains(&meta.id.expect("id should be present at this point"))
1032 })
1033 .chain(replacement.runs())
1034 .collect();
1035
1036 let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::<usize>();
1037
1038 let run_meta = runs
1039 .iter()
1040 .map(|(meta, _)| *meta)
1041 .cloned()
1042 .collect::<Vec<_>>();
1043
1044 let parts = runs
1045 .iter()
1046 .flat_map(|(_, parts)| *parts)
1047 .cloned()
1048 .collect::<Vec<_>>();
1049
1050 let run_splits = {
1051 let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1));
1052 let mut pointer = 0;
1053 for (i, (_, parts)) in runs.into_iter().enumerate() {
1054 if parts.is_empty() {
1055 continue;
1056 }
1057 if i < run_meta.len() - 1 {
1058 splits.push(pointer + parts.len());
1059 }
1060 pointer += parts.len();
1061 }
1062 splits
1063 };
1064
1065 Ok(HollowBatch::new(
1066 replacement.desc.clone(),
1067 parts,
1068 len,
1069 run_meta,
1070 run_splits,
1071 ))
1072 }
1073
1074 fn maybe_replace_checked<D>(
1075 &mut self,
1076 res: &FueledMergeRes<T>,
1077 metrics: &ColumnarMetrics,
1078 ) -> ApplyMergeResult
1079 where
1080 D: Semigroup + Codec64 + PartialEq + Debug,
1081 {
1082 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1086 return ApplyMergeResult::NotAppliedInvalidSince;
1087 }
1088
1089 let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1090 let num_batches = self.parts.len();
1091
1092 let result = match &res.input {
1093 CompactionInput::IdRange(id) => {
1094 self.handle_id_range_replacement::<D>(res, id, new_diffs_sum, metrics)
1095 }
1096 CompactionInput::PartialBatch(id, runs) => {
1097 self.handle_partial_batch_replacement::<D>(res, *id, runs, new_diffs_sum, metrics)
1098 }
1099 CompactionInput::Legacy => {
1100 error!("legacy compaction input is not supported");
1101 return ApplyMergeResult::NotAppliedNoMatch;
1102 }
1103 };
1104
1105 let num_batches_after = self.parts.len();
1106 assert!(
1107 num_batches_after <= num_batches,
1108 "replacing parts should not increase the number of batches"
1109 );
1110 result
1111 }
1112
1113 fn handle_id_range_replacement<D>(
1114 &mut self,
1115 res: &FueledMergeRes<T>,
1116 id: &SpineId,
1117 new_diffs_sum: Option<D>,
1118 metrics: &ColumnarMetrics,
1119 ) -> ApplyMergeResult
1120 where
1121 D: Semigroup + Codec64 + PartialEq + Debug,
1122 {
1123 let range = self
1124 .parts
1125 .iter()
1126 .enumerate()
1127 .filter_map(|(i, p)| {
1128 if id.covers(p.id) {
1129 Some((i, p.id))
1130 } else {
1131 None
1132 }
1133 })
1134 .collect::<Vec<_>>();
1135
1136 let ids: BTreeSet<_> = range.iter().map(|(_, id)| *id).collect();
1137
1138 if ids.is_empty() || id != &id_range(ids) {
1148 return ApplyMergeResult::NotAppliedNoMatch;
1149 }
1150
1151 let range: BTreeSet<_> = range.iter().map(|(i, _)| *i).collect();
1152
1153 let min = *range.iter().min().unwrap();
1155 let max = *range.iter().max().unwrap();
1156 let replacement_range = min..max + 1;
1157
1158 let old_diffs_sum = Self::diffs_sum::<D>(
1161 self.parts[replacement_range.clone()]
1162 .iter()
1163 .flat_map(|p| p.batch.parts.iter()),
1164 metrics,
1165 );
1166
1167 self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "id range replacement");
1168
1169 self.perform_subset_replacement(
1170 &res.output,
1171 *id,
1172 replacement_range,
1173 res.new_active_compaction.clone(),
1174 )
1175 }
1176
1177 fn handle_partial_batch_replacement<D>(
1178 &mut self,
1179 res: &FueledMergeRes<T>,
1180 id: SpineId,
1181 runs: &BTreeSet<RunId>,
1182 new_diffs_sum: Option<D>,
1183 metrics: &ColumnarMetrics,
1184 ) -> ApplyMergeResult
1185 where
1186 D: Semigroup + Codec64 + PartialEq + Debug,
1187 {
1188 if runs.is_empty() {
1189 return ApplyMergeResult::NotAppliedNoMatch;
1190 }
1191
1192 let part = self.parts.iter().enumerate().find(|(_, p)| p.id == id);
1193 let Some((i, batch)) = part else {
1194 return ApplyMergeResult::NotAppliedNoMatch;
1195 };
1196 let replacement_range = i..(i + 1);
1197
1198 let batch = &batch.batch;
1199 let run_ids = runs.iter().cloned().collect::<Vec<_>>();
1200
1201 let old_batch_diff_sum = Self::diffs_sum::<D>(batch.parts.iter(), metrics);
1202 let old_diffs_sum = Self::diffs_sum_for_runs::<D>(batch, &run_ids, metrics);
1203
1204 self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "partial batch replacement");
1205
1206 match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) {
1207 Ok(new_batch) => {
1208 let new_batch_diff_sum = Self::diffs_sum::<D>(new_batch.parts.iter(), metrics);
1209 self.validate_diffs_sum_match(
1210 old_batch_diff_sum,
1211 new_batch_diff_sum,
1212 "sanity checking diffs sum for replaced runs",
1213 );
1214 self.perform_subset_replacement(
1215 &new_batch,
1216 id,
1217 replacement_range,
1218 res.new_active_compaction.clone(),
1219 )
1220 }
1221 Err(err) => err,
1222 }
1223 }
1224
1225 fn validate_diffs_sum_match<D>(
1226 &self,
1227 old_diffs_sum: Option<D>,
1228 new_diffs_sum: Option<D>,
1229 context: &str,
1230 ) where
1231 D: Semigroup + Codec64 + PartialEq + Debug,
1232 {
1233 match (new_diffs_sum, old_diffs_sum) {
1234 (None, Some(old)) => {
1235 if !D::is_zero(&old) {
1236 panic!(
1237 "merge res diffs sum is None, but spine batch diffs sum ({:?}) is not zero ({})",
1238 old, context
1239 );
1240 }
1241 }
1242 (Some(new_diffs_sum), Some(old_diffs_sum)) => {
1243 assert_eq!(
1244 old_diffs_sum, new_diffs_sum,
1245 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?}) ({})",
1246 new_diffs_sum, old_diffs_sum, context
1247 );
1248 }
1249 _ => {}
1250 };
1251 }
1252
1253 fn maybe_replace_checked_classic<D>(
1259 &mut self,
1260 res: &FueledMergeRes<T>,
1261 metrics: &ColumnarMetrics,
1262 ) -> ApplyMergeResult
1263 where
1264 D: Semigroup + Codec64 + PartialEq + Debug,
1265 {
1266 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1270 return ApplyMergeResult::NotAppliedInvalidSince;
1271 }
1272
1273 let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1274
1275 let exact_match = res.output.desc.lower() == self.desc().lower()
1277 && res.output.desc.upper() == self.desc().upper();
1278 if exact_match {
1279 let old_diffs_sum = Self::diffs_sum::<D>(
1280 self.parts.iter().flat_map(|p| p.batch.parts.iter()),
1281 metrics,
1282 );
1283
1284 if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1285 assert_eq!(
1286 old_diffs_sum, new_diffs_sum,
1287 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1288 new_diffs_sum, old_diffs_sum
1289 );
1290 }
1291
1292 if res.output.len > self.len() {
1301 return ApplyMergeResult::NotAppliedTooManyUpdates;
1302 }
1303 *self = SpineBatch::merged(IdHollowBatch {
1304 id: self.id(),
1305 batch: Arc::new(res.output.clone()),
1306 });
1307 return ApplyMergeResult::AppliedExact;
1308 }
1309
1310 if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1312 let old_diffs_sum = Self::diffs_sum::<D>(
1313 self.parts[range.clone()]
1314 .iter()
1315 .flat_map(|p| p.batch.parts.iter()),
1316 metrics,
1317 );
1318
1319 if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1320 assert_eq!(
1321 old_diffs_sum, new_diffs_sum,
1322 "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1323 new_diffs_sum, old_diffs_sum
1324 );
1325 }
1326
1327 self.perform_subset_replacement(
1328 &res.output,
1329 id,
1330 range,
1331 res.new_active_compaction.clone(),
1332 )
1333 } else {
1334 ApplyMergeResult::NotAppliedNoMatch
1335 }
1336 }
1337
1338 fn maybe_replace_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
1344 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1348 return ApplyMergeResult::NotAppliedInvalidSince;
1349 }
1350
1351 let exact_match = res.output.desc.lower() == self.desc().lower()
1353 && res.output.desc.upper() == self.desc().upper();
1354 if exact_match {
1355 if res.output.len > self.len() {
1364 return ApplyMergeResult::NotAppliedTooManyUpdates;
1365 }
1366
1367 *self = SpineBatch::merged(IdHollowBatch {
1368 id: self.id(),
1369 batch: Arc::new(res.output.clone()),
1370 });
1371 return ApplyMergeResult::AppliedExact;
1372 }
1373
1374 if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1376 self.perform_subset_replacement(
1377 &res.output,
1378 id,
1379 range,
1380 res.new_active_compaction.clone(),
1381 )
1382 } else {
1383 ApplyMergeResult::NotAppliedNoMatch
1384 }
1385 }
1386
1387 fn find_replacement_range(&self, desc: &Description<T>) -> Option<(SpineId, Range<usize>)> {
1389 let mut lower = None;
1399 let mut upper = None;
1400
1401 for (i, batch) in self.parts.iter().enumerate() {
1402 if batch.batch.desc.lower() == desc.lower() {
1403 lower = Some((i, batch.id.0));
1404 }
1405 if batch.batch.desc.upper() == desc.upper() {
1406 upper = Some((i, batch.id.1));
1407 }
1408 if lower.is_some() && upper.is_some() {
1409 break;
1410 }
1411 }
1412
1413 match (lower, upper) {
1414 (Some((lower_idx, id_lower)), Some((upper_idx, id_upper))) => {
1415 Some((SpineId(id_lower, id_upper), lower_idx..(upper_idx + 1)))
1416 }
1417 _ => None,
1418 }
1419 }
1420
1421 fn perform_subset_replacement(
1423 &mut self,
1424 res: &HollowBatch<T>,
1425 spine_id: SpineId,
1426 range: Range<usize>,
1427 new_active_compaction: Option<ActiveCompaction>,
1428 ) -> ApplyMergeResult {
1429 let SpineBatch {
1430 id,
1431 parts,
1432 desc,
1433 active_compaction: _,
1434 len: _,
1435 } = self;
1436
1437 let mut new_parts = vec![];
1438 new_parts.extend_from_slice(&parts[..range.start]);
1439 new_parts.push(IdHollowBatch {
1440 id: spine_id,
1441 batch: Arc::new(res.clone()),
1442 });
1443 new_parts.extend_from_slice(&parts[range.end..]);
1444
1445 let new_spine_batch = SpineBatch {
1446 id: *id,
1447 desc: desc.to_owned(),
1448 len: new_parts.iter().map(|x| x.batch.len).sum(),
1449 parts: new_parts,
1450 active_compaction: new_active_compaction,
1451 };
1452
1453 if new_spine_batch.len() > self.len() {
1454 return ApplyMergeResult::NotAppliedTooManyUpdates;
1455 }
1456
1457 *self = new_spine_batch;
1458 ApplyMergeResult::AppliedSubset
1459 }
1460}
1461
1462#[derive(Debug, Clone, PartialEq, Serialize)]
1463pub struct FuelingMerge<T> {
1464 pub(crate) since: Antichain<T>,
1465 pub(crate) remaining_work: usize,
1466}
1467
1468#[derive(Debug, Clone, PartialEq, Serialize)]
1469pub struct IdFuelingMerge<T> {
1470 id: SpineId,
1471 merge: FuelingMerge<T>,
1472}
1473
1474impl<T: Timestamp + Lattice> FuelingMerge<T> {
1475 #[allow(clippy::as_conversions)]
1481 fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
1482 let used = std::cmp::min(*fuel as usize, self.remaining_work);
1483 self.remaining_work = self.remaining_work.saturating_sub(used);
1484 *fuel -= used as isize;
1485 }
1486
1487 fn done(
1492 self,
1493 bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1494 log: &mut SpineLog<'_, T>,
1495 ) -> Option<SpineBatch<T>> {
1496 let first = bs.first()?;
1497 let last = bs.last()?;
1498 let id = SpineId(first.id().0, last.id().1);
1499 assert!(id.0 < id.1);
1500 let lower = first.desc().lower().clone();
1501 let upper = last.desc().upper().clone();
1502 let since = self.since;
1503
1504 if bs.iter().all(SpineBatch::is_empty) {
1506 return Some(SpineBatch::empty(id, lower, upper, since));
1507 }
1508
1509 let desc = Description::new(lower, upper, since);
1510 let len = bs.iter().map(SpineBatch::len).sum();
1511
1512 let mut merged_parts_len = 0;
1516 for b in &bs {
1517 merged_parts_len += b.parts.len();
1518 }
1519 let mut merged_parts = Vec::with_capacity(merged_parts_len);
1520 for b in bs {
1521 merged_parts.extend(b.parts)
1522 }
1523 debug_assert_eq!(merged_parts.len(), merged_parts_len);
1525
1526 if let SpineLog::Enabled { merge_reqs } = log {
1527 merge_reqs.push(FueledMergeReq {
1528 id,
1529 desc: desc.clone(),
1530 inputs: merged_parts.clone(),
1531 });
1532 }
1533
1534 Some(SpineBatch {
1535 id,
1536 desc,
1537 len,
1538 parts: merged_parts,
1539 active_compaction: None,
1540 })
1541 }
1542}
1543
1544const BATCHES_PER_LEVEL: usize = 2;
1549
1550#[derive(Debug, Clone)]
1633struct Spine<T> {
1634 effort: usize,
1635 next_id: usize,
1636 since: Antichain<T>,
1637 upper: Antichain<T>,
1638 merging: Vec<MergeState<T>>,
1639}
1640
1641impl<T> Spine<T> {
1642 pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1644 self.merging.iter().rev().flat_map(|m| &m.batches)
1645 }
1646
1647 pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1649 self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1650 }
1651}
1652
1653impl<T: Timestamp + Lattice> Spine<T> {
1654 pub fn new() -> Self {
1661 Spine {
1662 effort: 1,
1663 next_id: 0,
1664 since: Antichain::from_elem(T::minimum()),
1665 upper: Antichain::from_elem(T::minimum()),
1666 merging: Vec::new(),
1667 }
1668 }
1669
1670 fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1678 self.tidy_layers();
1679 if self.reduced() {
1680 return false;
1681 }
1682
1683 if self.merging.iter().any(|b| b.merge.is_some()) {
1684 let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1685 self.apply_fuel(&fuel, log);
1687 } else {
1688 let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1693 let id = self.next_id();
1694 self.introduce_batch(
1695 SpineBatch::empty(
1696 id,
1697 self.upper.clone(),
1698 self.upper.clone(),
1699 self.since.clone(),
1700 ),
1701 level,
1702 log,
1703 );
1704 }
1705 true
1706 }
1707
1708 pub fn next_id(&mut self) -> SpineId {
1709 let id = self.next_id;
1710 self.next_id += 1;
1711 SpineId(id, self.next_id)
1712 }
1713
1714 pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1718 assert!(batch.desc.lower() != batch.desc.upper());
1719 assert_eq!(batch.desc.lower(), &self.upper);
1720
1721 let id = self.next_id();
1722 let batch = SpineBatch::merged(IdHollowBatch {
1723 id,
1724 batch: Arc::new(batch),
1725 });
1726
1727 self.upper.clone_from(batch.upper());
1728
1729 if batch.is_empty() {
1732 if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1733 if self.merging[position].is_single() && self.merging[position].is_empty() {
1734 self.insert_at(batch, position);
1735 if let Some(merged) = self.complete_at(position, log) {
1738 self.merging[position] = MergeState::single(merged);
1739 }
1740 return;
1741 }
1742 }
1743 }
1744
1745 let index = batch.len().next_power_of_two();
1747 self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1748 }
1749
1750 fn reduced(&self) -> bool {
1757 self.spine_batches()
1758 .map(|b| {
1759 b.parts
1760 .iter()
1761 .map(|p| p.batch.run_meta.len())
1762 .sum::<usize>()
1763 })
1764 .sum::<usize>()
1765 < 2
1766 }
1767
1768 #[allow(dead_code)]
1772 fn describe(&self) -> Vec<(usize, usize)> {
1773 self.merging
1774 .iter()
1775 .map(|b| (b.batches.len(), b.len()))
1776 .collect()
1777 }
1778
1779 fn introduce_batch(
1785 &mut self,
1786 batch: SpineBatch<T>,
1787 batch_index: usize,
1788 log: &mut SpineLog<'_, T>,
1789 ) {
1790 if batch_index > 32 {
1811 println!("Large batch index: {}", batch_index);
1812 }
1813
1814 let mut fuel = 8 << batch_index;
1821 fuel *= self.effort;
1824 #[allow(clippy::as_conversions)]
1827 let fuel = fuel as isize;
1828
1829 self.apply_fuel(&fuel, log);
1836
1837 self.roll_up(batch_index, log);
1854
1855 self.insert_at(batch, batch_index);
1859
1860 self.tidy_layers();
1866 }
1867
1868 fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1876 while self.merging.len() <= index {
1878 self.merging.push(MergeState::default());
1879 }
1880
1881 if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1883 let mut merged = None;
1886 for i in 0..index {
1887 if let Some(merged) = merged.take() {
1888 self.insert_at(merged, i);
1889 }
1890 merged = self.complete_at(i, log);
1891 }
1892
1893 if let Some(merged) = merged {
1897 self.insert_at(merged, index);
1898 }
1899
1900 if self.merging[index].is_full() {
1903 let merged = self.complete_at(index, log).expect("double batch");
1904 self.insert_at(merged, index + 1);
1905 }
1906 }
1907 }
1908
1909 pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1917 for index in 0..self.merging.len() {
1923 let mut fuel = *fuel;
1925 self.merging[index].work(&mut fuel);
1928 if self.merging[index].is_complete() {
1940 let complete = self.complete_at(index, log).expect("complete batch");
1941 self.insert_at(complete, index + 1);
1942 }
1943 }
1944 }
1945
1946 fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1952 while self.merging.len() <= index {
1954 self.merging.push(MergeState::default());
1955 }
1956
1957 let merging = &mut self.merging[index];
1959 merging.push_batch(batch);
1960 if merging.batches.is_full() {
1961 let compaction_frontier = Some(self.since.borrow());
1962 merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1963 }
1964 }
1965
1966 fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1968 self.merging[index].complete(log)
1969 }
1970
1971 fn tidy_layers(&mut self) {
1973 if !self.merging.is_empty() {
1978 let mut length = self.merging.len();
1979 if self.merging[length - 1].is_single() {
1980 let appropriate_level = usize::cast_from(
1984 self.merging[length - 1]
1985 .len()
1986 .next_power_of_two()
1987 .trailing_zeros(),
1988 );
1989
1990 while appropriate_level < length - 1 {
1992 let current = &mut self.merging[length - 2];
1993 if current.is_vacant() {
1994 self.merging.remove(length - 2);
1996 length = self.merging.len();
1997 } else {
1998 if !current.is_full() {
1999 let mut smaller = 0;
2007 for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
2008 smaller += batch.batches.len() << index;
2009 }
2010
2011 if smaller <= (1 << length) / 8 {
2012 let state = self.merging.remove(length - 2);
2015 assert_eq!(state.batches.len(), 1);
2016 for batch in state.batches {
2017 self.insert_at(batch, length - 2);
2018 }
2019 }
2020 }
2021 break;
2022 }
2023 }
2024 }
2025 }
2026 }
2027
2028 fn validate(&self) -> Result<(), String> {
2036 let mut id = SpineId(0, 0);
2037 let mut frontier = Antichain::from_elem(T::minimum());
2038 for x in self.merging.iter().rev() {
2039 if x.is_full() != x.merge.is_some() {
2040 return Err(format!(
2041 "all (and only) full batches should have fueling merges (full={}, merge={:?})",
2042 x.is_full(),
2043 x.merge,
2044 ));
2045 }
2046
2047 if let Some(m) = &x.merge {
2048 if !x.is_full() {
2049 return Err(format!(
2050 "merge should only exist for full batches (len={:?}, merge={:?})",
2051 x.batches.len(),
2052 m.id,
2053 ));
2054 }
2055 if x.id() != Some(m.id) {
2056 return Err(format!(
2057 "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
2058 x.id(),
2059 m.id,
2060 ));
2061 }
2062 }
2063
2064 for batch in &x.batches {
2069 if batch.id().0 != id.1 {
2070 return Err(format!(
2071 "batch id {:?} does not match the previous id {:?}: {:?}",
2072 batch.id(),
2073 id,
2074 self
2075 ));
2076 }
2077 id = batch.id();
2078 if batch.desc().lower() != &frontier {
2079 return Err(format!(
2080 "batch lower {:?} does not match the previous upper {:?}: {:?}",
2081 batch.desc().lower(),
2082 frontier,
2083 self
2084 ));
2085 }
2086 frontier.clone_from(batch.desc().upper());
2087 if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
2088 return Err(format!(
2089 "since of batch {:?} past the spine since {:?}: {:?}",
2090 batch.desc().since(),
2091 self.since,
2092 self
2093 ));
2094 }
2095 }
2096 }
2097 if self.next_id != id.1 {
2098 return Err(format!(
2099 "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
2100 self.next_id, id, self
2101 ));
2102 }
2103 if self.upper != frontier {
2104 return Err(format!(
2105 "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
2106 self.upper, frontier, self
2107 ));
2108 }
2109 Ok(())
2110 }
2111}
2112
2113#[derive(Debug, Clone)]
2118struct MergeState<T> {
2119 batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
2120 merge: Option<IdFuelingMerge<T>>,
2121}
2122
2123impl<T> Default for MergeState<T> {
2124 fn default() -> Self {
2125 Self {
2126 batches: ArrayVec::new(),
2127 merge: None,
2128 }
2129 }
2130}
2131
2132impl<T: Timestamp + Lattice> MergeState<T> {
2133 fn id(&self) -> Option<SpineId> {
2135 if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
2136 Some(SpineId(first.id().0, last.id().1))
2137 } else {
2138 None
2139 }
2140 }
2141
2142 fn single(batch: SpineBatch<T>) -> Self {
2144 let mut state = Self::default();
2145 state.push_batch(batch);
2146 state
2147 }
2148
2149 fn push_batch(&mut self, batch: SpineBatch<T>) {
2151 if let Some(last) = self.batches.last() {
2152 assert_eq!(last.id().1, batch.id().0);
2153 assert_eq!(last.upper(), batch.lower());
2154 }
2155 assert!(
2156 self.merge.is_none(),
2157 "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
2158 batch.id,
2159 self.batches.len(),
2160 );
2161 self.batches
2162 .try_push(batch)
2163 .expect("Attempted to insert batch into full layer!");
2164 }
2165
2166 fn len(&self) -> usize {
2168 self.batches.iter().map(SpineBatch::len).sum()
2169 }
2170
2171 fn is_empty(&self) -> bool {
2173 self.batches.iter().all(SpineBatch::is_empty)
2174 }
2175
2176 fn is_vacant(&self) -> bool {
2178 self.batches.is_empty()
2179 }
2180
2181 fn is_single(&self) -> bool {
2183 self.batches.len() == 1
2184 }
2185
2186 fn is_full(&self) -> bool {
2189 self.batches.is_full()
2190 }
2191
2192 fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
2199 let mut this = mem::take(self);
2200 if this.batches.len() <= 1 {
2201 this.batches.pop()
2202 } else {
2203 let id_merge = this
2205 .merge
2206 .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
2207 id_merge.merge.done(this.batches, log)
2208 }
2209 }
2210
2211 fn is_complete(&self) -> bool {
2213 match &self.merge {
2214 Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
2215 None => false,
2216 }
2217 }
2218
2219 fn work(&mut self, fuel: &mut isize) {
2221 if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
2223 merge.work(&self.batches[..], fuel)
2224 }
2225 }
2226}
2227
2228#[cfg(test)]
2229pub mod datadriven {
2230 use mz_ore::fmt::FormatBuffer;
2231
2232 use crate::internal::datadriven::DirectiveArgs;
2233
2234 use super::*;
2235
2236 #[derive(Debug, Default)]
2238 pub struct TraceState {
2239 pub trace: Trace<u64>,
2240 pub merge_reqs: Vec<FueledMergeReq<u64>>,
2241 }
2242
2243 pub fn since_upper(
2244 datadriven: &TraceState,
2245 _args: DirectiveArgs,
2246 ) -> Result<String, anyhow::Error> {
2247 Ok(format!(
2248 "{:?}{:?}\n",
2249 datadriven.trace.since().elements(),
2250 datadriven.trace.upper().elements()
2251 ))
2252 }
2253
2254 pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
2255 let mut s = String::new();
2256 for b in datadriven.trace.spine.spine_batches() {
2257 s.push_str(b.describe(true).as_str());
2258 s.push('\n');
2259 }
2260 Ok(s)
2261 }
2262
2263 pub fn insert(
2264 datadriven: &mut TraceState,
2265 args: DirectiveArgs,
2266 ) -> Result<String, anyhow::Error> {
2267 for x in args
2268 .input
2269 .trim()
2270 .split('\n')
2271 .map(DirectiveArgs::parse_hollow_batch)
2272 {
2273 datadriven
2274 .merge_reqs
2275 .append(&mut datadriven.trace.push_batch(x));
2276 }
2277 Ok("ok\n".to_owned())
2278 }
2279
2280 pub fn downgrade_since(
2281 datadriven: &mut TraceState,
2282 args: DirectiveArgs,
2283 ) -> Result<String, anyhow::Error> {
2284 let since = args.expect("since");
2285 datadriven
2286 .trace
2287 .downgrade_since(&Antichain::from_elem(since));
2288 Ok("ok\n".to_owned())
2289 }
2290
2291 pub fn take_merge_req(
2292 datadriven: &mut TraceState,
2293 _args: DirectiveArgs,
2294 ) -> Result<String, anyhow::Error> {
2295 let mut s = String::new();
2296 for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
2297 write!(
2298 s,
2299 "{:?}{:?}{:?} {}\n",
2300 merge_req.desc.lower().elements(),
2301 merge_req.desc.upper().elements(),
2302 merge_req.desc.since().elements(),
2303 merge_req
2304 .inputs
2305 .iter()
2306 .flat_map(|x| x.batch.parts.iter())
2307 .map(|x| x.printable_name())
2308 .collect::<Vec<_>>()
2309 .join(" ")
2310 );
2311 }
2312 Ok(s)
2313 }
2314
2315 pub fn apply_merge_res(
2316 datadriven: &mut TraceState,
2317 args: DirectiveArgs,
2318 ) -> Result<String, anyhow::Error> {
2319 let res = FueledMergeRes {
2320 output: DirectiveArgs::parse_hollow_batch(args.input),
2321 input: CompactionInput::Legacy,
2322 new_active_compaction: None,
2323 };
2324 match datadriven.trace.apply_merge_res_unchecked(&res) {
2325 ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
2326 ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
2327 ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
2328 ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
2329 ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
2330 }
2331 }
2332}
2333
2334#[cfg(test)]
2335pub(crate) mod tests {
2336 use std::ops::Range;
2337
2338 use proptest::prelude::*;
2339 use semver::Version;
2340
2341 use crate::internal::state::tests::{any_hollow_batch, any_hollow_batch_with_exact_runs};
2342
2343 use super::*;
2344
2345 pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
2346 num_batches: Range<usize>,
2347 ) -> impl Strategy<Value = Trace<T>> {
2348 Strategy::prop_map(
2349 (
2350 any::<Option<T>>(),
2351 proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
2352 any::<bool>(),
2353 any::<u64>(),
2354 ),
2355 |(since, mut batches, roundtrip_structure, timeout_ms)| {
2356 let mut trace = Trace::<T>::default();
2357 trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
2358
2359 batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
2362 let mut lower = Antichain::from_elem(T::minimum());
2363 for mut batch in batches {
2364 if PartialOrder::less_than(trace.since(), batch.desc.since()) {
2366 trace.downgrade_since(batch.desc.since());
2367 }
2368 batch.desc = Description::new(
2369 lower.clone(),
2370 batch.desc.upper().clone(),
2371 batch.desc.since().clone(),
2372 );
2373 lower.clone_from(batch.desc.upper());
2374 let _merge_req = trace.push_batch(batch);
2375 }
2376 let reqs: Vec<_> = trace
2377 .fueled_merge_reqs_before_ms(timeout_ms, None)
2378 .collect();
2379 for req in reqs {
2380 trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
2381 }
2382 trace.roundtrip_structure = roundtrip_structure;
2383 trace
2384 },
2385 )
2386 }
2387
2388 #[mz_ore::test]
2389 #[cfg_attr(miri, ignore)] fn test_roundtrips() {
2391 fn check(trace: Trace<i64>) {
2392 trace.validate().unwrap();
2393 let flat = trace.flatten();
2394 let unflat = Trace::unflatten(flat).unwrap();
2395 assert_eq!(trace, unflat);
2396 }
2397
2398 proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
2399 }
2400
2401 #[mz_ore::test]
2402 fn fueled_merge_reqs() {
2403 let mut trace: Trace<u64> = Trace::default();
2404 let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
2405 0,
2406 10,
2407 &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
2408 1000,
2409 ));
2410
2411 assert!(fueled_reqs.is_empty());
2412 assert_eq!(
2413 trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
2414 0,
2415 "no merge reqs when not filtering by version"
2416 );
2417 assert_eq!(
2418 trace
2419 .fueled_merge_reqs_before_ms(
2420 u64::MAX,
2421 Some(WriterKey::for_version(&Version::new(0, 50, 0)))
2422 )
2423 .count(),
2424 0,
2425 "zero batches are older than a past version"
2426 );
2427 assert_eq!(
2428 trace
2429 .fueled_merge_reqs_before_ms(
2430 u64::MAX,
2431 Some(WriterKey::for_version(&Version::new(99, 99, 0)))
2432 )
2433 .count(),
2434 1,
2435 "one batch is older than a future version"
2436 );
2437 }
2438
2439 #[mz_ore::test]
2440 fn remove_redundant_merge_reqs() {
2441 fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
2442 FueledMergeReq {
2443 id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
2444 desc: Description::new(
2445 Antichain::from_elem(lower),
2446 Antichain::from_elem(upper),
2447 Antichain::new(),
2448 ),
2449 inputs: vec![],
2450 }
2451 }
2452
2453 assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
2455
2456 assert_eq!(
2458 Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
2459 vec![req(0, 1)]
2460 );
2461
2462 assert_eq!(
2464 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
2465 vec![req(0, 1)]
2466 );
2467
2468 assert_eq!(
2470 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
2471 vec![req(1, 2), req(0, 1)]
2472 );
2473
2474 assert_eq!(
2476 Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
2477 vec![req(0, 3)]
2478 );
2479
2480 assert_eq!(
2482 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
2483 vec![req(0, 3)]
2484 );
2485
2486 assert_eq!(
2488 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
2489 vec![req(0, 3)]
2490 );
2491
2492 assert_eq!(
2494 Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
2495 vec![req(0, 3)]
2496 );
2497
2498 assert_eq!(
2500 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
2501 vec![req(1, 3), req(0, 2)]
2502 );
2503
2504 assert_eq!(
2506 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
2507 vec![req(0, 2), req(1, 3)]
2508 );
2509
2510 let req015 = FueledMergeReq {
2512 id: SpineId(0, 1),
2513 desc: Description::new(
2514 Antichain::from_elem(0),
2515 Antichain::from_elem(1),
2516 Antichain::from_elem(5),
2517 ),
2518 inputs: vec![],
2519 };
2520 assert_eq!(
2521 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2522 vec![req015, req(0, 1)]
2523 );
2524 }
2525
2526 #[mz_ore::test]
2527 #[cfg_attr(miri, ignore)] fn construct_batch_with_runs_replaced_test() {
2529 let batch_strategy = any_hollow_batch::<u64>();
2530 let to_replace_strategy = any_hollow_batch_with_exact_runs::<u64>(1);
2531
2532 let combined_strategy = (batch_strategy, to_replace_strategy)
2533 .prop_filter("non-empty batch", |(batch, _)| batch.run_meta.len() >= 1);
2534
2535 let final_strategy = combined_strategy.prop_flat_map(|(batch, to_replace)| {
2536 let batch_len = batch.run_meta.len();
2537 let batch_clone = batch.clone();
2538 let to_replace_clone = to_replace.clone();
2539
2540 proptest::collection::vec(any::<bool>(), batch_len)
2541 .prop_filter("at least one run selected", |mask| mask.iter().any(|&x| x))
2542 .prop_map(move |mask| {
2543 let indices: Vec<usize> = mask
2544 .iter()
2545 .enumerate()
2546 .filter_map(|(i, &selected)| if selected { Some(i) } else { None })
2547 .collect();
2548 (batch_clone.clone(), to_replace_clone.clone(), indices)
2549 })
2550 });
2551
2552 proptest!(|(
2553 (batch, to_replace, runs) in final_strategy
2554 )| {
2555 let original_run_ids: Vec<_> = batch.run_meta.iter().map(|x|
2556 x.id.unwrap().clone()
2557 ).collect();
2558
2559 let run_ids = runs.iter().map(|&i| original_run_ids[i].clone()).collect::<Vec<_>>();
2560
2561 let new_batch = SpineBatch::construct_batch_with_runs_replaced(
2562 &batch,
2563 &run_ids,
2564 &to_replace,
2565 ).unwrap();
2566
2567 prop_assert!(new_batch.run_meta.len() == batch.run_meta.len() - runs.len() + to_replace.run_meta.len());
2568 });
2569 }
2570}