mz_persist_client/internal/
trace.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An append-only collection of compactable update batches. The Spine below is
11//! a fork of Differential Dataflow's [Spine] with minimal modifications. The
12//! original Spine code is designed for incremental (via "fuel"ing) synchronous
13//! merge of in-memory batches. Persist doesn't want compaction to block
14//! incoming writes and, in fact, may in the future elect to push the work of
15//! compaction onto another machine entirely via RPC. As a result, we abuse the
16//! Spine code as follows:
17//!
18//! [Spine]: differential_dataflow::trace::implementations::spine_fueled::Spine
19//!
20//! - The normal Spine works in terms of [Batch] impls. A `Batch` is added to
21//!   the Spine. As progress is made, the Spine will merge two batches together
22//!   by: constructing a [Batch::Merger], giving it bits of fuel to
23//!   incrementally perform the merge (which spreads out the work, keeping
24//!   latencies even), and then once it's done fueling extracting the new single
25//!   output `Batch` and discarding the inputs.
26//! - Persist instead represents a batch of blob data with a [HollowBatch]
27//!   pointer which contains the normal `Batch` metadata plus the keys necessary
28//!   to retrieve the updates.
29//! - [SpineBatch] wraps `HollowBatch` and has a [FuelingMerge] companion
30//!   (analogous to `Batch::Merger`) that allows us to represent a merge as it
31//!   is fueling. Normally, this would represent real incremental compaction
32//!   progress, but in persist, it's simply a bookkeeping mechanism. Once fully
33//!   fueled, the `FuelingMerge` is turned into a fueled [SpineBatch],
34//!   which to the Spine is indistinguishable from a merged batch. At this
35//!   point, it is eligible for asynchronous compaction and a `FueledMergeReq`
36//!   is generated.
37//! - At any later point, this request may be answered via
38//!   [Trace::apply_merge_res_checked] or [Trace::apply_merge_res_unchecked].
39//!   This internally replaces the`SpineBatch`, which has no
40//!   effect on the structure of `Spine` but replaces the metadata
41//!   in persist's state to point at the new batch.
42//! - `SpineBatch` is explictly allowed to accumulate a list of `HollowBatch`s.
43//!   This decouples compaction from Spine progress and also allows us to reduce
44//!   write amplification by merging `N` batches at once where `N` can be
45//!   greater than 2.
46//!
47//! [Batch]: differential_dataflow::trace::Batch
48//! [Batch::Merger]: differential_dataflow::trace::Batch::Merger
49
50use arrayvec::ArrayVec;
51use differential_dataflow::difference::Semigroup;
52use mz_persist::metrics::ColumnarMetrics;
53use mz_persist_types::Codec64;
54use std::cmp::Ordering;
55use std::collections::{BTreeMap, BTreeSet};
56use std::fmt::Debug;
57use std::mem;
58use std::ops::Range;
59use std::sync::Arc;
60use tracing::error;
61
62use crate::internal::paths::WriterKey;
63use differential_dataflow::lattice::Lattice;
64use differential_dataflow::trace::Description;
65use mz_ore::cast::CastFrom;
66#[allow(unused_imports)] // False positive.
67use mz_ore::fmt::FormatBuffer;
68use serde::{Serialize, Serializer};
69use timely::PartialOrder;
70use timely::progress::frontier::AntichainRef;
71use timely::progress::{Antichain, Timestamp};
72
73use crate::internal::state::{HollowBatch, RunId};
74
75use super::state::RunPart;
76
77#[derive(Debug, Clone, PartialEq)]
78pub struct FueledMergeReq<T> {
79    pub id: SpineId,
80    pub desc: Description<T>,
81    pub inputs: Vec<IdHollowBatch<T>>,
82}
83
84#[derive(Debug)]
85pub struct FueledMergeRes<T> {
86    pub output: HollowBatch<T>,
87    pub input: CompactionInput,
88    pub new_active_compaction: Option<ActiveCompaction>,
89}
90
91/// An append-only collection of compactable update batches.
92///
93/// In an effort to keep our fork of Spine as close as possible to the original,
94/// we push as many changes as possible into this wrapper.
95#[derive(Debug, Clone)]
96pub struct Trace<T> {
97    spine: Spine<T>,
98    pub(crate) roundtrip_structure: bool,
99}
100
101#[cfg(any(test, debug_assertions))]
102impl<T: PartialEq> PartialEq for Trace<T> {
103    fn eq(&self, other: &Self) -> bool {
104        // Deconstruct self and other so we get a compile failure if new fields
105        // are added.
106        let Trace {
107            spine: _,
108            roundtrip_structure: _,
109        } = self;
110        let Trace {
111            spine: _,
112            roundtrip_structure: _,
113        } = other;
114
115        // Intentionally use HollowBatches for this comparison so we ignore
116        // differences in spine layers.
117        self.batches().eq(other.batches())
118    }
119}
120
121impl<T: Timestamp + Lattice> Default for Trace<T> {
122    fn default() -> Self {
123        Self {
124            spine: Spine::new(),
125            roundtrip_structure: true,
126        }
127    }
128}
129
130#[derive(Clone, Debug, Serialize)]
131pub struct ThinSpineBatch<T> {
132    pub(crate) level: usize,
133    pub(crate) desc: Description<T>,
134    pub(crate) parts: Vec<SpineId>,
135    /// NB: this exists to validate legacy batch bounds during the migration;
136    /// it can be deleted once the roundtrip_structure flag is permanently rolled out.
137    pub(crate) descs: Vec<Description<T>>,
138}
139
140impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
141    fn eq(&self, other: &Self) -> bool {
142        // Ignore the temporary descs vector when comparing for equality.
143        (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
144    }
145}
146
147#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
148pub struct ThinMerge<T> {
149    pub(crate) since: Antichain<T>,
150    pub(crate) remaining_work: usize,
151    pub(crate) active_compaction: Option<ActiveCompaction>,
152}
153
154impl<T: Clone> ThinMerge<T> {
155    fn fueling(merge: &FuelingMerge<T>) -> Self {
156        ThinMerge {
157            since: merge.since.clone(),
158            remaining_work: merge.remaining_work,
159            active_compaction: None,
160        }
161    }
162
163    fn fueled(batch: &SpineBatch<T>) -> Self {
164        ThinMerge {
165            since: batch.desc.since().clone(),
166            remaining_work: 0,
167            active_compaction: batch.active_compaction.clone(),
168        }
169    }
170}
171
172/// This is a "flattened" representation of a Trace. Goals:
173/// - small updates to the trace should result in small differences in the `FlatTrace`;
174/// - two `FlatTrace`s should be efficient to diff;
175/// - converting to and from a `Trace` should be relatively straightforward.
176///
177/// These goals are all somewhat in tension, and the space of possible representations is pretty
178/// large. See individual fields for comments on some of the tradeoffs.
179#[derive(Clone, Debug)]
180pub struct FlatTrace<T> {
181    pub(crate) since: Antichain<T>,
182    /// Hollow batches without an associated ID. If this flattened trace contains spine batches,
183    /// we can figure out which legacy batch belongs in which spine batch by comparing the `desc`s.
184    /// Previously, we serialized a trace as just this list of batches. Keeping this data around
185    /// helps ensure backwards compatibility. In the near future, we may still keep some batches
186    /// here to help minimize the size of diffs -- rewriting all the hollow batches in a shard
187    /// can be prohibitively expensive. Eventually, we'd like to remove this in favour of the
188    /// collection below.
189    pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
190    /// Hollow batches _with_ an associated ID. Spine batches can reference these hollow batches
191    /// by id directly.
192    pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
193    /// Spine batches stored by ID. We reference hollow batches by ID, instead of inlining them,
194    /// to make differential updates smaller when two batches merge together. We also store the
195    /// level on the batch, instead of mapping from level to a list of batches... the level of a
196    /// spine batch doesn't change over time, but the list of batches at a particular level does.
197    pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
198    /// In-progress merges. We store this by spine id instead of level to prepare for some possible
199    /// generalizations to spine (merging N of M batches at a level). This is also a natural place
200    /// to store incremental merge progress in the future.
201    pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
202}
203
204impl<T: Timestamp + Lattice> Trace<T> {
205    pub(crate) fn flatten(&self) -> FlatTrace<T> {
206        let since = self.spine.since.clone();
207        let mut legacy_batches = BTreeMap::new();
208        let mut hollow_batches = BTreeMap::new();
209        let mut spine_batches = BTreeMap::new();
210        let mut merges = BTreeMap::new();
211
212        let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
213            let id = batch.id();
214            let desc = batch.desc.clone();
215            let mut parts = Vec::with_capacity(batch.parts.len());
216            let mut descs = Vec::with_capacity(batch.parts.len());
217            for IdHollowBatch { id, batch } in &batch.parts {
218                parts.push(*id);
219                descs.push(batch.desc.clone());
220                // Ideally, we'd like to put all batches in the hollow_batches collection, since
221                // tracking the spine id reduces ambiguity and makes diffing cheaper. However,
222                // we currently keep most batches in the legacy collection for backwards
223                // compatibility.
224                // As an exception, we add batches with empty time ranges to hollow_batches:
225                // they're otherwise not guaranteed to be unique, and since we only started writing
226                // them down recently there's no backwards compatibility risk.
227                if batch.desc.lower() == batch.desc.upper() {
228                    hollow_batches.insert(*id, Arc::clone(batch));
229                } else {
230                    legacy_batches.insert(Arc::clone(batch), ());
231                }
232            }
233
234            let spine_batch = ThinSpineBatch {
235                level,
236                desc,
237                parts,
238                descs,
239            };
240            spine_batches.insert(id, spine_batch);
241        };
242
243        for (level, state) in self.spine.merging.iter().enumerate() {
244            for batch in &state.batches {
245                push_spine_batch(level, batch);
246                if let Some(c) = &batch.active_compaction {
247                    let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
248                    assert!(
249                        previous.is_none(),
250                        "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
251                        batch.id,
252                    )
253                }
254            }
255            if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
256                let previous = merges.insert(*id, ThinMerge::fueling(merge));
257                assert!(
258                    previous.is_none(),
259                    "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
260                )
261            }
262        }
263
264        if !self.roundtrip_structure {
265            assert!(hollow_batches.is_empty());
266            spine_batches.clear();
267            merges.clear();
268        }
269
270        FlatTrace {
271            since,
272            legacy_batches,
273            hollow_batches,
274            spine_batches,
275            merges,
276        }
277    }
278    pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
279        let FlatTrace {
280            since,
281            legacy_batches,
282            mut hollow_batches,
283            spine_batches,
284            mut merges,
285        } = value;
286
287        // If the flattened representation has spine batches (or is empty)
288        // we know to preserve the structure for this trace.
289        let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
290
291        // We need to look up legacy batches somehow, but we don't have a spine id for them.
292        // Instead, we rely on the fact that the spine must store them in antichain order.
293        // Our timestamp type may not be totally ordered, so we need to implement our own comparator
294        // here. Persist's invariants ensure that all the frontiers we're comparing are comparable,
295        // though.
296        let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
297            if PartialOrder::less_than(left, right) {
298                Ordering::Less
299            } else if PartialOrder::less_than(right, left) {
300                Ordering::Greater
301            } else {
302                Ordering::Equal
303            }
304        };
305        let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
306        legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
307
308        let mut pop_batch =
309            |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
310                if let Some(batch) = hollow_batches.remove(&id) {
311                    if let Some(desc) = expected_desc {
312                        assert_eq!(*desc, batch.desc);
313                    }
314                    return Ok(IdHollowBatch { id, batch });
315                }
316                let mut batch = legacy_batches
317                    .pop()
318                    .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
319
320                let Some(expected_desc) = expected_desc else {
321                    return Ok(IdHollowBatch { id, batch });
322                };
323
324                if expected_desc.lower() != batch.desc.lower() {
325                    return Err(format!(
326                        "hollow batch lower {:?} did not match expected lower {:?}",
327                        batch.desc.lower().elements(),
328                        expected_desc.lower().elements()
329                    ));
330                }
331
332                // Empty legacy batches are not deterministic: different nodes may split them up
333                // in different ways. For now, we rearrange them such to match the spine data.
334                if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
335                    let mut new_upper = batch.desc.upper().clone();
336
337                    // While our current batch is too small, and there's another empty batch
338                    // in the list, roll it in.
339                    while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
340                        let Some(next_batch) = legacy_batches.pop() else {
341                            break;
342                        };
343                        if next_batch.is_empty() {
344                            new_upper.clone_from(next_batch.desc.upper());
345                        } else {
346                            legacy_batches.push(next_batch);
347                            break;
348                        }
349                    }
350
351                    // If our current batch is too large, split it by the expected upper
352                    // and preserve the remainder.
353                    if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
354                        legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
355                            expected_desc.upper().clone(),
356                            new_upper.clone(),
357                            batch.desc.since().clone(),
358                        ))));
359                        new_upper.clone_from(expected_desc.upper());
360                    }
361                    batch = Arc::new(HollowBatch::empty(Description::new(
362                        batch.desc.lower().clone(),
363                        new_upper,
364                        batch.desc.since().clone(),
365                    )))
366                }
367
368                if expected_desc.upper() != batch.desc.upper() {
369                    return Err(format!(
370                        "hollow batch upper {:?} did not match expected upper {:?}",
371                        batch.desc.upper().elements(),
372                        expected_desc.upper().elements()
373                    ));
374                }
375
376                Ok(IdHollowBatch { id, batch })
377            };
378
379        let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
380            (batch.desc.upper().clone(), id.1)
381        } else {
382            (Antichain::from_elem(T::minimum()), 0)
383        };
384        let levels = spine_batches
385            .first_key_value()
386            .map(|(_, batch)| batch.level + 1)
387            .unwrap_or(0);
388        let mut merging = vec![MergeState::default(); levels];
389        for (id, batch) in spine_batches {
390            let level = batch.level;
391
392            let parts = batch
393                .parts
394                .into_iter()
395                .zip(batch.descs.iter().map(Some).chain(std::iter::repeat(None)))
396                .map(|(id, desc)| pop_batch(id, desc))
397                .collect::<Result<Vec<_>, _>>()?;
398            let len = parts.iter().map(|p| (*p).batch.len).sum();
399            let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
400            let batch = SpineBatch {
401                id,
402                desc: batch.desc,
403                parts,
404                active_compaction,
405                len,
406            };
407
408            let state = &mut merging[level];
409
410            state.push_batch(batch);
411            if let Some(id) = state.id() {
412                if let Some(merge) = merges.remove(&id) {
413                    state.merge = Some(IdFuelingMerge {
414                        id,
415                        merge: FuelingMerge {
416                            since: merge.since,
417                            remaining_work: merge.remaining_work,
418                        },
419                    })
420                }
421            }
422        }
423
424        let mut trace = Trace {
425            spine: Spine {
426                effort: 1,
427                next_id,
428                since,
429                upper,
430                merging,
431            },
432            roundtrip_structure,
433        };
434
435        fn check_empty(name: &str, len: usize) -> Result<(), String> {
436            if len != 0 {
437                Err(format!("{len} {name} left after reconstructing spine"))
438            } else {
439                Ok(())
440            }
441        }
442
443        if roundtrip_structure {
444            check_empty("legacy batches", legacy_batches.len())?;
445        } else {
446            // If the structure wasn't actually serialized, we may have legacy batches left over.
447            for batch in legacy_batches.into_iter().rev() {
448                trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
449            }
450        }
451        check_empty("hollow batches", hollow_batches.len())?;
452        check_empty("merges", merges.len())?;
453
454        debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
455
456        Ok(trace)
457    }
458}
459
460#[derive(Clone, Debug, Default)]
461pub(crate) struct SpineMetrics {
462    pub compact_batches: u64,
463    pub compacting_batches: u64,
464    pub noncompact_batches: u64,
465}
466
467impl<T> Trace<T> {
468    pub fn since(&self) -> &Antichain<T> {
469        &self.spine.since
470    }
471
472    pub fn upper(&self) -> &Antichain<T> {
473        &self.spine.upper
474    }
475
476    pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
477        for batch in self.batches() {
478            f(batch);
479        }
480    }
481
482    pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
483        self.spine
484            .spine_batches()
485            .flat_map(|b| b.parts.as_slice())
486            .map(|b| &*b.batch)
487    }
488
489    pub fn num_spine_batches(&self) -> usize {
490        self.spine.spine_batches().count()
491    }
492
493    #[cfg(test)]
494    pub fn num_hollow_batches(&self) -> usize {
495        self.batches().count()
496    }
497
498    #[cfg(test)]
499    pub fn num_updates(&self) -> usize {
500        self.batches().map(|b| b.len).sum()
501    }
502}
503
504impl<T: Timestamp + Lattice> Trace<T> {
505    pub fn downgrade_since(&mut self, since: &Antichain<T>) {
506        self.spine.since.clone_from(since);
507    }
508
509    #[must_use]
510    pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
511        let mut merge_reqs = Vec::new();
512        self.spine.insert(
513            batch,
514            &mut SpineLog::Enabled {
515                merge_reqs: &mut merge_reqs,
516            },
517        );
518        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
519        // Spine::roll_up (internally used by insert) clears all batches out of
520        // levels below a target by walking up from level 0 and merging each
521        // level into the next (providing the necessary fuel). In practice, this
522        // means we'll get a series of requests like `(a, b), (a, b, c), ...`.
523        // It's a waste to do all of these (we'll throw away the results), so we
524        // filter out any that are entirely covered by some other request.
525        Self::remove_redundant_merge_reqs(merge_reqs)
526    }
527
528    pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
529        // TODO: we ought to be able to look up the id for a batch by binary searching the levels.
530        // In the meantime, search backwards, since most compactions are for recent batches.
531        for batch in self.spine.spine_batches_mut().rev() {
532            if batch.id == id {
533                batch.active_compaction = Some(compaction);
534                break;
535            }
536        }
537    }
538
539    /// The same as [Self::push_batch] but without the `FueledMergeReq`s, which
540    /// account for a surprising amount of cpu in prod. database-issues#5411
541    pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
542        self.spine.insert(batch, &mut SpineLog::Disabled);
543    }
544
545    /// Apply some amount of effort to trace maintenance.
546    ///
547    /// The units of effort are updates, and the method should be thought of as
548    /// analogous to inserting as many empty updates, where the trace is
549    /// permitted to perform proportionate work.
550    ///
551    /// Returns true if this did work and false if it left the spine unchanged.
552    #[must_use]
553    pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
554        let mut merge_reqs = Vec::new();
555        let did_work = self.spine.exert(
556            fuel,
557            &mut SpineLog::Enabled {
558                merge_reqs: &mut merge_reqs,
559            },
560        );
561        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
562        // See the comment in [Self::push_batch].
563        let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
564        (merge_reqs, did_work)
565    }
566
567    /// Validates invariants.
568    ///
569    /// See `Spine::validate` for details.
570    pub fn validate(&self) -> Result<(), String> {
571        self.spine.validate()
572    }
573
574    /// Obtain all fueled merge reqs that either have no active compaction, or the previous
575    /// compaction was started at or before the threshold time, in order from oldest to newest.
576    pub(crate) fn fueled_merge_reqs_before_ms(
577        &self,
578        threshold_ms: u64,
579        threshold_writer: Option<WriterKey>,
580    ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
581        self.spine
582            .spine_batches()
583            .filter(move |b| {
584                let noncompact = !b.is_compact();
585                let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
586                    b.parts.iter().any(|b| {
587                        b.batch
588                            .parts
589                            .iter()
590                            .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
591                    })
592                });
593                noncompact || old_writer
594            })
595            .filter(move |b| {
596                // Either there's no active compaction, or the last active compaction
597                // is not after the timeout timestamp.
598                b.active_compaction
599                    .as_ref()
600                    .map_or(true, move |c| c.start_ms <= threshold_ms)
601            })
602            .map(|b| FueledMergeReq {
603                id: b.id,
604                desc: b.desc.clone(),
605                inputs: b.parts.clone(),
606            })
607    }
608
609    // This is only called with the results of one `insert` and so the length of
610    // `merge_reqs` is bounded by the number of levels in the spine (or possibly
611    // some small constant multiple?). The number of levels is logarithmic in the
612    // number of updates in the spine, so this number should stay very small. As
613    // a result, we simply use the naive O(n^2) algorithm here instead of doing
614    // anything fancy with e.g. interval trees.
615    fn remove_redundant_merge_reqs(
616        mut merge_reqs: Vec<FueledMergeReq<T>>,
617    ) -> Vec<FueledMergeReq<T>> {
618        // Returns true if b0 covers b1, false otherwise.
619        fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
620            // TODO: can we relax or remove this since check?
621            b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
622        }
623
624        let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
625        // In practice, merge_reqs will come in sorted such that the "large"
626        // requests are later. Take advantage of this by processing back to
627        // front.
628        while let Some(merge_req) = merge_reqs.pop() {
629            let covered = ret.iter().any(|r| covers(r, &merge_req));
630            if !covered {
631                // Now check if anything we've already staged is covered by this
632                // new req. In practice, the merge_reqs come in sorted and so
633                // this `retain` is a no-op.
634                ret.retain(|r| !covers(&merge_req, r));
635                ret.push(merge_req);
636            }
637        }
638        ret
639    }
640
641    pub fn spine_metrics(&self) -> SpineMetrics {
642        let mut metrics = SpineMetrics::default();
643        for batch in self.spine.spine_batches() {
644            if batch.is_compact() {
645                metrics.compact_batches += 1;
646            } else if batch.is_merging() {
647                metrics.compacting_batches += 1;
648            } else {
649                metrics.noncompact_batches += 1;
650            }
651        }
652        metrics
653    }
654}
655
656impl<T: Timestamp + Lattice + Codec64> Trace<T> {
657    pub fn apply_merge_res_checked_classic<D: Codec64 + Semigroup + PartialEq>(
658        &mut self,
659        res: &FueledMergeRes<T>,
660        metrics: &ColumnarMetrics,
661    ) -> ApplyMergeResult {
662        for batch in self.spine.spine_batches_mut().rev() {
663            let result = batch.maybe_replace_checked_classic::<D>(res, metrics);
664            if result.matched() {
665                return result;
666            }
667        }
668        ApplyMergeResult::NotAppliedNoMatch
669    }
670
671    pub fn apply_merge_res_checked<D: Codec64 + Semigroup + PartialEq>(
672        &mut self,
673        res: &FueledMergeRes<T>,
674        metrics: &ColumnarMetrics,
675    ) -> ApplyMergeResult {
676        for batch in self.spine.spine_batches_mut().rev() {
677            let result = batch.maybe_replace_checked::<D>(res, metrics);
678            if result.matched() {
679                return result;
680            }
681        }
682        ApplyMergeResult::NotAppliedNoMatch
683    }
684
685    pub fn apply_merge_res_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
686        for batch in self.spine.spine_batches_mut().rev() {
687            let result = batch.maybe_replace_unchecked(res);
688            if result.matched() {
689                return result;
690            }
691        }
692        ApplyMergeResult::NotAppliedNoMatch
693    }
694
695    pub fn apply_tombstone_merge(&mut self, desc: &Description<T>) -> ApplyMergeResult {
696        for batch in self.spine.spine_batches_mut().rev() {
697            let result = batch.maybe_replace_with_tombstone(desc);
698            if result.matched() {
699                return result;
700            }
701        }
702        ApplyMergeResult::NotAppliedNoMatch
703    }
704}
705
706/// A log of what transitively happened during a Spine operation: e.g.
707/// FueledMergeReqs were generated.
708enum SpineLog<'a, T> {
709    Enabled {
710        merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
711    },
712    Disabled,
713}
714
715#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
716pub enum CompactionInput {
717    /// We don't know what our inputs were; this should only be used for
718    /// unchecked legacy replacements.
719    Legacy,
720    /// This compaction output is a total replacement for all batches in this id range.
721    IdRange(SpineId),
722    /// This compaction output replaces the specified runs in this id range.
723    PartialBatch(SpineId, BTreeSet<RunId>),
724}
725
726#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
727pub struct SpineId(pub usize, pub usize);
728
729impl Serialize for SpineId {
730    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
731    where
732        S: Serializer,
733    {
734        let SpineId(lo, hi) = self;
735        serializer.serialize_str(&format!("{lo}-{hi}"))
736    }
737}
738
739/// Creates a `SpineId` that covers the range of ids in the set.
740pub fn id_range(ids: BTreeSet<SpineId>) -> SpineId {
741    let lower_spine_bound = ids
742        .first()
743        .map(|id| id.0)
744        .expect("at least one batch must be present");
745    let upper_spine_bound = ids
746        .last()
747        .map(|id| id.1)
748        .expect("at least one batch must be present");
749
750    SpineId(lower_spine_bound, upper_spine_bound)
751}
752
753impl SpineId {
754    fn covers(self, other: SpineId) -> bool {
755        self.0 <= other.0 && other.1 <= self.1
756    }
757}
758
759#[derive(Debug, Clone, PartialEq)]
760pub struct IdHollowBatch<T> {
761    pub id: SpineId,
762    pub batch: Arc<HollowBatch<T>>,
763}
764
765#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
766pub struct ActiveCompaction {
767    pub start_ms: u64,
768}
769
770#[derive(Debug, Clone, PartialEq)]
771struct SpineBatch<T> {
772    id: SpineId,
773    desc: Description<T>,
774    parts: Vec<IdHollowBatch<T>>,
775    active_compaction: Option<ActiveCompaction>,
776    // A cached version of parts.iter().map(|x| x.len).sum()
777    len: usize,
778}
779
780impl<T> SpineBatch<T> {
781    fn merged(batch: IdHollowBatch<T>) -> Self
782    where
783        T: Clone,
784    {
785        Self {
786            id: batch.id,
787            desc: batch.batch.desc.clone(),
788            len: batch.batch.len,
789            parts: vec![batch],
790            active_compaction: None,
791        }
792    }
793}
794
795#[derive(Debug, Copy, Clone)]
796pub enum ApplyMergeResult {
797    AppliedExact,
798    AppliedSubset,
799    NotAppliedNoMatch,
800    NotAppliedInvalidSince,
801    NotAppliedTooManyUpdates,
802}
803
804impl ApplyMergeResult {
805    pub fn applied(&self) -> bool {
806        match self {
807            ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
808            _ => false,
809        }
810    }
811    pub fn matched(&self) -> bool {
812        match self {
813            ApplyMergeResult::AppliedExact
814            | ApplyMergeResult::AppliedSubset
815            | ApplyMergeResult::NotAppliedTooManyUpdates => true,
816            _ => false,
817        }
818    }
819}
820
821impl<T: Timestamp + Lattice> SpineBatch<T> {
822    pub fn lower(&self) -> &Antichain<T> {
823        self.desc().lower()
824    }
825
826    pub fn upper(&self) -> &Antichain<T> {
827        self.desc().upper()
828    }
829
830    fn id(&self) -> SpineId {
831        debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
832        debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
833        self.id
834    }
835
836    pub fn is_compact(&self) -> bool {
837        // A compact batch has at most one run.
838        // This check used to be if there was at most one hollow batch with at most one run,
839        // but that was a bit too strict since introducing incremental compaction.
840        // Incremental compaction can result in a batch with a single run, but multiple empty
841        // hollow batches, which we still consider compact. As levels are merged, we
842        // will eventually clean up the empty hollow batches.
843        self.parts
844            .iter()
845            .map(|p| p.batch.run_meta.len())
846            .sum::<usize>()
847            <= 1
848    }
849
850    pub fn is_merging(&self) -> bool {
851        self.active_compaction.is_some()
852    }
853
854    fn desc(&self) -> &Description<T> {
855        &self.desc
856    }
857
858    pub fn len(&self) -> usize {
859        // NB: This is an upper bound on len for a non-compact batch; we won't know for sure until
860        // we compact it.
861        debug_assert_eq!(
862            self.len,
863            self.parts.iter().map(|x| x.batch.len).sum::<usize>()
864        );
865        self.len
866    }
867
868    pub fn is_empty(&self) -> bool {
869        self.len() == 0
870    }
871
872    pub fn empty(
873        id: SpineId,
874        lower: Antichain<T>,
875        upper: Antichain<T>,
876        since: Antichain<T>,
877    ) -> Self {
878        SpineBatch::merged(IdHollowBatch {
879            id,
880            batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
881        })
882    }
883
884    pub fn begin_merge(
885        bs: &[Self],
886        compaction_frontier: Option<AntichainRef<T>>,
887    ) -> Option<IdFuelingMerge<T>> {
888        let from = bs.first()?.id().0;
889        let until = bs.last()?.id().1;
890        let id = SpineId(from, until);
891        let mut sinces = bs.iter().map(|b| b.desc().since());
892        let mut since = sinces.next()?.clone();
893        for b in bs {
894            since.join_assign(b.desc().since())
895        }
896        if let Some(compaction_frontier) = compaction_frontier {
897            since.join_assign(&compaction_frontier.to_owned());
898        }
899        let remaining_work = bs.iter().map(|x| x.len()).sum();
900        Some(IdFuelingMerge {
901            id,
902            merge: FuelingMerge {
903                since,
904                remaining_work,
905            },
906        })
907    }
908
909    #[cfg(test)]
910    fn describe(&self, extended: bool) -> String {
911        let SpineBatch {
912            id,
913            parts,
914            desc,
915            active_compaction,
916            len,
917        } = self;
918        let compaction = match active_compaction {
919            None => "".to_owned(),
920            Some(c) => format!(" (c@{})", c.start_ms),
921        };
922        match extended {
923            false => format!(
924                "[{}-{}]{:?}{:?}{}/{}{compaction}",
925                id.0,
926                id.1,
927                desc.lower().elements(),
928                desc.upper().elements(),
929                parts.len(),
930                len
931            ),
932            true => {
933                format!(
934                    "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
935                    id.0,
936                    id.1,
937                    desc.lower().elements(),
938                    desc.upper().elements(),
939                    desc.since().elements(),
940                    parts.len(),
941                    len,
942                    parts
943                        .iter()
944                        .flat_map(|x| x.batch.parts.iter())
945                        .map(|x| format!(" {}", x.printable_name()))
946                        .collect::<Vec<_>>()
947                        .join("")
948                )
949            }
950        }
951    }
952}
953
954impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
955    fn diffs_sum<'a, D: Semigroup + Codec64>(
956        parts: impl Iterator<Item = &'a RunPart<T>>,
957        metrics: &ColumnarMetrics,
958    ) -> Option<D> {
959        parts
960            .map(|p| p.diffs_sum::<D>(metrics))
961            .reduce(|a, b| match (a, b) {
962                (Some(mut a), Some(b)) => {
963                    a.plus_equals(&b);
964                    Some(a)
965                }
966                _ => None,
967            })
968            .flatten()
969    }
970
971    fn diffs_sum_for_runs<D: Semigroup + Codec64>(
972        batch: &HollowBatch<T>,
973        run_ids: &[RunId],
974        metrics: &ColumnarMetrics,
975    ) -> Option<D> {
976        if run_ids.is_empty() {
977            return None;
978        }
979
980        let parts = batch
981            .runs()
982            .filter(|(meta, _)| {
983                run_ids.contains(&meta.id.expect("id should be present at this point"))
984            })
985            .flat_map(|(_, parts)| parts);
986
987        Self::diffs_sum(parts, metrics)
988    }
989
990    fn maybe_replace_with_tombstone(&mut self, desc: &Description<T>) -> ApplyMergeResult {
991        let exact_match =
992            desc.lower() == self.desc().lower() && desc.upper() == self.desc().upper();
993
994        let empty_batch = HollowBatch::empty(desc.clone());
995        if exact_match {
996            *self = SpineBatch::merged(IdHollowBatch {
997                id: self.id(),
998                batch: Arc::new(empty_batch),
999            });
1000            return ApplyMergeResult::AppliedExact;
1001        }
1002
1003        if let Some((id, range)) = self.find_replacement_range(desc) {
1004            self.perform_subset_replacement(&empty_batch, id, range, None)
1005        } else {
1006            ApplyMergeResult::NotAppliedNoMatch
1007        }
1008    }
1009
1010    fn construct_batch_with_runs_replaced(
1011        original: &HollowBatch<T>,
1012        run_ids: &[RunId],
1013        replacement: &HollowBatch<T>,
1014    ) -> Result<HollowBatch<T>, ApplyMergeResult> {
1015        if run_ids.is_empty() {
1016            return Err(ApplyMergeResult::NotAppliedNoMatch);
1017        }
1018
1019        let orig_run_ids: BTreeSet<_> = original.runs().filter_map(|(meta, _)| meta.id).collect();
1020        let run_ids: BTreeSet<_> = run_ids.iter().cloned().collect();
1021        if !orig_run_ids.is_superset(&run_ids) {
1022            return Err(ApplyMergeResult::NotAppliedNoMatch);
1023        }
1024
1025        let runs: Vec<_> = original
1026            .runs()
1027            .filter(|(meta, _)| {
1028                !run_ids.contains(&meta.id.expect("id should be present at this point"))
1029            })
1030            .chain(replacement.runs())
1031            .collect();
1032
1033        let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::<usize>();
1034
1035        let run_meta = runs
1036            .iter()
1037            .map(|(meta, _)| *meta)
1038            .cloned()
1039            .collect::<Vec<_>>();
1040
1041        let parts = runs
1042            .iter()
1043            .flat_map(|(_, parts)| *parts)
1044            .cloned()
1045            .collect::<Vec<_>>();
1046
1047        let run_splits = {
1048            let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1));
1049            let mut pointer = 0;
1050            for (i, (_, parts)) in runs.into_iter().enumerate() {
1051                if parts.is_empty() {
1052                    continue;
1053                }
1054                if i < run_meta.len() - 1 {
1055                    splits.push(pointer + parts.len());
1056                }
1057                pointer += parts.len();
1058            }
1059            splits
1060        };
1061
1062        Ok(HollowBatch::new(
1063            replacement.desc.clone(),
1064            parts,
1065            len,
1066            run_meta,
1067            run_splits,
1068        ))
1069    }
1070
1071    fn maybe_replace_checked<D>(
1072        &mut self,
1073        res: &FueledMergeRes<T>,
1074        metrics: &ColumnarMetrics,
1075    ) -> ApplyMergeResult
1076    where
1077        D: Semigroup + Codec64 + PartialEq + Debug,
1078    {
1079        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1080        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1081        // since must be in advance of the merge res since.
1082        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1083            return ApplyMergeResult::NotAppliedInvalidSince;
1084        }
1085
1086        let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1087        let num_batches = self.parts.len();
1088
1089        let result = match &res.input {
1090            CompactionInput::IdRange(id) => {
1091                self.handle_id_range_replacement::<D>(res, id, new_diffs_sum, metrics)
1092            }
1093            CompactionInput::PartialBatch(id, runs) => {
1094                self.handle_partial_batch_replacement::<D>(res, *id, runs, new_diffs_sum, metrics)
1095            }
1096            CompactionInput::Legacy => {
1097                error!("legacy compaction input is not supported");
1098                return ApplyMergeResult::NotAppliedNoMatch;
1099            }
1100        };
1101
1102        let num_batches_after = self.parts.len();
1103        assert!(
1104            num_batches_after <= num_batches,
1105            "replacing parts should not increase the number of batches"
1106        );
1107        result
1108    }
1109
1110    fn handle_id_range_replacement<D>(
1111        &mut self,
1112        res: &FueledMergeRes<T>,
1113        id: &SpineId,
1114        new_diffs_sum: Option<D>,
1115        metrics: &ColumnarMetrics,
1116    ) -> ApplyMergeResult
1117    where
1118        D: Semigroup + Codec64 + PartialEq + Debug,
1119    {
1120        let range = self
1121            .parts
1122            .iter()
1123            .enumerate()
1124            .filter_map(|(i, p)| {
1125                if id.covers(p.id) {
1126                    Some((i, p.id))
1127                } else {
1128                    None
1129                }
1130            })
1131            .collect::<Vec<_>>();
1132
1133        let ids: BTreeSet<_> = range.iter().map(|(_, id)| *id).collect();
1134
1135        // If ids is empty, it means that we didn't find any parts that match the id range.
1136        // We also check that the id matches the range of ids we found.
1137        // At scale, sometimes regular compaction will race forced compaction,
1138        // for things like the catalog. In that case, we may have a
1139        // a replacement that no longer lines up with the spine batches.
1140        // I think this is because forced compaction ignores the active_compaction
1141        // and just goes for it. This is slightly annoying but probably the right behavior
1142        // for a functions whose prefix is `force_`, so we just return
1143        // NotAppliedNoMatch here.
1144        if ids.is_empty() || id != &id_range(ids) {
1145            return ApplyMergeResult::NotAppliedNoMatch;
1146        }
1147
1148        let range: BTreeSet<_> = range.iter().map(|(i, _)| *i).collect();
1149
1150        // This is the range of hollow batches that we will replace.
1151        let min = *range.iter().min().unwrap();
1152        let max = *range.iter().max().unwrap();
1153        let replacement_range = min..max + 1;
1154
1155        // We need to replace a range of parts. Here we don't care about the run_indices
1156        // because we must be replacing the entire part(s)
1157        let old_diffs_sum = Self::diffs_sum::<D>(
1158            self.parts[replacement_range.clone()]
1159                .iter()
1160                .flat_map(|p| p.batch.parts.iter()),
1161            metrics,
1162        );
1163
1164        self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "id range replacement");
1165
1166        self.perform_subset_replacement(
1167            &res.output,
1168            *id,
1169            replacement_range,
1170            res.new_active_compaction.clone(),
1171        )
1172    }
1173
1174    fn handle_partial_batch_replacement<D>(
1175        &mut self,
1176        res: &FueledMergeRes<T>,
1177        id: SpineId,
1178        runs: &BTreeSet<RunId>,
1179        new_diffs_sum: Option<D>,
1180        metrics: &ColumnarMetrics,
1181    ) -> ApplyMergeResult
1182    where
1183        D: Semigroup + Codec64 + PartialEq + Debug,
1184    {
1185        if runs.is_empty() {
1186            return ApplyMergeResult::NotAppliedNoMatch;
1187        }
1188
1189        let part = self.parts.iter().enumerate().find(|(_, p)| p.id == id);
1190        let Some((i, batch)) = part else {
1191            return ApplyMergeResult::NotAppliedNoMatch;
1192        };
1193        let replacement_range = i..(i + 1);
1194
1195        let batch = &batch.batch;
1196        let run_ids = runs.iter().cloned().collect::<Vec<_>>();
1197
1198        let old_batch_diff_sum = Self::diffs_sum::<D>(batch.parts.iter(), metrics);
1199        let old_diffs_sum = Self::diffs_sum_for_runs::<D>(batch, &run_ids, metrics);
1200
1201        self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "partial batch replacement");
1202
1203        match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) {
1204            Ok(new_batch) => {
1205                let new_batch_diff_sum = Self::diffs_sum::<D>(new_batch.parts.iter(), metrics);
1206                self.validate_diffs_sum_match(
1207                    old_batch_diff_sum,
1208                    new_batch_diff_sum,
1209                    "sanity checking diffs sum for replaced runs",
1210                );
1211                self.perform_subset_replacement(
1212                    &new_batch,
1213                    id,
1214                    replacement_range,
1215                    res.new_active_compaction.clone(),
1216                )
1217            }
1218            Err(err) => err,
1219        }
1220    }
1221
1222    fn validate_diffs_sum_match<D>(
1223        &self,
1224        old_diffs_sum: Option<D>,
1225        new_diffs_sum: Option<D>,
1226        context: &str,
1227    ) where
1228        D: Semigroup + Codec64 + PartialEq + Debug,
1229    {
1230        match (new_diffs_sum, old_diffs_sum) {
1231            (None, Some(old)) => {
1232                if !D::is_zero(&old) {
1233                    panic!(
1234                        "merge res diffs sum is None, but spine batch diffs sum ({:?}) is not zero ({})",
1235                        old, context
1236                    );
1237                }
1238            }
1239            (Some(new_diffs_sum), Some(old_diffs_sum)) => {
1240                assert_eq!(
1241                    old_diffs_sum, new_diffs_sum,
1242                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?}) ({})",
1243                    new_diffs_sum, old_diffs_sum, context
1244                );
1245            }
1246            _ => {}
1247        };
1248    }
1249
1250    /// This is the "legacy" way of replacing a spine batch with a merge result.
1251    /// It is used in moments when we don't have the full compaction input
1252    /// information.
1253    /// Eventually we should strive to roundtrip Spine IDs everywhere and
1254    /// deprecate this method.
1255    fn maybe_replace_checked_classic<D>(
1256        &mut self,
1257        res: &FueledMergeRes<T>,
1258        metrics: &ColumnarMetrics,
1259    ) -> ApplyMergeResult
1260    where
1261        D: Semigroup + Codec64 + PartialEq + Debug,
1262    {
1263        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1264        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1265        // since must be in advance of the merge res since.
1266        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1267            return ApplyMergeResult::NotAppliedInvalidSince;
1268        }
1269
1270        let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1271
1272        // If our merge result exactly matches a spine batch, we can swap it in directly
1273        let exact_match = res.output.desc.lower() == self.desc().lower()
1274            && res.output.desc.upper() == self.desc().upper();
1275        if exact_match {
1276            let old_diffs_sum = Self::diffs_sum::<D>(
1277                self.parts.iter().flat_map(|p| p.batch.parts.iter()),
1278                metrics,
1279            );
1280
1281            if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1282                assert_eq!(
1283                    old_diffs_sum, new_diffs_sum,
1284                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1285                    new_diffs_sum, old_diffs_sum
1286                );
1287            }
1288
1289            // Spine internally has an invariant about a batch being at some level
1290            // or higher based on the len. We could end up violating this invariant
1291            // if we increased the length of the batch.
1292            //
1293            // A res output with length greater than the existing spine batch implies
1294            // a compaction has already been applied to this range, and with a higher
1295            // rate of consolidation than this one. This could happen as a result of
1296            // compaction's memory bound limiting the amount of consolidation possible.
1297            if res.output.len > self.len() {
1298                return ApplyMergeResult::NotAppliedTooManyUpdates;
1299            }
1300            *self = SpineBatch::merged(IdHollowBatch {
1301                id: self.id(),
1302                batch: Arc::new(res.output.clone()),
1303            });
1304            return ApplyMergeResult::AppliedExact;
1305        }
1306
1307        // Try subset replacement
1308        if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1309            let old_diffs_sum = Self::diffs_sum::<D>(
1310                self.parts[range.clone()]
1311                    .iter()
1312                    .flat_map(|p| p.batch.parts.iter()),
1313                metrics,
1314            );
1315
1316            if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1317                assert_eq!(
1318                    old_diffs_sum, new_diffs_sum,
1319                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1320                    new_diffs_sum, old_diffs_sum
1321                );
1322            }
1323
1324            self.perform_subset_replacement(
1325                &res.output,
1326                id,
1327                range,
1328                res.new_active_compaction.clone(),
1329            )
1330        } else {
1331            ApplyMergeResult::NotAppliedNoMatch
1332        }
1333    }
1334
1335    /// This is the even more legacy way of replacing a spine batch with a merge result.
1336    /// It is used in moments when we don't have the full compaction input
1337    /// information, and we don't have the diffs sum.
1338    /// Eventually we should strive to roundtrip Spine IDs and diffs sums everywhere and
1339    /// deprecate this method.
1340    fn maybe_replace_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
1341        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1342        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1343        // since must be in advance of the merge res since.
1344        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1345            return ApplyMergeResult::NotAppliedInvalidSince;
1346        }
1347
1348        // If our merge result exactly matches a spine batch, we can swap it in directly
1349        let exact_match = res.output.desc.lower() == self.desc().lower()
1350            && res.output.desc.upper() == self.desc().upper();
1351        if exact_match {
1352            // Spine internally has an invariant about a batch being at some level
1353            // or higher based on the len. We could end up violating this invariant
1354            // if we increased the length of the batch.
1355            //
1356            // A res output with length greater than the existing spine batch implies
1357            // a compaction has already been applied to this range, and with a higher
1358            // rate of consolidation than this one. This could happen as a result of
1359            // compaction's memory bound limiting the amount of consolidation possible.
1360            if res.output.len > self.len() {
1361                return ApplyMergeResult::NotAppliedTooManyUpdates;
1362            }
1363
1364            *self = SpineBatch::merged(IdHollowBatch {
1365                id: self.id(),
1366                batch: Arc::new(res.output.clone()),
1367            });
1368            return ApplyMergeResult::AppliedExact;
1369        }
1370
1371        // Try subset replacement
1372        if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1373            self.perform_subset_replacement(
1374                &res.output,
1375                id,
1376                range,
1377                res.new_active_compaction.clone(),
1378            )
1379        } else {
1380            ApplyMergeResult::NotAppliedNoMatch
1381        }
1382    }
1383
1384    /// Find the range of parts that can be replaced by the merge result
1385    fn find_replacement_range(&self, desc: &Description<T>) -> Option<(SpineId, Range<usize>)> {
1386        // It is possible the structure of the spine has changed since the merge res
1387        // was created, such that it no longer exactly matches the description of a
1388        // spine batch. This can happen if another merge has happened in the interim,
1389        // or if spine needed to be rebuilt from state.
1390        //
1391        // When this occurs, we can still attempt to slot the merge res in to replace
1392        // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts
1393        // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res.
1394
1395        let mut lower = None;
1396        let mut upper = None;
1397
1398        for (i, batch) in self.parts.iter().enumerate() {
1399            if batch.batch.desc.lower() == desc.lower() {
1400                lower = Some((i, batch.id.0));
1401            }
1402            if batch.batch.desc.upper() == desc.upper() {
1403                upper = Some((i, batch.id.1));
1404            }
1405            if lower.is_some() && upper.is_some() {
1406                break;
1407            }
1408        }
1409
1410        match (lower, upper) {
1411            (Some((lower_idx, id_lower)), Some((upper_idx, id_upper))) => {
1412                Some((SpineId(id_lower, id_upper), lower_idx..(upper_idx + 1)))
1413            }
1414            _ => None,
1415        }
1416    }
1417
1418    /// Perform the actual subset replacement
1419    fn perform_subset_replacement(
1420        &mut self,
1421        res: &HollowBatch<T>,
1422        spine_id: SpineId,
1423        range: Range<usize>,
1424        new_active_compaction: Option<ActiveCompaction>,
1425    ) -> ApplyMergeResult {
1426        let SpineBatch {
1427            id,
1428            parts,
1429            desc,
1430            active_compaction: _,
1431            len: _,
1432        } = self;
1433
1434        let mut new_parts = vec![];
1435        new_parts.extend_from_slice(&parts[..range.start]);
1436        new_parts.push(IdHollowBatch {
1437            id: spine_id,
1438            batch: Arc::new(res.clone()),
1439        });
1440        new_parts.extend_from_slice(&parts[range.end..]);
1441
1442        let new_spine_batch = SpineBatch {
1443            id: *id,
1444            desc: desc.to_owned(),
1445            len: new_parts.iter().map(|x| x.batch.len).sum(),
1446            parts: new_parts,
1447            active_compaction: new_active_compaction,
1448        };
1449
1450        if new_spine_batch.len() > self.len() {
1451            return ApplyMergeResult::NotAppliedTooManyUpdates;
1452        }
1453
1454        *self = new_spine_batch;
1455        ApplyMergeResult::AppliedSubset
1456    }
1457}
1458
1459#[derive(Debug, Clone, PartialEq, Serialize)]
1460pub struct FuelingMerge<T> {
1461    pub(crate) since: Antichain<T>,
1462    pub(crate) remaining_work: usize,
1463}
1464
1465#[derive(Debug, Clone, PartialEq, Serialize)]
1466pub struct IdFuelingMerge<T> {
1467    id: SpineId,
1468    merge: FuelingMerge<T>,
1469}
1470
1471impl<T: Timestamp + Lattice> FuelingMerge<T> {
1472    /// Perform some amount of work, decrementing `fuel`.
1473    ///
1474    /// If `fuel` is non-zero after the call, the merging is complete and one
1475    /// should call `done` to extract the merged results.
1476    // TODO(benesch): rewrite to avoid usage of `as`.
1477    #[allow(clippy::as_conversions)]
1478    fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
1479        let used = std::cmp::min(*fuel as usize, self.remaining_work);
1480        self.remaining_work = self.remaining_work.saturating_sub(used);
1481        *fuel -= used as isize;
1482    }
1483
1484    /// Extracts merged results.
1485    ///
1486    /// This method should only be called after `work` has been called and has
1487    /// not brought `fuel` to zero. Otherwise, the merge is still in progress.
1488    fn done(
1489        self,
1490        bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1491        log: &mut SpineLog<'_, T>,
1492    ) -> Option<SpineBatch<T>> {
1493        let first = bs.first()?;
1494        let last = bs.last()?;
1495        let id = SpineId(first.id().0, last.id().1);
1496        assert!(id.0 < id.1);
1497        let lower = first.desc().lower().clone();
1498        let upper = last.desc().upper().clone();
1499        let since = self.since;
1500
1501        // Special case empty batches.
1502        if bs.iter().all(SpineBatch::is_empty) {
1503            return Some(SpineBatch::empty(id, lower, upper, since));
1504        }
1505
1506        let desc = Description::new(lower, upper, since);
1507        let len = bs.iter().map(SpineBatch::len).sum();
1508
1509        // Pre-size the merged_parts Vec. Benchmarking has shown that, at least
1510        // in the worst case, the double iteration is absolutely worth having
1511        // merged_parts pre-sized.
1512        let mut merged_parts_len = 0;
1513        for b in &bs {
1514            merged_parts_len += b.parts.len();
1515        }
1516        let mut merged_parts = Vec::with_capacity(merged_parts_len);
1517        for b in bs {
1518            merged_parts.extend(b.parts)
1519        }
1520        // Sanity check the pre-size code.
1521        debug_assert_eq!(merged_parts.len(), merged_parts_len);
1522
1523        if let SpineLog::Enabled { merge_reqs } = log {
1524            merge_reqs.push(FueledMergeReq {
1525                id,
1526                desc: desc.clone(),
1527                inputs: merged_parts.clone(),
1528            });
1529        }
1530
1531        Some(SpineBatch {
1532            id,
1533            desc,
1534            len,
1535            parts: merged_parts,
1536            active_compaction: None,
1537        })
1538    }
1539}
1540
1541/// The maximum number of batches per level in the spine.
1542/// In practice, we probably want a larger max and a configurable soft cap, but using a
1543/// stack-friendly data structure and keeping this number low makes this safer during the
1544/// initial rollout.
1545const BATCHES_PER_LEVEL: usize = 2;
1546
1547/// An append-only collection of update batches.
1548///
1549/// The `Spine` is a general-purpose trace implementation based on collection
1550/// and merging immutable batches of updates. It is generic with respect to the
1551/// batch type, and can be instantiated for any implementor of `trace::Batch`.
1552///
1553/// ## Design
1554///
1555/// This spine is represented as a list of layers, where each element in the
1556/// list is either
1557///
1558///   1. MergeState::Vacant  empty
1559///   2. MergeState::Single  a single batch
1560///   3. MergeState::Double  a pair of batches
1561///
1562/// Each "batch" has the option to be `None`, indicating a non-batch that
1563/// nonetheless acts as a number of updates proportionate to the level at which
1564/// it exists (for bookkeeping).
1565///
1566/// Each of the batches at layer i contains at most 2^i elements. The sequence
1567/// of batches should have the upper bound of one match the lower bound of the
1568/// next. Batches may be logically empty, with matching upper and lower bounds,
1569/// as a bookkeeping mechanism.
1570///
1571/// Each batch at layer i is treated as if it contains exactly 2^i elements,
1572/// even though it may actually contain fewer elements. This allows us to
1573/// decouple the physical representation from logical amounts of effort invested
1574/// in each batch. It allows us to begin compaction and to reduce the number of
1575/// updates, without compromising our ability to continue to move updates along
1576/// the spine. We are explicitly making the trade-off that while some batches
1577/// might compact at lower levels, we want to treat them as if they contained
1578/// their full set of updates for accounting reasons (to apply work to higher
1579/// levels).
1580///
1581/// We maintain the invariant that for any in-progress merge at level k there
1582/// should be fewer than 2^k records at levels lower than k. That is, even if we
1583/// were to apply an unbounded amount of effort to those records, we would not
1584/// have enough records to prompt a merge into the in-progress merge. Ideally,
1585/// we maintain the extended invariant that for any in-progress merge at level
1586/// k, the remaining effort required (number of records minus applied effort) is
1587/// less than the number of records that would need to be added to reach 2^k
1588/// records in layers below.
1589///
1590/// ## Mathematics
1591///
1592/// When a merge is initiated, there should be a non-negative *deficit* of
1593/// updates before the layers below could plausibly produce a new batch for the
1594/// currently merging layer. We must determine a factor of proportionality, so
1595/// that newly arrived updates provide at least that amount of "fuel" towards
1596/// the merging layer, so that the merge completes before lower levels invade.
1597///
1598/// ### Deficit:
1599///
1600/// A new merge is initiated only in response to the completion of a prior
1601/// merge, or the introduction of new records from outside. The latter case is
1602/// special, and will maintain our invariant trivially, so we will focus on the
1603/// former case.
1604///
1605/// When a merge at level k completes, assuming we have maintained our invariant
1606/// then there should be fewer than 2^k records at lower levels. The newly
1607/// created merge at level k+1 will require up to 2^k+2 units of work, and
1608/// should not expect a new batch until strictly more than 2^k records are
1609/// added. This means that a factor of proportionality of four should be
1610/// sufficient to ensure that the merge completes before a new merge is
1611/// initiated.
1612///
1613/// When new records get introduced, we will need to roll up any batches at
1614/// lower levels, which we treat as the introduction of records. Each of these
1615/// virtual records introduced should either be accounted for the fuel it should
1616/// contribute, as it results in the promotion of batches closer to in-progress
1617/// merges.
1618///
1619/// ### Fuel sharing
1620///
1621/// We like the idea of applying fuel preferentially to merges at *lower*
1622/// levels, under the idea that they are easier to complete, and we benefit from
1623/// fewer total merges in progress. This does delay the completion of merges at
1624/// higher levels, and may not obviously be a total win. If we choose to do
1625/// this, we should make sure that we correctly account for completed merges at
1626/// low layers: they should still extract fuel from new updates even though they
1627/// have completed, at least until they have paid back any "debt" to higher
1628/// layers by continuing to provide fuel as updates arrive.
1629#[derive(Debug, Clone)]
1630struct Spine<T> {
1631    effort: usize,
1632    next_id: usize,
1633    since: Antichain<T>,
1634    upper: Antichain<T>,
1635    merging: Vec<MergeState<T>>,
1636}
1637
1638impl<T> Spine<T> {
1639    /// All batches in the spine, oldest to newest.
1640    pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1641        self.merging.iter().rev().flat_map(|m| &m.batches)
1642    }
1643
1644    /// All (mutable) batches in the spine, oldest to newest.
1645    pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1646        self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1647    }
1648}
1649
1650impl<T: Timestamp + Lattice> Spine<T> {
1651    /// Allocates a fueled `Spine`.
1652    ///
1653    /// This trace will merge batches progressively, with each inserted batch
1654    /// applying a multiple of the batch's length in effort to each merge. The
1655    /// `effort` parameter is that multiplier. This value should be at least one
1656    /// for the merging to happen; a value of zero is not helpful.
1657    pub fn new() -> Self {
1658        Spine {
1659            effort: 1,
1660            next_id: 0,
1661            since: Antichain::from_elem(T::minimum()),
1662            upper: Antichain::from_elem(T::minimum()),
1663            merging: Vec::new(),
1664        }
1665    }
1666
1667    /// Apply some amount of effort to trace maintenance.
1668    ///
1669    /// The units of effort are updates, and the method should be thought of as
1670    /// analogous to inserting as many empty updates, where the trace is
1671    /// permitted to perform proportionate work.
1672    ///
1673    /// Returns true if this did work and false if it left the spine unchanged.
1674    fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1675        self.tidy_layers();
1676        if self.reduced() {
1677            return false;
1678        }
1679
1680        if self.merging.iter().any(|b| b.merge.is_some()) {
1681            let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1682            // If any merges exist, we can directly call `apply_fuel`.
1683            self.apply_fuel(&fuel, log);
1684        } else {
1685            // Otherwise, we'll need to introduce fake updates to move merges
1686            // along.
1687
1688            // Introduce an empty batch with roughly *effort number of virtual updates.
1689            let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1690            let id = self.next_id();
1691            self.introduce_batch(
1692                SpineBatch::empty(
1693                    id,
1694                    self.upper.clone(),
1695                    self.upper.clone(),
1696                    self.since.clone(),
1697                ),
1698                level,
1699                log,
1700            );
1701        }
1702        true
1703    }
1704
1705    pub fn next_id(&mut self) -> SpineId {
1706        let id = self.next_id;
1707        self.next_id += 1;
1708        SpineId(id, self.next_id)
1709    }
1710
1711    // Ideally, this method acts as insertion of `batch`, even if we are not yet
1712    // able to begin merging the batch. This means it is a good time to perform
1713    // amortized work proportional to the size of batch.
1714    pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1715        assert!(batch.desc.lower() != batch.desc.upper());
1716        assert_eq!(batch.desc.lower(), &self.upper);
1717
1718        let id = self.next_id();
1719        let batch = SpineBatch::merged(IdHollowBatch {
1720            id,
1721            batch: Arc::new(batch),
1722        });
1723
1724        self.upper.clone_from(batch.upper());
1725
1726        // If `batch` and the most recently inserted batch are both empty,
1727        // we can just fuse them.
1728        if batch.is_empty() {
1729            if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1730                if self.merging[position].is_single() && self.merging[position].is_empty() {
1731                    self.insert_at(batch, position);
1732                    // Since we just inserted a batch, we should always have work to complete...
1733                    // but otherwise we just leave this layer vacant.
1734                    if let Some(merged) = self.complete_at(position, log) {
1735                        self.merging[position] = MergeState::single(merged);
1736                    }
1737                    return;
1738                }
1739            }
1740        }
1741
1742        // Normal insertion for the batch.
1743        let index = batch.len().next_power_of_two();
1744        self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1745    }
1746
1747    /// True iff there is at most one HollowBatch in `self.merging`.
1748    ///
1749    /// When true, there is no maintenance work to perform in the trace, other
1750    /// than compaction. We do not yet have logic in place to determine if
1751    /// compaction would improve a trace, so for now we are ignoring that.
1752    fn reduced(&self) -> bool {
1753        self.spine_batches()
1754            .flat_map(|b| b.parts.as_slice())
1755            .count()
1756            < 2
1757    }
1758
1759    /// Describes the merge progress of layers in the trace.
1760    ///
1761    /// Intended for diagnostics rather than public consumption.
1762    #[allow(dead_code)]
1763    fn describe(&self) -> Vec<(usize, usize)> {
1764        self.merging
1765            .iter()
1766            .map(|b| (b.batches.len(), b.len()))
1767            .collect()
1768    }
1769
1770    /// Introduces a batch at an indicated level.
1771    ///
1772    /// The level indication is often related to the size of the batch, but it
1773    /// can also be used to artificially fuel the computation by supplying empty
1774    /// batches at non-trivial indices, to move merges along.
1775    fn introduce_batch(
1776        &mut self,
1777        batch: SpineBatch<T>,
1778        batch_index: usize,
1779        log: &mut SpineLog<'_, T>,
1780    ) {
1781        // Step 0.  Determine an amount of fuel to use for the computation.
1782        //
1783        //          Fuel is used to drive maintenance of the data structure,
1784        //          and in particular are used to make progress through merges
1785        //          that are in progress. The amount of fuel to use should be
1786        //          proportional to the number of records introduced, so that
1787        //          we are guaranteed to complete all merges before they are
1788        //          required as arguments to merges again.
1789        //
1790        //          The fuel use policy is negotiable, in that we might aim
1791        //          to use relatively less when we can, so that we return
1792        //          control promptly, or we might account more work to larger
1793        //          batches. Not clear to me which are best, of if there
1794        //          should be a configuration knob controlling this.
1795
1796        // The amount of fuel to use is proportional to 2^batch_index, scaled by
1797        // a factor of self.effort which determines how eager we are in
1798        // performing maintenance work. We need to ensure that each merge in
1799        // progress receives fuel for each introduced batch, and so multiply by
1800        // that as well.
1801        if batch_index > 32 {
1802            println!("Large batch index: {}", batch_index);
1803        }
1804
1805        // We believe that eight units of fuel is sufficient for each introduced
1806        // record, accounted as four for each record, and a potential four more
1807        // for each virtual record associated with promoting existing smaller
1808        // batches. We could try and make this be less, or be scaled to merges
1809        // based on their deficit at time of instantiation. For now, we remain
1810        // conservative.
1811        let mut fuel = 8 << batch_index;
1812        // Scale up by the effort parameter, which is calibrated to one as the
1813        // minimum amount of effort.
1814        fuel *= self.effort;
1815        // Convert to an `isize` so we can observe any fuel shortfall.
1816        // TODO(benesch): avoid dangerous usage of `as`.
1817        #[allow(clippy::as_conversions)]
1818        let fuel = fuel as isize;
1819
1820        // Step 1.  Apply fuel to each in-progress merge.
1821        //
1822        //          Before we can introduce new updates, we must apply any
1823        //          fuel to in-progress merges, as this fuel is what ensures
1824        //          that the merges will be complete by the time we insert
1825        //          the updates.
1826        self.apply_fuel(&fuel, log);
1827
1828        // Step 2.  We must ensure the invariant that adjacent layers do not
1829        //          contain two batches will be satisfied when we insert the
1830        //          batch. We forcibly completing all merges at layers lower
1831        //          than and including `batch_index`, so that the new batch is
1832        //          inserted into an empty layer.
1833        //
1834        //          We could relax this to "strictly less than `batch_index`"
1835        //          if the layer above has only a single batch in it, which
1836        //          seems not implausible if it has been the focus of effort.
1837        //
1838        //          This should be interpreted as the introduction of some
1839        //          volume of fake updates, and we will need to fuel merges
1840        //          by a proportional amount to ensure that they are not
1841        //          surprised later on. The number of fake updates should
1842        //          correspond to the deficit for the layer, which perhaps
1843        //          we should track explicitly.
1844        self.roll_up(batch_index, log);
1845
1846        // Step 3. This insertion should be into an empty layer. It is a logical
1847        //         error otherwise, as we may be violating our invariant, from
1848        //         which all wonderment derives.
1849        self.insert_at(batch, batch_index);
1850
1851        // Step 4. Tidy the largest layers.
1852        //
1853        //         It is important that we not tidy only smaller layers,
1854        //         as their ascension is what ensures the merging and
1855        //         eventual compaction of the largest layers.
1856        self.tidy_layers();
1857    }
1858
1859    /// Ensures that an insertion at layer `index` will succeed.
1860    ///
1861    /// This method is subject to the constraint that all existing batches
1862    /// should occur at higher levels, which requires it to "roll up" batches
1863    /// present at lower levels before the method is called. In doing this, we
1864    /// should not introduce more virtual records than 2^index, as that is the
1865    /// amount of excess fuel we have budgeted for completing merges.
1866    fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1867        // Ensure entries sufficient for `index`.
1868        while self.merging.len() <= index {
1869            self.merging.push(MergeState::default());
1870        }
1871
1872        // We only need to roll up if there are non-vacant layers.
1873        if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1874            // Collect and merge all batches at layers up to but not including
1875            // `index`.
1876            let mut merged = None;
1877            for i in 0..index {
1878                if let Some(merged) = merged.take() {
1879                    self.insert_at(merged, i);
1880                }
1881                merged = self.complete_at(i, log);
1882            }
1883
1884            // The merged results should be introduced at level `index`, which
1885            // should be ready to absorb them (possibly creating a new merge at
1886            // the time).
1887            if let Some(merged) = merged {
1888                self.insert_at(merged, index);
1889            }
1890
1891            // If the insertion results in a merge, we should complete it to
1892            // ensure the upcoming insertion at `index` does not panic.
1893            if self.merging[index].is_full() {
1894                let merged = self.complete_at(index, log).expect("double batch");
1895                self.insert_at(merged, index + 1);
1896            }
1897        }
1898    }
1899
1900    /// Applies an amount of fuel to merges in progress.
1901    ///
1902    /// The supplied `fuel` is for each in progress merge, and if we want to
1903    /// spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we
1904    /// could do so in order to maintain fewer batches on average (at the risk
1905    /// of completing merges of large batches later, but tbh probably not much
1906    /// later).
1907    pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1908        // For the moment our strategy is to apply fuel independently to each
1909        // merge in progress, rather than prioritizing small merges. This sounds
1910        // like a great idea, but we need better accounting in place to ensure
1911        // that merges that borrow against later layers but then complete still
1912        // "acquire" fuel to pay back their debts.
1913        for index in 0..self.merging.len() {
1914            // Give each level independent fuel, for now.
1915            let mut fuel = *fuel;
1916            // Pass along various logging stuffs, in case we need to report
1917            // success.
1918            self.merging[index].work(&mut fuel);
1919            // `fuel` could have a deficit at this point, meaning we over-spent
1920            // when we took a merge step. We could ignore this, or maintain the
1921            // deficit and account future fuel against it before spending again.
1922            // It isn't clear why that would be especially helpful to do; we
1923            // might want to avoid overspends at multiple layers in the same
1924            // invocation (to limit latencies), but there is probably a rich
1925            // policy space here.
1926
1927            // If a merge completes, we can immediately merge it in to the next
1928            // level, which is "guaranteed" to be complete at this point, by our
1929            // fueling discipline.
1930            if self.merging[index].is_complete() {
1931                let complete = self.complete_at(index, log).expect("complete batch");
1932                self.insert_at(complete, index + 1);
1933            }
1934        }
1935    }
1936
1937    /// Inserts a batch at a specific location.
1938    ///
1939    /// This is a non-public internal method that can panic if we try and insert
1940    /// into a layer which already contains two batches (and is still in the
1941    /// process of merging).
1942    fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1943        // Ensure the spine is large enough.
1944        while self.merging.len() <= index {
1945            self.merging.push(MergeState::default());
1946        }
1947
1948        // Insert the batch at the location.
1949        let merging = &mut self.merging[index];
1950        merging.push_batch(batch);
1951        if merging.batches.is_full() {
1952            let compaction_frontier = Some(self.since.borrow());
1953            merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1954        }
1955    }
1956
1957    /// Completes and extracts what ever is at layer `index`, leaving this layer vacant.
1958    fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1959        self.merging[index].complete(log)
1960    }
1961
1962    /// Attempts to draw down large layers to size appropriate layers.
1963    fn tidy_layers(&mut self) {
1964        // If the largest layer is complete (not merging), we can attempt to
1965        // draw it down to the next layer. This is permitted if we can maintain
1966        // our invariant that below each merge there are at most half the
1967        // records that would be required to invade the merge.
1968        if !self.merging.is_empty() {
1969            let mut length = self.merging.len();
1970            if self.merging[length - 1].is_single() {
1971                // To move a batch down, we require that it contain few enough
1972                // records that the lower level is appropriate, and that moving
1973                // the batch would not create a merge violating our invariant.
1974                let appropriate_level = usize::cast_from(
1975                    self.merging[length - 1]
1976                        .len()
1977                        .next_power_of_two()
1978                        .trailing_zeros(),
1979                );
1980
1981                // Continue only as far as is appropriate
1982                while appropriate_level < length - 1 {
1983                    let current = &mut self.merging[length - 2];
1984                    if current.is_vacant() {
1985                        // Vacant batches can be absorbed.
1986                        self.merging.remove(length - 2);
1987                        length = self.merging.len();
1988                    } else {
1989                        if !current.is_full() {
1990                            // Single batches may initiate a merge, if sizes are
1991                            // within bounds, but terminate the loop either way.
1992
1993                            // Determine the number of records that might lead
1994                            // to a merge. Importantly, this is not the number
1995                            // of actual records, but the sum of upper bounds
1996                            // based on indices.
1997                            let mut smaller = 0;
1998                            for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
1999                                smaller += batch.batches.len() << index;
2000                            }
2001
2002                            if smaller <= (1 << length) / 8 {
2003                                // Remove the batch under consideration (shifting the deeper batches up a level),
2004                                // then merge in the single batch at the current level.
2005                                let state = self.merging.remove(length - 2);
2006                                assert_eq!(state.batches.len(), 1);
2007                                for batch in state.batches {
2008                                    self.insert_at(batch, length - 2);
2009                                }
2010                            }
2011                        }
2012                        break;
2013                    }
2014                }
2015            }
2016        }
2017    }
2018
2019    /// Checks invariants:
2020    /// - The lowers and uppers of all batches "line up".
2021    /// - The lower of the "minimum" batch is `antichain[T::minimum]`.
2022    /// - The upper of the "maximum" batch is `== self.upper`.
2023    /// - The since of each batch is `less_equal self.since`.
2024    /// - The `SpineIds` all "line up" and cover from `0` to `self.next_id`.
2025    /// - TODO: Verify fuel and level invariants.
2026    fn validate(&self) -> Result<(), String> {
2027        let mut id = SpineId(0, 0);
2028        let mut frontier = Antichain::from_elem(T::minimum());
2029        for x in self.merging.iter().rev() {
2030            if x.is_full() != x.merge.is_some() {
2031                return Err(format!(
2032                    "all (and only) full batches should have fueling merges (full={}, merge={:?})",
2033                    x.is_full(),
2034                    x.merge,
2035                ));
2036            }
2037
2038            if let Some(m) = &x.merge {
2039                if !x.is_full() {
2040                    return Err(format!(
2041                        "merge should only exist for full batches (len={:?}, merge={:?})",
2042                        x.batches.len(),
2043                        m.id,
2044                    ));
2045                }
2046                if x.id() != Some(m.id) {
2047                    return Err(format!(
2048                        "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
2049                        x.id(),
2050                        m.id,
2051                    ));
2052                }
2053            }
2054
2055            // TODO: Anything we can validate about x.merge? It'd
2056            // be nice to assert that it's bigger than the len of the
2057            // two batches, but apply_merge_res might swap those lengths
2058            // out from under us.
2059            for batch in &x.batches {
2060                if batch.id().0 != id.1 {
2061                    return Err(format!(
2062                        "batch id {:?} does not match the previous id {:?}: {:?}",
2063                        batch.id(),
2064                        id,
2065                        self
2066                    ));
2067                }
2068                id = batch.id();
2069                if batch.desc().lower() != &frontier {
2070                    return Err(format!(
2071                        "batch lower {:?} does not match the previous upper {:?}: {:?}",
2072                        batch.desc().lower(),
2073                        frontier,
2074                        self
2075                    ));
2076                }
2077                frontier.clone_from(batch.desc().upper());
2078                if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
2079                    return Err(format!(
2080                        "since of batch {:?} past the spine since {:?}: {:?}",
2081                        batch.desc().since(),
2082                        self.since,
2083                        self
2084                    ));
2085                }
2086            }
2087        }
2088        if self.next_id != id.1 {
2089            return Err(format!(
2090                "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
2091                self.next_id, id, self
2092            ));
2093        }
2094        if self.upper != frontier {
2095            return Err(format!(
2096                "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
2097                self.upper, frontier, self
2098            ));
2099        }
2100        Ok(())
2101    }
2102}
2103
2104/// Describes the state of a layer.
2105///
2106/// A layer can be empty, contain a single batch, or contain a pair of batches
2107/// that are in the process of merging into a batch for the next layer.
2108#[derive(Debug, Clone)]
2109struct MergeState<T> {
2110    batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
2111    merge: Option<IdFuelingMerge<T>>,
2112}
2113
2114impl<T> Default for MergeState<T> {
2115    fn default() -> Self {
2116        Self {
2117            batches: ArrayVec::new(),
2118            merge: None,
2119        }
2120    }
2121}
2122
2123impl<T: Timestamp + Lattice> MergeState<T> {
2124    /// An id that covers all the batches in the given merge state, assuming there are any.
2125    fn id(&self) -> Option<SpineId> {
2126        if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
2127            Some(SpineId(first.id().0, last.id().1))
2128        } else {
2129            None
2130        }
2131    }
2132
2133    /// A new single-batch merge state.
2134    fn single(batch: SpineBatch<T>) -> Self {
2135        let mut state = Self::default();
2136        state.push_batch(batch);
2137        state
2138    }
2139
2140    /// Push a new batch at this level, checking invariants.
2141    fn push_batch(&mut self, batch: SpineBatch<T>) {
2142        if let Some(last) = self.batches.last() {
2143            assert_eq!(last.id().1, batch.id().0);
2144            assert_eq!(last.upper(), batch.lower());
2145        }
2146        assert!(
2147            self.merge.is_none(),
2148            "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
2149            batch.id,
2150            self.batches.len(),
2151        );
2152        self.batches
2153            .try_push(batch)
2154            .expect("Attempted to insert batch into full layer!");
2155    }
2156
2157    /// The number of actual updates contained in the level.
2158    fn len(&self) -> usize {
2159        self.batches.iter().map(SpineBatch::len).sum()
2160    }
2161
2162    /// True if this merge state contains no updates.
2163    fn is_empty(&self) -> bool {
2164        self.batches.iter().all(SpineBatch::is_empty)
2165    }
2166
2167    /// True if this level contains no batches.
2168    fn is_vacant(&self) -> bool {
2169        self.batches.is_empty()
2170    }
2171
2172    /// True only for a single-batch state.
2173    fn is_single(&self) -> bool {
2174        self.batches.len() == 1
2175    }
2176
2177    /// True if this merge cannot hold any more batches.
2178    /// (i.e. for a binary merge tree, true if this layer holds two batches.)
2179    fn is_full(&self) -> bool {
2180        self.batches.is_full()
2181    }
2182
2183    /// Immediately complete any merge.
2184    ///
2185    /// The result is either a batch, if there is a non-trivial batch to return
2186    /// or `None` if there is no meaningful batch to return.
2187    ///
2188    /// There is the additional option of input batches.
2189    fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
2190        let mut this = mem::take(self);
2191        if this.batches.len() <= 1 {
2192            this.batches.pop()
2193        } else {
2194            // Merge the remaining batches, regardless of whether we have a fully fueled merge.
2195            let id_merge = this
2196                .merge
2197                .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
2198            id_merge.merge.done(this.batches, log)
2199        }
2200    }
2201
2202    /// True iff the layer is a complete merge, ready for extraction.
2203    fn is_complete(&self) -> bool {
2204        match &self.merge {
2205            Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
2206            None => false,
2207        }
2208    }
2209
2210    /// Performs a bounded amount of work towards a merge.
2211    fn work(&mut self, fuel: &mut isize) {
2212        // We only perform work for merges in progress.
2213        if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
2214            merge.work(&self.batches[..], fuel)
2215        }
2216    }
2217}
2218
2219#[cfg(test)]
2220pub mod datadriven {
2221    use crate::internal::datadriven::DirectiveArgs;
2222
2223    use super::*;
2224
2225    /// Shared state for a single [crate::internal::trace] [datadriven::TestFile].
2226    #[derive(Debug, Default)]
2227    pub struct TraceState {
2228        pub trace: Trace<u64>,
2229        pub merge_reqs: Vec<FueledMergeReq<u64>>,
2230    }
2231
2232    pub fn since_upper(
2233        datadriven: &TraceState,
2234        _args: DirectiveArgs,
2235    ) -> Result<String, anyhow::Error> {
2236        Ok(format!(
2237            "{:?}{:?}\n",
2238            datadriven.trace.since().elements(),
2239            datadriven.trace.upper().elements()
2240        ))
2241    }
2242
2243    pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
2244        let mut s = String::new();
2245        for b in datadriven.trace.spine.spine_batches() {
2246            s.push_str(b.describe(true).as_str());
2247            s.push('\n');
2248        }
2249        Ok(s)
2250    }
2251
2252    pub fn insert(
2253        datadriven: &mut TraceState,
2254        args: DirectiveArgs,
2255    ) -> Result<String, anyhow::Error> {
2256        for x in args
2257            .input
2258            .trim()
2259            .split('\n')
2260            .map(DirectiveArgs::parse_hollow_batch)
2261        {
2262            datadriven
2263                .merge_reqs
2264                .append(&mut datadriven.trace.push_batch(x));
2265        }
2266        Ok("ok\n".to_owned())
2267    }
2268
2269    pub fn downgrade_since(
2270        datadriven: &mut TraceState,
2271        args: DirectiveArgs,
2272    ) -> Result<String, anyhow::Error> {
2273        let since = args.expect("since");
2274        datadriven
2275            .trace
2276            .downgrade_since(&Antichain::from_elem(since));
2277        Ok("ok\n".to_owned())
2278    }
2279
2280    pub fn take_merge_req(
2281        datadriven: &mut TraceState,
2282        _args: DirectiveArgs,
2283    ) -> Result<String, anyhow::Error> {
2284        let mut s = String::new();
2285        for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
2286            write!(
2287                s,
2288                "{:?}{:?}{:?} {}\n",
2289                merge_req.desc.lower().elements(),
2290                merge_req.desc.upper().elements(),
2291                merge_req.desc.since().elements(),
2292                merge_req
2293                    .inputs
2294                    .iter()
2295                    .flat_map(|x| x.batch.parts.iter())
2296                    .map(|x| x.printable_name())
2297                    .collect::<Vec<_>>()
2298                    .join(" ")
2299            );
2300        }
2301        Ok(s)
2302    }
2303
2304    pub fn apply_merge_res(
2305        datadriven: &mut TraceState,
2306        args: DirectiveArgs,
2307    ) -> Result<String, anyhow::Error> {
2308        let res = FueledMergeRes {
2309            output: DirectiveArgs::parse_hollow_batch(args.input),
2310            input: CompactionInput::Legacy,
2311            new_active_compaction: None,
2312        };
2313        match datadriven.trace.apply_merge_res_unchecked(&res) {
2314            ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
2315            ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
2316            ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
2317            ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
2318            ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
2319        }
2320    }
2321}
2322
2323#[cfg(test)]
2324pub(crate) mod tests {
2325    use std::ops::Range;
2326
2327    use proptest::prelude::*;
2328    use semver::Version;
2329
2330    use crate::internal::state::tests::{any_hollow_batch, any_hollow_batch_with_exact_runs};
2331
2332    use super::*;
2333
2334    pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
2335        num_batches: Range<usize>,
2336    ) -> impl Strategy<Value = Trace<T>> {
2337        Strategy::prop_map(
2338            (
2339                any::<Option<T>>(),
2340                proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
2341                any::<bool>(),
2342                any::<u64>(),
2343            ),
2344            |(since, mut batches, roundtrip_structure, timeout_ms)| {
2345                let mut trace = Trace::<T>::default();
2346                trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
2347
2348                // Fix up the arbitrary HollowBatches so the lowers and uppers
2349                // align.
2350                batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
2351                let mut lower = Antichain::from_elem(T::minimum());
2352                for mut batch in batches {
2353                    // Overall trace since has to be past each batch's since.
2354                    if PartialOrder::less_than(trace.since(), batch.desc.since()) {
2355                        trace.downgrade_since(batch.desc.since());
2356                    }
2357                    batch.desc = Description::new(
2358                        lower.clone(),
2359                        batch.desc.upper().clone(),
2360                        batch.desc.since().clone(),
2361                    );
2362                    lower.clone_from(batch.desc.upper());
2363                    let _merge_req = trace.push_batch(batch);
2364                }
2365                let reqs: Vec<_> = trace
2366                    .fueled_merge_reqs_before_ms(timeout_ms, None)
2367                    .collect();
2368                for req in reqs {
2369                    trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
2370                }
2371                trace.roundtrip_structure = roundtrip_structure;
2372                trace
2373            },
2374        )
2375    }
2376
2377    #[mz_ore::test]
2378    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
2379    fn test_roundtrips() {
2380        fn check(trace: Trace<i64>) {
2381            trace.validate().unwrap();
2382            let flat = trace.flatten();
2383            let unflat = Trace::unflatten(flat).unwrap();
2384            assert_eq!(trace, unflat);
2385        }
2386
2387        proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
2388    }
2389
2390    #[mz_ore::test]
2391    fn fueled_merge_reqs() {
2392        let mut trace: Trace<u64> = Trace::default();
2393        let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
2394            0,
2395            10,
2396            &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
2397            1000,
2398        ));
2399
2400        assert!(fueled_reqs.is_empty());
2401        assert_eq!(
2402            trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
2403            0,
2404            "no merge reqs when not filtering by version"
2405        );
2406        assert_eq!(
2407            trace
2408                .fueled_merge_reqs_before_ms(
2409                    u64::MAX,
2410                    Some(WriterKey::for_version(&Version::new(0, 50, 0)))
2411                )
2412                .count(),
2413            0,
2414            "zero batches are older than a past version"
2415        );
2416        assert_eq!(
2417            trace
2418                .fueled_merge_reqs_before_ms(
2419                    u64::MAX,
2420                    Some(WriterKey::for_version(&Version::new(99, 99, 0)))
2421                )
2422                .count(),
2423            1,
2424            "one batch is older than a future version"
2425        );
2426    }
2427
2428    #[mz_ore::test]
2429    fn remove_redundant_merge_reqs() {
2430        fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
2431            FueledMergeReq {
2432                id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
2433                desc: Description::new(
2434                    Antichain::from_elem(lower),
2435                    Antichain::from_elem(upper),
2436                    Antichain::new(),
2437                ),
2438                inputs: vec![],
2439            }
2440        }
2441
2442        // Empty
2443        assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
2444
2445        // Single
2446        assert_eq!(
2447            Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
2448            vec![req(0, 1)]
2449        );
2450
2451        // Duplicate
2452        assert_eq!(
2453            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
2454            vec![req(0, 1)]
2455        );
2456
2457        // Nothing covered
2458        assert_eq!(
2459            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
2460            vec![req(1, 2), req(0, 1)]
2461        );
2462
2463        // Covered
2464        assert_eq!(
2465            Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
2466            vec![req(0, 3)]
2467        );
2468
2469        // Covered, lower equal
2470        assert_eq!(
2471            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
2472            vec![req(0, 3)]
2473        );
2474
2475        // Covered, upper equal
2476        assert_eq!(
2477            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
2478            vec![req(0, 3)]
2479        );
2480
2481        // Covered, unexpected order (doesn't happen in practice)
2482        assert_eq!(
2483            Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
2484            vec![req(0, 3)]
2485        );
2486
2487        // Partially overlapping
2488        assert_eq!(
2489            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
2490            vec![req(1, 3), req(0, 2)]
2491        );
2492
2493        // Partially overlapping, the other order
2494        assert_eq!(
2495            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
2496            vec![req(0, 2), req(1, 3)]
2497        );
2498
2499        // Different sinces (doesn't happen in practice)
2500        let req015 = FueledMergeReq {
2501            id: SpineId(0, 1),
2502            desc: Description::new(
2503                Antichain::from_elem(0),
2504                Antichain::from_elem(1),
2505                Antichain::from_elem(5),
2506            ),
2507            inputs: vec![],
2508        };
2509        assert_eq!(
2510            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2511            vec![req015, req(0, 1)]
2512        );
2513    }
2514
2515    #[mz_ore::test]
2516    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
2517    fn construct_batch_with_runs_replaced_test() {
2518        let batch_strategy = any_hollow_batch::<u64>();
2519        let to_replace_strategy = any_hollow_batch_with_exact_runs::<u64>(1);
2520
2521        let combined_strategy = (batch_strategy, to_replace_strategy)
2522            .prop_filter("non-empty batch", |(batch, _)| batch.run_meta.len() >= 1);
2523
2524        let final_strategy = combined_strategy.prop_flat_map(|(batch, to_replace)| {
2525            let batch_len = batch.run_meta.len();
2526            let batch_clone = batch.clone();
2527            let to_replace_clone = to_replace.clone();
2528
2529            proptest::collection::vec(any::<bool>(), batch_len)
2530                .prop_filter("at least one run selected", |mask| mask.iter().any(|&x| x))
2531                .prop_map(move |mask| {
2532                    let indices: Vec<usize> = mask
2533                        .iter()
2534                        .enumerate()
2535                        .filter_map(|(i, &selected)| if selected { Some(i) } else { None })
2536                        .collect();
2537                    (batch_clone.clone(), to_replace_clone.clone(), indices)
2538                })
2539        });
2540
2541        proptest!(|(
2542            (batch, to_replace, runs) in final_strategy
2543        )| {
2544            let original_run_ids: Vec<_> = batch.run_meta.iter().map(|x|
2545                x.id.unwrap().clone()
2546            ).collect();
2547
2548            let run_ids = runs.iter().map(|&i| original_run_ids[i].clone()).collect::<Vec<_>>();
2549
2550            let new_batch = SpineBatch::construct_batch_with_runs_replaced(
2551                &batch,
2552                &run_ids,
2553                &to_replace,
2554            ).unwrap();
2555
2556            prop_assert!(new_batch.run_meta.len() == batch.run_meta.len() - runs.len() + to_replace.run_meta.len());
2557        });
2558    }
2559}