mz_persist_client/internal/
trace.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An append-only collection of compactable update batches. The Spine below is
11//! a fork of Differential Dataflow's [Spine] with minimal modifications. The
12//! original Spine code is designed for incremental (via "fuel"ing) synchronous
13//! merge of in-memory batches. Persist doesn't want compaction to block
14//! incoming writes and, in fact, may in the future elect to push the work of
15//! compaction onto another machine entirely via RPC. As a result, we abuse the
16//! Spine code as follows:
17//!
18//! [Spine]: differential_dataflow::trace::implementations::spine_fueled::Spine
19//!
20//! - The normal Spine works in terms of [Batch] impls. A `Batch` is added to
21//!   the Spine. As progress is made, the Spine will merge two batches together
22//!   by: constructing a [Batch::Merger], giving it bits of fuel to
23//!   incrementally perform the merge (which spreads out the work, keeping
24//!   latencies even), and then once it's done fueling extracting the new single
25//!   output `Batch` and discarding the inputs.
26//! - Persist instead represents a batch of blob data with a [HollowBatch]
27//!   pointer which contains the normal `Batch` metadata plus the keys necessary
28//!   to retrieve the updates.
29//! - [SpineBatch] wraps `HollowBatch` and has a [FuelingMerge] companion
30//!   (analogous to `Batch::Merger`) that allows us to represent a merge as it
31//!   is fueling. Normally, this would represent real incremental compaction
32//!   progress, but in persist, it's simply a bookkeeping mechanism. Once fully
33//!   fueled, the `FuelingMerge` is turned into a fueled [SpineBatch],
34//!   which to the Spine is indistinguishable from a merged batch. At this
35//!   point, it is eligible for asynchronous compaction and a `FueledMergeReq`
36//!   is generated.
37//! - At any later point, this request may be answered via
38//!   [Trace::apply_merge_res_checked] or [Trace::apply_merge_res_unchecked].
39//!   This internally replaces the`SpineBatch`, which has no
40//!   effect on the structure of `Spine` but replaces the metadata
41//!   in persist's state to point at the new batch.
42//! - `SpineBatch` is explictly allowed to accumulate a list of `HollowBatch`s.
43//!   This decouples compaction from Spine progress and also allows us to reduce
44//!   write amplification by merging `N` batches at once where `N` can be
45//!   greater than 2.
46//!
47//! [Batch]: differential_dataflow::trace::Batch
48//! [Batch::Merger]: differential_dataflow::trace::Batch::Merger
49
50use std::cmp::Ordering;
51use std::collections::{BTreeMap, BTreeSet};
52use std::fmt::Debug;
53use std::mem;
54use std::ops::Range;
55use std::sync::Arc;
56
57use arrayvec::ArrayVec;
58use differential_dataflow::difference::Semigroup;
59use differential_dataflow::lattice::Lattice;
60use differential_dataflow::trace::Description;
61use itertools::Itertools;
62use mz_ore::cast::CastFrom;
63use mz_persist::metrics::ColumnarMetrics;
64use mz_persist_types::Codec64;
65use serde::{Serialize, Serializer};
66use timely::PartialOrder;
67use timely::progress::frontier::AntichainRef;
68use timely::progress::{Antichain, Timestamp};
69use tracing::error;
70
71use crate::internal::paths::WriterKey;
72use crate::internal::state::{HollowBatch, RunId};
73
74use super::state::RunPart;
75
76#[derive(Debug, Clone, PartialEq)]
77pub struct FueledMergeReq<T> {
78    pub id: SpineId,
79    pub desc: Description<T>,
80    pub inputs: Vec<IdHollowBatch<T>>,
81}
82
83#[derive(Debug)]
84pub struct FueledMergeRes<T> {
85    pub output: HollowBatch<T>,
86    pub input: CompactionInput,
87    pub new_active_compaction: Option<ActiveCompaction>,
88}
89
90/// An append-only collection of compactable update batches.
91///
92/// In an effort to keep our fork of Spine as close as possible to the original,
93/// we push as many changes as possible into this wrapper.
94#[derive(Debug, Clone)]
95pub struct Trace<T> {
96    spine: Spine<T>,
97    pub(crate) roundtrip_structure: bool,
98}
99
100#[cfg(any(test, debug_assertions))]
101impl<T: PartialEq> PartialEq for Trace<T> {
102    fn eq(&self, other: &Self) -> bool {
103        // Deconstruct self and other so we get a compile failure if new fields
104        // are added.
105        let Trace {
106            spine: _,
107            roundtrip_structure: _,
108        } = self;
109        let Trace {
110            spine: _,
111            roundtrip_structure: _,
112        } = other;
113
114        // Intentionally use HollowBatches for this comparison so we ignore
115        // differences in spine layers.
116        self.batches().eq(other.batches())
117    }
118}
119
120impl<T: Timestamp + Lattice> Default for Trace<T> {
121    fn default() -> Self {
122        Self {
123            spine: Spine::new(),
124            roundtrip_structure: true,
125        }
126    }
127}
128
129#[derive(Clone, Debug, Serialize)]
130pub struct ThinSpineBatch<T> {
131    pub(crate) level: usize,
132    pub(crate) desc: Description<T>,
133    pub(crate) parts: Vec<SpineId>,
134    /// NB: this exists to validate legacy batch bounds during the migration;
135    /// it can be deleted once the roundtrip_structure flag is permanently rolled out.
136    pub(crate) descs: Vec<Description<T>>,
137}
138
139impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
140    fn eq(&self, other: &Self) -> bool {
141        // Ignore the temporary descs vector when comparing for equality.
142        (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
143    }
144}
145
146#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
147pub struct ThinMerge<T> {
148    pub(crate) since: Antichain<T>,
149    pub(crate) remaining_work: usize,
150    pub(crate) active_compaction: Option<ActiveCompaction>,
151}
152
153impl<T: Clone> ThinMerge<T> {
154    fn fueling(merge: &FuelingMerge<T>) -> Self {
155        ThinMerge {
156            since: merge.since.clone(),
157            remaining_work: merge.remaining_work,
158            active_compaction: None,
159        }
160    }
161
162    fn fueled(batch: &SpineBatch<T>) -> Self {
163        ThinMerge {
164            since: batch.desc.since().clone(),
165            remaining_work: 0,
166            active_compaction: batch.active_compaction.clone(),
167        }
168    }
169}
170
171/// This is a "flattened" representation of a Trace. Goals:
172/// - small updates to the trace should result in small differences in the `FlatTrace`;
173/// - two `FlatTrace`s should be efficient to diff;
174/// - converting to and from a `Trace` should be relatively straightforward.
175///
176/// These goals are all somewhat in tension, and the space of possible representations is pretty
177/// large. See individual fields for comments on some of the tradeoffs.
178#[derive(Clone, Debug)]
179pub struct FlatTrace<T> {
180    pub(crate) since: Antichain<T>,
181    /// Hollow batches without an associated ID. If this flattened trace contains spine batches,
182    /// we can figure out which legacy batch belongs in which spine batch by comparing the `desc`s.
183    /// Previously, we serialized a trace as just this list of batches. Keeping this data around
184    /// helps ensure backwards compatibility. In the near future, we may still keep some batches
185    /// here to help minimize the size of diffs -- rewriting all the hollow batches in a shard
186    /// can be prohibitively expensive. Eventually, we'd like to remove this in favour of the
187    /// collection below.
188    pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
189    /// Hollow batches _with_ an associated ID. Spine batches can reference these hollow batches
190    /// by id directly.
191    pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
192    /// Spine batches stored by ID. We reference hollow batches by ID, instead of inlining them,
193    /// to make differential updates smaller when two batches merge together. We also store the
194    /// level on the batch, instead of mapping from level to a list of batches... the level of a
195    /// spine batch doesn't change over time, but the list of batches at a particular level does.
196    pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
197    /// In-progress merges. We store this by spine id instead of level to prepare for some possible
198    /// generalizations to spine (merging N of M batches at a level). This is also a natural place
199    /// to store incremental merge progress in the future.
200    pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
201}
202
203impl<T: Timestamp + Lattice> Trace<T> {
204    pub(crate) fn flatten(&self) -> FlatTrace<T> {
205        let since = self.spine.since.clone();
206        let mut legacy_batches = BTreeMap::new();
207        let mut hollow_batches = BTreeMap::new();
208        let mut spine_batches = BTreeMap::new();
209        let mut merges = BTreeMap::new();
210
211        let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
212            let id = batch.id();
213            let desc = batch.desc.clone();
214            let mut parts = Vec::with_capacity(batch.parts.len());
215            let mut descs = Vec::with_capacity(batch.parts.len());
216            for IdHollowBatch { id, batch } in &batch.parts {
217                parts.push(*id);
218                descs.push(batch.desc.clone());
219                // Ideally, we'd like to put all batches in the hollow_batches collection, since
220                // tracking the spine id reduces ambiguity and makes diffing cheaper. However,
221                // we currently keep most batches in the legacy collection for backwards
222                // compatibility.
223                // As an exception, we add batches with empty time ranges to hollow_batches:
224                // they're otherwise not guaranteed to be unique, and since we only started writing
225                // them down recently there's no backwards compatibility risk.
226                if batch.desc.lower() == batch.desc.upper() {
227                    hollow_batches.insert(*id, Arc::clone(batch));
228                } else {
229                    legacy_batches.insert(Arc::clone(batch), ());
230                }
231            }
232
233            let spine_batch = ThinSpineBatch {
234                level,
235                desc,
236                parts,
237                descs,
238            };
239            spine_batches.insert(id, spine_batch);
240        };
241
242        for (level, state) in self.spine.merging.iter().enumerate() {
243            for batch in &state.batches {
244                push_spine_batch(level, batch);
245                if let Some(c) = &batch.active_compaction {
246                    let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
247                    assert!(
248                        previous.is_none(),
249                        "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
250                        batch.id,
251                    )
252                }
253            }
254            if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
255                let previous = merges.insert(*id, ThinMerge::fueling(merge));
256                assert!(
257                    previous.is_none(),
258                    "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
259                )
260            }
261        }
262
263        if !self.roundtrip_structure {
264            assert!(hollow_batches.is_empty());
265            spine_batches.clear();
266            merges.clear();
267        }
268
269        FlatTrace {
270            since,
271            legacy_batches,
272            hollow_batches,
273            spine_batches,
274            merges,
275        }
276    }
277    pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
278        let FlatTrace {
279            since,
280            legacy_batches,
281            mut hollow_batches,
282            spine_batches,
283            mut merges,
284        } = value;
285
286        // If the flattened representation has spine batches (or is empty)
287        // we know to preserve the structure for this trace.
288        let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
289
290        // We need to look up legacy batches somehow, but we don't have a spine id for them.
291        // Instead, we rely on the fact that the spine must store them in antichain order.
292        // Our timestamp type may not be totally ordered, so we need to implement our own comparator
293        // here. Persist's invariants ensure that all the frontiers we're comparing are comparable,
294        // though.
295        let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
296            if PartialOrder::less_than(left, right) {
297                Ordering::Less
298            } else if PartialOrder::less_than(right, left) {
299                Ordering::Greater
300            } else {
301                Ordering::Equal
302            }
303        };
304        let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
305        legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
306
307        let mut pop_batch =
308            |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
309                if let Some(batch) = hollow_batches.remove(&id) {
310                    if let Some(desc) = expected_desc {
311                        assert_eq!(*desc, batch.desc);
312                    }
313                    return Ok(IdHollowBatch { id, batch });
314                }
315                let mut batch = legacy_batches
316                    .pop()
317                    .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
318
319                let Some(expected_desc) = expected_desc else {
320                    return Ok(IdHollowBatch { id, batch });
321                };
322
323                if expected_desc.lower() != batch.desc.lower() {
324                    return Err(format!(
325                        "hollow batch lower {:?} did not match expected lower {:?}",
326                        batch.desc.lower().elements(),
327                        expected_desc.lower().elements()
328                    ));
329                }
330
331                // Empty legacy batches are not deterministic: different nodes may split them up
332                // in different ways. For now, we rearrange them such to match the spine data.
333                if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
334                    let mut new_upper = batch.desc.upper().clone();
335
336                    // While our current batch is too small, and there's another empty batch
337                    // in the list, roll it in.
338                    while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
339                        let Some(next_batch) = legacy_batches.pop() else {
340                            break;
341                        };
342                        if next_batch.is_empty() {
343                            new_upper.clone_from(next_batch.desc.upper());
344                        } else {
345                            legacy_batches.push(next_batch);
346                            break;
347                        }
348                    }
349
350                    // If our current batch is too large, split it by the expected upper
351                    // and preserve the remainder.
352                    if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
353                        legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
354                            expected_desc.upper().clone(),
355                            new_upper.clone(),
356                            batch.desc.since().clone(),
357                        ))));
358                        new_upper.clone_from(expected_desc.upper());
359                    }
360                    batch = Arc::new(HollowBatch::empty(Description::new(
361                        batch.desc.lower().clone(),
362                        new_upper,
363                        batch.desc.since().clone(),
364                    )))
365                }
366
367                if expected_desc.upper() != batch.desc.upper() {
368                    return Err(format!(
369                        "hollow batch upper {:?} did not match expected upper {:?}",
370                        batch.desc.upper().elements(),
371                        expected_desc.upper().elements()
372                    ));
373                }
374
375                Ok(IdHollowBatch { id, batch })
376            };
377
378        let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
379            (batch.desc.upper().clone(), id.1)
380        } else {
381            (Antichain::from_elem(T::minimum()), 0)
382        };
383        let levels = spine_batches
384            .first_key_value()
385            .map(|(_, batch)| batch.level + 1)
386            .unwrap_or(0);
387        let mut merging = vec![MergeState::default(); levels];
388        for (id, batch) in spine_batches {
389            let level = batch.level;
390
391            let descs = batch.descs.iter().map(Some).chain(std::iter::repeat_n(
392                None,
393                batch.parts.len() - batch.descs.len(),
394            ));
395            let parts = batch
396                .parts
397                .into_iter()
398                .zip_eq(descs)
399                .map(|(id, desc)| pop_batch(id, desc))
400                .collect::<Result<Vec<_>, _>>()?;
401            let len = parts.iter().map(|p| (*p).batch.len).sum();
402            let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
403            let batch = SpineBatch {
404                id,
405                desc: batch.desc,
406                parts,
407                active_compaction,
408                len,
409            };
410
411            let state = &mut merging[level];
412
413            state.push_batch(batch);
414            if let Some(id) = state.id() {
415                if let Some(merge) = merges.remove(&id) {
416                    state.merge = Some(IdFuelingMerge {
417                        id,
418                        merge: FuelingMerge {
419                            since: merge.since,
420                            remaining_work: merge.remaining_work,
421                        },
422                    })
423                }
424            }
425        }
426
427        let mut trace = Trace {
428            spine: Spine {
429                effort: 1,
430                next_id,
431                since,
432                upper,
433                merging,
434            },
435            roundtrip_structure,
436        };
437
438        fn check_empty(name: &str, len: usize) -> Result<(), String> {
439            if len != 0 {
440                Err(format!("{len} {name} left after reconstructing spine"))
441            } else {
442                Ok(())
443            }
444        }
445
446        if roundtrip_structure {
447            check_empty("legacy batches", legacy_batches.len())?;
448        } else {
449            // If the structure wasn't actually serialized, we may have legacy batches left over.
450            for batch in legacy_batches.into_iter().rev() {
451                trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
452            }
453        }
454        check_empty("hollow batches", hollow_batches.len())?;
455        check_empty("merges", merges.len())?;
456
457        debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
458
459        Ok(trace)
460    }
461}
462
463#[derive(Clone, Debug, Default)]
464pub(crate) struct SpineMetrics {
465    pub compact_batches: u64,
466    pub compacting_batches: u64,
467    pub noncompact_batches: u64,
468}
469
470impl<T> Trace<T> {
471    pub fn since(&self) -> &Antichain<T> {
472        &self.spine.since
473    }
474
475    pub fn upper(&self) -> &Antichain<T> {
476        &self.spine.upper
477    }
478
479    pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
480        for batch in self.batches() {
481            f(batch);
482        }
483    }
484
485    pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
486        self.spine
487            .spine_batches()
488            .flat_map(|b| b.parts.as_slice())
489            .map(|b| &*b.batch)
490    }
491
492    pub fn num_spine_batches(&self) -> usize {
493        self.spine.spine_batches().count()
494    }
495
496    #[cfg(test)]
497    pub fn num_hollow_batches(&self) -> usize {
498        self.batches().count()
499    }
500
501    #[cfg(test)]
502    pub fn num_updates(&self) -> usize {
503        self.batches().map(|b| b.len).sum()
504    }
505}
506
507impl<T: Timestamp + Lattice> Trace<T> {
508    pub fn downgrade_since(&mut self, since: &Antichain<T>) {
509        self.spine.since.clone_from(since);
510    }
511
512    #[must_use]
513    pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
514        let mut merge_reqs = Vec::new();
515        self.spine.insert(
516            batch,
517            &mut SpineLog::Enabled {
518                merge_reqs: &mut merge_reqs,
519            },
520        );
521        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
522        // Spine::roll_up (internally used by insert) clears all batches out of
523        // levels below a target by walking up from level 0 and merging each
524        // level into the next (providing the necessary fuel). In practice, this
525        // means we'll get a series of requests like `(a, b), (a, b, c), ...`.
526        // It's a waste to do all of these (we'll throw away the results), so we
527        // filter out any that are entirely covered by some other request.
528        Self::remove_redundant_merge_reqs(merge_reqs)
529    }
530
531    pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
532        // TODO: we ought to be able to look up the id for a batch by binary searching the levels.
533        // In the meantime, search backwards, since most compactions are for recent batches.
534        for batch in self.spine.spine_batches_mut().rev() {
535            if batch.id == id {
536                batch.active_compaction = Some(compaction);
537                break;
538            }
539        }
540    }
541
542    /// The same as [Self::push_batch] but without the `FueledMergeReq`s, which
543    /// account for a surprising amount of cpu in prod. database-issues#5411
544    pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
545        self.spine.insert(batch, &mut SpineLog::Disabled);
546    }
547
548    /// Apply some amount of effort to trace maintenance.
549    ///
550    /// The units of effort are updates, and the method should be thought of as
551    /// analogous to inserting as many empty updates, where the trace is
552    /// permitted to perform proportionate work.
553    ///
554    /// Returns true if this did work and false if it left the spine unchanged.
555    #[must_use]
556    pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
557        let mut merge_reqs = Vec::new();
558        let did_work = self.spine.exert(
559            fuel,
560            &mut SpineLog::Enabled {
561                merge_reqs: &mut merge_reqs,
562            },
563        );
564        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
565        // See the comment in [Self::push_batch].
566        let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
567        (merge_reqs, did_work)
568    }
569
570    /// Validates invariants.
571    ///
572    /// See `Spine::validate` for details.
573    pub fn validate(&self) -> Result<(), String> {
574        self.spine.validate()
575    }
576
577    /// Obtain all fueled merge reqs that either have no active compaction, or the previous
578    /// compaction was started at or before the threshold time, in order from oldest to newest.
579    pub(crate) fn fueled_merge_reqs_before_ms(
580        &self,
581        threshold_ms: u64,
582        threshold_writer: Option<WriterKey>,
583    ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
584        self.spine
585            .spine_batches()
586            .filter(move |b| {
587                let noncompact = !b.is_compact();
588                let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
589                    b.parts.iter().any(|b| {
590                        b.batch
591                            .parts
592                            .iter()
593                            .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
594                    })
595                });
596                noncompact || old_writer
597            })
598            .filter(move |b| {
599                // Either there's no active compaction, or the last active compaction
600                // is not after the timeout timestamp.
601                b.active_compaction
602                    .as_ref()
603                    .map_or(true, move |c| c.start_ms <= threshold_ms)
604            })
605            .map(|b| FueledMergeReq {
606                id: b.id,
607                desc: b.desc.clone(),
608                inputs: b.parts.clone(),
609            })
610    }
611
612    // This is only called with the results of one `insert` and so the length of
613    // `merge_reqs` is bounded by the number of levels in the spine (or possibly
614    // some small constant multiple?). The number of levels is logarithmic in the
615    // number of updates in the spine, so this number should stay very small. As
616    // a result, we simply use the naive O(n^2) algorithm here instead of doing
617    // anything fancy with e.g. interval trees.
618    fn remove_redundant_merge_reqs(
619        mut merge_reqs: Vec<FueledMergeReq<T>>,
620    ) -> Vec<FueledMergeReq<T>> {
621        // Returns true if b0 covers b1, false otherwise.
622        fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
623            // TODO: can we relax or remove this since check?
624            b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
625        }
626
627        let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
628        // In practice, merge_reqs will come in sorted such that the "large"
629        // requests are later. Take advantage of this by processing back to
630        // front.
631        while let Some(merge_req) = merge_reqs.pop() {
632            let covered = ret.iter().any(|r| covers(r, &merge_req));
633            if !covered {
634                // Now check if anything we've already staged is covered by this
635                // new req. In practice, the merge_reqs come in sorted and so
636                // this `retain` is a no-op.
637                ret.retain(|r| !covers(&merge_req, r));
638                ret.push(merge_req);
639            }
640        }
641        ret
642    }
643
644    pub fn spine_metrics(&self) -> SpineMetrics {
645        let mut metrics = SpineMetrics::default();
646        for batch in self.spine.spine_batches() {
647            if batch.is_compact() {
648                metrics.compact_batches += 1;
649            } else if batch.is_merging() {
650                metrics.compacting_batches += 1;
651            } else {
652                metrics.noncompact_batches += 1;
653            }
654        }
655        metrics
656    }
657}
658
659impl<T: Timestamp + Lattice + Codec64> Trace<T> {
660    pub fn apply_merge_res_checked_classic<D: Codec64 + Semigroup + PartialEq>(
661        &mut self,
662        res: &FueledMergeRes<T>,
663        metrics: &ColumnarMetrics,
664    ) -> ApplyMergeResult {
665        for batch in self.spine.spine_batches_mut().rev() {
666            let result = batch.maybe_replace_checked_classic::<D>(res, metrics);
667            if result.matched() {
668                return result;
669            }
670        }
671        ApplyMergeResult::NotAppliedNoMatch
672    }
673
674    pub fn apply_merge_res_checked<D: Codec64 + Semigroup + PartialEq>(
675        &mut self,
676        res: &FueledMergeRes<T>,
677        metrics: &ColumnarMetrics,
678    ) -> ApplyMergeResult {
679        for batch in self.spine.spine_batches_mut().rev() {
680            let result = batch.maybe_replace_checked::<D>(res, metrics);
681            if result.matched() {
682                return result;
683            }
684        }
685        ApplyMergeResult::NotAppliedNoMatch
686    }
687
688    pub fn apply_merge_res_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
689        for batch in self.spine.spine_batches_mut().rev() {
690            let result = batch.maybe_replace_unchecked(res);
691            if result.matched() {
692                return result;
693            }
694        }
695        ApplyMergeResult::NotAppliedNoMatch
696    }
697
698    pub fn apply_tombstone_merge(&mut self, desc: &Description<T>) -> ApplyMergeResult {
699        for batch in self.spine.spine_batches_mut().rev() {
700            let result = batch.maybe_replace_with_tombstone(desc);
701            if result.matched() {
702                return result;
703            }
704        }
705        ApplyMergeResult::NotAppliedNoMatch
706    }
707}
708
709/// A log of what transitively happened during a Spine operation: e.g.
710/// FueledMergeReqs were generated.
711enum SpineLog<'a, T> {
712    Enabled {
713        merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
714    },
715    Disabled,
716}
717
718#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
719pub enum CompactionInput {
720    /// We don't know what our inputs were; this should only be used for
721    /// unchecked legacy replacements.
722    Legacy,
723    /// This compaction output is a total replacement for all batches in this id range.
724    IdRange(SpineId),
725    /// This compaction output replaces the specified runs in this id range.
726    PartialBatch(SpineId, BTreeSet<RunId>),
727}
728
729#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
730pub struct SpineId(pub usize, pub usize);
731
732impl Serialize for SpineId {
733    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
734    where
735        S: Serializer,
736    {
737        let SpineId(lo, hi) = self;
738        serializer.serialize_str(&format!("{lo}-{hi}"))
739    }
740}
741
742/// Creates a `SpineId` that covers the range of ids in the set.
743pub fn id_range(ids: BTreeSet<SpineId>) -> SpineId {
744    let lower_spine_bound = ids
745        .first()
746        .map(|id| id.0)
747        .expect("at least one batch must be present");
748    let upper_spine_bound = ids
749        .last()
750        .map(|id| id.1)
751        .expect("at least one batch must be present");
752
753    SpineId(lower_spine_bound, upper_spine_bound)
754}
755
756impl SpineId {
757    fn covers(self, other: SpineId) -> bool {
758        self.0 <= other.0 && other.1 <= self.1
759    }
760}
761
762#[derive(Debug, Clone, PartialEq)]
763pub struct IdHollowBatch<T> {
764    pub id: SpineId,
765    pub batch: Arc<HollowBatch<T>>,
766}
767
768#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
769pub struct ActiveCompaction {
770    pub start_ms: u64,
771}
772
773#[derive(Debug, Clone, PartialEq)]
774struct SpineBatch<T> {
775    id: SpineId,
776    desc: Description<T>,
777    parts: Vec<IdHollowBatch<T>>,
778    active_compaction: Option<ActiveCompaction>,
779    // A cached version of parts.iter().map(|x| x.len).sum()
780    len: usize,
781}
782
783impl<T> SpineBatch<T> {
784    fn merged(batch: IdHollowBatch<T>) -> Self
785    where
786        T: Clone,
787    {
788        Self {
789            id: batch.id,
790            desc: batch.batch.desc.clone(),
791            len: batch.batch.len,
792            parts: vec![batch],
793            active_compaction: None,
794        }
795    }
796}
797
798#[derive(Debug, Copy, Clone)]
799pub enum ApplyMergeResult {
800    AppliedExact,
801    AppliedSubset,
802    NotAppliedNoMatch,
803    NotAppliedInvalidSince,
804    NotAppliedTooManyUpdates,
805}
806
807impl ApplyMergeResult {
808    pub fn applied(&self) -> bool {
809        match self {
810            ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
811            _ => false,
812        }
813    }
814    pub fn matched(&self) -> bool {
815        match self {
816            ApplyMergeResult::AppliedExact
817            | ApplyMergeResult::AppliedSubset
818            | ApplyMergeResult::NotAppliedTooManyUpdates => true,
819            _ => false,
820        }
821    }
822}
823
824impl<T: Timestamp + Lattice> SpineBatch<T> {
825    pub fn lower(&self) -> &Antichain<T> {
826        self.desc().lower()
827    }
828
829    pub fn upper(&self) -> &Antichain<T> {
830        self.desc().upper()
831    }
832
833    fn id(&self) -> SpineId {
834        debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
835        debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
836        self.id
837    }
838
839    pub fn is_compact(&self) -> bool {
840        // A compact batch has at most one run.
841        // This check used to be if there was at most one hollow batch with at most one run,
842        // but that was a bit too strict since introducing incremental compaction.
843        // Incremental compaction can result in a batch with a single run, but multiple empty
844        // hollow batches, which we still consider compact. As levels are merged, we
845        // will eventually clean up the empty hollow batches.
846        self.parts
847            .iter()
848            .map(|p| p.batch.run_meta.len())
849            .sum::<usize>()
850            <= 1
851    }
852
853    pub fn is_merging(&self) -> bool {
854        self.active_compaction.is_some()
855    }
856
857    fn desc(&self) -> &Description<T> {
858        &self.desc
859    }
860
861    pub fn len(&self) -> usize {
862        // NB: This is an upper bound on len for a non-compact batch; we won't know for sure until
863        // we compact it.
864        debug_assert_eq!(
865            self.len,
866            self.parts.iter().map(|x| x.batch.len).sum::<usize>()
867        );
868        self.len
869    }
870
871    pub fn is_empty(&self) -> bool {
872        self.len() == 0
873    }
874
875    pub fn empty(
876        id: SpineId,
877        lower: Antichain<T>,
878        upper: Antichain<T>,
879        since: Antichain<T>,
880    ) -> Self {
881        SpineBatch::merged(IdHollowBatch {
882            id,
883            batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
884        })
885    }
886
887    pub fn begin_merge(
888        bs: &[Self],
889        compaction_frontier: Option<AntichainRef<T>>,
890    ) -> Option<IdFuelingMerge<T>> {
891        let from = bs.first()?.id().0;
892        let until = bs.last()?.id().1;
893        let id = SpineId(from, until);
894        let mut sinces = bs.iter().map(|b| b.desc().since());
895        let mut since = sinces.next()?.clone();
896        for b in bs {
897            since.join_assign(b.desc().since())
898        }
899        if let Some(compaction_frontier) = compaction_frontier {
900            since.join_assign(&compaction_frontier.to_owned());
901        }
902        let remaining_work = bs.iter().map(|x| x.len()).sum();
903        Some(IdFuelingMerge {
904            id,
905            merge: FuelingMerge {
906                since,
907                remaining_work,
908            },
909        })
910    }
911
912    #[cfg(test)]
913    fn describe(&self, extended: bool) -> String {
914        let SpineBatch {
915            id,
916            parts,
917            desc,
918            active_compaction,
919            len,
920        } = self;
921        let compaction = match active_compaction {
922            None => "".to_owned(),
923            Some(c) => format!(" (c@{})", c.start_ms),
924        };
925        match extended {
926            false => format!(
927                "[{}-{}]{:?}{:?}{}/{}{compaction}",
928                id.0,
929                id.1,
930                desc.lower().elements(),
931                desc.upper().elements(),
932                parts.len(),
933                len
934            ),
935            true => {
936                format!(
937                    "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
938                    id.0,
939                    id.1,
940                    desc.lower().elements(),
941                    desc.upper().elements(),
942                    desc.since().elements(),
943                    parts.len(),
944                    len,
945                    parts
946                        .iter()
947                        .flat_map(|x| x.batch.parts.iter())
948                        .map(|x| format!(" {}", x.printable_name()))
949                        .collect::<Vec<_>>()
950                        .join("")
951                )
952            }
953        }
954    }
955}
956
957impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
958    fn diffs_sum<'a, D: Semigroup + Codec64>(
959        parts: impl Iterator<Item = &'a RunPart<T>>,
960        metrics: &ColumnarMetrics,
961    ) -> Option<D> {
962        parts
963            .map(|p| p.diffs_sum::<D>(metrics))
964            .reduce(|a, b| match (a, b) {
965                (Some(mut a), Some(b)) => {
966                    a.plus_equals(&b);
967                    Some(a)
968                }
969                _ => None,
970            })
971            .flatten()
972    }
973
974    fn diffs_sum_for_runs<D: Semigroup + Codec64>(
975        batch: &HollowBatch<T>,
976        run_ids: &[RunId],
977        metrics: &ColumnarMetrics,
978    ) -> Option<D> {
979        if run_ids.is_empty() {
980            return None;
981        }
982
983        let parts = batch
984            .runs()
985            .filter(|(meta, _)| {
986                run_ids.contains(&meta.id.expect("id should be present at this point"))
987            })
988            .flat_map(|(_, parts)| parts);
989
990        Self::diffs_sum(parts, metrics)
991    }
992
993    fn maybe_replace_with_tombstone(&mut self, desc: &Description<T>) -> ApplyMergeResult {
994        let exact_match =
995            desc.lower() == self.desc().lower() && desc.upper() == self.desc().upper();
996
997        let empty_batch = HollowBatch::empty(desc.clone());
998        if exact_match {
999            *self = SpineBatch::merged(IdHollowBatch {
1000                id: self.id(),
1001                batch: Arc::new(empty_batch),
1002            });
1003            return ApplyMergeResult::AppliedExact;
1004        }
1005
1006        if let Some((id, range)) = self.find_replacement_range(desc) {
1007            self.perform_subset_replacement(&empty_batch, id, range, None)
1008        } else {
1009            ApplyMergeResult::NotAppliedNoMatch
1010        }
1011    }
1012
1013    fn construct_batch_with_runs_replaced(
1014        original: &HollowBatch<T>,
1015        run_ids: &[RunId],
1016        replacement: &HollowBatch<T>,
1017    ) -> Result<HollowBatch<T>, ApplyMergeResult> {
1018        if run_ids.is_empty() {
1019            return Err(ApplyMergeResult::NotAppliedNoMatch);
1020        }
1021
1022        let orig_run_ids: BTreeSet<_> = original.runs().filter_map(|(meta, _)| meta.id).collect();
1023        let run_ids: BTreeSet<_> = run_ids.iter().cloned().collect();
1024        if !orig_run_ids.is_superset(&run_ids) {
1025            return Err(ApplyMergeResult::NotAppliedNoMatch);
1026        }
1027
1028        let runs: Vec<_> = original
1029            .runs()
1030            .filter(|(meta, _)| {
1031                !run_ids.contains(&meta.id.expect("id should be present at this point"))
1032            })
1033            .chain(replacement.runs())
1034            .collect();
1035
1036        let len = runs.iter().filter_map(|(meta, _)| meta.len).sum::<usize>();
1037
1038        let run_meta = runs
1039            .iter()
1040            .map(|(meta, _)| *meta)
1041            .cloned()
1042            .collect::<Vec<_>>();
1043
1044        let parts = runs
1045            .iter()
1046            .flat_map(|(_, parts)| *parts)
1047            .cloned()
1048            .collect::<Vec<_>>();
1049
1050        let run_splits = {
1051            let mut splits = Vec::with_capacity(run_meta.len().saturating_sub(1));
1052            let mut pointer = 0;
1053            for (i, (_, parts)) in runs.into_iter().enumerate() {
1054                if parts.is_empty() {
1055                    continue;
1056                }
1057                if i < run_meta.len() - 1 {
1058                    splits.push(pointer + parts.len());
1059                }
1060                pointer += parts.len();
1061            }
1062            splits
1063        };
1064
1065        Ok(HollowBatch::new(
1066            replacement.desc.clone(),
1067            parts,
1068            len,
1069            run_meta,
1070            run_splits,
1071        ))
1072    }
1073
1074    fn maybe_replace_checked<D>(
1075        &mut self,
1076        res: &FueledMergeRes<T>,
1077        metrics: &ColumnarMetrics,
1078    ) -> ApplyMergeResult
1079    where
1080        D: Semigroup + Codec64 + PartialEq + Debug,
1081    {
1082        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1083        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1084        // since must be in advance of the merge res since.
1085        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1086            return ApplyMergeResult::NotAppliedInvalidSince;
1087        }
1088
1089        let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1090        let num_batches = self.parts.len();
1091
1092        let result = match &res.input {
1093            CompactionInput::IdRange(id) => {
1094                self.handle_id_range_replacement::<D>(res, id, new_diffs_sum, metrics)
1095            }
1096            CompactionInput::PartialBatch(id, runs) => {
1097                self.handle_partial_batch_replacement::<D>(res, *id, runs, new_diffs_sum, metrics)
1098            }
1099            CompactionInput::Legacy => {
1100                error!("legacy compaction input is not supported");
1101                return ApplyMergeResult::NotAppliedNoMatch;
1102            }
1103        };
1104
1105        let num_batches_after = self.parts.len();
1106        assert!(
1107            num_batches_after <= num_batches,
1108            "replacing parts should not increase the number of batches"
1109        );
1110        result
1111    }
1112
1113    fn handle_id_range_replacement<D>(
1114        &mut self,
1115        res: &FueledMergeRes<T>,
1116        id: &SpineId,
1117        new_diffs_sum: Option<D>,
1118        metrics: &ColumnarMetrics,
1119    ) -> ApplyMergeResult
1120    where
1121        D: Semigroup + Codec64 + PartialEq + Debug,
1122    {
1123        let range = self
1124            .parts
1125            .iter()
1126            .enumerate()
1127            .filter_map(|(i, p)| {
1128                if id.covers(p.id) {
1129                    Some((i, p.id))
1130                } else {
1131                    None
1132                }
1133            })
1134            .collect::<Vec<_>>();
1135
1136        let ids: BTreeSet<_> = range.iter().map(|(_, id)| *id).collect();
1137
1138        // If ids is empty, it means that we didn't find any parts that match the id range.
1139        // We also check that the id matches the range of ids we found.
1140        // At scale, sometimes regular compaction will race forced compaction,
1141        // for things like the catalog. In that case, we may have a
1142        // a replacement that no longer lines up with the spine batches.
1143        // I think this is because forced compaction ignores the active_compaction
1144        // and just goes for it. This is slightly annoying but probably the right behavior
1145        // for a functions whose prefix is `force_`, so we just return
1146        // NotAppliedNoMatch here.
1147        if ids.is_empty() || id != &id_range(ids) {
1148            return ApplyMergeResult::NotAppliedNoMatch;
1149        }
1150
1151        let range: BTreeSet<_> = range.iter().map(|(i, _)| *i).collect();
1152
1153        // This is the range of hollow batches that we will replace.
1154        let min = *range.iter().min().unwrap();
1155        let max = *range.iter().max().unwrap();
1156        let replacement_range = min..max + 1;
1157
1158        // We need to replace a range of parts. Here we don't care about the run_indices
1159        // because we must be replacing the entire part(s)
1160        let old_diffs_sum = Self::diffs_sum::<D>(
1161            self.parts[replacement_range.clone()]
1162                .iter()
1163                .flat_map(|p| p.batch.parts.iter()),
1164            metrics,
1165        );
1166
1167        self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "id range replacement");
1168
1169        self.perform_subset_replacement(
1170            &res.output,
1171            *id,
1172            replacement_range,
1173            res.new_active_compaction.clone(),
1174        )
1175    }
1176
1177    fn handle_partial_batch_replacement<D>(
1178        &mut self,
1179        res: &FueledMergeRes<T>,
1180        id: SpineId,
1181        runs: &BTreeSet<RunId>,
1182        new_diffs_sum: Option<D>,
1183        metrics: &ColumnarMetrics,
1184    ) -> ApplyMergeResult
1185    where
1186        D: Semigroup + Codec64 + PartialEq + Debug,
1187    {
1188        if runs.is_empty() {
1189            return ApplyMergeResult::NotAppliedNoMatch;
1190        }
1191
1192        let part = self.parts.iter().enumerate().find(|(_, p)| p.id == id);
1193        let Some((i, batch)) = part else {
1194            return ApplyMergeResult::NotAppliedNoMatch;
1195        };
1196        let replacement_range = i..(i + 1);
1197
1198        let batch = &batch.batch;
1199        let run_ids = runs.iter().cloned().collect::<Vec<_>>();
1200
1201        let old_batch_diff_sum = Self::diffs_sum::<D>(batch.parts.iter(), metrics);
1202        let old_diffs_sum = Self::diffs_sum_for_runs::<D>(batch, &run_ids, metrics);
1203
1204        self.validate_diffs_sum_match(old_diffs_sum, new_diffs_sum, "partial batch replacement");
1205
1206        match Self::construct_batch_with_runs_replaced(batch, &run_ids, &res.output) {
1207            Ok(new_batch) => {
1208                let new_batch_diff_sum = Self::diffs_sum::<D>(new_batch.parts.iter(), metrics);
1209                self.validate_diffs_sum_match(
1210                    old_batch_diff_sum,
1211                    new_batch_diff_sum,
1212                    "sanity checking diffs sum for replaced runs",
1213                );
1214                self.perform_subset_replacement(
1215                    &new_batch,
1216                    id,
1217                    replacement_range,
1218                    res.new_active_compaction.clone(),
1219                )
1220            }
1221            Err(err) => err,
1222        }
1223    }
1224
1225    fn validate_diffs_sum_match<D>(
1226        &self,
1227        old_diffs_sum: Option<D>,
1228        new_diffs_sum: Option<D>,
1229        context: &str,
1230    ) where
1231        D: Semigroup + Codec64 + PartialEq + Debug,
1232    {
1233        match (new_diffs_sum, old_diffs_sum) {
1234            (None, Some(old)) => {
1235                if !D::is_zero(&old) {
1236                    panic!(
1237                        "merge res diffs sum is None, but spine batch diffs sum ({:?}) is not zero ({})",
1238                        old, context
1239                    );
1240                }
1241            }
1242            (Some(new_diffs_sum), Some(old_diffs_sum)) => {
1243                assert_eq!(
1244                    old_diffs_sum, new_diffs_sum,
1245                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?}) ({})",
1246                    new_diffs_sum, old_diffs_sum, context
1247                );
1248            }
1249            _ => {}
1250        };
1251    }
1252
1253    /// This is the "legacy" way of replacing a spine batch with a merge result.
1254    /// It is used in moments when we don't have the full compaction input
1255    /// information.
1256    /// Eventually we should strive to roundtrip Spine IDs everywhere and
1257    /// deprecate this method.
1258    fn maybe_replace_checked_classic<D>(
1259        &mut self,
1260        res: &FueledMergeRes<T>,
1261        metrics: &ColumnarMetrics,
1262    ) -> ApplyMergeResult
1263    where
1264        D: Semigroup + Codec64 + PartialEq + Debug,
1265    {
1266        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1267        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1268        // since must be in advance of the merge res since.
1269        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1270            return ApplyMergeResult::NotAppliedInvalidSince;
1271        }
1272
1273        let new_diffs_sum = Self::diffs_sum(res.output.parts.iter(), metrics);
1274
1275        // If our merge result exactly matches a spine batch, we can swap it in directly
1276        let exact_match = res.output.desc.lower() == self.desc().lower()
1277            && res.output.desc.upper() == self.desc().upper();
1278        if exact_match {
1279            let old_diffs_sum = Self::diffs_sum::<D>(
1280                self.parts.iter().flat_map(|p| p.batch.parts.iter()),
1281                metrics,
1282            );
1283
1284            if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1285                assert_eq!(
1286                    old_diffs_sum, new_diffs_sum,
1287                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1288                    new_diffs_sum, old_diffs_sum
1289                );
1290            }
1291
1292            // Spine internally has an invariant about a batch being at some level
1293            // or higher based on the len. We could end up violating this invariant
1294            // if we increased the length of the batch.
1295            //
1296            // A res output with length greater than the existing spine batch implies
1297            // a compaction has already been applied to this range, and with a higher
1298            // rate of consolidation than this one. This could happen as a result of
1299            // compaction's memory bound limiting the amount of consolidation possible.
1300            if res.output.len > self.len() {
1301                return ApplyMergeResult::NotAppliedTooManyUpdates;
1302            }
1303            *self = SpineBatch::merged(IdHollowBatch {
1304                id: self.id(),
1305                batch: Arc::new(res.output.clone()),
1306            });
1307            return ApplyMergeResult::AppliedExact;
1308        }
1309
1310        // Try subset replacement
1311        if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1312            let old_diffs_sum = Self::diffs_sum::<D>(
1313                self.parts[range.clone()]
1314                    .iter()
1315                    .flat_map(|p| p.batch.parts.iter()),
1316                metrics,
1317            );
1318
1319            if let (Some(old_diffs_sum), Some(new_diffs_sum)) = (old_diffs_sum, new_diffs_sum) {
1320                assert_eq!(
1321                    old_diffs_sum, new_diffs_sum,
1322                    "merge res diffs sum ({:?}) did not match spine batch diffs sum ({:?})",
1323                    new_diffs_sum, old_diffs_sum
1324                );
1325            }
1326
1327            self.perform_subset_replacement(
1328                &res.output,
1329                id,
1330                range,
1331                res.new_active_compaction.clone(),
1332            )
1333        } else {
1334            ApplyMergeResult::NotAppliedNoMatch
1335        }
1336    }
1337
1338    /// This is the even more legacy way of replacing a spine batch with a merge result.
1339    /// It is used in moments when we don't have the full compaction input
1340    /// information, and we don't have the diffs sum.
1341    /// Eventually we should strive to roundtrip Spine IDs and diffs sums everywhere and
1342    /// deprecate this method.
1343    fn maybe_replace_unchecked(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
1344        // The spine's and merge res's sinces don't need to match (which could occur if Spine
1345        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
1346        // since must be in advance of the merge res since.
1347        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
1348            return ApplyMergeResult::NotAppliedInvalidSince;
1349        }
1350
1351        // If our merge result exactly matches a spine batch, we can swap it in directly
1352        let exact_match = res.output.desc.lower() == self.desc().lower()
1353            && res.output.desc.upper() == self.desc().upper();
1354        if exact_match {
1355            // Spine internally has an invariant about a batch being at some level
1356            // or higher based on the len. We could end up violating this invariant
1357            // if we increased the length of the batch.
1358            //
1359            // A res output with length greater than the existing spine batch implies
1360            // a compaction has already been applied to this range, and with a higher
1361            // rate of consolidation than this one. This could happen as a result of
1362            // compaction's memory bound limiting the amount of consolidation possible.
1363            if res.output.len > self.len() {
1364                return ApplyMergeResult::NotAppliedTooManyUpdates;
1365            }
1366
1367            *self = SpineBatch::merged(IdHollowBatch {
1368                id: self.id(),
1369                batch: Arc::new(res.output.clone()),
1370            });
1371            return ApplyMergeResult::AppliedExact;
1372        }
1373
1374        // Try subset replacement
1375        if let Some((id, range)) = self.find_replacement_range(&res.output.desc) {
1376            self.perform_subset_replacement(
1377                &res.output,
1378                id,
1379                range,
1380                res.new_active_compaction.clone(),
1381            )
1382        } else {
1383            ApplyMergeResult::NotAppliedNoMatch
1384        }
1385    }
1386
1387    /// Find the range of parts that can be replaced by the merge result
1388    fn find_replacement_range(&self, desc: &Description<T>) -> Option<(SpineId, Range<usize>)> {
1389        // It is possible the structure of the spine has changed since the merge res
1390        // was created, such that it no longer exactly matches the description of a
1391        // spine batch. This can happen if another merge has happened in the interim,
1392        // or if spine needed to be rebuilt from state.
1393        //
1394        // When this occurs, we can still attempt to slot the merge res in to replace
1395        // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts
1396        // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res.
1397
1398        let mut lower = None;
1399        let mut upper = None;
1400
1401        for (i, batch) in self.parts.iter().enumerate() {
1402            if batch.batch.desc.lower() == desc.lower() {
1403                lower = Some((i, batch.id.0));
1404            }
1405            if batch.batch.desc.upper() == desc.upper() {
1406                upper = Some((i, batch.id.1));
1407            }
1408            if lower.is_some() && upper.is_some() {
1409                break;
1410            }
1411        }
1412
1413        match (lower, upper) {
1414            (Some((lower_idx, id_lower)), Some((upper_idx, id_upper))) => {
1415                Some((SpineId(id_lower, id_upper), lower_idx..(upper_idx + 1)))
1416            }
1417            _ => None,
1418        }
1419    }
1420
1421    /// Perform the actual subset replacement
1422    fn perform_subset_replacement(
1423        &mut self,
1424        res: &HollowBatch<T>,
1425        spine_id: SpineId,
1426        range: Range<usize>,
1427        new_active_compaction: Option<ActiveCompaction>,
1428    ) -> ApplyMergeResult {
1429        let SpineBatch {
1430            id,
1431            parts,
1432            desc,
1433            active_compaction: _,
1434            len: _,
1435        } = self;
1436
1437        let mut new_parts = vec![];
1438        new_parts.extend_from_slice(&parts[..range.start]);
1439        new_parts.push(IdHollowBatch {
1440            id: spine_id,
1441            batch: Arc::new(res.clone()),
1442        });
1443        new_parts.extend_from_slice(&parts[range.end..]);
1444
1445        let new_spine_batch = SpineBatch {
1446            id: *id,
1447            desc: desc.to_owned(),
1448            len: new_parts.iter().map(|x| x.batch.len).sum(),
1449            parts: new_parts,
1450            active_compaction: new_active_compaction,
1451        };
1452
1453        if new_spine_batch.len() > self.len() {
1454            return ApplyMergeResult::NotAppliedTooManyUpdates;
1455        }
1456
1457        *self = new_spine_batch;
1458        ApplyMergeResult::AppliedSubset
1459    }
1460}
1461
1462#[derive(Debug, Clone, PartialEq, Serialize)]
1463pub struct FuelingMerge<T> {
1464    pub(crate) since: Antichain<T>,
1465    pub(crate) remaining_work: usize,
1466}
1467
1468#[derive(Debug, Clone, PartialEq, Serialize)]
1469pub struct IdFuelingMerge<T> {
1470    id: SpineId,
1471    merge: FuelingMerge<T>,
1472}
1473
1474impl<T: Timestamp + Lattice> FuelingMerge<T> {
1475    /// Perform some amount of work, decrementing `fuel`.
1476    ///
1477    /// If `fuel` is non-zero after the call, the merging is complete and one
1478    /// should call `done` to extract the merged results.
1479    // TODO(benesch): rewrite to avoid usage of `as`.
1480    #[allow(clippy::as_conversions)]
1481    fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
1482        let used = std::cmp::min(*fuel as usize, self.remaining_work);
1483        self.remaining_work = self.remaining_work.saturating_sub(used);
1484        *fuel -= used as isize;
1485    }
1486
1487    /// Extracts merged results.
1488    ///
1489    /// This method should only be called after `work` has been called and has
1490    /// not brought `fuel` to zero. Otherwise, the merge is still in progress.
1491    fn done(
1492        self,
1493        bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1494        log: &mut SpineLog<'_, T>,
1495    ) -> Option<SpineBatch<T>> {
1496        let first = bs.first()?;
1497        let last = bs.last()?;
1498        let id = SpineId(first.id().0, last.id().1);
1499        assert!(id.0 < id.1);
1500        let lower = first.desc().lower().clone();
1501        let upper = last.desc().upper().clone();
1502        let since = self.since;
1503
1504        // Special case empty batches.
1505        if bs.iter().all(SpineBatch::is_empty) {
1506            return Some(SpineBatch::empty(id, lower, upper, since));
1507        }
1508
1509        let desc = Description::new(lower, upper, since);
1510        let len = bs.iter().map(SpineBatch::len).sum();
1511
1512        // Pre-size the merged_parts Vec. Benchmarking has shown that, at least
1513        // in the worst case, the double iteration is absolutely worth having
1514        // merged_parts pre-sized.
1515        let mut merged_parts_len = 0;
1516        for b in &bs {
1517            merged_parts_len += b.parts.len();
1518        }
1519        let mut merged_parts = Vec::with_capacity(merged_parts_len);
1520        for b in bs {
1521            merged_parts.extend(b.parts)
1522        }
1523        // Sanity check the pre-size code.
1524        debug_assert_eq!(merged_parts.len(), merged_parts_len);
1525
1526        if let SpineLog::Enabled { merge_reqs } = log {
1527            merge_reqs.push(FueledMergeReq {
1528                id,
1529                desc: desc.clone(),
1530                inputs: merged_parts.clone(),
1531            });
1532        }
1533
1534        Some(SpineBatch {
1535            id,
1536            desc,
1537            len,
1538            parts: merged_parts,
1539            active_compaction: None,
1540        })
1541    }
1542}
1543
1544/// The maximum number of batches per level in the spine.
1545/// In practice, we probably want a larger max and a configurable soft cap, but using a
1546/// stack-friendly data structure and keeping this number low makes this safer during the
1547/// initial rollout.
1548const BATCHES_PER_LEVEL: usize = 2;
1549
1550/// An append-only collection of update batches.
1551///
1552/// The `Spine` is a general-purpose trace implementation based on collection
1553/// and merging immutable batches of updates. It is generic with respect to the
1554/// batch type, and can be instantiated for any implementor of `trace::Batch`.
1555///
1556/// ## Design
1557///
1558/// This spine is represented as a list of layers, where each element in the
1559/// list is either
1560///
1561///   1. MergeState::Vacant  empty
1562///   2. MergeState::Single  a single batch
1563///   3. MergeState::Double  a pair of batches
1564///
1565/// Each "batch" has the option to be `None`, indicating a non-batch that
1566/// nonetheless acts as a number of updates proportionate to the level at which
1567/// it exists (for bookkeeping).
1568///
1569/// Each of the batches at layer i contains at most 2^i elements. The sequence
1570/// of batches should have the upper bound of one match the lower bound of the
1571/// next. Batches may be logically empty, with matching upper and lower bounds,
1572/// as a bookkeeping mechanism.
1573///
1574/// Each batch at layer i is treated as if it contains exactly 2^i elements,
1575/// even though it may actually contain fewer elements. This allows us to
1576/// decouple the physical representation from logical amounts of effort invested
1577/// in each batch. It allows us to begin compaction and to reduce the number of
1578/// updates, without compromising our ability to continue to move updates along
1579/// the spine. We are explicitly making the trade-off that while some batches
1580/// might compact at lower levels, we want to treat them as if they contained
1581/// their full set of updates for accounting reasons (to apply work to higher
1582/// levels).
1583///
1584/// We maintain the invariant that for any in-progress merge at level k there
1585/// should be fewer than 2^k records at levels lower than k. That is, even if we
1586/// were to apply an unbounded amount of effort to those records, we would not
1587/// have enough records to prompt a merge into the in-progress merge. Ideally,
1588/// we maintain the extended invariant that for any in-progress merge at level
1589/// k, the remaining effort required (number of records minus applied effort) is
1590/// less than the number of records that would need to be added to reach 2^k
1591/// records in layers below.
1592///
1593/// ## Mathematics
1594///
1595/// When a merge is initiated, there should be a non-negative *deficit* of
1596/// updates before the layers below could plausibly produce a new batch for the
1597/// currently merging layer. We must determine a factor of proportionality, so
1598/// that newly arrived updates provide at least that amount of "fuel" towards
1599/// the merging layer, so that the merge completes before lower levels invade.
1600///
1601/// ### Deficit:
1602///
1603/// A new merge is initiated only in response to the completion of a prior
1604/// merge, or the introduction of new records from outside. The latter case is
1605/// special, and will maintain our invariant trivially, so we will focus on the
1606/// former case.
1607///
1608/// When a merge at level k completes, assuming we have maintained our invariant
1609/// then there should be fewer than 2^k records at lower levels. The newly
1610/// created merge at level k+1 will require up to 2^k+2 units of work, and
1611/// should not expect a new batch until strictly more than 2^k records are
1612/// added. This means that a factor of proportionality of four should be
1613/// sufficient to ensure that the merge completes before a new merge is
1614/// initiated.
1615///
1616/// When new records get introduced, we will need to roll up any batches at
1617/// lower levels, which we treat as the introduction of records. Each of these
1618/// virtual records introduced should either be accounted for the fuel it should
1619/// contribute, as it results in the promotion of batches closer to in-progress
1620/// merges.
1621///
1622/// ### Fuel sharing
1623///
1624/// We like the idea of applying fuel preferentially to merges at *lower*
1625/// levels, under the idea that they are easier to complete, and we benefit from
1626/// fewer total merges in progress. This does delay the completion of merges at
1627/// higher levels, and may not obviously be a total win. If we choose to do
1628/// this, we should make sure that we correctly account for completed merges at
1629/// low layers: they should still extract fuel from new updates even though they
1630/// have completed, at least until they have paid back any "debt" to higher
1631/// layers by continuing to provide fuel as updates arrive.
1632#[derive(Debug, Clone)]
1633struct Spine<T> {
1634    effort: usize,
1635    next_id: usize,
1636    since: Antichain<T>,
1637    upper: Antichain<T>,
1638    merging: Vec<MergeState<T>>,
1639}
1640
1641impl<T> Spine<T> {
1642    /// All batches in the spine, oldest to newest.
1643    pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1644        self.merging.iter().rev().flat_map(|m| &m.batches)
1645    }
1646
1647    /// All (mutable) batches in the spine, oldest to newest.
1648    pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1649        self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1650    }
1651}
1652
1653impl<T: Timestamp + Lattice> Spine<T> {
1654    /// Allocates a fueled `Spine`.
1655    ///
1656    /// This trace will merge batches progressively, with each inserted batch
1657    /// applying a multiple of the batch's length in effort to each merge. The
1658    /// `effort` parameter is that multiplier. This value should be at least one
1659    /// for the merging to happen; a value of zero is not helpful.
1660    pub fn new() -> Self {
1661        Spine {
1662            effort: 1,
1663            next_id: 0,
1664            since: Antichain::from_elem(T::minimum()),
1665            upper: Antichain::from_elem(T::minimum()),
1666            merging: Vec::new(),
1667        }
1668    }
1669
1670    /// Apply some amount of effort to trace maintenance.
1671    ///
1672    /// The units of effort are updates, and the method should be thought of as
1673    /// analogous to inserting as many empty updates, where the trace is
1674    /// permitted to perform proportionate work.
1675    ///
1676    /// Returns true if this did work and false if it left the spine unchanged.
1677    fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1678        self.tidy_layers();
1679        if self.reduced() {
1680            return false;
1681        }
1682
1683        if self.merging.iter().any(|b| b.merge.is_some()) {
1684            let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1685            // If any merges exist, we can directly call `apply_fuel`.
1686            self.apply_fuel(&fuel, log);
1687        } else {
1688            // Otherwise, we'll need to introduce fake updates to move merges
1689            // along.
1690
1691            // Introduce an empty batch with roughly *effort number of virtual updates.
1692            let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1693            let id = self.next_id();
1694            self.introduce_batch(
1695                SpineBatch::empty(
1696                    id,
1697                    self.upper.clone(),
1698                    self.upper.clone(),
1699                    self.since.clone(),
1700                ),
1701                level,
1702                log,
1703            );
1704        }
1705        true
1706    }
1707
1708    pub fn next_id(&mut self) -> SpineId {
1709        let id = self.next_id;
1710        self.next_id += 1;
1711        SpineId(id, self.next_id)
1712    }
1713
1714    // Ideally, this method acts as insertion of `batch`, even if we are not yet
1715    // able to begin merging the batch. This means it is a good time to perform
1716    // amortized work proportional to the size of batch.
1717    pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1718        assert!(batch.desc.lower() != batch.desc.upper());
1719        assert_eq!(batch.desc.lower(), &self.upper);
1720
1721        let id = self.next_id();
1722        let batch = SpineBatch::merged(IdHollowBatch {
1723            id,
1724            batch: Arc::new(batch),
1725        });
1726
1727        self.upper.clone_from(batch.upper());
1728
1729        // If `batch` and the most recently inserted batch are both empty,
1730        // we can just fuse them.
1731        if batch.is_empty() {
1732            if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1733                if self.merging[position].is_single() && self.merging[position].is_empty() {
1734                    self.insert_at(batch, position);
1735                    // Since we just inserted a batch, we should always have work to complete...
1736                    // but otherwise we just leave this layer vacant.
1737                    if let Some(merged) = self.complete_at(position, log) {
1738                        self.merging[position] = MergeState::single(merged);
1739                    }
1740                    return;
1741                }
1742            }
1743        }
1744
1745        // Normal insertion for the batch.
1746        let index = batch.len().next_power_of_two();
1747        self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1748    }
1749
1750    /// True iff there is at most one HollowBatch in `self.merging`.
1751    ///
1752    /// When true, there is no maintenance work to perform in the trace, other
1753    /// than compaction. We do not yet have logic in place to determine if
1754    /// compaction would improve a trace, so for now we are ignoring that.
1755    fn reduced(&self) -> bool {
1756        self.spine_batches()
1757            .flat_map(|b| b.parts.as_slice())
1758            .count()
1759            < 2
1760    }
1761
1762    /// Describes the merge progress of layers in the trace.
1763    ///
1764    /// Intended for diagnostics rather than public consumption.
1765    #[allow(dead_code)]
1766    fn describe(&self) -> Vec<(usize, usize)> {
1767        self.merging
1768            .iter()
1769            .map(|b| (b.batches.len(), b.len()))
1770            .collect()
1771    }
1772
1773    /// Introduces a batch at an indicated level.
1774    ///
1775    /// The level indication is often related to the size of the batch, but it
1776    /// can also be used to artificially fuel the computation by supplying empty
1777    /// batches at non-trivial indices, to move merges along.
1778    fn introduce_batch(
1779        &mut self,
1780        batch: SpineBatch<T>,
1781        batch_index: usize,
1782        log: &mut SpineLog<'_, T>,
1783    ) {
1784        // Step 0.  Determine an amount of fuel to use for the computation.
1785        //
1786        //          Fuel is used to drive maintenance of the data structure,
1787        //          and in particular are used to make progress through merges
1788        //          that are in progress. The amount of fuel to use should be
1789        //          proportional to the number of records introduced, so that
1790        //          we are guaranteed to complete all merges before they are
1791        //          required as arguments to merges again.
1792        //
1793        //          The fuel use policy is negotiable, in that we might aim
1794        //          to use relatively less when we can, so that we return
1795        //          control promptly, or we might account more work to larger
1796        //          batches. Not clear to me which are best, of if there
1797        //          should be a configuration knob controlling this.
1798
1799        // The amount of fuel to use is proportional to 2^batch_index, scaled by
1800        // a factor of self.effort which determines how eager we are in
1801        // performing maintenance work. We need to ensure that each merge in
1802        // progress receives fuel for each introduced batch, and so multiply by
1803        // that as well.
1804        if batch_index > 32 {
1805            println!("Large batch index: {}", batch_index);
1806        }
1807
1808        // We believe that eight units of fuel is sufficient for each introduced
1809        // record, accounted as four for each record, and a potential four more
1810        // for each virtual record associated with promoting existing smaller
1811        // batches. We could try and make this be less, or be scaled to merges
1812        // based on their deficit at time of instantiation. For now, we remain
1813        // conservative.
1814        let mut fuel = 8 << batch_index;
1815        // Scale up by the effort parameter, which is calibrated to one as the
1816        // minimum amount of effort.
1817        fuel *= self.effort;
1818        // Convert to an `isize` so we can observe any fuel shortfall.
1819        // TODO(benesch): avoid dangerous usage of `as`.
1820        #[allow(clippy::as_conversions)]
1821        let fuel = fuel as isize;
1822
1823        // Step 1.  Apply fuel to each in-progress merge.
1824        //
1825        //          Before we can introduce new updates, we must apply any
1826        //          fuel to in-progress merges, as this fuel is what ensures
1827        //          that the merges will be complete by the time we insert
1828        //          the updates.
1829        self.apply_fuel(&fuel, log);
1830
1831        // Step 2.  We must ensure the invariant that adjacent layers do not
1832        //          contain two batches will be satisfied when we insert the
1833        //          batch. We forcibly completing all merges at layers lower
1834        //          than and including `batch_index`, so that the new batch is
1835        //          inserted into an empty layer.
1836        //
1837        //          We could relax this to "strictly less than `batch_index`"
1838        //          if the layer above has only a single batch in it, which
1839        //          seems not implausible if it has been the focus of effort.
1840        //
1841        //          This should be interpreted as the introduction of some
1842        //          volume of fake updates, and we will need to fuel merges
1843        //          by a proportional amount to ensure that they are not
1844        //          surprised later on. The number of fake updates should
1845        //          correspond to the deficit for the layer, which perhaps
1846        //          we should track explicitly.
1847        self.roll_up(batch_index, log);
1848
1849        // Step 3. This insertion should be into an empty layer. It is a logical
1850        //         error otherwise, as we may be violating our invariant, from
1851        //         which all wonderment derives.
1852        self.insert_at(batch, batch_index);
1853
1854        // Step 4. Tidy the largest layers.
1855        //
1856        //         It is important that we not tidy only smaller layers,
1857        //         as their ascension is what ensures the merging and
1858        //         eventual compaction of the largest layers.
1859        self.tidy_layers();
1860    }
1861
1862    /// Ensures that an insertion at layer `index` will succeed.
1863    ///
1864    /// This method is subject to the constraint that all existing batches
1865    /// should occur at higher levels, which requires it to "roll up" batches
1866    /// present at lower levels before the method is called. In doing this, we
1867    /// should not introduce more virtual records than 2^index, as that is the
1868    /// amount of excess fuel we have budgeted for completing merges.
1869    fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1870        // Ensure entries sufficient for `index`.
1871        while self.merging.len() <= index {
1872            self.merging.push(MergeState::default());
1873        }
1874
1875        // We only need to roll up if there are non-vacant layers.
1876        if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1877            // Collect and merge all batches at layers up to but not including
1878            // `index`.
1879            let mut merged = None;
1880            for i in 0..index {
1881                if let Some(merged) = merged.take() {
1882                    self.insert_at(merged, i);
1883                }
1884                merged = self.complete_at(i, log);
1885            }
1886
1887            // The merged results should be introduced at level `index`, which
1888            // should be ready to absorb them (possibly creating a new merge at
1889            // the time).
1890            if let Some(merged) = merged {
1891                self.insert_at(merged, index);
1892            }
1893
1894            // If the insertion results in a merge, we should complete it to
1895            // ensure the upcoming insertion at `index` does not panic.
1896            if self.merging[index].is_full() {
1897                let merged = self.complete_at(index, log).expect("double batch");
1898                self.insert_at(merged, index + 1);
1899            }
1900        }
1901    }
1902
1903    /// Applies an amount of fuel to merges in progress.
1904    ///
1905    /// The supplied `fuel` is for each in progress merge, and if we want to
1906    /// spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we
1907    /// could do so in order to maintain fewer batches on average (at the risk
1908    /// of completing merges of large batches later, but tbh probably not much
1909    /// later).
1910    pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1911        // For the moment our strategy is to apply fuel independently to each
1912        // merge in progress, rather than prioritizing small merges. This sounds
1913        // like a great idea, but we need better accounting in place to ensure
1914        // that merges that borrow against later layers but then complete still
1915        // "acquire" fuel to pay back their debts.
1916        for index in 0..self.merging.len() {
1917            // Give each level independent fuel, for now.
1918            let mut fuel = *fuel;
1919            // Pass along various logging stuffs, in case we need to report
1920            // success.
1921            self.merging[index].work(&mut fuel);
1922            // `fuel` could have a deficit at this point, meaning we over-spent
1923            // when we took a merge step. We could ignore this, or maintain the
1924            // deficit and account future fuel against it before spending again.
1925            // It isn't clear why that would be especially helpful to do; we
1926            // might want to avoid overspends at multiple layers in the same
1927            // invocation (to limit latencies), but there is probably a rich
1928            // policy space here.
1929
1930            // If a merge completes, we can immediately merge it in to the next
1931            // level, which is "guaranteed" to be complete at this point, by our
1932            // fueling discipline.
1933            if self.merging[index].is_complete() {
1934                let complete = self.complete_at(index, log).expect("complete batch");
1935                self.insert_at(complete, index + 1);
1936            }
1937        }
1938    }
1939
1940    /// Inserts a batch at a specific location.
1941    ///
1942    /// This is a non-public internal method that can panic if we try and insert
1943    /// into a layer which already contains two batches (and is still in the
1944    /// process of merging).
1945    fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1946        // Ensure the spine is large enough.
1947        while self.merging.len() <= index {
1948            self.merging.push(MergeState::default());
1949        }
1950
1951        // Insert the batch at the location.
1952        let merging = &mut self.merging[index];
1953        merging.push_batch(batch);
1954        if merging.batches.is_full() {
1955            let compaction_frontier = Some(self.since.borrow());
1956            merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1957        }
1958    }
1959
1960    /// Completes and extracts what ever is at layer `index`, leaving this layer vacant.
1961    fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1962        self.merging[index].complete(log)
1963    }
1964
1965    /// Attempts to draw down large layers to size appropriate layers.
1966    fn tidy_layers(&mut self) {
1967        // If the largest layer is complete (not merging), we can attempt to
1968        // draw it down to the next layer. This is permitted if we can maintain
1969        // our invariant that below each merge there are at most half the
1970        // records that would be required to invade the merge.
1971        if !self.merging.is_empty() {
1972            let mut length = self.merging.len();
1973            if self.merging[length - 1].is_single() {
1974                // To move a batch down, we require that it contain few enough
1975                // records that the lower level is appropriate, and that moving
1976                // the batch would not create a merge violating our invariant.
1977                let appropriate_level = usize::cast_from(
1978                    self.merging[length - 1]
1979                        .len()
1980                        .next_power_of_two()
1981                        .trailing_zeros(),
1982                );
1983
1984                // Continue only as far as is appropriate
1985                while appropriate_level < length - 1 {
1986                    let current = &mut self.merging[length - 2];
1987                    if current.is_vacant() {
1988                        // Vacant batches can be absorbed.
1989                        self.merging.remove(length - 2);
1990                        length = self.merging.len();
1991                    } else {
1992                        if !current.is_full() {
1993                            // Single batches may initiate a merge, if sizes are
1994                            // within bounds, but terminate the loop either way.
1995
1996                            // Determine the number of records that might lead
1997                            // to a merge. Importantly, this is not the number
1998                            // of actual records, but the sum of upper bounds
1999                            // based on indices.
2000                            let mut smaller = 0;
2001                            for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
2002                                smaller += batch.batches.len() << index;
2003                            }
2004
2005                            if smaller <= (1 << length) / 8 {
2006                                // Remove the batch under consideration (shifting the deeper batches up a level),
2007                                // then merge in the single batch at the current level.
2008                                let state = self.merging.remove(length - 2);
2009                                assert_eq!(state.batches.len(), 1);
2010                                for batch in state.batches {
2011                                    self.insert_at(batch, length - 2);
2012                                }
2013                            }
2014                        }
2015                        break;
2016                    }
2017                }
2018            }
2019        }
2020    }
2021
2022    /// Checks invariants:
2023    /// - The lowers and uppers of all batches "line up".
2024    /// - The lower of the "minimum" batch is `antichain[T::minimum]`.
2025    /// - The upper of the "maximum" batch is `== self.upper`.
2026    /// - The since of each batch is `less_equal self.since`.
2027    /// - The `SpineIds` all "line up" and cover from `0` to `self.next_id`.
2028    /// - TODO: Verify fuel and level invariants.
2029    fn validate(&self) -> Result<(), String> {
2030        let mut id = SpineId(0, 0);
2031        let mut frontier = Antichain::from_elem(T::minimum());
2032        for x in self.merging.iter().rev() {
2033            if x.is_full() != x.merge.is_some() {
2034                return Err(format!(
2035                    "all (and only) full batches should have fueling merges (full={}, merge={:?})",
2036                    x.is_full(),
2037                    x.merge,
2038                ));
2039            }
2040
2041            if let Some(m) = &x.merge {
2042                if !x.is_full() {
2043                    return Err(format!(
2044                        "merge should only exist for full batches (len={:?}, merge={:?})",
2045                        x.batches.len(),
2046                        m.id,
2047                    ));
2048                }
2049                if x.id() != Some(m.id) {
2050                    return Err(format!(
2051                        "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
2052                        x.id(),
2053                        m.id,
2054                    ));
2055                }
2056            }
2057
2058            // TODO: Anything we can validate about x.merge? It'd
2059            // be nice to assert that it's bigger than the len of the
2060            // two batches, but apply_merge_res might swap those lengths
2061            // out from under us.
2062            for batch in &x.batches {
2063                if batch.id().0 != id.1 {
2064                    return Err(format!(
2065                        "batch id {:?} does not match the previous id {:?}: {:?}",
2066                        batch.id(),
2067                        id,
2068                        self
2069                    ));
2070                }
2071                id = batch.id();
2072                if batch.desc().lower() != &frontier {
2073                    return Err(format!(
2074                        "batch lower {:?} does not match the previous upper {:?}: {:?}",
2075                        batch.desc().lower(),
2076                        frontier,
2077                        self
2078                    ));
2079                }
2080                frontier.clone_from(batch.desc().upper());
2081                if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
2082                    return Err(format!(
2083                        "since of batch {:?} past the spine since {:?}: {:?}",
2084                        batch.desc().since(),
2085                        self.since,
2086                        self
2087                    ));
2088                }
2089            }
2090        }
2091        if self.next_id != id.1 {
2092            return Err(format!(
2093                "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
2094                self.next_id, id, self
2095            ));
2096        }
2097        if self.upper != frontier {
2098            return Err(format!(
2099                "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
2100                self.upper, frontier, self
2101            ));
2102        }
2103        Ok(())
2104    }
2105}
2106
2107/// Describes the state of a layer.
2108///
2109/// A layer can be empty, contain a single batch, or contain a pair of batches
2110/// that are in the process of merging into a batch for the next layer.
2111#[derive(Debug, Clone)]
2112struct MergeState<T> {
2113    batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
2114    merge: Option<IdFuelingMerge<T>>,
2115}
2116
2117impl<T> Default for MergeState<T> {
2118    fn default() -> Self {
2119        Self {
2120            batches: ArrayVec::new(),
2121            merge: None,
2122        }
2123    }
2124}
2125
2126impl<T: Timestamp + Lattice> MergeState<T> {
2127    /// An id that covers all the batches in the given merge state, assuming there are any.
2128    fn id(&self) -> Option<SpineId> {
2129        if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
2130            Some(SpineId(first.id().0, last.id().1))
2131        } else {
2132            None
2133        }
2134    }
2135
2136    /// A new single-batch merge state.
2137    fn single(batch: SpineBatch<T>) -> Self {
2138        let mut state = Self::default();
2139        state.push_batch(batch);
2140        state
2141    }
2142
2143    /// Push a new batch at this level, checking invariants.
2144    fn push_batch(&mut self, batch: SpineBatch<T>) {
2145        if let Some(last) = self.batches.last() {
2146            assert_eq!(last.id().1, batch.id().0);
2147            assert_eq!(last.upper(), batch.lower());
2148        }
2149        assert!(
2150            self.merge.is_none(),
2151            "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
2152            batch.id,
2153            self.batches.len(),
2154        );
2155        self.batches
2156            .try_push(batch)
2157            .expect("Attempted to insert batch into full layer!");
2158    }
2159
2160    /// The number of actual updates contained in the level.
2161    fn len(&self) -> usize {
2162        self.batches.iter().map(SpineBatch::len).sum()
2163    }
2164
2165    /// True if this merge state contains no updates.
2166    fn is_empty(&self) -> bool {
2167        self.batches.iter().all(SpineBatch::is_empty)
2168    }
2169
2170    /// True if this level contains no batches.
2171    fn is_vacant(&self) -> bool {
2172        self.batches.is_empty()
2173    }
2174
2175    /// True only for a single-batch state.
2176    fn is_single(&self) -> bool {
2177        self.batches.len() == 1
2178    }
2179
2180    /// True if this merge cannot hold any more batches.
2181    /// (i.e. for a binary merge tree, true if this layer holds two batches.)
2182    fn is_full(&self) -> bool {
2183        self.batches.is_full()
2184    }
2185
2186    /// Immediately complete any merge.
2187    ///
2188    /// The result is either a batch, if there is a non-trivial batch to return
2189    /// or `None` if there is no meaningful batch to return.
2190    ///
2191    /// There is the additional option of input batches.
2192    fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
2193        let mut this = mem::take(self);
2194        if this.batches.len() <= 1 {
2195            this.batches.pop()
2196        } else {
2197            // Merge the remaining batches, regardless of whether we have a fully fueled merge.
2198            let id_merge = this
2199                .merge
2200                .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
2201            id_merge.merge.done(this.batches, log)
2202        }
2203    }
2204
2205    /// True iff the layer is a complete merge, ready for extraction.
2206    fn is_complete(&self) -> bool {
2207        match &self.merge {
2208            Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
2209            None => false,
2210        }
2211    }
2212
2213    /// Performs a bounded amount of work towards a merge.
2214    fn work(&mut self, fuel: &mut isize) {
2215        // We only perform work for merges in progress.
2216        if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
2217            merge.work(&self.batches[..], fuel)
2218        }
2219    }
2220}
2221
2222#[cfg(test)]
2223pub mod datadriven {
2224    use mz_ore::fmt::FormatBuffer;
2225
2226    use crate::internal::datadriven::DirectiveArgs;
2227
2228    use super::*;
2229
2230    /// Shared state for a single [crate::internal::trace] [datadriven::TestFile].
2231    #[derive(Debug, Default)]
2232    pub struct TraceState {
2233        pub trace: Trace<u64>,
2234        pub merge_reqs: Vec<FueledMergeReq<u64>>,
2235    }
2236
2237    pub fn since_upper(
2238        datadriven: &TraceState,
2239        _args: DirectiveArgs,
2240    ) -> Result<String, anyhow::Error> {
2241        Ok(format!(
2242            "{:?}{:?}\n",
2243            datadriven.trace.since().elements(),
2244            datadriven.trace.upper().elements()
2245        ))
2246    }
2247
2248    pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
2249        let mut s = String::new();
2250        for b in datadriven.trace.spine.spine_batches() {
2251            s.push_str(b.describe(true).as_str());
2252            s.push('\n');
2253        }
2254        Ok(s)
2255    }
2256
2257    pub fn insert(
2258        datadriven: &mut TraceState,
2259        args: DirectiveArgs,
2260    ) -> Result<String, anyhow::Error> {
2261        for x in args
2262            .input
2263            .trim()
2264            .split('\n')
2265            .map(DirectiveArgs::parse_hollow_batch)
2266        {
2267            datadriven
2268                .merge_reqs
2269                .append(&mut datadriven.trace.push_batch(x));
2270        }
2271        Ok("ok\n".to_owned())
2272    }
2273
2274    pub fn downgrade_since(
2275        datadriven: &mut TraceState,
2276        args: DirectiveArgs,
2277    ) -> Result<String, anyhow::Error> {
2278        let since = args.expect("since");
2279        datadriven
2280            .trace
2281            .downgrade_since(&Antichain::from_elem(since));
2282        Ok("ok\n".to_owned())
2283    }
2284
2285    pub fn take_merge_req(
2286        datadriven: &mut TraceState,
2287        _args: DirectiveArgs,
2288    ) -> Result<String, anyhow::Error> {
2289        let mut s = String::new();
2290        for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
2291            write!(
2292                s,
2293                "{:?}{:?}{:?} {}\n",
2294                merge_req.desc.lower().elements(),
2295                merge_req.desc.upper().elements(),
2296                merge_req.desc.since().elements(),
2297                merge_req
2298                    .inputs
2299                    .iter()
2300                    .flat_map(|x| x.batch.parts.iter())
2301                    .map(|x| x.printable_name())
2302                    .collect::<Vec<_>>()
2303                    .join(" ")
2304            );
2305        }
2306        Ok(s)
2307    }
2308
2309    pub fn apply_merge_res(
2310        datadriven: &mut TraceState,
2311        args: DirectiveArgs,
2312    ) -> Result<String, anyhow::Error> {
2313        let res = FueledMergeRes {
2314            output: DirectiveArgs::parse_hollow_batch(args.input),
2315            input: CompactionInput::Legacy,
2316            new_active_compaction: None,
2317        };
2318        match datadriven.trace.apply_merge_res_unchecked(&res) {
2319            ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
2320            ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
2321            ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
2322            ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
2323            ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
2324        }
2325    }
2326}
2327
2328#[cfg(test)]
2329pub(crate) mod tests {
2330    use std::ops::Range;
2331
2332    use proptest::prelude::*;
2333    use semver::Version;
2334
2335    use crate::internal::state::tests::{any_hollow_batch, any_hollow_batch_with_exact_runs};
2336
2337    use super::*;
2338
2339    pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
2340        num_batches: Range<usize>,
2341    ) -> impl Strategy<Value = Trace<T>> {
2342        Strategy::prop_map(
2343            (
2344                any::<Option<T>>(),
2345                proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
2346                any::<bool>(),
2347                any::<u64>(),
2348            ),
2349            |(since, mut batches, roundtrip_structure, timeout_ms)| {
2350                let mut trace = Trace::<T>::default();
2351                trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
2352
2353                // Fix up the arbitrary HollowBatches so the lowers and uppers
2354                // align.
2355                batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
2356                let mut lower = Antichain::from_elem(T::minimum());
2357                for mut batch in batches {
2358                    // Overall trace since has to be past each batch's since.
2359                    if PartialOrder::less_than(trace.since(), batch.desc.since()) {
2360                        trace.downgrade_since(batch.desc.since());
2361                    }
2362                    batch.desc = Description::new(
2363                        lower.clone(),
2364                        batch.desc.upper().clone(),
2365                        batch.desc.since().clone(),
2366                    );
2367                    lower.clone_from(batch.desc.upper());
2368                    let _merge_req = trace.push_batch(batch);
2369                }
2370                let reqs: Vec<_> = trace
2371                    .fueled_merge_reqs_before_ms(timeout_ms, None)
2372                    .collect();
2373                for req in reqs {
2374                    trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
2375                }
2376                trace.roundtrip_structure = roundtrip_structure;
2377                trace
2378            },
2379        )
2380    }
2381
2382    #[mz_ore::test]
2383    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
2384    fn test_roundtrips() {
2385        fn check(trace: Trace<i64>) {
2386            trace.validate().unwrap();
2387            let flat = trace.flatten();
2388            let unflat = Trace::unflatten(flat).unwrap();
2389            assert_eq!(trace, unflat);
2390        }
2391
2392        proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
2393    }
2394
2395    #[mz_ore::test]
2396    fn fueled_merge_reqs() {
2397        let mut trace: Trace<u64> = Trace::default();
2398        let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
2399            0,
2400            10,
2401            &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
2402            1000,
2403        ));
2404
2405        assert!(fueled_reqs.is_empty());
2406        assert_eq!(
2407            trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
2408            0,
2409            "no merge reqs when not filtering by version"
2410        );
2411        assert_eq!(
2412            trace
2413                .fueled_merge_reqs_before_ms(
2414                    u64::MAX,
2415                    Some(WriterKey::for_version(&Version::new(0, 50, 0)))
2416                )
2417                .count(),
2418            0,
2419            "zero batches are older than a past version"
2420        );
2421        assert_eq!(
2422            trace
2423                .fueled_merge_reqs_before_ms(
2424                    u64::MAX,
2425                    Some(WriterKey::for_version(&Version::new(99, 99, 0)))
2426                )
2427                .count(),
2428            1,
2429            "one batch is older than a future version"
2430        );
2431    }
2432
2433    #[mz_ore::test]
2434    fn remove_redundant_merge_reqs() {
2435        fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
2436            FueledMergeReq {
2437                id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
2438                desc: Description::new(
2439                    Antichain::from_elem(lower),
2440                    Antichain::from_elem(upper),
2441                    Antichain::new(),
2442                ),
2443                inputs: vec![],
2444            }
2445        }
2446
2447        // Empty
2448        assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
2449
2450        // Single
2451        assert_eq!(
2452            Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
2453            vec![req(0, 1)]
2454        );
2455
2456        // Duplicate
2457        assert_eq!(
2458            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
2459            vec![req(0, 1)]
2460        );
2461
2462        // Nothing covered
2463        assert_eq!(
2464            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
2465            vec![req(1, 2), req(0, 1)]
2466        );
2467
2468        // Covered
2469        assert_eq!(
2470            Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
2471            vec![req(0, 3)]
2472        );
2473
2474        // Covered, lower equal
2475        assert_eq!(
2476            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
2477            vec![req(0, 3)]
2478        );
2479
2480        // Covered, upper equal
2481        assert_eq!(
2482            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
2483            vec![req(0, 3)]
2484        );
2485
2486        // Covered, unexpected order (doesn't happen in practice)
2487        assert_eq!(
2488            Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
2489            vec![req(0, 3)]
2490        );
2491
2492        // Partially overlapping
2493        assert_eq!(
2494            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
2495            vec![req(1, 3), req(0, 2)]
2496        );
2497
2498        // Partially overlapping, the other order
2499        assert_eq!(
2500            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
2501            vec![req(0, 2), req(1, 3)]
2502        );
2503
2504        // Different sinces (doesn't happen in practice)
2505        let req015 = FueledMergeReq {
2506            id: SpineId(0, 1),
2507            desc: Description::new(
2508                Antichain::from_elem(0),
2509                Antichain::from_elem(1),
2510                Antichain::from_elem(5),
2511            ),
2512            inputs: vec![],
2513        };
2514        assert_eq!(
2515            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2516            vec![req015, req(0, 1)]
2517        );
2518    }
2519
2520    #[mz_ore::test]
2521    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
2522    fn construct_batch_with_runs_replaced_test() {
2523        let batch_strategy = any_hollow_batch::<u64>();
2524        let to_replace_strategy = any_hollow_batch_with_exact_runs::<u64>(1);
2525
2526        let combined_strategy = (batch_strategy, to_replace_strategy)
2527            .prop_filter("non-empty batch", |(batch, _)| batch.run_meta.len() >= 1);
2528
2529        let final_strategy = combined_strategy.prop_flat_map(|(batch, to_replace)| {
2530            let batch_len = batch.run_meta.len();
2531            let batch_clone = batch.clone();
2532            let to_replace_clone = to_replace.clone();
2533
2534            proptest::collection::vec(any::<bool>(), batch_len)
2535                .prop_filter("at least one run selected", |mask| mask.iter().any(|&x| x))
2536                .prop_map(move |mask| {
2537                    let indices: Vec<usize> = mask
2538                        .iter()
2539                        .enumerate()
2540                        .filter_map(|(i, &selected)| if selected { Some(i) } else { None })
2541                        .collect();
2542                    (batch_clone.clone(), to_replace_clone.clone(), indices)
2543                })
2544        });
2545
2546        proptest!(|(
2547            (batch, to_replace, runs) in final_strategy
2548        )| {
2549            let original_run_ids: Vec<_> = batch.run_meta.iter().map(|x|
2550                x.id.unwrap().clone()
2551            ).collect();
2552
2553            let run_ids = runs.iter().map(|&i| original_run_ids[i].clone()).collect::<Vec<_>>();
2554
2555            let new_batch = SpineBatch::construct_batch_with_runs_replaced(
2556                &batch,
2557                &run_ids,
2558                &to_replace,
2559            ).unwrap();
2560
2561            prop_assert!(new_batch.run_meta.len() == batch.run_meta.len() - runs.len() + to_replace.run_meta.len());
2562        });
2563    }
2564}