Skip to main content

mz_persist_client/internal/
trace.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An append-only collection of compactable update batches. The Spine below is
11//! a fork of Differential Dataflow's [Spine] with minimal modifications. The
12//! original Spine code is designed for incremental (via "fuel"ing) synchronous
13//! merge of in-memory batches. Persist doesn't want compaction to block
14//! incoming writes and, in fact, may in the future elect to push the work of
15//! compaction onto another machine entirely via RPC. As a result, we abuse the
16//! Spine code as follows:
17//!
18//! [Spine]: differential_dataflow::trace::implementations::spine_fueled::Spine
19//!
20//! - The normal Spine works in terms of [Batch] impls. A `Batch` is added to
21//!   the Spine. As progress is made, the Spine will merge two batches together
22//!   by: constructing a [Batch::Merger], giving it bits of fuel to
23//!   incrementally perform the merge (which spreads out the work, keeping
24//!   latencies even), and then once it's done fueling extracting the new single
25//!   output `Batch` and discarding the inputs.
26//! - Persist instead represents a batch of blob data with a [HollowBatch]
27//!   pointer which contains the normal `Batch` metadata plus the keys necessary
28//!   to retrieve the updates.
29//! - [SpineBatch] wraps `HollowBatch` and has a [FuelingMerge] companion
30//!   (analogous to `Batch::Merger`) that allows us to represent a merge as it
31//!   is fueling. Normally, this would represent real incremental compaction
32//!   progress, but in persist, it's simply a bookkeeping mechanism. Once fully
33//!   fueled, the `FuelingMerge` is turned into a fueled [SpineBatch],
34//!   which to the Spine is indistinguishable from a merged batch. At this
35//!   point, it is eligible for asynchronous compaction and a `FueledMergeReq`
36//!   is generated.
37//! - At any later point, this request may be answered via
38//!   [Trace::apply_merge_res_checked] or [Trace::apply_merge_res_unchecked].
39//!   This internally replaces the`SpineBatch`, which has no
40//!   effect on the structure of `Spine` but replaces the metadata
41//!   in persist's state to point at the new batch.
42//! - `SpineBatch` is explictly allowed to accumulate a list of `HollowBatch`s.
43//!   This decouples compaction from Spine progress and also allows us to reduce
44//!   write amplification by merging `N` batches at once where `N` can be
45//!   greater than 2.
46//!
47//! [Batch]: differential_dataflow::trace::Batch
48//! [Batch::Merger]: differential_dataflow::trace::Batch::Merger
49
50use std::cmp::Ordering;
51use std::collections::{BTreeMap, BTreeSet};
52use std::fmt::{Debug, Display};
53use std::mem;
54use std::ops::Range;
55use std::sync::Arc;
56
57use arrayvec::ArrayVec;
58use differential_dataflow::difference::Monoid;
59use differential_dataflow::lattice::Lattice;
60use differential_dataflow::trace::Description;
61use itertools::Itertools;
62use mz_ore::cast::CastFrom;
63use mz_persist::metrics::ColumnarMetrics;
64use mz_persist_types::Codec64;
65use serde::{Serialize, Serializer};
66use timely::PartialOrder;
67use timely::progress::frontier::AntichainRef;
68use timely::progress::{Antichain, Timestamp};
69use tracing::{error, warn};
70
71use crate::internal::paths::WriterKey;
72use crate::internal::state::{HollowBatch, RunId};
73
74use super::state::RunPart;
75
76#[derive(Debug, Clone, PartialEq)]
77pub struct FueledMergeReq<T> {
78    pub id: SpineId,
79    pub desc: Description<T>,
80    pub inputs: Vec<IdHollowBatch<T>>,
81}
82
83#[derive(Debug)]
84pub struct FueledMergeRes<T> {
85    pub output: HollowBatch<T>,
86    pub input: CompactionInput,
87    pub new_active_compaction: Option<ActiveCompaction>,
88}
89
90/// An append-only collection of compactable update batches.
91///
92/// In an effort to keep our fork of Spine as close as possible to the original,
93/// we push as many changes as possible into this wrapper.
94#[derive(Debug, Clone)]
95pub struct Trace<T> {
96    spine: Spine<T>,
97    pub(crate) roundtrip_structure: bool,
98}
99
100#[cfg(any(test, debug_assertions))]
101impl<T: PartialEq> PartialEq for Trace<T> {
102    fn eq(&self, other: &Self) -> bool {
103        // Deconstruct self and other so we get a compile failure if new fields
104        // are added.
105        let Trace {
106            spine: _,
107            roundtrip_structure: _,
108        } = self;
109        let Trace {
110            spine: _,
111            roundtrip_structure: _,
112        } = other;
113
114        // Intentionally use HollowBatches for this comparison so we ignore
115        // differences in spine layers.
116        self.batches().eq(other.batches())
117    }
118}
119
120impl<T: Timestamp + Lattice> Default for Trace<T> {
121    fn default() -> Self {
122        Self {
123            spine: Spine::new(),
124            roundtrip_structure: true,
125        }
126    }
127}
128
129#[derive(Clone, Debug, Serialize)]
130pub struct ThinSpineBatch<T> {
131    pub(crate) level: usize,
132    pub(crate) desc: Description<T>,
133    pub(crate) parts: Vec<SpineId>,
134    /// NB: this exists to validate legacy batch bounds during the migration;
135    /// it can be deleted once the roundtrip_structure flag is permanently rolled out.
136    pub(crate) descs: Vec<Description<T>>,
137}
138
139impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
140    fn eq(&self, other: &Self) -> bool {
141        // Ignore the temporary descs vector when comparing for equality.
142        (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
143    }
144}
145
146#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
147pub struct ThinMerge<T> {
148    pub(crate) since: Antichain<T>,
149    pub(crate) remaining_work: usize,
150    pub(crate) active_compaction: Option<ActiveCompaction>,
151}
152
153impl<T: Clone> ThinMerge<T> {
154    fn fueling(merge: &FuelingMerge<T>) -> Self {
155        ThinMerge {
156            since: merge.since.clone(),
157            remaining_work: merge.remaining_work,
158            active_compaction: None,
159        }
160    }
161
162    fn fueled(batch: &SpineBatch<T>) -> Self {
163        ThinMerge {
164            since: batch.desc.since().clone(),
165            remaining_work: 0,
166            active_compaction: batch.active_compaction.clone(),
167        }
168    }
169}
170
171/// This is a "flattened" representation of a Trace. Goals:
172/// - small updates to the trace should result in small differences in the `FlatTrace`;
173/// - two `FlatTrace`s should be efficient to diff;
174/// - converting to and from a `Trace` should be relatively straightforward.
175///
176/// These goals are all somewhat in tension, and the space of possible representations is pretty
177/// large. See individual fields for comments on some of the tradeoffs.
178#[derive(Clone, Debug)]
179pub struct FlatTrace<T> {
180    pub(crate) since: Antichain<T>,
181    /// Hollow batches without an associated ID. If this flattened trace contains spine batches,
182    /// we can figure out which legacy batch belongs in which spine batch by comparing the `desc`s.
183    /// Previously, we serialized a trace as just this list of batches. Keeping this data around
184    /// helps ensure backwards compatibility. In the near future, we may still keep some batches
185    /// here to help minimize the size of diffs -- rewriting all the hollow batches in a shard
186    /// can be prohibitively expensive. Eventually, we'd like to remove this in favour of the
187    /// collection below.
188    pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
189    /// Hollow batches _with_ an associated ID. Spine batches can reference these hollow batches
190    /// by id directly.
191    pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
192    /// Spine batches stored by ID. We reference hollow batches by ID, instead of inlining them,
193    /// to make differential updates smaller when two batches merge together. We also store the
194    /// level on the batch, instead of mapping from level to a list of batches... the level of a
195    /// spine batch doesn't change over time, but the list of batches at a particular level does.
196    pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
197    /// In-progress merges. We store this by spine id instead of level to prepare for some possible
198    /// generalizations to spine (merging N of M batches at a level). This is also a natural place
199    /// to store incremental merge progress in the future.
200    pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
201}
202
203impl<T: Timestamp + Lattice> Trace<T> {
204    pub(crate) fn flatten(&self) -> FlatTrace<T> {
205        let since = self.spine.since.clone();
206        let mut legacy_batches = BTreeMap::new();
207        let mut hollow_batches = BTreeMap::new();
208        let mut spine_batches = BTreeMap::new();
209        let mut merges = BTreeMap::new();
210
211        let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
212            let id = batch.id();
213            let desc = batch.desc.clone();
214            let mut parts = Vec::with_capacity(batch.parts.len());
215            let mut descs = Vec::with_capacity(batch.parts.len());
216            for IdHollowBatch { id, batch } in &batch.parts {
217                parts.push(*id);
218                descs.push(batch.desc.clone());
219                // Ideally, we'd like to put all batches in the hollow_batches collection, since
220                // tracking the spine id reduces ambiguity and makes diffing cheaper. However,
221                // we currently keep most batches in the legacy collection for backwards
222                // compatibility.
223                // As an exception, we add batches with empty time ranges to hollow_batches:
224                // they're otherwise not guaranteed to be unique, and since we only started writing
225                // them down recently there's no backwards compatibility risk.
226                if batch.desc.lower() == batch.desc.upper() {
227                    hollow_batches.insert(*id, Arc::clone(batch));
228                } else {
229                    legacy_batches.insert(Arc::clone(batch), ());
230                }
231            }
232
233            let spine_batch = ThinSpineBatch {
234                level,
235                desc,
236                parts,
237                descs,
238            };
239            spine_batches.insert(id, spine_batch);
240        };
241
242        for (level, state) in self.spine.merging.iter().enumerate() {
243            for batch in &state.batches {
244                push_spine_batch(level, batch);
245                if let Some(c) = &batch.active_compaction {
246                    let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
247                    assert!(
248                        previous.is_none(),
249                        "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
250                        batch.id,
251                    )
252                }
253            }
254            if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
255                let previous = merges.insert(*id, ThinMerge::fueling(merge));
256                assert!(
257                    previous.is_none(),
258                    "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
259                )
260            }
261        }
262
263        if !self.roundtrip_structure {
264            assert!(hollow_batches.is_empty());
265            spine_batches.clear();
266            merges.clear();
267        }
268
269        FlatTrace {
270            since,
271            legacy_batches,
272            hollow_batches,
273            spine_batches,
274            merges,
275        }
276    }
277    pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
278        let FlatTrace {
279            since,
280            legacy_batches,
281            mut hollow_batches,
282            spine_batches,
283            mut merges,
284        } = value;
285
286        // If the flattened representation has spine batches (or is empty)
287        // we know to preserve the structure for this trace.
288        let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
289
290        // We need to look up legacy batches somehow, but we don't have a spine id for them.
291        // Instead, we rely on the fact that the spine must store them in antichain order.
292        // Our timestamp type may not be totally ordered, so we need to implement our own comparator
293        // here. Persist's invariants ensure that all the frontiers we're comparing are comparable,
294        // though.
295        let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
296            if PartialOrder::less_than(left, right) {
297                Ordering::Less
298            } else if PartialOrder::less_than(right, left) {
299                Ordering::Greater
300            } else {
301                Ordering::Equal
302            }
303        };
304        let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
305        legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
306
307        let mut pop_batch =
308            |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
309                if let Some(batch) = hollow_batches.remove(&id) {
310                    if let Some(desc) = expected_desc {
311                        // We don't expect the desc's upper and lower to change for a given spine id.
312                        assert_eq!(desc.lower(), batch.desc.lower());
313                        assert_eq!(desc.upper(), batch.desc.upper());
314                        // Due to the way thin spine batches are diffed, the sinces can be out of sync.
315                        // This should be rare, and hopefully impossible once we change how diffs work.
316                        if desc.since() != batch.desc.since() {
317                            warn!(
318                                "unexpected since out of sync for spine batch: {:?} != {:?}",
319                                desc.since().elements(),
320                                batch.desc.since().elements()
321                            );
322                        }
323                    }
324                    return Ok(IdHollowBatch { id, batch });
325                }
326                let mut batch = legacy_batches
327                    .pop()
328                    .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
329
330                let Some(expected_desc) = expected_desc else {
331                    return Ok(IdHollowBatch { id, batch });
332                };
333
334                if expected_desc.lower() != batch.desc.lower() {
335                    return Err(format!(
336                        "hollow batch lower {:?} did not match expected lower {:?}",
337                        batch.desc.lower().elements(),
338                        expected_desc.lower().elements()
339                    ));
340                }
341
342                // Empty legacy batches are not deterministic: different nodes may split them up
343                // in different ways. For now, we rearrange them such to match the spine data.
344                if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
345                    let mut new_upper = batch.desc.upper().clone();
346
347                    // While our current batch is too small, and there's another empty batch
348                    // in the list, roll it in.
349                    while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
350                        let Some(next_batch) = legacy_batches.pop() else {
351                            break;
352                        };
353                        if next_batch.is_empty() {
354                            new_upper.clone_from(next_batch.desc.upper());
355                        } else {
356                            legacy_batches.push(next_batch);
357                            break;
358                        }
359                    }
360
361                    // If our current batch is too large, split it by the expected upper
362                    // and preserve the remainder.
363                    if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
364                        legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
365                            expected_desc.upper().clone(),
366                            new_upper.clone(),
367                            batch.desc.since().clone(),
368                        ))));
369                        new_upper.clone_from(expected_desc.upper());
370                    }
371                    batch = Arc::new(HollowBatch::empty(Description::new(
372                        batch.desc.lower().clone(),
373                        new_upper,
374                        batch.desc.since().clone(),
375                    )))
376                }
377
378                if expected_desc.upper() != batch.desc.upper() {
379                    return Err(format!(
380                        "hollow batch upper {:?} did not match expected upper {:?}",
381                        batch.desc.upper().elements(),
382                        expected_desc.upper().elements()
383                    ));
384                }
385
386                Ok(IdHollowBatch { id, batch })
387            };
388
389        let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
390            (batch.desc.upper().clone(), id.1)
391        } else {
392            (Antichain::from_elem(T::minimum()), 0)
393        };
394        let levels = spine_batches
395            .first_key_value()
396            .map(|(_, batch)| batch.level + 1)
397            .unwrap_or(0);
398        let mut merging = vec![MergeState::default(); levels];
399        for (id, batch) in spine_batches {
400            let level = batch.level;
401
402            let descs = batch.descs.iter().map(Some).chain(std::iter::repeat_n(
403                None,
404                batch.parts.len() - batch.descs.len(),
405            ));
406            let parts = batch
407                .parts
408                .into_iter()
409                .zip_eq(descs)
410                .map(|(id, desc)| pop_batch(id, desc))
411                .collect::<Result<Vec<_>, _>>()?;
412            let len = parts.iter().map(|p| (*p).batch.len).sum();
413            let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
414            let batch = SpineBatch {
415                id,
416                desc: batch.desc,
417                parts,
418                active_compaction,
419                len,
420            };
421
422            let state = &mut merging[level];
423
424            state.push_batch(batch);
425            if let Some(id) = state.id() {
426                if let Some(merge) = merges.remove(&id) {
427                    state.merge = Some(IdFuelingMerge {
428                        id,
429                        merge: FuelingMerge {
430                            since: merge.since,
431                            remaining_work: merge.remaining_work,
432                        },
433                    })
434                }
435            }
436        }
437
438        let mut trace = Trace {
439            spine: Spine {
440                effort: 1,
441                next_id,
442                since,
443                upper,
444                merging,
445            },
446            roundtrip_structure,
447        };
448
449        fn check_empty(name: &str, len: usize) -> Result<(), String> {
450            if len != 0 {
451                Err(format!("{len} {name} left after reconstructing spine"))
452            } else {
453                Ok(())
454            }
455        }
456
457        if roundtrip_structure {
458            check_empty("legacy batches", legacy_batches.len())?;
459        } else {
460            // If the structure wasn't actually serialized, we may have legacy batches left over.
461            for batch in legacy_batches.into_iter().rev() {
462                trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
463            }
464        }
465        check_empty("hollow batches", hollow_batches.len())?;
466        check_empty("merges", merges.len())?;
467
468        debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
469
470        Ok(trace)
471    }
472}
473
474#[derive(Clone, Debug, Default)]
475pub(crate) struct SpineMetrics {
476    pub compact_batches: u64,
477    pub compacting_batches: u64,
478    pub noncompact_batches: u64,
479}
480
481impl<T> Trace<T> {
482    pub fn since(&self) -> &Antichain<T> {
483        &self.spine.since
484    }
485
486    pub fn upper(&self) -> &Antichain<T> {
487        &self.spine.upper
488    }
489
490    pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
491        for batch in self.batches() {
492            f(batch);
493        }
494    }
495
496    pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
497        self.spine
498            .spine_batches()
499            .flat_map(|b| b.parts.as_slice())
500            .map(|b| &*b.batch)
501    }
502
503    pub fn num_spine_batches(&self) -> usize {
504        self.spine.spine_batches().count()
505    }
506
507    #[cfg(test)]
508    pub fn num_hollow_batches(&self) -> usize {
509        self.batches().count()
510    }
511
512    #[cfg(test)]
513    pub fn num_updates(&self) -> usize {
514        self.batches().map(|b| b.len).sum()
515    }
516}
517
518impl<T: Timestamp + Lattice> Trace<T> {
519    pub fn downgrade_since(&mut self, since: &Antichain<T>) {
520        self.spine.since.clone_from(since);
521    }
522
523    #[must_use]
524    pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
525        let mut merge_reqs = Vec::new();
526        self.spine.insert(
527            batch,
528            &mut SpineLog::Enabled {
529                merge_reqs: &mut merge_reqs,
530            },
531        );
532        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
533        // Spine::roll_up (internally used by insert) clears all batches out of
534        // levels below a target by walking up from level 0 and merging each
535        // level into the next (providing the necessary fuel). In practice, this
536        // means we'll get a series of requests like `(a, b), (a, b, c), ...`.
537        // It's a waste to do all of these (we'll throw away the results), so we
538        // filter out any that are entirely covered by some other request.
539        Self::remove_redundant_merge_reqs(merge_reqs)
540    }
541
542    pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
543        // TODO: we ought to be able to look up the id for a batch by binary searching the levels.
544        // In the meantime, search backwards, since most compactions are for recent batches.
545        for batch in self.spine.spine_batches_mut().rev() {
546            if batch.id == id {
547                batch.active_compaction = Some(compaction);
548                break;
549            }
550        }
551    }
552
553    /// The same as [Self::push_batch] but without the `FueledMergeReq`s, which
554    /// account for a surprising amount of cpu in prod. database-issues#5411
555    pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
556        self.spine.insert(batch, &mut SpineLog::Disabled);
557    }
558
559    /// Apply some amount of effort to trace maintenance.
560    ///
561    /// The units of effort are updates, and the method should be thought of as
562    /// analogous to inserting as many empty updates, where the trace is
563    /// permitted to perform proportionate work.
564    ///
565    /// Returns true if this did work and false if it left the spine unchanged.
566    #[must_use]
567    pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
568        let mut merge_reqs = Vec::new();
569        let did_work = self.spine.exert(
570            fuel,
571            &mut SpineLog::Enabled {
572                merge_reqs: &mut merge_reqs,
573            },
574        );
575        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
576        // See the comment in [Self::push_batch].
577        let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
578        (merge_reqs, did_work)
579    }
580
581    /// Validates invariants.
582    ///
583    /// See `Spine::validate` for details.
584    pub fn validate(&self) -> Result<(), String> {
585        self.spine.validate()
586    }
587
588    /// Obtain all fueled merge reqs that either have no active compaction, or the previous
589    /// compaction was started at or before the threshold time, in order from oldest to newest.
590    pub(crate) fn fueled_merge_reqs_before_ms(
591        &self,
592        threshold_ms: u64,
593        threshold_writer: Option<WriterKey>,
594    ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
595        self.spine
596            .spine_batches()
597            .filter(move |b| {
598                let noncompact = !b.is_compact();
599                let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
600                    b.parts.iter().any(|b| {
601                        b.batch
602                            .parts
603                            .iter()
604                            .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
605                    })
606                });
607                noncompact || old_writer
608            })
609            .filter(move |b| {
610                // Either there's no active compaction, or the last active compaction
611                // is not after the timeout timestamp.
612                b.active_compaction
613                    .as_ref()
614                    .map_or(true, move |c| c.start_ms <= threshold_ms)
615            })
616            .map(|b| FueledMergeReq {
617                id: b.id,
618                desc: b.desc.clone(),
619                inputs: b.parts.clone(),
620            })
621    }
622
623    // This is only called with the results of one `insert` and so the length of
624    // `merge_reqs` is bounded by the number of levels in the spine (or possibly
625    // some small constant multiple?). The number of levels is logarithmic in the
626    // number of updates in the spine, so this number should stay very small. As
627    // a result, we simply use the naive O(n^2) algorithm here instead of doing
628    // anything fancy with e.g. interval trees.
629    fn remove_redundant_merge_reqs(
630        mut merge_reqs: Vec<FueledMergeReq<T>>,
631    ) -> Vec<FueledMergeReq<T>> {
632        // Returns true if b0 covers b1, false otherwise.
633        fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
634            // TODO: can we relax or remove this since check?
635            b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
636        }
637
638        let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
639        // In practice, merge_reqs will come in sorted such that the "large"
640        // requests are later. Take advantage of this by processing back to
641        // front.
642        while let Some(merge_req) = merge_reqs.pop() {
643            let covered = ret.iter().any(|r| covers(r, &merge_req));
644            if !covered {
645                // Now check if anything we've already staged is covered by this
646                // new req. In practice, the merge_reqs come in sorted and so
647                // this `retain` is a no-op.
648                ret.retain(|r| !covers(&merge_req, r));
649                ret.push(merge_req);
650            }
651        }
652        ret
653    }
654
655    pub fn spine_metrics(&self) -> SpineMetrics {
656        let mut metrics = SpineMetrics::default();
657        for batch in self.spine.spine_batches() {
658            if batch.is_compact() {
659                metrics.compact_batches += 1;
660            } else if batch.is_merging() {
661                metrics.compacting_batches += 1;
662            } else {
663                metrics.noncompact_batches += 1;
664            }
665        }
666        metrics
667    }
668}
669
670impl<T: Timestamp + Lattice + Codec64> Trace<T> {
671    pub fn apply_merge_res_checked<D: Codec64 + Monoid + PartialEq>(
672        &mut self,
673        res: &FueledMergeRes<T>,
674        metrics: &ColumnarMetrics,
675    ) -> ApplyMergeResult {
676        for batch in self.spine.spine_batches_mut().rev() {
677            let result = batch.maybe_replace_checked::<D>(res, metrics);
678            if result.matched() {
679                return result;
680            }
681        }
682        ApplyMergeResult::NotAppliedNoMatch
683    }
684
685    pub fn apply_merge_res_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
686        for batch in self.spine.spine_batches_mut().rev() {
687            let result = batch.maybe_replace_unchecked(res);
688            if result.matched() {
689                return result;
690            }
691        }
692        ApplyMergeResult::NotAppliedNoMatch
693    }
694
695    pub fn apply_tombstone_merge(&mut self, desc: &Description<T>) -> ApplyMergeResult {
696        for batch in self.spine.spine_batches_mut().rev() {
697            let result = batch.maybe_replace_with_tombstone(desc);
698            if result.matched() {
699                return result;
700            }
701        }
702        ApplyMergeResult::NotAppliedNoMatch
703    }
704}
705
706/// A log of what transitively happened during a Spine operation: e.g.
707/// FueledMergeReqs were generated.
708enum SpineLog<'a, T> {
709    Enabled {
710        merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
711    },
712    Disabled,
713}
714
715#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
716pub enum CompactionInput {
717    /// We don't know what our inputs were; this should only be used for
718    /// unchecked legacy replacements.
719    Legacy,
720    /// This compaction output is a total replacement for all batches in this id range.
721    IdRange(SpineId),
722    /// This compaction output replaces the specified runs in this id range.
723    PartialBatch(SpineId, BTreeSet<RunId>),
724}
725
726#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
727pub struct SpineId(pub usize, pub usize);
728
729impl Display for SpineId {
730    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
731        write!(f, "[{}, {})", self.0, self.1)
732    }
733}
734
735impl Serialize for SpineId {
736    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
737    where
738        S: Serializer,
739    {
740        let SpineId(lo, hi) = self;
741        serializer.serialize_str(&format!("{lo}-{hi}"))
742    }
743}
744
745/// Creates a `SpineId` that covers the range of ids in the set.
746pub fn id_range(ids: BTreeSet<SpineId>) -> SpineId {
747    let mut id_iter = ids.iter().copied();
748    let Some(mut result) = id_iter.next() else {
749        panic!("at least one batch must be present")
750    };
751
752    for id in id_iter {
753        assert_eq!(
754            result.1, id.0,
755            "expected contiguous ids, but {result:?} is not adjacent to {id:?} in ids {ids:?}"
756        );
757        result.1 = id.1;
758    }
759    result
760}
761
762impl SpineId {
763    fn covers(self, other: SpineId) -> bool {
764        self.0 <= other.0 && other.1 <= self.1
765    }
766}
767
768#[derive(Debug, Clone, PartialEq)]
769pub struct IdHollowBatch<T> {
770    pub id: SpineId,
771    pub batch: Arc<HollowBatch<T>>,
772}
773
774#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
775pub struct ActiveCompaction {
776    pub start_ms: u64,
777}
778
779#[derive(Debug, Clone, PartialEq)]
780struct SpineBatch<T> {
781    id: SpineId,
782    desc: Description<T>,
783    parts: Vec<IdHollowBatch<T>>,
784    active_compaction: Option<ActiveCompaction>,
785    // A cached version of parts.iter().map(|x| x.len).sum()
786    len: usize,
787}
788
789impl<T> SpineBatch<T> {
790    fn merged(batch: IdHollowBatch<T>) -> Self
791    where
792        T: Clone,
793    {
794        Self {
795            id: batch.id,
796            desc: batch.batch.desc.clone(),
797            len: batch.batch.len,
798            parts: vec![batch],
799            active_compaction: None,
800        }
801    }
802}
803
804#[derive(Debug, Copy, Clone)]
805pub enum ApplyMergeResult {
806    AppliedExact,
807    AppliedSubset,
808    NotAppliedNoMatch,
809    NotAppliedInvalidSince,
810    NotAppliedTooManyUpdates,
811}
812
813impl ApplyMergeResult {
814    pub fn applied(&self) -> bool {
815        match self {
816            ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
817            _ => false,
818        }
819    }
820    pub fn matched(&self) -> bool {
821        match self {
822            ApplyMergeResult::AppliedExact
823            | ApplyMergeResult::AppliedSubset
824            | ApplyMergeResult::NotAppliedTooManyUpdates => true,
825            _ => false,
826        }
827    }
828}
829
830impl<T: Timestamp + Lattice> SpineBatch<T> {
831    pub fn lower(&self) -> &Antichain<T> {
832        self.desc().lower()
833    }
834
835    pub fn upper(&self) -> &Antichain<T> {
836        self.desc().upper()
837    }
838
839    fn id(&self) -> SpineId {
840        debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
841        debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
842        self.id
843    }
844
845    pub fn is_compact(&self) -> bool {
846        // A compact batch has at most one run.
847        // This check used to be if there was at most one hollow batch with at most one run,
848        // but that was a bit too strict since introducing incremental compaction.
849        // Incremental compaction can result in a batch with a single run, but multiple empty
850        // hollow batches, which we still consider compact. As levels are merged, we
851        // will eventually clean up the empty hollow batches.
852        self.parts
853            .iter()
854            .map(|p| p.batch.run_meta.len())
855            .sum::<usize>()
856            <= 1
857    }
858
859    pub fn is_merging(&self) -> bool {
860        self.active_compaction.is_some()
861    }
862
863    fn desc(&self) -> &Description<T> {
864        &self.desc
865    }
866
867    pub fn len(&self) -> usize {
868        // NB: This is an upper bound on len for a non-compact batch; we won't know for sure until
869        // we compact it.
870        debug_assert_eq!(
871            self.len,
872            self.parts.iter().map(|x| x.batch.len).sum::<usize>()
873        );
874        self.len
875    }
876
877    pub fn is_empty(&self) -> bool {
878        self.len() == 0
879    }
880
881    pub fn empty(
882        id: SpineId,
883        lower: Antichain<T>,
884        upper: Antichain<T>,
885        since: Antichain<T>,
886    ) -> Self {
887        SpineBatch::merged(IdHollowBatch {
888            id,
889            batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
890        })
891    }
892
893    pub fn begin_merge(
894        bs: &[Self],
895        compaction_frontier: Option<AntichainRef<T>>,
896    ) -> Option<IdFuelingMerge<T>> {
897        let from = bs.first()?.id().0;
898        let until = bs.last()?.id().1;
899        let id = SpineId(from, until);
900        let mut sinces = bs.iter().map(|b| b.desc().since());
901        let mut since = sinces.next()?.clone();
902        for b in bs {
903            since.join_assign(b.desc().since())
904        }
905        if let Some(compaction_frontier) = compaction_frontier {
906            since.join_assign(&compaction_frontier.to_owned());
907        }
908        let remaining_work = bs.iter().map(|x| x.len()).sum();
909        Some(IdFuelingMerge {
910            id,
911            merge: FuelingMerge {
912                since,
913                remaining_work,
914            },
915        })
916    }
917
918    #[cfg(test)]
919    fn describe(&self, extended: bool) -> String {
920        let SpineBatch {
921            id,
922            parts,
923            desc,
924            active_compaction,
925            len,
926        } = self;
927        let compaction = match active_compaction {
928            None => "".to_owned(),
929            Some(c) => format!(" (c@{})", c.start_ms),
930        };
931        match extended {
932            false => format!(
933                "[{}-{}]{:?}{:?}{}/{}{compaction}",
934                id.0,
935                id.1,
936                desc.lower().elements(),
937                desc.upper().elements(),
938                parts.len(),
939                len
940            ),
941            true => {
942                format!(
943                    "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
944                    id.0,
945                    id.1,
946                    desc.lower().elements(),
947                    desc.upper().elements(),
948                    desc.since().elements(),
949                    parts.len(),
950                    len,
951                    parts
952                        .iter()
953                        .flat_map(|x| x.batch.parts.iter())
954                        .map(|x| format!(" {}", x.printable_name()))
955                        .collect::<Vec<_>>()
956                        .join("")
957                )
958            }
959        }
960    }
961}
962
963impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
964    fn diffs_sum<'a, D: Monoid + Codec64>(
965        parts: impl IntoIterator<Item = &'a RunPart<T>>,
966        metrics: &ColumnarMetrics,
967    ) -> Option<D> {
968        let mut sum = D::zero();
969        for part in parts {
970            sum.plus_equals(&part.diffs_sum::<D>(metrics)?);
971        }
972        Some(sum)
973    }
974
975    /// Get the diff sum from the given batch for the given runs.
976    /// Returns `None` if the runs aren't present or any parts don't have statistics.
977    fn diffs_sum_for_runs<D: Monoid + Codec64>(
978        batch: &HollowBatch<T>,
979        run_ids: &[RunId],
980        metrics: &ColumnarMetrics,
981    ) -> Option<D> {
982        let mut run_ids = BTreeSet::from_iter(run_ids.iter().copied());
983        let mut sum = D::zero();
984
985        for (meta, run) in batch.runs() {
986            let id = meta.id?;
987            if run_ids.remove(&id) {
988                sum.plus_equals(&Self::diffs_sum(run, metrics)?);
989            }
990        }
991
992        run_ids.is_empty().then_some(sum)
993    }
994
995    fn maybe_replace_with_tombstone(&mut self, desc: &Description<T>) -> ApplyMergeResult {
996        let exact_match =
997            desc.lower() == self.desc().lower() && desc.upper() == self.desc().upper();
998
999        let empty_batch = HollowBatch::empty(desc.clone());
1000        if exact_match {
1001            *self = SpineBatch::merged(IdHollowBatch {
1002                id: self.id(),
1003                batch: Arc::new(empty_batch),
1004            });
1005            return ApplyMergeResult::AppliedExact;
1006        }
1007
1008        if let Some((id, range)) = self.find_replacement_range(desc) {
1009            self.perform_subset_replacement(&empty_batch, id, range, None)
1010        } else {
1011            ApplyMergeResult::NotAppliedNoMatch
1012        }
1013    }
1014
1015    fn construct_batch_with_runs_replaced(
1016        original: &HollowBatch<T>,
1017        run_ids: &[RunId],
1018        replacement: &HollowBatch<T>,
1019    ) -> Result<HollowBatch<T>, ApplyMergeResult> {
1020        if run_ids.is_empty() {
1021            return Err(ApplyMergeResult::NotAppliedNoMatch);
1022        }
1023
1024        let orig_run_ids: BTreeSet<_> = original.runs().filter_map(|(meta, _)| meta.id).collect();
1025        let run_ids: BTreeSet<_> = run_ids.iter().cloned().collect();
1026        if !orig_run_ids.is_superset(&run_ids) {
1027            return Err(ApplyMergeResult::NotAppliedNoMatch);
1028        }
1029
1030        let runs: Vec<_> = original
1031            .runs()
1032            .filter(|(meta, _)| {
1033                !run_ids.contains(&meta.id.expect("id should be present at this point"))
1034            })
1035            .chain(replacement.runs())
1036            .collect();
1037
1038        let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::<usize>();
1039
1040        let run_meta = runs
1041            .iter()
1042            .map(|(meta, _)| *meta)
1043            .cloned()
1044            .collect::<Vec<_>>();
1045
1046        let parts = runs
1047            .iter()
1048            .flat_map(|(_, parts)| *parts)
1049            .cloned()
1050            .collect::<Vec<_>>();
1051
1052        let run_splits = {
1053            let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1));
1054            let mut pointer = 0;
1055            for (i, (_, parts)) in runs.into_iter().enumerate() {
1056                if parts.is_empty() {
1057                    continue;
1058                }
1059                if i < run_meta.len() - 1 {
1060                    splits.push(pointer + parts.len());
1061                }
1062                pointer += parts.len();
1063            }
1064            splits
1065        };
1066
1067        Ok(HollowBatch::new(
1068            replacement.desc.clone(),
1069            parts,
1070            len,
1071            run_meta,
1072            run_splits,
1073        ))
1074    }
1075
1076    fn maybe_replace_checked<D>(
1077        &mut self,
1078        res: &FueledMergeRes<T>,
1079        metrics: &ColumnarMetrics,
1080    ) -> ApplyMergeResult
1081    where
1082        D: Monoid + Codec64 + PartialEq + Debug,
1083    {
1084        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1085        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1086        // since must be in advance of the merge res since.
1087        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1088            return ApplyMergeResult::NotAppliedInvalidSince;
1089        }
1090
1091        let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1092        let num_batches = self.parts.len();
1093
1094        let result = match &res.input {
1095            CompactionInput::IdRange(id) => {
1096                self.handle_id_range_replacement::<D>(res, id, new_diffs_sum, metrics)
1097            }
1098            CompactionInput::PartialBatch(id, runs) => {
1099                self.handle_partial_batch_replacement::<D>(res, *id, runs, new_diffs_sum, metrics)
1100            }
1101            CompactionInput::Legacy => self.maybe_replace_checked_classic::<D>(res, metrics),
1102        };
1103
1104        let num_batches_after = self.parts.len();
1105        assert!(
1106            num_batches_after <= num_batches,
1107            "replacing parts should not increase the number of batches"
1108        );
1109        result
1110    }
1111
1112    fn handle_id_range_replacement<D>(
1113        &mut self,
1114        res: &FueledMergeRes<T>,
1115        id: &SpineId,
1116        new_diffs_sum: Option<D>,
1117        metrics: &ColumnarMetrics,
1118    ) -> ApplyMergeResult
1119    where
1120        D: Monoid + Codec64 + PartialEq + Debug,
1121    {
1122        let range = self
1123            .parts
1124            .iter()
1125            .enumerate()
1126            .filter_map(|(i, p)| {
1127                if id.covers(p.id) {
1128                    Some((i, p.id))
1129                } else {
1130                    None
1131                }
1132            })
1133            .collect::<Vec<_>>();
1134
1135        let ids: BTreeSet<_> = range.iter().map(|(_, id)| *id).collect();
1136
1137        // If ids is empty, it means that we didn't find any parts that match the id range.
1138        // We also check that the id matches the range of ids we found.
1139        // At scale, sometimes regular compaction will race forced compaction,
1140        // for things like the catalog. In that case, we may have a
1141        // replacement that no longer lines up with the spine batches.
1142        // I think this is because forced compaction ignores the active_compaction
1143        // and just goes for it. This is slightly annoying but probably the right behavior
1144        // for a functions whose prefix is `force_`, so we just return
1145        // NotAppliedNoMatch here.
1146        if ids.is_empty() || id != &id_range(ids) {
1147            return ApplyMergeResult::NotAppliedNoMatch;
1148        }
1149
1150        // This is the range of hollow batches that we will replace.
1151        let (min, max) = match range.iter().map(|(i, _)| *i).minmax() {
1152            itertools::MinMaxResult::NoElements => return ApplyMergeResult::NotAppliedNoMatch,
1153            itertools::MinMaxResult::OneElement(elt) => (elt, elt),
1154            itertools::MinMaxResult::MinMax(min, max) => (min, max),
1155        };
1156        let replacement_range = min..max + 1;
1157
1158        // We need to replace a range of parts. Here we don't care about the run_indices
1159        // because we must be replacing the entire part(s)
1160        let old_diffs_sum = Self::diffs_sum::<D>(
1161            self.parts[replacement_range.clone()]
1162                .iter()
1163                .flat_map(|p| p.batch.parts.iter()),
1164            metrics,
1165        );
1166
1167        Self::validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "id range replacement");
1168
1169        self.perform_subset_replacement(
1170            &res.output,
1171            *id,
1172            replacement_range,
1173            res.new_active_compaction.clone(),
1174        )
1175    }
1176
1177    fn handle_partial_batch_replacement<D>(
1178        &mut self,
1179        res: &FueledMergeRes<T>,
1180        id: SpineId,
1181        runs: &BTreeSet<RunId>,
1182        new_diffs_sum: Option<D>,
1183        metrics: &ColumnarMetrics,
1184    ) -> ApplyMergeResult
1185    where
1186        D: Monoid + Codec64 + PartialEq + Debug,
1187    {
1188        if runs.is_empty() {
1189            return ApplyMergeResult::NotAppliedNoMatch;
1190        }
1191
1192        let part = self.parts.iter().enumerate().find(|(_, p)| p.id == id);
1193        let Some((i, batch)) = part else {
1194            return ApplyMergeResult::NotAppliedNoMatch;
1195        };
1196        let replacement_range = i..(i + 1);
1197
1198        let replacement_desc = &res.output.desc;
1199        let existing_desc = &batch.batch.desc;
1200        assert_eq!(
1201            replacement_desc.lower(),
1202            existing_desc.lower(),
1203            "batch lower should match, but {:?} != {:?}",
1204            replacement_desc.lower(),
1205            existing_desc.lower()
1206        );
1207        assert_eq!(
1208            replacement_desc.upper(),
1209            existing_desc.upper(),
1210            "batch upper should match, but {:?} != {:?}",
1211            replacement_desc.upper(),
1212            existing_desc.upper()
1213        );
1214        if !PartialOrder::less_equal(existing_desc.since(), replacement_desc.since()) {
1215            error!(
1216                "batch since should advance, but {:?} !<= {:?}",
1217                existing_desc.since(),
1218                replacement_desc.since()
1219            );
1220            return ApplyMergeResult::NotAppliedInvalidSince;
1221        }
1222
1223        let batch = &batch.batch;
1224        let run_ids = runs.iter().cloned().collect::<Vec<_>>();
1225
1226        match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) {
1227            Ok(new_batch) => {
1228                let old_diffs_sum = Self::diffs_sum_for_runs::<D>(batch, &run_ids, metrics);
1229                Self::validate_diffs_sum_match(
1230                    old_diffs_sum,
1231                    new_diffs_sum,
1232                    "partial batch replacement",
1233                );
1234                let old_batch_diff_sum = Self::diffs_sum::<D>(batch.parts.iter(), metrics);
1235                let new_batch_diff_sum = Self::diffs_sum::<D>(new_batch.parts.iter(), metrics);
1236                Self::validate_diffs_sum_match(
1237                    old_batch_diff_sum,
1238                    new_batch_diff_sum,
1239                    "sanity checking diffs sum for replaced runs",
1240                );
1241                self.perform_subset_replacement(
1242                    &new_batch,
1243                    id,
1244                    replacement_range,
1245                    res.new_active_compaction.clone(),
1246                )
1247            }
1248            Err(err) => err,
1249        }
1250    }
1251
1252    fn validate_diffs_sum_match<D>(
1253        old_diffs_sum: Option<D>,
1254        new_diffs_sum: Option<D>,
1255        context: &str,
1256    ) where
1257        D: Monoid + Codec64 + PartialEq + Debug,
1258    {
1259        let new_diffs_sum = new_diffs_sum.unwrap_or_else(D::zero);
1260        if let Some(old_diffs_sum) = old_diffs_sum {
1261            assert_eq!(
1262                old_diffs_sum, new_diffs_sum,
1263                "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?}) ({})",
1264                new_diffs_sum, old_diffs_sum, context
1265            )
1266        }
1267    }
1268
1269    /// This is the "legacy" way of replacing a spine batch with a merge result.
1270    /// It is used in moments when we don't have the full compaction input
1271    /// information.
1272    /// Eventually we should strive to roundtrip Spine IDs everywhere and
1273    /// deprecate this method.
1274    fn maybe_replace_checked_classic<D>(
1275        &mut self,
1276        res: &FueledMergeRes<T>,
1277        metrics: &ColumnarMetrics,
1278    ) -> ApplyMergeResult
1279    where
1280        D: Monoid + Codec64 + PartialEq + Debug,
1281    {
1282        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1283        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1284        // since must be in advance of the merge res since.
1285        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1286            return ApplyMergeResult::NotAppliedInvalidSince;
1287        }
1288
1289        let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1290
1291        // If our merge result exactly matches a spine batch, we can swap it in directly
1292        let exact_match = res.output.desc.lower() == self.desc().lower()
1293            && res.output.desc.upper() == self.desc().upper();
1294        if exact_match {
1295            let old_diffs_sum = Self::diffs_sum::<D>(
1296                self.parts.iter().flat_map(|p| p.batch.parts.iter()),
1297                metrics,
1298            );
1299
1300            if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1301                assert_eq!(
1302                    old_diffs_sum, new_diffs_sum,
1303                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1304                    new_diffs_sum, old_diffs_sum
1305                );
1306            }
1307
1308            // Spine internally has an invariant about a batch being at some level
1309            // or higher based on the len. We could end up violating this invariant
1310            // if we increased the length of the batch.
1311            //
1312            // A res output with length greater than the existing spine batch implies
1313            // a compaction has already been applied to this range, and with a higher
1314            // rate of consolidation than this one. This could happen as a result of
1315            // compaction's memory bound limiting the amount of consolidation possible.
1316            if res.output.len > self.len() {
1317                return ApplyMergeResult::NotAppliedTooManyUpdates;
1318            }
1319            *self = SpineBatch::merged(IdHollowBatch {
1320                id: self.id(),
1321                batch: Arc::new(res.output.clone()),
1322            });
1323            return ApplyMergeResult::AppliedExact;
1324        }
1325
1326        // Try subset replacement
1327        if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1328            let old_diffs_sum = Self::diffs_sum::<D>(
1329                self.parts[range.clone()]
1330                    .iter()
1331                    .flat_map(|p| p.batch.parts.iter()),
1332                metrics,
1333            );
1334
1335            if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1336                assert_eq!(
1337                    old_diffs_sum, new_diffs_sum,
1338                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1339                    new_diffs_sum, old_diffs_sum
1340                );
1341            }
1342
1343            self.perform_subset_replacement(
1344                &res.output,
1345                id,
1346                range,
1347                res.new_active_compaction.clone(),
1348            )
1349        } else {
1350            ApplyMergeResult::NotAppliedNoMatch
1351        }
1352    }
1353
1354    /// This is the even more legacy way of replacing a spine batch with a merge result.
1355    /// It is used in moments when we don't have the full compaction input
1356    /// information, and we don't have the diffs sum.
1357    /// Eventually we should strive to roundtrip Spine IDs and diffs sums everywhere and
1358    /// deprecate this method.
1359    fn maybe_replace_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
1360        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1361        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1362        // since must be in advance of the merge res since.
1363        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1364            return ApplyMergeResult::NotAppliedInvalidSince;
1365        }
1366
1367        // If our merge result exactly matches a spine batch, we can swap it in directly
1368        let exact_match = res.output.desc.lower() == self.desc().lower()
1369            && res.output.desc.upper() == self.desc().upper();
1370        if exact_match {
1371            // Spine internally has an invariant about a batch being at some level
1372            // or higher based on the len. We could end up violating this invariant
1373            // if we increased the length of the batch.
1374            //
1375            // A res output with length greater than the existing spine batch implies
1376            // a compaction has already been applied to this range, and with a higher
1377            // rate of consolidation than this one. This could happen as a result of
1378            // compaction's memory bound limiting the amount of consolidation possible.
1379            if res.output.len > self.len() {
1380                return ApplyMergeResult::NotAppliedTooManyUpdates;
1381            }
1382
1383            *self = SpineBatch::merged(IdHollowBatch {
1384                id: self.id(),
1385                batch: Arc::new(res.output.clone()),
1386            });
1387            return ApplyMergeResult::AppliedExact;
1388        }
1389
1390        // Try subset replacement
1391        if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1392            self.perform_subset_replacement(
1393                &res.output,
1394                id,
1395                range,
1396                res.new_active_compaction.clone(),
1397            )
1398        } else {
1399            ApplyMergeResult::NotAppliedNoMatch
1400        }
1401    }
1402
1403    /// Find the range of parts that can be replaced by the merge result
1404    fn find_replacement_range(&self, desc: &Description<T>) -> Option<(SpineId, Range<usize>)> {
1405        // It is possible the structure of the spine has changed since the merge res
1406        // was created, such that it no longer exactly matches the description of a
1407        // spine batch. This can happen if another merge has happened in the interim,
1408        // or if spine needed to be rebuilt from state.
1409        //
1410        // When this occurs, we can still attempt to slot the merge res in to replace
1411        // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts
1412        // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res.
1413
1414        let mut lower = None;
1415        let mut upper = None;
1416
1417        for (i, batch) in self.parts.iter().enumerate() {
1418            if batch.batch.desc.lower() == desc.lower() {
1419                lower = Some((i, batch.id.0));
1420            }
1421            if batch.batch.desc.upper() == desc.upper() {
1422                upper = Some((i, batch.id.1));
1423            }
1424            if lower.is_some() && upper.is_some() {
1425                break;
1426            }
1427        }
1428
1429        match (lower, upper) {
1430            (Some((lower_idx, id_lower)), Some((upper_idx, id_upper))) => {
1431                Some((SpineId(id_lower, id_upper), lower_idx..(upper_idx + 1)))
1432            }
1433            _ => None,
1434        }
1435    }
1436
1437    /// Perform the actual subset replacement
1438    fn perform_subset_replacement(
1439        &mut self,
1440        res: &HollowBatch<T>,
1441        spine_id: SpineId,
1442        range: Range<usize>,
1443        new_active_compaction: Option<ActiveCompaction>,
1444    ) -> ApplyMergeResult {
1445        let SpineBatch {
1446            id,
1447            parts,
1448            desc,
1449            active_compaction: _,
1450            len: _,
1451        } = self;
1452
1453        let mut new_parts = vec![];
1454        new_parts.extend_from_slice(&parts[..range.start]);
1455        new_parts.push(IdHollowBatch {
1456            id: spine_id,
1457            batch: Arc::new(res.clone()),
1458        });
1459        new_parts.extend_from_slice(&parts[range.end..]);
1460
1461        let res = if range.len() == parts.len() {
1462            ApplyMergeResult::AppliedExact
1463        } else {
1464            ApplyMergeResult::AppliedSubset
1465        };
1466
1467        let new_spine_batch = SpineBatch {
1468            id: *id,
1469            desc: desc.to_owned(),
1470            len: new_parts.iter().map(|x| x.batch.len).sum(),
1471            parts: new_parts,
1472            active_compaction: new_active_compaction,
1473        };
1474
1475        if new_spine_batch.len() > self.len() {
1476            return ApplyMergeResult::NotAppliedTooManyUpdates;
1477        }
1478
1479        *self = new_spine_batch;
1480        res
1481    }
1482}
1483
1484#[derive(Debug, Clone, PartialEq, Serialize)]
1485pub struct FuelingMerge<T> {
1486    pub(crate) since: Antichain<T>,
1487    pub(crate) remaining_work: usize,
1488}
1489
1490#[derive(Debug, Clone, PartialEq, Serialize)]
1491pub struct IdFuelingMerge<T> {
1492    id: SpineId,
1493    merge: FuelingMerge<T>,
1494}
1495
1496impl<T: Timestamp + Lattice> FuelingMerge<T> {
1497    /// Perform some amount of work, decrementing `fuel`.
1498    ///
1499    /// If `fuel` is non-zero after the call, the merging is complete and one
1500    /// should call `done` to extract the merged results.
1501    // TODO(benesch): rewrite to avoid usage of `as`.
1502    #[allow(clippy::as_conversions)]
1503    fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
1504        let used = std::cmp::min(*fuel as usize, self.remaining_work);
1505        self.remaining_work = self.remaining_work.saturating_sub(used);
1506        *fuel -= used as isize;
1507    }
1508
1509    /// Extracts merged results.
1510    ///
1511    /// This method should only be called after `work` has been called and has
1512    /// not brought `fuel` to zero. Otherwise, the merge is still in progress.
1513    fn done(
1514        self,
1515        bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1516        log: &mut SpineLog<'_, T>,
1517    ) -> Option<SpineBatch<T>> {
1518        let first = bs.first()?;
1519        let last = bs.last()?;
1520        let id = SpineId(first.id().0, last.id().1);
1521        assert!(id.0 < id.1);
1522        let lower = first.desc().lower().clone();
1523        let upper = last.desc().upper().clone();
1524        let since = self.since;
1525
1526        // Special case empty batches.
1527        if bs.iter().all(SpineBatch::is_empty) {
1528            return Some(SpineBatch::empty(id, lower, upper, since));
1529        }
1530
1531        let desc = Description::new(lower, upper, since);
1532        let len = bs.iter().map(SpineBatch::len).sum();
1533
1534        // Pre-size the merged_parts Vec. Benchmarking has shown that, at least
1535        // in the worst case, the double iteration is absolutely worth having
1536        // merged_parts pre-sized.
1537        let mut merged_parts_len = 0;
1538        for b in &bs {
1539            merged_parts_len += b.parts.len();
1540        }
1541        let mut merged_parts = Vec::with_capacity(merged_parts_len);
1542        for b in bs {
1543            merged_parts.extend(b.parts)
1544        }
1545        // Sanity check the pre-size code.
1546        debug_assert_eq!(merged_parts.len(), merged_parts_len);
1547
1548        if let SpineLog::Enabled { merge_reqs } = log {
1549            merge_reqs.push(FueledMergeReq {
1550                id,
1551                desc: desc.clone(),
1552                inputs: merged_parts.clone(),
1553            });
1554        }
1555
1556        Some(SpineBatch {
1557            id,
1558            desc,
1559            len,
1560            parts: merged_parts,
1561            active_compaction: None,
1562        })
1563    }
1564}
1565
1566/// The maximum number of batches per level in the spine.
1567/// In practice, we probably want a larger max and a configurable soft cap, but using a
1568/// stack-friendly data structure and keeping this number low makes this safer during the
1569/// initial rollout.
1570const BATCHES_PER_LEVEL: usize = 2;
1571
1572/// An append-only collection of update batches.
1573///
1574/// The `Spine` is a general-purpose trace implementation based on collection
1575/// and merging immutable batches of updates. It is generic with respect to the
1576/// batch type, and can be instantiated for any implementor of `trace::Batch`.
1577///
1578/// ## Design
1579///
1580/// This spine is represented as a list of layers, where each element in the
1581/// list is either
1582///
1583///   1. MergeState::Vacant  empty
1584///   2. MergeState::Single  a single batch
1585///   3. MergeState::Double  a pair of batches
1586///
1587/// Each "batch" has the option to be `None`, indicating a non-batch that
1588/// nonetheless acts as a number of updates proportionate to the level at which
1589/// it exists (for bookkeeping).
1590///
1591/// Each of the batches at layer i contains at most 2^i elements. The sequence
1592/// of batches should have the upper bound of one match the lower bound of the
1593/// next. Batches may be logically empty, with matching upper and lower bounds,
1594/// as a bookkeeping mechanism.
1595///
1596/// Each batch at layer i is treated as if it contains exactly 2^i elements,
1597/// even though it may actually contain fewer elements. This allows us to
1598/// decouple the physical representation from logical amounts of effort invested
1599/// in each batch. It allows us to begin compaction and to reduce the number of
1600/// updates, without compromising our ability to continue to move updates along
1601/// the spine. We are explicitly making the trade-off that while some batches
1602/// might compact at lower levels, we want to treat them as if they contained
1603/// their full set of updates for accounting reasons (to apply work to higher
1604/// levels).
1605///
1606/// We maintain the invariant that for any in-progress merge at level k there
1607/// should be fewer than 2^k records at levels lower than k. That is, even if we
1608/// were to apply an unbounded amount of effort to those records, we would not
1609/// have enough records to prompt a merge into the in-progress merge. Ideally,
1610/// we maintain the extended invariant that for any in-progress merge at level
1611/// k, the remaining effort required (number of records minus applied effort) is
1612/// less than the number of records that would need to be added to reach 2^k
1613/// records in layers below.
1614///
1615/// ## Mathematics
1616///
1617/// When a merge is initiated, there should be a non-negative *deficit* of
1618/// updates before the layers below could plausibly produce a new batch for the
1619/// currently merging layer. We must determine a factor of proportionality, so
1620/// that newly arrived updates provide at least that amount of "fuel" towards
1621/// the merging layer, so that the merge completes before lower levels invade.
1622///
1623/// ### Deficit:
1624///
1625/// A new merge is initiated only in response to the completion of a prior
1626/// merge, or the introduction of new records from outside. The latter case is
1627/// special, and will maintain our invariant trivially, so we will focus on the
1628/// former case.
1629///
1630/// When a merge at level k completes, assuming we have maintained our invariant
1631/// then there should be fewer than 2^k records at lower levels. The newly
1632/// created merge at level k+1 will require up to 2^k+2 units of work, and
1633/// should not expect a new batch until strictly more than 2^k records are
1634/// added. This means that a factor of proportionality of four should be
1635/// sufficient to ensure that the merge completes before a new merge is
1636/// initiated.
1637///
1638/// When new records get introduced, we will need to roll up any batches at
1639/// lower levels, which we treat as the introduction of records. Each of these
1640/// virtual records introduced should either be accounted for the fuel it should
1641/// contribute, as it results in the promotion of batches closer to in-progress
1642/// merges.
1643///
1644/// ### Fuel sharing
1645///
1646/// We like the idea of applying fuel preferentially to merges at *lower*
1647/// levels, under the idea that they are easier to complete, and we benefit from
1648/// fewer total merges in progress. This does delay the completion of merges at
1649/// higher levels, and may not obviously be a total win. If we choose to do
1650/// this, we should make sure that we correctly account for completed merges at
1651/// low layers: they should still extract fuel from new updates even though they
1652/// have completed, at least until they have paid back any "debt" to higher
1653/// layers by continuing to provide fuel as updates arrive.
1654#[derive(Debug, Clone)]
1655struct Spine<T> {
1656    effort: usize,
1657    next_id: usize,
1658    since: Antichain<T>,
1659    upper: Antichain<T>,
1660    merging: Vec<MergeState<T>>,
1661}
1662
1663impl<T> Spine<T> {
1664    /// All batches in the spine, oldest to newest.
1665    pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1666        self.merging.iter().rev().flat_map(|m| &m.batches)
1667    }
1668
1669    /// All (mutable) batches in the spine, oldest to newest.
1670    pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1671        self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1672    }
1673}
1674
1675impl<T: Timestamp + Lattice> Spine<T> {
1676    /// Allocates a fueled `Spine`.
1677    ///
1678    /// This trace will merge batches progressively, with each inserted batch
1679    /// applying a multiple of the batch's length in effort to each merge. The
1680    /// `effort` parameter is that multiplier. This value should be at least one
1681    /// for the merging to happen; a value of zero is not helpful.
1682    pub fn new() -> Self {
1683        Spine {
1684            effort: 1,
1685            next_id: 0,
1686            since: Antichain::from_elem(T::minimum()),
1687            upper: Antichain::from_elem(T::minimum()),
1688            merging: Vec::new(),
1689        }
1690    }
1691
1692    /// Apply some amount of effort to trace maintenance.
1693    ///
1694    /// The units of effort are updates, and the method should be thought of as
1695    /// analogous to inserting as many empty updates, where the trace is
1696    /// permitted to perform proportionate work.
1697    ///
1698    /// Returns true if this did work and false if it left the spine unchanged.
1699    fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1700        self.tidy_layers();
1701        if self.reduced() {
1702            return false;
1703        }
1704
1705        if self.merging.iter().any(|b| b.merge.is_some()) {
1706            let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1707            // If any merges exist, we can directly call `apply_fuel`.
1708            self.apply_fuel(&fuel, log);
1709        } else {
1710            // Otherwise, we'll need to introduce fake updates to move merges
1711            // along.
1712
1713            // Introduce an empty batch with roughly *effort number of virtual updates.
1714            let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1715            let id = self.next_id();
1716            self.introduce_batch(
1717                SpineBatch::empty(
1718                    id,
1719                    self.upper.clone(),
1720                    self.upper.clone(),
1721                    self.since.clone(),
1722                ),
1723                level,
1724                log,
1725            );
1726        }
1727        true
1728    }
1729
1730    pub fn next_id(&mut self) -> SpineId {
1731        let id = self.next_id;
1732        self.next_id += 1;
1733        SpineId(id, self.next_id)
1734    }
1735
1736    // Ideally, this method acts as insertion of `batch`, even if we are not yet
1737    // able to begin merging the batch. This means it is a good time to perform
1738    // amortized work proportional to the size of batch.
1739    pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1740        assert!(batch.desc.lower() != batch.desc.upper());
1741        assert_eq!(batch.desc.lower(), &self.upper);
1742
1743        let id = self.next_id();
1744        let batch = SpineBatch::merged(IdHollowBatch {
1745            id,
1746            batch: Arc::new(batch),
1747        });
1748
1749        self.upper.clone_from(batch.upper());
1750
1751        // If `batch` and the most recently inserted batch are both empty,
1752        // we can just fuse them.
1753        if batch.is_empty() {
1754            if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1755                if self.merging[position].is_single() && self.merging[position].is_empty() {
1756                    self.insert_at(batch, position);
1757                    // Since we just inserted a batch, we should always have work to complete...
1758                    // but otherwise we just leave this layer vacant.
1759                    if let Some(merged) = self.complete_at(position, log) {
1760                        self.merging[position] = MergeState::single(merged);
1761                    }
1762                    return;
1763                }
1764            }
1765        }
1766
1767        // Normal insertion for the batch.
1768        let index = batch.len().next_power_of_two();
1769        self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1770    }
1771
1772    /// Returns true when the trace is considered *structurally reduced*.
1773    ///
1774    /// Reduced == the total number of runs (across every
1775    /// `SpineBatch` and all of their inner hollow batches) is < 2. In other
1776    /// words, there are either zero runs (fully empty) or exactly one logical
1777    /// run of data remaining.
1778    fn reduced(&self) -> bool {
1779        self.spine_batches()
1780            .map(|b| {
1781                b.parts
1782                    .iter()
1783                    .map(|p| p.batch.run_meta.len())
1784                    .sum::<usize>()
1785            })
1786            .sum::<usize>()
1787            < 2
1788    }
1789
1790    /// Describes the merge progress of layers in the trace.
1791    ///
1792    /// Intended for diagnostics rather than public consumption.
1793    #[allow(dead_code)]
1794    fn describe(&self) -> Vec<(usize, usize)> {
1795        self.merging
1796            .iter()
1797            .map(|b| (b.batches.len(), b.len()))
1798            .collect()
1799    }
1800
1801    /// Introduces a batch at an indicated level.
1802    ///
1803    /// The level indication is often related to the size of the batch, but it
1804    /// can also be used to artificially fuel the computation by supplying empty
1805    /// batches at non-trivial indices, to move merges along.
1806    fn introduce_batch(
1807        &mut self,
1808        batch: SpineBatch<T>,
1809        batch_index: usize,
1810        log: &mut SpineLog<'_, T>,
1811    ) {
1812        // Step 0.  Determine an amount of fuel to use for the computation.
1813        //
1814        //          Fuel is used to drive maintenance of the data structure,
1815        //          and in particular are used to make progress through merges
1816        //          that are in progress. The amount of fuel to use should be
1817        //          proportional to the number of records introduced, so that
1818        //          we are guaranteed to complete all merges before they are
1819        //          required as arguments to merges again.
1820        //
1821        //          The fuel use policy is negotiable, in that we might aim
1822        //          to use relatively less when we can, so that we return
1823        //          control promptly, or we might account more work to larger
1824        //          batches. Not clear to me which are best, of if there
1825        //          should be a configuration knob controlling this.
1826
1827        // The amount of fuel to use is proportional to 2^batch_index, scaled by
1828        // a factor of self.effort which determines how eager we are in
1829        // performing maintenance work. We need to ensure that each merge in
1830        // progress receives fuel for each introduced batch, and so multiply by
1831        // that as well.
1832        if batch_index > 32 {
1833            println!("Large batch index: {}", batch_index);
1834        }
1835
1836        // We believe that eight units of fuel is sufficient for each introduced
1837        // record, accounted as four for each record, and a potential four more
1838        // for each virtual record associated with promoting existing smaller
1839        // batches. We could try and make this be less, or be scaled to merges
1840        // based on their deficit at time of instantiation. For now, we remain
1841        // conservative.
1842        let mut fuel = 8 << batch_index;
1843        // Scale up by the effort parameter, which is calibrated to one as the
1844        // minimum amount of effort.
1845        fuel *= self.effort;
1846        // Convert to an `isize` so we can observe any fuel shortfall.
1847        // TODO(benesch): avoid dangerous usage of `as`.
1848        #[allow(clippy::as_conversions)]
1849        let fuel = fuel as isize;
1850
1851        // Step 1.  Apply fuel to each in-progress merge.
1852        //
1853        //          Before we can introduce new updates, we must apply any
1854        //          fuel to in-progress merges, as this fuel is what ensures
1855        //          that the merges will be complete by the time we insert
1856        //          the updates.
1857        self.apply_fuel(&fuel, log);
1858
1859        // Step 2.  We must ensure the invariant that adjacent layers do not
1860        //          contain two batches will be satisfied when we insert the
1861        //          batch. We forcibly completing all merges at layers lower
1862        //          than and including `batch_index`, so that the new batch is
1863        //          inserted into an empty layer.
1864        //
1865        //          We could relax this to "strictly less than `batch_index`"
1866        //          if the layer above has only a single batch in it, which
1867        //          seems not implausible if it has been the focus of effort.
1868        //
1869        //          This should be interpreted as the introduction of some
1870        //          volume of fake updates, and we will need to fuel merges
1871        //          by a proportional amount to ensure that they are not
1872        //          surprised later on. The number of fake updates should
1873        //          correspond to the deficit for the layer, which perhaps
1874        //          we should track explicitly.
1875        self.roll_up(batch_index, log);
1876
1877        // Step 3. This insertion should be into an empty layer. It is a logical
1878        //         error otherwise, as we may be violating our invariant, from
1879        //         which all wonderment derives.
1880        self.insert_at(batch, batch_index);
1881
1882        // Step 4. Tidy the largest layers.
1883        //
1884        //         It is important that we not tidy only smaller layers,
1885        //         as their ascension is what ensures the merging and
1886        //         eventual compaction of the largest layers.
1887        self.tidy_layers();
1888    }
1889
1890    /// Ensures that an insertion at layer `index` will succeed.
1891    ///
1892    /// This method is subject to the constraint that all existing batches
1893    /// should occur at higher levels, which requires it to "roll up" batches
1894    /// present at lower levels before the method is called. In doing this, we
1895    /// should not introduce more virtual records than 2^index, as that is the
1896    /// amount of excess fuel we have budgeted for completing merges.
1897    fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1898        // Ensure entries sufficient for `index`.
1899        while self.merging.len() <= index {
1900            self.merging.push(MergeState::default());
1901        }
1902
1903        // We only need to roll up if there are non-vacant layers.
1904        if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1905            // Collect and merge all batches at layers up to but not including
1906            // `index`.
1907            let mut merged = None;
1908            for i in 0..index {
1909                if let Some(merged) = merged.take() {
1910                    self.insert_at(merged, i);
1911                }
1912                merged = self.complete_at(i, log);
1913            }
1914
1915            // The merged results should be introduced at level `index`, which
1916            // should be ready to absorb them (possibly creating a new merge at
1917            // the time).
1918            if let Some(merged) = merged {
1919                self.insert_at(merged, index);
1920            }
1921
1922            // If the insertion results in a merge, we should complete it to
1923            // ensure the upcoming insertion at `index` does not panic.
1924            if self.merging[index].is_full() {
1925                let merged = self.complete_at(index, log).expect("double batch");
1926                self.insert_at(merged, index + 1);
1927            }
1928        }
1929    }
1930
1931    /// Applies an amount of fuel to merges in progress.
1932    ///
1933    /// The supplied `fuel` is for each in progress merge, and if we want to
1934    /// spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we
1935    /// could do so in order to maintain fewer batches on average (at the risk
1936    /// of completing merges of large batches later, but tbh probably not much
1937    /// later).
1938    pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1939        // For the moment our strategy is to apply fuel independently to each
1940        // merge in progress, rather than prioritizing small merges. This sounds
1941        // like a great idea, but we need better accounting in place to ensure
1942        // that merges that borrow against later layers but then complete still
1943        // "acquire" fuel to pay back their debts.
1944        for index in 0..self.merging.len() {
1945            // Give each level independent fuel, for now.
1946            let mut fuel = *fuel;
1947            // Pass along various logging stuffs, in case we need to report
1948            // success.
1949            self.merging[index].work(&mut fuel);
1950            // `fuel` could have a deficit at this point, meaning we over-spent
1951            // when we took a merge step. We could ignore this, or maintain the
1952            // deficit and account future fuel against it before spending again.
1953            // It isn't clear why that would be especially helpful to do; we
1954            // might want to avoid overspends at multiple layers in the same
1955            // invocation (to limit latencies), but there is probably a rich
1956            // policy space here.
1957
1958            // If a merge completes, we can immediately merge it in to the next
1959            // level, which is "guaranteed" to be complete at this point, by our
1960            // fueling discipline.
1961            if self.merging[index].is_complete() {
1962                let complete = self.complete_at(index, log).expect("complete batch");
1963                self.insert_at(complete, index + 1);
1964            }
1965        }
1966    }
1967
1968    /// Inserts a batch at a specific location.
1969    ///
1970    /// This is a non-public internal method that can panic if we try and insert
1971    /// into a layer which already contains two batches (and is still in the
1972    /// process of merging).
1973    fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1974        // Ensure the spine is large enough.
1975        while self.merging.len() <= index {
1976            self.merging.push(MergeState::default());
1977        }
1978
1979        // Insert the batch at the location.
1980        let merging = &mut self.merging[index];
1981        merging.push_batch(batch);
1982        if merging.batches.is_full() {
1983            let compaction_frontier = Some(self.since.borrow());
1984            merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1985        }
1986    }
1987
1988    /// Completes and extracts what ever is at layer `index`, leaving this layer vacant.
1989    fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1990        self.merging[index].complete(log)
1991    }
1992
1993    /// Attempts to draw down large layers to size appropriate layers.
1994    fn tidy_layers(&mut self) {
1995        // If the largest layer is complete (not merging), we can attempt to
1996        // draw it down to the next layer. This is permitted if we can maintain
1997        // our invariant that below each merge there are at most half the
1998        // records that would be required to invade the merge.
1999        if !self.merging.is_empty() {
2000            let mut length = self.merging.len();
2001            if self.merging[length - 1].is_single() {
2002                // To move a batch down, we require that it contain few enough
2003                // records that the lower level is appropriate, and that moving
2004                // the batch would not create a merge violating our invariant.
2005                let appropriate_level = usize::cast_from(
2006                    self.merging[length - 1]
2007                        .len()
2008                        .next_power_of_two()
2009                        .trailing_zeros(),
2010                );
2011
2012                // Continue only as far as is appropriate
2013                while appropriate_level < length - 1 {
2014                    let current = &mut self.merging[length - 2];
2015                    if current.is_vacant() {
2016                        // Vacant batches can be absorbed.
2017                        self.merging.remove(length - 2);
2018                        length = self.merging.len();
2019                    } else {
2020                        if !current.is_full() {
2021                            // Single batches may initiate a merge, if sizes are
2022                            // within bounds, but terminate the loop either way.
2023
2024                            // Determine the number of records that might lead
2025                            // to a merge. Importantly, this is not the number
2026                            // of actual records, but the sum of upper bounds
2027                            // based on indices.
2028                            let mut smaller = 0;
2029                            for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
2030                                smaller += batch.batches.len() << index;
2031                            }
2032
2033                            if smaller <= (1 << length) / 8 {
2034                                // Remove the batch under consideration (shifting the deeper batches up a level),
2035                                // then merge in the single batch at the current level.
2036                                let state = self.merging.remove(length - 2);
2037                                assert_eq!(state.batches.len(), 1);
2038                                for batch in state.batches {
2039                                    self.insert_at(batch, length - 2);
2040                                }
2041                            }
2042                        }
2043                        break;
2044                    }
2045                }
2046            }
2047        }
2048    }
2049
2050    /// Checks invariants:
2051    /// - The lowers and uppers of all batches "line up".
2052    /// - The lower of the "minimum" batch is `antichain[T::minimum]`.
2053    /// - The upper of the "maximum" batch is `== self.upper`.
2054    /// - The since of each batch is `less_equal self.since`.
2055    /// - The `SpineIds` all "line up" and cover from `0` to `self.next_id`.
2056    /// - TODO: Verify fuel and level invariants.
2057    fn validate(&self) -> Result<(), String> {
2058        let mut id = SpineId(0, 0);
2059        let mut frontier = Antichain::from_elem(T::minimum());
2060        for x in self.merging.iter().rev() {
2061            if x.is_full() != x.merge.is_some() {
2062                return Err(format!(
2063                    "all (and only) full batches should have fueling merges (full={}, merge={:?})",
2064                    x.is_full(),
2065                    x.merge,
2066                ));
2067            }
2068
2069            if let Some(m) = &x.merge {
2070                if !x.is_full() {
2071                    return Err(format!(
2072                        "merge should only exist for full batches (len={:?}, merge={:?})",
2073                        x.batches.len(),
2074                        m.id,
2075                    ));
2076                }
2077                if x.id() != Some(m.id) {
2078                    return Err(format!(
2079                        "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
2080                        x.id(),
2081                        m.id,
2082                    ));
2083                }
2084            }
2085
2086            // TODO: Anything we can validate about x.merge? It'd
2087            // be nice to assert that it's bigger than the len of the
2088            // two batches, but apply_merge_res might swap those lengths
2089            // out from under us.
2090            for batch in &x.batches {
2091                if batch.id().0 != id.1 {
2092                    return Err(format!(
2093                        "batch id {:?} does not match the previous id {:?}: {:?}",
2094                        batch.id(),
2095                        id,
2096                        self
2097                    ));
2098                }
2099                id = batch.id();
2100                if batch.desc().lower() != &frontier {
2101                    return Err(format!(
2102                        "batch lower {:?} does not match the previous upper {:?}: {:?}",
2103                        batch.desc().lower(),
2104                        frontier,
2105                        self
2106                    ));
2107                }
2108                frontier.clone_from(batch.desc().upper());
2109                if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
2110                    return Err(format!(
2111                        "since of batch {:?} past the spine since {:?}: {:?}",
2112                        batch.desc().since(),
2113                        self.since,
2114                        self
2115                    ));
2116                }
2117            }
2118        }
2119        if self.next_id != id.1 {
2120            return Err(format!(
2121                "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
2122                self.next_id, id, self
2123            ));
2124        }
2125        if self.upper != frontier {
2126            return Err(format!(
2127                "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
2128                self.upper, frontier, self
2129            ));
2130        }
2131        Ok(())
2132    }
2133}
2134
2135/// Describes the state of a layer.
2136///
2137/// A layer can be empty, contain a single batch, or contain a pair of batches
2138/// that are in the process of merging into a batch for the next layer.
2139#[derive(Debug, Clone)]
2140struct MergeState<T> {
2141    batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
2142    merge: Option<IdFuelingMerge<T>>,
2143}
2144
2145impl<T> Default for MergeState<T> {
2146    fn default() -> Self {
2147        Self {
2148            batches: ArrayVec::new(),
2149            merge: None,
2150        }
2151    }
2152}
2153
2154impl<T: Timestamp + Lattice> MergeState<T> {
2155    /// An id that covers all the batches in the given merge state, assuming there are any.
2156    fn id(&self) -> Option<SpineId> {
2157        if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
2158            Some(SpineId(first.id().0, last.id().1))
2159        } else {
2160            None
2161        }
2162    }
2163
2164    /// A new single-batch merge state.
2165    fn single(batch: SpineBatch<T>) -> Self {
2166        let mut state = Self::default();
2167        state.push_batch(batch);
2168        state
2169    }
2170
2171    /// Push a new batch at this level, checking invariants.
2172    fn push_batch(&mut self, batch: SpineBatch<T>) {
2173        if let Some(last) = self.batches.last() {
2174            assert_eq!(last.id().1, batch.id().0);
2175            assert_eq!(last.upper(), batch.lower());
2176        }
2177        assert!(
2178            self.merge.is_none(),
2179            "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
2180            batch.id,
2181            self.batches.len(),
2182        );
2183        self.batches
2184            .try_push(batch)
2185            .expect("Attempted to insert batch into full layer!");
2186    }
2187
2188    /// The number of actual updates contained in the level.
2189    fn len(&self) -> usize {
2190        self.batches.iter().map(SpineBatch::len).sum()
2191    }
2192
2193    /// True if this merge state contains no updates.
2194    fn is_empty(&self) -> bool {
2195        self.batches.iter().all(SpineBatch::is_empty)
2196    }
2197
2198    /// True if this level contains no batches.
2199    fn is_vacant(&self) -> bool {
2200        self.batches.is_empty()
2201    }
2202
2203    /// True only for a single-batch state.
2204    fn is_single(&self) -> bool {
2205        self.batches.len() == 1
2206    }
2207
2208    /// True if this merge cannot hold any more batches.
2209    /// (i.e. for a binary merge tree, true if this layer holds two batches.)
2210    fn is_full(&self) -> bool {
2211        self.batches.is_full()
2212    }
2213
2214    /// Immediately complete any merge.
2215    ///
2216    /// The result is either a batch, if there is a non-trivial batch to return
2217    /// or `None` if there is no meaningful batch to return.
2218    ///
2219    /// There is the additional option of input batches.
2220    fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
2221        let mut this = mem::take(self);
2222        if this.batches.len() <= 1 {
2223            this.batches.pop()
2224        } else {
2225            // Merge the remaining batches, regardless of whether we have a fully fueled merge.
2226            let id_merge = this
2227                .merge
2228                .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
2229            id_merge.merge.done(this.batches, log)
2230        }
2231    }
2232
2233    /// True iff the layer is a complete merge, ready for extraction.
2234    fn is_complete(&self) -> bool {
2235        match &self.merge {
2236            Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
2237            None => false,
2238        }
2239    }
2240
2241    /// Performs a bounded amount of work towards a merge.
2242    fn work(&mut self, fuel: &mut isize) {
2243        // We only perform work for merges in progress.
2244        if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
2245            merge.work(&self.batches[..], fuel)
2246        }
2247    }
2248}
2249
2250#[cfg(test)]
2251pub mod datadriven {
2252    use mz_ore::fmt::FormatBuffer;
2253
2254    use crate::internal::datadriven::DirectiveArgs;
2255
2256    use super::*;
2257
2258    /// Shared state for a single [crate::internal::trace] [datadriven::TestFile].
2259    #[derive(Debug, Default)]
2260    pub struct TraceState {
2261        pub trace: Trace<u64>,
2262        pub merge_reqs: Vec<FueledMergeReq<u64>>,
2263    }
2264
2265    pub fn since_upper(
2266        datadriven: &TraceState,
2267        _args: DirectiveArgs,
2268    ) -> Result<String, anyhow::Error> {
2269        Ok(format!(
2270            "{:?}{:?}\n",
2271            datadriven.trace.since().elements(),
2272            datadriven.trace.upper().elements()
2273        ))
2274    }
2275
2276    pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
2277        let mut s = String::new();
2278        for b in datadriven.trace.spine.spine_batches() {
2279            s.push_str(b.describe(true).as_str());
2280            s.push('\n');
2281        }
2282        Ok(s)
2283    }
2284
2285    pub fn insert(
2286        datadriven: &mut TraceState,
2287        args: DirectiveArgs,
2288    ) -> Result<String, anyhow::Error> {
2289        for x in args
2290            .input
2291            .trim()
2292            .split('\n')
2293            .map(DirectiveArgs::parse_hollow_batch)
2294        {
2295            datadriven
2296                .merge_reqs
2297                .append(&mut datadriven.trace.push_batch(x));
2298        }
2299        Ok("ok\n".to_owned())
2300    }
2301
2302    pub fn downgrade_since(
2303        datadriven: &mut TraceState,
2304        args: DirectiveArgs,
2305    ) -> Result<String, anyhow::Error> {
2306        let since = args.expect("since");
2307        datadriven
2308            .trace
2309            .downgrade_since(&Antichain::from_elem(since));
2310        Ok("ok\n".to_owned())
2311    }
2312
2313    pub fn take_merge_req(
2314        datadriven: &mut TraceState,
2315        _args: DirectiveArgs,
2316    ) -> Result<String, anyhow::Error> {
2317        let mut s = String::new();
2318        for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
2319            write!(
2320                s,
2321                "{:?}{:?}{:?} {}\n",
2322                merge_req.desc.lower().elements(),
2323                merge_req.desc.upper().elements(),
2324                merge_req.desc.since().elements(),
2325                merge_req
2326                    .inputs
2327                    .iter()
2328                    .flat_map(|x| x.batch.parts.iter())
2329                    .map(|x| x.printable_name())
2330                    .collect::<Vec<_>>()
2331                    .join(" ")
2332            );
2333        }
2334        Ok(s)
2335    }
2336
2337    pub fn apply_merge_res(
2338        datadriven: &mut TraceState,
2339        args: DirectiveArgs,
2340    ) -> Result<String, anyhow::Error> {
2341        let res = FueledMergeRes {
2342            output: DirectiveArgs::parse_hollow_batch(args.input),
2343            input: CompactionInput::Legacy,
2344            new_active_compaction: None,
2345        };
2346        match datadriven.trace.apply_merge_res_unchecked(&res) {
2347            ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
2348            ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
2349            ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
2350            ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
2351            ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
2352        }
2353    }
2354}
2355
2356#[cfg(test)]
2357pub(crate) mod tests {
2358    use std::ops::Range;
2359
2360    use proptest::prelude::*;
2361    use semver::Version;
2362
2363    use crate::internal::state::tests::{any_hollow_batch, any_hollow_batch_with_exact_runs};
2364
2365    use super::*;
2366
2367    pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
2368        num_batches: Range<usize>,
2369    ) -> impl Strategy<Value = Trace<T>> {
2370        Strategy::prop_map(
2371            (
2372                any::<Option<T>>(),
2373                proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
2374                any::<bool>(),
2375                any::<u64>(),
2376            ),
2377            |(since, mut batches, roundtrip_structure, timeout_ms)| {
2378                let mut trace = Trace::<T>::default();
2379                trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
2380
2381                // Fix up the arbitrary HollowBatches so the lowers and uppers
2382                // align.
2383                batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
2384                let mut lower = Antichain::from_elem(T::minimum());
2385                for mut batch in batches {
2386                    // Overall trace since has to be past each batch's since.
2387                    if PartialOrder::less_than(trace.since(), batch.desc.since()) {
2388                        trace.downgrade_since(batch.desc.since());
2389                    }
2390                    batch.desc = Description::new(
2391                        lower.clone(),
2392                        batch.desc.upper().clone(),
2393                        batch.desc.since().clone(),
2394                    );
2395                    lower.clone_from(batch.desc.upper());
2396                    let _merge_req = trace.push_batch(batch);
2397                }
2398                let reqs: Vec<_> = trace
2399                    .fueled_merge_reqs_before_ms(timeout_ms, None)
2400                    .collect();
2401                for req in reqs {
2402                    trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
2403                }
2404                trace.roundtrip_structure = roundtrip_structure;
2405                trace
2406            },
2407        )
2408    }
2409
2410    #[mz_ore::test]
2411    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
2412    fn test_roundtrips() {
2413        fn check(trace: Trace<i64>) {
2414            trace.validate().unwrap();
2415            let flat = trace.flatten();
2416            let unflat = Trace::unflatten(flat).unwrap();
2417            assert_eq!(trace, unflat);
2418        }
2419
2420        proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
2421    }
2422
2423    #[mz_ore::test]
2424    fn fueled_merge_reqs() {
2425        let mut trace: Trace<u64> = Trace::default();
2426        let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
2427            0,
2428            10,
2429            &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
2430            1000,
2431        ));
2432
2433        assert!(fueled_reqs.is_empty());
2434        assert_eq!(
2435            trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
2436            0,
2437            "no merge reqs when not filtering by version"
2438        );
2439        assert_eq!(
2440            trace
2441                .fueled_merge_reqs_before_ms(
2442                    u64::MAX,
2443                    Some(WriterKey::for_version(&Version::new(0, 50, 0)))
2444                )
2445                .count(),
2446            0,
2447            "zero batches are older than a past version"
2448        );
2449        assert_eq!(
2450            trace
2451                .fueled_merge_reqs_before_ms(
2452                    u64::MAX,
2453                    Some(WriterKey::for_version(&Version::new(99, 99, 0)))
2454                )
2455                .count(),
2456            1,
2457            "one batch is older than a future version"
2458        );
2459    }
2460
2461    #[mz_ore::test]
2462    fn remove_redundant_merge_reqs() {
2463        fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
2464            FueledMergeReq {
2465                id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
2466                desc: Description::new(
2467                    Antichain::from_elem(lower),
2468                    Antichain::from_elem(upper),
2469                    Antichain::new(),
2470                ),
2471                inputs: vec![],
2472            }
2473        }
2474
2475        // Empty
2476        assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
2477
2478        // Single
2479        assert_eq!(
2480            Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
2481            vec![req(0, 1)]
2482        );
2483
2484        // Duplicate
2485        assert_eq!(
2486            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
2487            vec![req(0, 1)]
2488        );
2489
2490        // Nothing covered
2491        assert_eq!(
2492            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
2493            vec![req(1, 2), req(0, 1)]
2494        );
2495
2496        // Covered
2497        assert_eq!(
2498            Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
2499            vec![req(0, 3)]
2500        );
2501
2502        // Covered, lower equal
2503        assert_eq!(
2504            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
2505            vec![req(0, 3)]
2506        );
2507
2508        // Covered, upper equal
2509        assert_eq!(
2510            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
2511            vec![req(0, 3)]
2512        );
2513
2514        // Covered, unexpected order (doesn't happen in practice)
2515        assert_eq!(
2516            Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
2517            vec![req(0, 3)]
2518        );
2519
2520        // Partially overlapping
2521        assert_eq!(
2522            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
2523            vec![req(1, 3), req(0, 2)]
2524        );
2525
2526        // Partially overlapping, the other order
2527        assert_eq!(
2528            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
2529            vec![req(0, 2), req(1, 3)]
2530        );
2531
2532        // Different sinces (doesn't happen in practice)
2533        let req015 = FueledMergeReq {
2534            id: SpineId(0, 1),
2535            desc: Description::new(
2536                Antichain::from_elem(0),
2537                Antichain::from_elem(1),
2538                Antichain::from_elem(5),
2539            ),
2540            inputs: vec![],
2541        };
2542        assert_eq!(
2543            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2544            vec![req015, req(0, 1)]
2545        );
2546    }
2547
2548    #[mz_ore::test]
2549    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
2550    fn construct_batch_with_runs_replaced_test() {
2551        let batch_strategy = any_hollow_batch::<u64>();
2552        let to_replace_strategy = any_hollow_batch_with_exact_runs::<u64>(1);
2553
2554        let combined_strategy = (batch_strategy, to_replace_strategy)
2555            .prop_filter("non-empty batch", |(batch, _)| batch.run_meta.len() >= 1);
2556
2557        let final_strategy = combined_strategy.prop_flat_map(|(batch, to_replace)| {
2558            let batch_len = batch.run_meta.len();
2559            let batch_clone = batch.clone();
2560            let to_replace_clone = to_replace.clone();
2561
2562            proptest::collection::vec(any::<bool>(), batch_len)
2563                .prop_filter("at least one run selected", |mask| mask.iter().any(|&x| x))
2564                .prop_map(move |mask| {
2565                    let indices: Vec<usize> = mask
2566                        .iter()
2567                        .enumerate()
2568                        .filter_map(|(i, &selected)| if selected { Some(i) } else { None })
2569                        .collect();
2570                    (batch_clone.clone(), to_replace_clone.clone(), indices)
2571                })
2572        });
2573
2574        proptest!(|(
2575            (batch, to_replace, runs) in final_strategy
2576        )| {
2577            let original_run_ids: Vec<_> = batch.run_meta.iter().map(|x|
2578                x.id.unwrap().clone()
2579            ).collect();
2580
2581            let run_ids = runs.iter().map(|&i| original_run_ids[i].clone()).collect::<Vec<_>>();
2582
2583            let new_batch = SpineBatch::construct_batch_with_runs_replaced(
2584                &batch,
2585                &run_ids,
2586                &to_replace,
2587            ).unwrap();
2588
2589            let expected_len = batch.run_meta.len() - runs.len()
2590                + to_replace.run_meta.len();
2591            prop_assert!(new_batch.run_meta.len() == expected_len);
2592        });
2593    }
2594
2595    #[mz_ore::test]
2596    fn test_perform_subset_replacement() {
2597        let batch1 = crate::internal::state::tests::hollow::<u64>(0, 10, &["a"], 10);
2598        let batch2 = crate::internal::state::tests::hollow::<u64>(10, 20, &["b"], 10);
2599        let batch3 = crate::internal::state::tests::hollow::<u64>(20, 30, &["c"], 10);
2600
2601        let id_batch1 = IdHollowBatch {
2602            id: SpineId(0, 1),
2603            batch: Arc::new(batch1.clone()),
2604        };
2605        let id_batch2 = IdHollowBatch {
2606            id: SpineId(1, 2),
2607            batch: Arc::new(batch2.clone()),
2608        };
2609        let id_batch3 = IdHollowBatch {
2610            id: SpineId(2, 3),
2611            batch: Arc::new(batch3.clone()),
2612        };
2613
2614        let spine_batch = SpineBatch {
2615            id: SpineId(0, 3),
2616            desc: Description::new(
2617                Antichain::from_elem(0),
2618                Antichain::from_elem(30),
2619                Antichain::from_elem(0),
2620            ),
2621            parts: vec![id_batch1, id_batch2, id_batch3],
2622            active_compaction: None,
2623            len: 30,
2624        };
2625
2626        let res_exact = crate::internal::state::tests::hollow::<u64>(0, 30, &["d"], 30);
2627        let mut sb_exact = spine_batch.clone();
2628        let result = sb_exact.perform_subset_replacement(&res_exact, SpineId(0, 3), 0..3, None);
2629        assert!(matches!(result, ApplyMergeResult::AppliedExact));
2630        assert_eq!(sb_exact.parts.len(), 1);
2631        assert_eq!(sb_exact.len(), 30);
2632
2633        let res_subset = crate::internal::state::tests::hollow::<u64>(0, 20, &["e"], 20);
2634        let mut sb_subset = spine_batch.clone();
2635        let result = sb_subset.perform_subset_replacement(&res_subset, SpineId(0, 2), 0..2, None);
2636        assert!(matches!(result, ApplyMergeResult::AppliedSubset));
2637        assert_eq!(sb_subset.parts.len(), 2); // One new part + one old part
2638        assert_eq!(sb_subset.len(), 30);
2639
2640        let res_too_big = crate::internal::state::tests::hollow::<u64>(0, 30, &["f"], 31);
2641        let mut sb_too_big = spine_batch.clone();
2642        let result = sb_too_big.perform_subset_replacement(&res_too_big, SpineId(0, 3), 0..3, None);
2643        assert!(matches!(result, ApplyMergeResult::NotAppliedTooManyUpdates));
2644        assert_eq!(sb_too_big.parts.len(), 3);
2645        assert_eq!(sb_too_big.len(), 30);
2646    }
2647}