mz_persist_client/internal/
trace.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An append-only collection of compactable update batches. The Spine below is
11//! a fork of Differential Dataflow's [Spine] with minimal modifications. The
12//! original Spine code is designed for incremental (via "fuel"ing) synchronous
13//! merge of in-memory batches. Persist doesn't want compaction to block
14//! incoming writes and, in fact, may in the future elect to push the work of
15//! compaction onto another machine entirely via RPC. As a result, we abuse the
16//! Spine code as follows:
17//!
18//! [Spine]: differential_dataflow::trace::implementations::spine_fueled::Spine
19//!
20//! - The normal Spine works in terms of [Batch] impls. A `Batch` is added to
21//!   the Spine. As progress is made, the Spine will merge two batches together
22//!   by: constructing a [Batch::Merger], giving it bits of fuel to
23//!   incrementally perform the merge (which spreads out the work, keeping
24//!   latencies even), and then once it's done fueling extracting the new single
25//!   output `Batch` and discarding the inputs.
26//! - Persist instead represents a batch of blob data with a [HollowBatch]
27//!   pointer which contains the normal `Batch` metadata plus the keys necessary
28//!   to retrieve the updates.
29//! - [SpineBatch] wraps `HollowBatch` and has a [FuelingMerge] companion
30//!   (analogous to `Batch::Merger`) that allows us to represent a merge as it
31//!   is fueling. Normally, this would represent real incremental compaction
32//!   progress, but in persist, it's simply a bookkeeping mechanism. Once fully
33//!   fueled, the `FuelingMerge` is turned into a fueled [SpineBatch],
34//!   which to the Spine is indistinguishable from a merged batch. At this
35//!   point, it is eligible for asynchronous compaction and a `FueledMergeReq`
36//!   is generated.
37//! - At any later point, this request may be answered via
38//!   [Trace::apply_merge_res]. This internally replaces the
39//!   `SpineBatch`, which has no effect on the structure of `Spine`
40//!   but replaces the metadata in persist's state to point at the
41//!   new batch.
42//! - `SpineBatch` is explictly allowed to accumulate a list of `HollowBatch`s.
43//!   This decouples compaction from Spine progress and also allows us to reduce
44//!   write amplification by merging `N` batches at once where `N` can be
45//!   greater than 2.
46//!
47//! [Batch]: differential_dataflow::trace::Batch
48//! [Batch::Merger]: differential_dataflow::trace::Batch::Merger
49
50use arrayvec::ArrayVec;
51use std::cmp::Ordering;
52use std::collections::BTreeMap;
53use std::fmt::Debug;
54use std::mem;
55use std::sync::Arc;
56
57use crate::internal::paths::WriterKey;
58use differential_dataflow::lattice::Lattice;
59use differential_dataflow::trace::Description;
60use mz_ore::cast::CastFrom;
61#[allow(unused_imports)] // False positive.
62use mz_ore::fmt::FormatBuffer;
63use serde::{Serialize, Serializer};
64use timely::PartialOrder;
65use timely::progress::frontier::AntichainRef;
66use timely::progress::{Antichain, Timestamp};
67
68use crate::internal::state::HollowBatch;
69
70#[derive(Debug, Clone, PartialEq)]
71pub struct FueledMergeReq<T> {
72    pub id: SpineId,
73    pub desc: Description<T>,
74    pub inputs: Vec<IdHollowBatch<T>>,
75}
76
77#[derive(Debug)]
78pub struct FueledMergeRes<T> {
79    pub output: HollowBatch<T>,
80    pub new_active_compaction: Option<ActiveCompaction>,
81}
82
83/// An append-only collection of compactable update batches.
84///
85/// In an effort to keep our fork of Spine as close as possible to the original,
86/// we push as many changes as possible into this wrapper.
87#[derive(Debug, Clone)]
88pub struct Trace<T> {
89    spine: Spine<T>,
90    pub(crate) roundtrip_structure: bool,
91}
92
93#[cfg(any(test, debug_assertions))]
94impl<T: PartialEq> PartialEq for Trace<T> {
95    fn eq(&self, other: &Self) -> bool {
96        // Deconstruct self and other so we get a compile failure if new fields
97        // are added.
98        let Trace {
99            spine: _,
100            roundtrip_structure: _,
101        } = self;
102        let Trace {
103            spine: _,
104            roundtrip_structure: _,
105        } = other;
106
107        // Intentionally use HollowBatches for this comparison so we ignore
108        // differences in spine layers.
109        self.batches().eq(other.batches())
110    }
111}
112
113impl<T: Timestamp + Lattice> Default for Trace<T> {
114    fn default() -> Self {
115        Self {
116            spine: Spine::new(),
117            roundtrip_structure: true,
118        }
119    }
120}
121
122#[derive(Clone, Debug, Serialize)]
123pub struct ThinSpineBatch<T> {
124    pub(crate) level: usize,
125    pub(crate) desc: Description<T>,
126    pub(crate) parts: Vec<SpineId>,
127    /// NB: this exists to validate legacy batch bounds during the migration;
128    /// it can be deleted once the roundtrip_structure flag is permanently rolled out.
129    pub(crate) descs: Vec<Description<T>>,
130}
131
132impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
133    fn eq(&self, other: &Self) -> bool {
134        // Ignore the temporary descs vector when comparing for equality.
135        (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
136    }
137}
138
139#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
140pub struct ThinMerge<T> {
141    pub(crate) since: Antichain<T>,
142    pub(crate) remaining_work: usize,
143    pub(crate) active_compaction: Option<ActiveCompaction>,
144}
145
146impl<T: Clone> ThinMerge<T> {
147    fn fueling(merge: &FuelingMerge<T>) -> Self {
148        ThinMerge {
149            since: merge.since.clone(),
150            remaining_work: merge.remaining_work,
151            active_compaction: None,
152        }
153    }
154
155    fn fueled(batch: &SpineBatch<T>) -> Self {
156        ThinMerge {
157            since: batch.desc.since().clone(),
158            remaining_work: 0,
159            active_compaction: batch.active_compaction.clone(),
160        }
161    }
162}
163
164/// This is a "flattened" representation of a Trace. Goals:
165/// - small updates to the trace should result in small differences in the `FlatTrace`;
166/// - two `FlatTrace`s should be efficient to diff;
167/// - converting to and from a `Trace` should be relatively straightforward.
168///
169/// These goals are all somewhat in tension, and the space of possible representations is pretty
170/// large. See individual fields for comments on some of the tradeoffs.
171#[derive(Clone, Debug)]
172pub struct FlatTrace<T> {
173    pub(crate) since: Antichain<T>,
174    /// Hollow batches without an associated ID. If this flattened trace contains spine batches,
175    /// we can figure out which legacy batch belongs in which spine batch by comparing the `desc`s.
176    /// Previously, we serialized a trace as just this list of batches. Keeping this data around
177    /// helps ensure backwards compatibility. In the near future, we may still keep some batches
178    /// here to help minimize the size of diffs -- rewriting all the hollow batches in a shard
179    /// can be prohibitively expensive. Eventually, we'd like to remove this in favour of the
180    /// collection below.
181    pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
182    /// Hollow batches _with_ an associated ID. Spine batches can reference these hollow batches
183    /// by id directly.
184    pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
185    /// Spine batches stored by ID. We reference hollow batches by ID, instead of inlining them,
186    /// to make differential updates smaller when two batches merge together. We also store the
187    /// level on the batch, instead of mapping from level to a list of batches... the level of a
188    /// spine batch doesn't change over time, but the list of batches at a particular level does.
189    pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
190    /// In-progress merges. We store this by spine id instead of level to prepare for some possible
191    /// generalizations to spine (merging N of M batches at a level). This is also a natural place
192    /// to store incremental merge progress in the future.
193    pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
194}
195
196impl<T: Timestamp + Lattice> Trace<T> {
197    pub(crate) fn flatten(&self) -> FlatTrace<T> {
198        let since = self.spine.since.clone();
199        let mut legacy_batches = BTreeMap::new();
200        let mut hollow_batches = BTreeMap::new();
201        let mut spine_batches = BTreeMap::new();
202        let mut merges = BTreeMap::new();
203
204        let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
205            let id = batch.id();
206            let desc = batch.desc.clone();
207            let mut parts = Vec::with_capacity(batch.parts.len());
208            let mut descs = Vec::with_capacity(batch.parts.len());
209            for IdHollowBatch { id, batch } in &batch.parts {
210                parts.push(*id);
211                descs.push(batch.desc.clone());
212                // Ideally, we'd like to put all batches in the hollow_batches collection, since
213                // tracking the spine id reduces ambiguity and makes diffing cheaper. However,
214                // we currently keep most batches in the legacy collection for backwards
215                // compatibility.
216                // As an exception, we add batches with empty time ranges to hollow_batches:
217                // they're otherwise not guaranteed to be unique, and since we only started writing
218                // them down recently there's no backwards compatibility risk.
219                if batch.desc.lower() == batch.desc.upper() {
220                    hollow_batches.insert(*id, Arc::clone(batch));
221                } else {
222                    legacy_batches.insert(Arc::clone(batch), ());
223                }
224            }
225
226            let spine_batch = ThinSpineBatch {
227                level,
228                desc,
229                parts,
230                descs,
231            };
232            spine_batches.insert(id, spine_batch);
233        };
234
235        for (level, state) in self.spine.merging.iter().enumerate() {
236            for batch in &state.batches {
237                push_spine_batch(level, batch);
238                if let Some(c) = &batch.active_compaction {
239                    let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
240                    assert!(
241                        previous.is_none(),
242                        "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
243                        batch.id,
244                    )
245                }
246            }
247            if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
248                let previous = merges.insert(*id, ThinMerge::fueling(merge));
249                assert!(
250                    previous.is_none(),
251                    "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
252                )
253            }
254        }
255
256        if !self.roundtrip_structure {
257            assert!(hollow_batches.is_empty());
258            spine_batches.clear();
259            merges.clear();
260        }
261
262        FlatTrace {
263            since,
264            legacy_batches,
265            hollow_batches,
266            spine_batches,
267            merges,
268        }
269    }
270    pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
271        let FlatTrace {
272            since,
273            legacy_batches,
274            mut hollow_batches,
275            spine_batches,
276            mut merges,
277        } = value;
278
279        // If the flattened representation has spine batches (or is empty)
280        // we know to preserve the structure for this trace.
281        let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
282
283        // We need to look up legacy batches somehow, but we don't have a spine id for them.
284        // Instead, we rely on the fact that the spine must store them in antichain order.
285        // Our timestamp type may not be totally ordered, so we need to implement our own comparator
286        // here. Persist's invariants ensure that all the frontiers we're comparing are comparable,
287        // though.
288        let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
289            if PartialOrder::less_than(left, right) {
290                Ordering::Less
291            } else if PartialOrder::less_than(right, left) {
292                Ordering::Greater
293            } else {
294                Ordering::Equal
295            }
296        };
297        let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
298        legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
299
300        let mut pop_batch =
301            |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
302                if let Some(batch) = hollow_batches.remove(&id) {
303                    if let Some(desc) = expected_desc {
304                        assert_eq!(*desc, batch.desc);
305                    }
306                    return Ok(IdHollowBatch { id, batch });
307                }
308                let mut batch = legacy_batches
309                    .pop()
310                    .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
311
312                let Some(expected_desc) = expected_desc else {
313                    return Ok(IdHollowBatch { id, batch });
314                };
315
316                if expected_desc.lower() != batch.desc.lower() {
317                    return Err(format!(
318                        "hollow batch lower {:?} did not match expected lower {:?}",
319                        batch.desc.lower().elements(),
320                        expected_desc.lower().elements()
321                    ));
322                }
323
324                // Empty legacy batches are not deterministic: different nodes may split them up
325                // in different ways. For now, we rearrange them such to match the spine data.
326                if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
327                    let mut new_upper = batch.desc.upper().clone();
328
329                    // While our current batch is too small, and there's another empty batch
330                    // in the list, roll it in.
331                    while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
332                        let Some(next_batch) = legacy_batches.pop() else {
333                            break;
334                        };
335                        if next_batch.is_empty() {
336                            new_upper.clone_from(next_batch.desc.upper());
337                        } else {
338                            legacy_batches.push(next_batch);
339                            break;
340                        }
341                    }
342
343                    // If our current batch is too large, split it by the expected upper
344                    // and preserve the remainder.
345                    if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
346                        legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
347                            expected_desc.upper().clone(),
348                            new_upper.clone(),
349                            batch.desc.since().clone(),
350                        ))));
351                        new_upper.clone_from(expected_desc.upper());
352                    }
353                    batch = Arc::new(HollowBatch::empty(Description::new(
354                        batch.desc.lower().clone(),
355                        new_upper,
356                        expected_desc.since().clone(),
357                    )))
358                }
359
360                if expected_desc.upper() != batch.desc.upper() {
361                    return Err(format!(
362                        "hollow batch upper {:?} did not match expected upper {:?}",
363                        batch.desc.upper().elements(),
364                        expected_desc.upper().elements()
365                    ));
366                }
367
368                Ok(IdHollowBatch { id, batch })
369            };
370
371        let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
372            (batch.desc.upper().clone(), id.1)
373        } else {
374            (Antichain::from_elem(T::minimum()), 0)
375        };
376        let levels = spine_batches
377            .first_key_value()
378            .map(|(_, batch)| batch.level + 1)
379            .unwrap_or(0);
380        let mut merging = vec![MergeState::default(); levels];
381        for (id, batch) in spine_batches {
382            let level = batch.level;
383
384            let parts = batch
385                .parts
386                .into_iter()
387                .zip(batch.descs.iter().map(Some).chain(std::iter::repeat(None)))
388                .map(|(id, desc)| pop_batch(id, desc))
389                .collect::<Result<Vec<_>, _>>()?;
390            let len = parts.iter().map(|p| (*p).batch.len).sum();
391            let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
392            let batch = SpineBatch {
393                id,
394                desc: batch.desc,
395                parts,
396                active_compaction,
397                len,
398            };
399
400            let state = &mut merging[level];
401
402            state.push_batch(batch);
403            if let Some(id) = state.id() {
404                if let Some(merge) = merges.remove(&id) {
405                    state.merge = Some(IdFuelingMerge {
406                        id,
407                        merge: FuelingMerge {
408                            since: merge.since,
409                            remaining_work: merge.remaining_work,
410                        },
411                    })
412                }
413            }
414        }
415
416        let mut trace = Trace {
417            spine: Spine {
418                effort: 1,
419                next_id,
420                since,
421                upper,
422                merging,
423            },
424            roundtrip_structure,
425        };
426
427        fn check_empty(name: &str, len: usize) -> Result<(), String> {
428            if len != 0 {
429                Err(format!("{len} {name} left after reconstructing spine"))
430            } else {
431                Ok(())
432            }
433        }
434
435        if roundtrip_structure {
436            check_empty("legacy batches", legacy_batches.len())?;
437        } else {
438            // If the structure wasn't actually serialized, we may have legacy batches left over.
439            for batch in legacy_batches.into_iter().rev() {
440                trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
441            }
442        }
443        check_empty("hollow batches", hollow_batches.len())?;
444        check_empty("merges", merges.len())?;
445
446        debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
447
448        Ok(trace)
449    }
450}
451
452#[derive(Clone, Debug, Default)]
453pub(crate) struct SpineMetrics {
454    pub compact_batches: u64,
455    pub compacting_batches: u64,
456    pub noncompact_batches: u64,
457}
458
459impl<T> Trace<T> {
460    pub fn since(&self) -> &Antichain<T> {
461        &self.spine.since
462    }
463
464    pub fn upper(&self) -> &Antichain<T> {
465        &self.spine.upper
466    }
467
468    pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
469        for batch in self.batches() {
470            f(batch);
471        }
472    }
473
474    pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
475        self.spine
476            .spine_batches()
477            .flat_map(|b| b.parts.as_slice())
478            .map(|b| &*b.batch)
479    }
480
481    pub fn num_spine_batches(&self) -> usize {
482        self.spine.spine_batches().count()
483    }
484
485    #[cfg(test)]
486    pub fn num_hollow_batches(&self) -> usize {
487        self.batches().count()
488    }
489
490    #[cfg(test)]
491    pub fn num_updates(&self) -> usize {
492        self.batches().map(|b| b.len).sum()
493    }
494}
495
496impl<T: Timestamp + Lattice> Trace<T> {
497    pub fn downgrade_since(&mut self, since: &Antichain<T>) {
498        self.spine.since.clone_from(since);
499    }
500
501    #[must_use]
502    pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
503        let mut merge_reqs = Vec::new();
504        self.spine.insert(
505            batch,
506            &mut SpineLog::Enabled {
507                merge_reqs: &mut merge_reqs,
508            },
509        );
510        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
511        // Spine::roll_up (internally used by insert) clears all batches out of
512        // levels below a target by walking up from level 0 and merging each
513        // level into the next (providing the necessary fuel). In practice, this
514        // means we'll get a series of requests like `(a, b), (a, b, c), ...`.
515        // It's a waste to do all of these (we'll throw away the results), so we
516        // filter out any that are entirely covered by some other request.
517        Self::remove_redundant_merge_reqs(merge_reqs)
518    }
519
520    pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
521        // TODO: we ought to be able to look up the id for a batch by binary searching the levels.
522        // In the meantime, search backwards, since most compactions are for recent batches.
523        for batch in self.spine.spine_batches_mut().rev() {
524            if batch.id == id {
525                batch.active_compaction = Some(compaction);
526                break;
527            }
528        }
529    }
530
531    /// The same as [Self::push_batch] but without the `FueledMergeReq`s, which
532    /// account for a surprising amount of cpu in prod. database-issues#5411
533    pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
534        self.spine.insert(batch, &mut SpineLog::Disabled);
535    }
536
537    /// Apply some amount of effort to trace maintenance.
538    ///
539    /// The units of effort are updates, and the method should be thought of as
540    /// analogous to inserting as many empty updates, where the trace is
541    /// permitted to perform proportionate work.
542    ///
543    /// Returns true if this did work and false if it left the spine unchanged.
544    #[must_use]
545    pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
546        let mut merge_reqs = Vec::new();
547        let did_work = self.spine.exert(
548            fuel,
549            &mut SpineLog::Enabled {
550                merge_reqs: &mut merge_reqs,
551            },
552        );
553        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
554        // See the comment in [Self::push_batch].
555        let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
556        (merge_reqs, did_work)
557    }
558
559    /// Validates invariants.
560    ///
561    /// See `Spine::validate` for details.
562    pub fn validate(&self) -> Result<(), String> {
563        self.spine.validate()
564    }
565
566    pub fn apply_merge_res(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
567        for batch in self.spine.spine_batches_mut().rev() {
568            let result = batch.maybe_replace(res);
569            if result.matched() {
570                return result;
571            }
572        }
573        ApplyMergeResult::NotAppliedNoMatch
574    }
575
576    /// Obtain all fueled merge reqs that either have no active compaction, or the previous
577    /// compaction was started at or before the threshold time, in order from oldest to newest.
578    pub(crate) fn fueled_merge_reqs_before_ms(
579        &self,
580        threshold_ms: u64,
581        threshold_writer: Option<WriterKey>,
582    ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
583        self.spine
584            .spine_batches()
585            .filter(move |b| {
586                let noncompact = !b.is_compact();
587                let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
588                    b.parts.iter().any(|b| {
589                        b.batch
590                            .parts
591                            .iter()
592                            .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
593                    })
594                });
595                noncompact || old_writer
596            })
597            .filter(move |b| {
598                // Either there's no active compaction, or the last active compaction
599                // is not after the timeout timestamp.
600                b.active_compaction
601                    .as_ref()
602                    .map_or(true, move |c| c.start_ms <= threshold_ms)
603            })
604            .map(|b| FueledMergeReq {
605                id: b.id,
606                desc: b.desc.clone(),
607                inputs: b.parts.clone(),
608            })
609    }
610
611    // This is only called with the results of one `insert` and so the length of
612    // `merge_reqs` is bounded by the number of levels in the spine (or possibly
613    // some small constant multiple?). The number of levels is logarithmic in the
614    // number of updates in the spine, so this number should stay very small. As
615    // a result, we simply use the naive O(n^2) algorithm here instead of doing
616    // anything fancy with e.g. interval trees.
617    fn remove_redundant_merge_reqs(
618        mut merge_reqs: Vec<FueledMergeReq<T>>,
619    ) -> Vec<FueledMergeReq<T>> {
620        // Returns true if b0 covers b1, false otherwise.
621        fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
622            // TODO: can we relax or remove this since check?
623            b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
624        }
625
626        let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
627        // In practice, merge_reqs will come in sorted such that the "large"
628        // requests are later. Take advantage of this by processing back to
629        // front.
630        while let Some(merge_req) = merge_reqs.pop() {
631            let covered = ret.iter().any(|r| covers(r, &merge_req));
632            if !covered {
633                // Now check if anything we've already staged is covered by this
634                // new req. In practice, the merge_reqs come in sorted and so
635                // this `retain` is a no-op.
636                ret.retain(|r| !covers(&merge_req, r));
637                ret.push(merge_req);
638            }
639        }
640        ret
641    }
642
643    pub fn spine_metrics(&self) -> SpineMetrics {
644        let mut metrics = SpineMetrics::default();
645        for batch in self.spine.spine_batches() {
646            if batch.is_compact() {
647                metrics.compact_batches += 1;
648            } else if batch.is_merging() {
649                metrics.compacting_batches += 1;
650            } else {
651                metrics.noncompact_batches += 1;
652            }
653        }
654        metrics
655    }
656}
657
658/// A log of what transitively happened during a Spine operation: e.g.
659/// FueledMergeReqs were generated.
660enum SpineLog<'a, T> {
661    Enabled {
662        merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
663    },
664    Disabled,
665}
666
667#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
668pub struct SpineId(pub usize, pub usize);
669
670impl Serialize for SpineId {
671    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
672    where
673        S: Serializer,
674    {
675        let SpineId(lo, hi) = self;
676        serializer.serialize_str(&format!("{lo}-{hi}"))
677    }
678}
679
680impl SpineId {
681    fn covers(self, other: SpineId) -> bool {
682        self.0 <= other.0 && other.1 <= self.1
683    }
684}
685
686#[derive(Debug, Clone, PartialEq)]
687pub struct IdHollowBatch<T> {
688    pub id: SpineId,
689    pub batch: Arc<HollowBatch<T>>,
690}
691
692#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
693pub struct ActiveCompaction {
694    pub start_ms: u64,
695}
696
697#[derive(Debug, Clone, PartialEq)]
698struct SpineBatch<T> {
699    id: SpineId,
700    desc: Description<T>,
701    parts: Vec<IdHollowBatch<T>>,
702    active_compaction: Option<ActiveCompaction>,
703    // A cached version of parts.iter().map(|x| x.len).sum()
704    len: usize,
705}
706
707impl<T> SpineBatch<T> {
708    fn merged(batch: IdHollowBatch<T>, active_compaction: Option<ActiveCompaction>) -> Self
709    where
710        T: Clone,
711    {
712        Self {
713            id: batch.id,
714            desc: batch.batch.desc.clone(),
715            len: batch.batch.len,
716            parts: vec![batch],
717            active_compaction,
718        }
719    }
720}
721
722#[derive(Debug, Copy, Clone)]
723pub enum ApplyMergeResult {
724    AppliedExact,
725    AppliedSubset,
726    NotAppliedNoMatch,
727    NotAppliedInvalidSince,
728    NotAppliedTooManyUpdates,
729}
730
731impl ApplyMergeResult {
732    pub fn applied(&self) -> bool {
733        match self {
734            ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
735            _ => false,
736        }
737    }
738    pub fn matched(&self) -> bool {
739        match self {
740            ApplyMergeResult::AppliedExact
741            | ApplyMergeResult::AppliedSubset
742            | ApplyMergeResult::NotAppliedTooManyUpdates => true,
743            _ => false,
744        }
745    }
746}
747
748impl<T: Timestamp + Lattice> SpineBatch<T> {
749    pub fn lower(&self) -> &Antichain<T> {
750        self.desc().lower()
751    }
752
753    pub fn upper(&self) -> &Antichain<T> {
754        self.desc().upper()
755    }
756
757    fn id(&self) -> SpineId {
758        debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
759        debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
760        self.id
761    }
762
763    pub fn is_compact(&self) -> bool {
764        // This definition is extremely likely to change, but for now, we consider a batch
765        // "compact" if it has at most one hollow batch with at most one run.
766        self.parts.len() <= 1 && self.parts.iter().all(|p| p.batch.run_splits.is_empty())
767    }
768
769    pub fn is_merging(&self) -> bool {
770        self.active_compaction.is_some()
771    }
772
773    fn desc(&self) -> &Description<T> {
774        &self.desc
775    }
776
777    pub fn len(&self) -> usize {
778        // NB: This is an upper bound on len for a non-compact batch; we won't know for sure until
779        // we compact it.
780        debug_assert_eq!(
781            self.len,
782            self.parts.iter().map(|x| x.batch.len).sum::<usize>()
783        );
784        self.len
785    }
786
787    pub fn is_empty(&self) -> bool {
788        self.len() == 0
789    }
790
791    pub fn empty(
792        id: SpineId,
793        lower: Antichain<T>,
794        upper: Antichain<T>,
795        since: Antichain<T>,
796    ) -> Self {
797        SpineBatch::merged(
798            IdHollowBatch {
799                id,
800                batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
801            },
802            None,
803        )
804    }
805
806    pub fn begin_merge(
807        bs: &[Self],
808        compaction_frontier: Option<AntichainRef<T>>,
809    ) -> Option<IdFuelingMerge<T>> {
810        let from = bs.first()?.id().0;
811        let until = bs.last()?.id().1;
812        let id = SpineId(from, until);
813        let mut sinces = bs.iter().map(|b| b.desc().since());
814        let mut since = sinces.next()?.clone();
815        for b in bs {
816            since.join_assign(b.desc().since())
817        }
818        if let Some(compaction_frontier) = compaction_frontier {
819            since.join_assign(&compaction_frontier.to_owned());
820        }
821        let remaining_work = bs.iter().map(|x| x.len()).sum();
822        Some(IdFuelingMerge {
823            id,
824            merge: FuelingMerge {
825                since,
826                remaining_work,
827            },
828        })
829    }
830
831    // TODO: Roundtrip the SpineId through FueledMergeReq/FueledMergeRes?
832    fn maybe_replace(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
833        // The spine's and merge res's sinces don't need to match (which could occur if Spine
834        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
835        // since must be in advance of the merge res since.
836        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
837            return ApplyMergeResult::NotAppliedInvalidSince;
838        }
839
840        // If our merge result exactly matches a spine batch, we can swap it in directly
841        let exact_match = res.output.desc.lower() == self.desc().lower()
842            && res.output.desc.upper() == self.desc().upper();
843        if exact_match {
844            // Spine internally has an invariant about a batch being at some level
845            // or higher based on the len. We could end up violating this invariant
846            // if we increased the length of the batch.
847            //
848            // A res output with length greater than the existing spine batch implies
849            // a compaction has already been applied to this range, and with a higher
850            // rate of consolidation than this one. This could happen as a result of
851            // compaction's memory bound limiting the amount of consolidation possible.
852            if res.output.len > self.len() {
853                return ApplyMergeResult::NotAppliedTooManyUpdates;
854            }
855            *self = SpineBatch::merged(
856                IdHollowBatch {
857                    id: self.id(),
858                    batch: Arc::new(res.output.clone()),
859                },
860                res.new_active_compaction.clone(),
861            );
862            return ApplyMergeResult::AppliedExact;
863        }
864
865        // It is possible the structure of the spine has changed since the merge res
866        // was created, such that it no longer exactly matches the description of a
867        // spine batch. This can happen if another merge has happened in the interim,
868        // or if spine needed to be rebuilt from state.
869        //
870        // When this occurs, we can still attempt to slot the merge res in to replace
871        // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts
872        // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res.
873        let SpineBatch {
874            id,
875            parts,
876            desc,
877            active_compaction: _,
878            len: _,
879        } = self;
880        // first, determine if a subset of parts can be cleanly replaced by the merge res
881        let mut lower = None;
882        let mut upper = None;
883        for (i, batch) in parts.iter().enumerate() {
884            if batch.batch.desc.lower() == res.output.desc.lower() {
885                lower = Some((i, batch.id.0));
886            }
887            if batch.batch.desc.upper() == res.output.desc.upper() {
888                upper = Some((i, batch.id.1));
889            }
890            if lower.is_some() && upper.is_some() {
891                break;
892            }
893        }
894
895        // next, replace parts with the merge res batch if we can
896        match (lower, upper) {
897            (Some((lower, id_lower)), Some((upper, id_upper))) => {
898                let mut new_parts = vec![];
899                new_parts.extend_from_slice(&parts[..lower]);
900                new_parts.push(IdHollowBatch {
901                    id: SpineId(id_lower, id_upper),
902                    batch: Arc::new(res.output.clone()),
903                });
904                new_parts.extend_from_slice(&parts[upper + 1..]);
905                let new_spine_batch = SpineBatch {
906                    id: *id,
907                    desc: desc.to_owned(),
908                    len: new_parts.iter().map(|x| x.batch.len).sum(),
909                    parts: new_parts,
910                    active_compaction: res.new_active_compaction.clone(),
911                };
912                if new_spine_batch.len() > self.len() {
913                    return ApplyMergeResult::NotAppliedTooManyUpdates;
914                }
915                *self = new_spine_batch;
916                ApplyMergeResult::AppliedSubset
917            }
918            _ => ApplyMergeResult::NotAppliedNoMatch,
919        }
920    }
921
922    #[cfg(test)]
923    fn describe(&self, extended: bool) -> String {
924        let SpineBatch {
925            id,
926            parts,
927            desc,
928            active_compaction,
929            len,
930        } = self;
931        let compaction = match active_compaction {
932            None => "".to_owned(),
933            Some(c) => format!(" (c@{})", c.start_ms),
934        };
935        match extended {
936            false => format!(
937                "[{}-{}]{:?}{:?}{}/{}{compaction}",
938                id.0,
939                id.1,
940                desc.lower().elements(),
941                desc.upper().elements(),
942                parts.len(),
943                len
944            ),
945            true => {
946                format!(
947                    "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
948                    id.0,
949                    id.1,
950                    desc.lower().elements(),
951                    desc.upper().elements(),
952                    desc.since().elements(),
953                    parts.len(),
954                    len,
955                    parts
956                        .iter()
957                        .flat_map(|x| x.batch.parts.iter())
958                        .map(|x| format!(" {}", x.printable_name()))
959                        .collect::<Vec<_>>()
960                        .join("")
961                )
962            }
963        }
964    }
965}
966
967#[derive(Debug, Clone, PartialEq, Serialize)]
968pub struct FuelingMerge<T> {
969    pub(crate) since: Antichain<T>,
970    pub(crate) remaining_work: usize,
971}
972
973#[derive(Debug, Clone, PartialEq, Serialize)]
974pub struct IdFuelingMerge<T> {
975    id: SpineId,
976    merge: FuelingMerge<T>,
977}
978
979impl<T: Timestamp + Lattice> FuelingMerge<T> {
980    /// Perform some amount of work, decrementing `fuel`.
981    ///
982    /// If `fuel` is non-zero after the call, the merging is complete and one
983    /// should call `done` to extract the merged results.
984    // TODO(benesch): rewrite to avoid usage of `as`.
985    #[allow(clippy::as_conversions)]
986    fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
987        let used = std::cmp::min(*fuel as usize, self.remaining_work);
988        self.remaining_work = self.remaining_work.saturating_sub(used);
989        *fuel -= used as isize;
990    }
991
992    /// Extracts merged results.
993    ///
994    /// This method should only be called after `work` has been called and has
995    /// not brought `fuel` to zero. Otherwise, the merge is still in progress.
996    fn done(
997        self,
998        bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
999        log: &mut SpineLog<'_, T>,
1000    ) -> Option<SpineBatch<T>> {
1001        let first = bs.first()?;
1002        let last = bs.last()?;
1003        let id = SpineId(first.id().0, last.id().1);
1004        assert!(id.0 < id.1);
1005        let lower = first.desc().lower().clone();
1006        let upper = last.desc().upper().clone();
1007        let since = self.since;
1008
1009        // Special case empty batches.
1010        if bs.iter().all(SpineBatch::is_empty) {
1011            return Some(SpineBatch::empty(id, lower, upper, since));
1012        }
1013
1014        let desc = Description::new(lower, upper, since);
1015        let len = bs.iter().map(SpineBatch::len).sum();
1016
1017        // Pre-size the merged_parts Vec. Benchmarking has shown that, at least
1018        // in the worst case, the double iteration is absolutely worth having
1019        // merged_parts pre-sized.
1020        let mut merged_parts_len = 0;
1021        for b in &bs {
1022            merged_parts_len += b.parts.len();
1023        }
1024        let mut merged_parts = Vec::with_capacity(merged_parts_len);
1025        for b in bs {
1026            merged_parts.extend(b.parts)
1027        }
1028        // Sanity check the pre-size code.
1029        debug_assert_eq!(merged_parts.len(), merged_parts_len);
1030
1031        if let SpineLog::Enabled { merge_reqs } = log {
1032            merge_reqs.push(FueledMergeReq {
1033                id,
1034                desc: desc.clone(),
1035                inputs: merged_parts.clone(),
1036            });
1037        }
1038
1039        Some(SpineBatch {
1040            id,
1041            desc,
1042            len,
1043            parts: merged_parts,
1044            active_compaction: None,
1045        })
1046    }
1047}
1048
1049/// The maximum number of batches per level in the spine.
1050/// In practice, we probably want a larger max and a configurable soft cap, but using a
1051/// stack-friendly data structure and keeping this number low makes this safer during the
1052/// initial rollout.
1053const BATCHES_PER_LEVEL: usize = 2;
1054
1055/// An append-only collection of update batches.
1056///
1057/// The `Spine` is a general-purpose trace implementation based on collection
1058/// and merging immutable batches of updates. It is generic with respect to the
1059/// batch type, and can be instantiated for any implementor of `trace::Batch`.
1060///
1061/// ## Design
1062///
1063/// This spine is represented as a list of layers, where each element in the
1064/// list is either
1065///
1066///   1. MergeState::Vacant  empty
1067///   2. MergeState::Single  a single batch
1068///   3. MergeState::Double  a pair of batches
1069///
1070/// Each "batch" has the option to be `None`, indicating a non-batch that
1071/// nonetheless acts as a number of updates proportionate to the level at which
1072/// it exists (for bookkeeping).
1073///
1074/// Each of the batches at layer i contains at most 2^i elements. The sequence
1075/// of batches should have the upper bound of one match the lower bound of the
1076/// next. Batches may be logically empty, with matching upper and lower bounds,
1077/// as a bookkeeping mechanism.
1078///
1079/// Each batch at layer i is treated as if it contains exactly 2^i elements,
1080/// even though it may actually contain fewer elements. This allows us to
1081/// decouple the physical representation from logical amounts of effort invested
1082/// in each batch. It allows us to begin compaction and to reduce the number of
1083/// updates, without compromising our ability to continue to move updates along
1084/// the spine. We are explicitly making the trade-off that while some batches
1085/// might compact at lower levels, we want to treat them as if they contained
1086/// their full set of updates for accounting reasons (to apply work to higher
1087/// levels).
1088///
1089/// We maintain the invariant that for any in-progress merge at level k there
1090/// should be fewer than 2^k records at levels lower than k. That is, even if we
1091/// were to apply an unbounded amount of effort to those records, we would not
1092/// have enough records to prompt a merge into the in-progress merge. Ideally,
1093/// we maintain the extended invariant that for any in-progress merge at level
1094/// k, the remaining effort required (number of records minus applied effort) is
1095/// less than the number of records that would need to be added to reach 2^k
1096/// records in layers below.
1097///
1098/// ## Mathematics
1099///
1100/// When a merge is initiated, there should be a non-negative *deficit* of
1101/// updates before the layers below could plausibly produce a new batch for the
1102/// currently merging layer. We must determine a factor of proportionality, so
1103/// that newly arrived updates provide at least that amount of "fuel" towards
1104/// the merging layer, so that the merge completes before lower levels invade.
1105///
1106/// ### Deficit:
1107///
1108/// A new merge is initiated only in response to the completion of a prior
1109/// merge, or the introduction of new records from outside. The latter case is
1110/// special, and will maintain our invariant trivially, so we will focus on the
1111/// former case.
1112///
1113/// When a merge at level k completes, assuming we have maintained our invariant
1114/// then there should be fewer than 2^k records at lower levels. The newly
1115/// created merge at level k+1 will require up to 2^k+2 units of work, and
1116/// should not expect a new batch until strictly more than 2^k records are
1117/// added. This means that a factor of proportionality of four should be
1118/// sufficient to ensure that the merge completes before a new merge is
1119/// initiated.
1120///
1121/// When new records get introduced, we will need to roll up any batches at
1122/// lower levels, which we treat as the introduction of records. Each of these
1123/// virtual records introduced should either be accounted for the fuel it should
1124/// contribute, as it results in the promotion of batches closer to in-progress
1125/// merges.
1126///
1127/// ### Fuel sharing
1128///
1129/// We like the idea of applying fuel preferentially to merges at *lower*
1130/// levels, under the idea that they are easier to complete, and we benefit from
1131/// fewer total merges in progress. This does delay the completion of merges at
1132/// higher levels, and may not obviously be a total win. If we choose to do
1133/// this, we should make sure that we correctly account for completed merges at
1134/// low layers: they should still extract fuel from new updates even though they
1135/// have completed, at least until they have paid back any "debt" to higher
1136/// layers by continuing to provide fuel as updates arrive.
1137#[derive(Debug, Clone)]
1138struct Spine<T> {
1139    effort: usize,
1140    next_id: usize,
1141    since: Antichain<T>,
1142    upper: Antichain<T>,
1143    merging: Vec<MergeState<T>>,
1144}
1145
1146impl<T> Spine<T> {
1147    /// All batches in the spine, oldest to newest.
1148    pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1149        self.merging.iter().rev().flat_map(|m| &m.batches)
1150    }
1151
1152    /// All (mutable) batches in the spine, oldest to newest.
1153    pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1154        self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1155    }
1156}
1157
1158impl<T: Timestamp + Lattice> Spine<T> {
1159    /// Allocates a fueled `Spine`.
1160    ///
1161    /// This trace will merge batches progressively, with each inserted batch
1162    /// applying a multiple of the batch's length in effort to each merge. The
1163    /// `effort` parameter is that multiplier. This value should be at least one
1164    /// for the merging to happen; a value of zero is not helpful.
1165    pub fn new() -> Self {
1166        Spine {
1167            effort: 1,
1168            next_id: 0,
1169            since: Antichain::from_elem(T::minimum()),
1170            upper: Antichain::from_elem(T::minimum()),
1171            merging: Vec::new(),
1172        }
1173    }
1174
1175    /// Apply some amount of effort to trace maintenance.
1176    ///
1177    /// The units of effort are updates, and the method should be thought of as
1178    /// analogous to inserting as many empty updates, where the trace is
1179    /// permitted to perform proportionate work.
1180    ///
1181    /// Returns true if this did work and false if it left the spine unchanged.
1182    fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1183        self.tidy_layers();
1184        if self.reduced() {
1185            return false;
1186        }
1187
1188        if self.merging.iter().any(|b| b.merge.is_some()) {
1189            let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1190            // If any merges exist, we can directly call `apply_fuel`.
1191            self.apply_fuel(&fuel, log);
1192        } else {
1193            // Otherwise, we'll need to introduce fake updates to move merges
1194            // along.
1195
1196            // Introduce an empty batch with roughly *effort number of virtual updates.
1197            let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1198            let id = self.next_id();
1199            self.introduce_batch(
1200                SpineBatch::empty(
1201                    id,
1202                    self.upper.clone(),
1203                    self.upper.clone(),
1204                    self.since.clone(),
1205                ),
1206                level,
1207                log,
1208            );
1209        }
1210        true
1211    }
1212
1213    pub fn next_id(&mut self) -> SpineId {
1214        let id = self.next_id;
1215        self.next_id += 1;
1216        SpineId(id, self.next_id)
1217    }
1218
1219    // Ideally, this method acts as insertion of `batch`, even if we are not yet
1220    // able to begin merging the batch. This means it is a good time to perform
1221    // amortized work proportional to the size of batch.
1222    pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1223        assert!(batch.desc.lower() != batch.desc.upper());
1224        assert_eq!(batch.desc.lower(), &self.upper);
1225
1226        let id = self.next_id();
1227        let batch = SpineBatch::merged(
1228            IdHollowBatch {
1229                id,
1230                batch: Arc::new(batch),
1231            },
1232            None,
1233        );
1234
1235        self.upper.clone_from(batch.upper());
1236
1237        // If `batch` and the most recently inserted batch are both empty,
1238        // we can just fuse them.
1239        if batch.is_empty() {
1240            if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1241                if self.merging[position].is_single() && self.merging[position].is_empty() {
1242                    self.insert_at(batch, position);
1243                    // Since we just inserted a batch, we should always have work to complete...
1244                    // but otherwise we just leave this layer vacant.
1245                    if let Some(merged) = self.complete_at(position, log) {
1246                        self.merging[position] = MergeState::single(merged);
1247                    }
1248                    return;
1249                }
1250            }
1251        }
1252
1253        // Normal insertion for the batch.
1254        let index = batch.len().next_power_of_two();
1255        self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1256    }
1257
1258    /// True iff there is at most one HollowBatch in `self.merging`.
1259    ///
1260    /// When true, there is no maintenance work to perform in the trace, other
1261    /// than compaction. We do not yet have logic in place to determine if
1262    /// compaction would improve a trace, so for now we are ignoring that.
1263    fn reduced(&self) -> bool {
1264        self.spine_batches()
1265            .flat_map(|b| b.parts.as_slice())
1266            .count()
1267            < 2
1268    }
1269
1270    /// Describes the merge progress of layers in the trace.
1271    ///
1272    /// Intended for diagnostics rather than public consumption.
1273    #[allow(dead_code)]
1274    fn describe(&self) -> Vec<(usize, usize)> {
1275        self.merging
1276            .iter()
1277            .map(|b| (b.batches.len(), b.len()))
1278            .collect()
1279    }
1280
1281    /// Introduces a batch at an indicated level.
1282    ///
1283    /// The level indication is often related to the size of the batch, but it
1284    /// can also be used to artificially fuel the computation by supplying empty
1285    /// batches at non-trivial indices, to move merges along.
1286    fn introduce_batch(
1287        &mut self,
1288        batch: SpineBatch<T>,
1289        batch_index: usize,
1290        log: &mut SpineLog<'_, T>,
1291    ) {
1292        // Step 0.  Determine an amount of fuel to use for the computation.
1293        //
1294        //          Fuel is used to drive maintenance of the data structure,
1295        //          and in particular are used to make progress through merges
1296        //          that are in progress. The amount of fuel to use should be
1297        //          proportional to the number of records introduced, so that
1298        //          we are guaranteed to complete all merges before they are
1299        //          required as arguments to merges again.
1300        //
1301        //          The fuel use policy is negotiable, in that we might aim
1302        //          to use relatively less when we can, so that we return
1303        //          control promptly, or we might account more work to larger
1304        //          batches. Not clear to me which are best, of if there
1305        //          should be a configuration knob controlling this.
1306
1307        // The amount of fuel to use is proportional to 2^batch_index, scaled by
1308        // a factor of self.effort which determines how eager we are in
1309        // performing maintenance work. We need to ensure that each merge in
1310        // progress receives fuel for each introduced batch, and so multiply by
1311        // that as well.
1312        if batch_index > 32 {
1313            println!("Large batch index: {}", batch_index);
1314        }
1315
1316        // We believe that eight units of fuel is sufficient for each introduced
1317        // record, accounted as four for each record, and a potential four more
1318        // for each virtual record associated with promoting existing smaller
1319        // batches. We could try and make this be less, or be scaled to merges
1320        // based on their deficit at time of instantiation. For now, we remain
1321        // conservative.
1322        let mut fuel = 8 << batch_index;
1323        // Scale up by the effort parameter, which is calibrated to one as the
1324        // minimum amount of effort.
1325        fuel *= self.effort;
1326        // Convert to an `isize` so we can observe any fuel shortfall.
1327        // TODO(benesch): avoid dangerous usage of `as`.
1328        #[allow(clippy::as_conversions)]
1329        let fuel = fuel as isize;
1330
1331        // Step 1.  Apply fuel to each in-progress merge.
1332        //
1333        //          Before we can introduce new updates, we must apply any
1334        //          fuel to in-progress merges, as this fuel is what ensures
1335        //          that the merges will be complete by the time we insert
1336        //          the updates.
1337        self.apply_fuel(&fuel, log);
1338
1339        // Step 2.  We must ensure the invariant that adjacent layers do not
1340        //          contain two batches will be satisfied when we insert the
1341        //          batch. We forcibly completing all merges at layers lower
1342        //          than and including `batch_index`, so that the new batch is
1343        //          inserted into an empty layer.
1344        //
1345        //          We could relax this to "strictly less than `batch_index`"
1346        //          if the layer above has only a single batch in it, which
1347        //          seems not implausible if it has been the focus of effort.
1348        //
1349        //          This should be interpreted as the introduction of some
1350        //          volume of fake updates, and we will need to fuel merges
1351        //          by a proportional amount to ensure that they are not
1352        //          surprised later on. The number of fake updates should
1353        //          correspond to the deficit for the layer, which perhaps
1354        //          we should track explicitly.
1355        self.roll_up(batch_index, log);
1356
1357        // Step 3. This insertion should be into an empty layer. It is a logical
1358        //         error otherwise, as we may be violating our invariant, from
1359        //         which all wonderment derives.
1360        self.insert_at(batch, batch_index);
1361
1362        // Step 4. Tidy the largest layers.
1363        //
1364        //         It is important that we not tidy only smaller layers,
1365        //         as their ascension is what ensures the merging and
1366        //         eventual compaction of the largest layers.
1367        self.tidy_layers();
1368    }
1369
1370    /// Ensures that an insertion at layer `index` will succeed.
1371    ///
1372    /// This method is subject to the constraint that all existing batches
1373    /// should occur at higher levels, which requires it to "roll up" batches
1374    /// present at lower levels before the method is called. In doing this, we
1375    /// should not introduce more virtual records than 2^index, as that is the
1376    /// amount of excess fuel we have budgeted for completing merges.
1377    fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1378        // Ensure entries sufficient for `index`.
1379        while self.merging.len() <= index {
1380            self.merging.push(MergeState::default());
1381        }
1382
1383        // We only need to roll up if there are non-vacant layers.
1384        if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1385            // Collect and merge all batches at layers up to but not including
1386            // `index`.
1387            let mut merged = None;
1388            for i in 0..index {
1389                if let Some(merged) = merged.take() {
1390                    self.insert_at(merged, i);
1391                }
1392                merged = self.complete_at(i, log);
1393            }
1394
1395            // The merged results should be introduced at level `index`, which
1396            // should be ready to absorb them (possibly creating a new merge at
1397            // the time).
1398            if let Some(merged) = merged {
1399                self.insert_at(merged, index);
1400            }
1401
1402            // If the insertion results in a merge, we should complete it to
1403            // ensure the upcoming insertion at `index` does not panic.
1404            if self.merging[index].is_full() {
1405                let merged = self.complete_at(index, log).expect("double batch");
1406                self.insert_at(merged, index + 1);
1407            }
1408        }
1409    }
1410
1411    /// Applies an amount of fuel to merges in progress.
1412    ///
1413    /// The supplied `fuel` is for each in progress merge, and if we want to
1414    /// spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we
1415    /// could do so in order to maintain fewer batches on average (at the risk
1416    /// of completing merges of large batches later, but tbh probably not much
1417    /// later).
1418    pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1419        // For the moment our strategy is to apply fuel independently to each
1420        // merge in progress, rather than prioritizing small merges. This sounds
1421        // like a great idea, but we need better accounting in place to ensure
1422        // that merges that borrow against later layers but then complete still
1423        // "acquire" fuel to pay back their debts.
1424        for index in 0..self.merging.len() {
1425            // Give each level independent fuel, for now.
1426            let mut fuel = *fuel;
1427            // Pass along various logging stuffs, in case we need to report
1428            // success.
1429            self.merging[index].work(&mut fuel);
1430            // `fuel` could have a deficit at this point, meaning we over-spent
1431            // when we took a merge step. We could ignore this, or maintain the
1432            // deficit and account future fuel against it before spending again.
1433            // It isn't clear why that would be especially helpful to do; we
1434            // might want to avoid overspends at multiple layers in the same
1435            // invocation (to limit latencies), but there is probably a rich
1436            // policy space here.
1437
1438            // If a merge completes, we can immediately merge it in to the next
1439            // level, which is "guaranteed" to be complete at this point, by our
1440            // fueling discipline.
1441            if self.merging[index].is_complete() {
1442                let complete = self.complete_at(index, log).expect("complete batch");
1443                self.insert_at(complete, index + 1);
1444            }
1445        }
1446    }
1447
1448    /// Inserts a batch at a specific location.
1449    ///
1450    /// This is a non-public internal method that can panic if we try and insert
1451    /// into a layer which already contains two batches (and is still in the
1452    /// process of merging).
1453    fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1454        // Ensure the spine is large enough.
1455        while self.merging.len() <= index {
1456            self.merging.push(MergeState::default());
1457        }
1458
1459        // Insert the batch at the location.
1460        let merging = &mut self.merging[index];
1461        merging.push_batch(batch);
1462        if merging.batches.is_full() {
1463            let compaction_frontier = Some(self.since.borrow());
1464            merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1465        }
1466    }
1467
1468    /// Completes and extracts what ever is at layer `index`, leaving this layer vacant.
1469    fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1470        self.merging[index].complete(log)
1471    }
1472
1473    /// Attempts to draw down large layers to size appropriate layers.
1474    fn tidy_layers(&mut self) {
1475        // If the largest layer is complete (not merging), we can attempt to
1476        // draw it down to the next layer. This is permitted if we can maintain
1477        // our invariant that below each merge there are at most half the
1478        // records that would be required to invade the merge.
1479        if !self.merging.is_empty() {
1480            let mut length = self.merging.len();
1481            if self.merging[length - 1].is_single() {
1482                // To move a batch down, we require that it contain few enough
1483                // records that the lower level is appropriate, and that moving
1484                // the batch would not create a merge violating our invariant.
1485                let appropriate_level = usize::cast_from(
1486                    self.merging[length - 1]
1487                        .len()
1488                        .next_power_of_two()
1489                        .trailing_zeros(),
1490                );
1491
1492                // Continue only as far as is appropriate
1493                while appropriate_level < length - 1 {
1494                    let current = &mut self.merging[length - 2];
1495                    if current.is_vacant() {
1496                        // Vacant batches can be absorbed.
1497                        self.merging.remove(length - 2);
1498                        length = self.merging.len();
1499                    } else {
1500                        if !current.is_full() {
1501                            // Single batches may initiate a merge, if sizes are
1502                            // within bounds, but terminate the loop either way.
1503
1504                            // Determine the number of records that might lead
1505                            // to a merge. Importantly, this is not the number
1506                            // of actual records, but the sum of upper bounds
1507                            // based on indices.
1508                            let mut smaller = 0;
1509                            for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
1510                                smaller += batch.batches.len() << index;
1511                            }
1512
1513                            if smaller <= (1 << length) / 8 {
1514                                // Remove the batch under consideration (shifting the deeper batches up a level),
1515                                // then merge in the single batch at the current level.
1516                                let state = self.merging.remove(length - 2);
1517                                assert_eq!(state.batches.len(), 1);
1518                                for batch in state.batches {
1519                                    self.insert_at(batch, length - 2);
1520                                }
1521                            }
1522                        }
1523                        break;
1524                    }
1525                }
1526            }
1527        }
1528    }
1529
1530    /// Checks invariants:
1531    /// - The lowers and uppers of all batches "line up".
1532    /// - The lower of the "minimum" batch is `antichain[T::minimum]`.
1533    /// - The upper of the "maximum" batch is `== self.upper`.
1534    /// - The since of each batch is `less_equal self.since`.
1535    /// - The `SpineIds` all "line up" and cover from `0` to `self.next_id`.
1536    /// - TODO: Verify fuel and level invariants.
1537    fn validate(&self) -> Result<(), String> {
1538        let mut id = SpineId(0, 0);
1539        let mut frontier = Antichain::from_elem(T::minimum());
1540        for x in self.merging.iter().rev() {
1541            if x.is_full() != x.merge.is_some() {
1542                return Err(format!(
1543                    "all (and only) full batches should have fueling merges (full={}, merge={:?})",
1544                    x.is_full(),
1545                    x.merge,
1546                ));
1547            }
1548
1549            if let Some(m) = &x.merge {
1550                if !x.is_full() {
1551                    return Err(format!(
1552                        "merge should only exist for full batches (len={:?}, merge={:?})",
1553                        x.batches.len(),
1554                        m.id,
1555                    ));
1556                }
1557                if x.id() != Some(m.id) {
1558                    return Err(format!(
1559                        "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
1560                        x.id(),
1561                        m.id,
1562                    ));
1563                }
1564            }
1565
1566            // TODO: Anything we can validate about x.merge? It'd
1567            // be nice to assert that it's bigger than the len of the
1568            // two batches, but apply_merge_res might swap those lengths
1569            // out from under us.
1570            for batch in &x.batches {
1571                if batch.id().0 != id.1 {
1572                    return Err(format!(
1573                        "batch id {:?} does not match the previous id {:?}: {:?}",
1574                        batch.id(),
1575                        id,
1576                        self
1577                    ));
1578                }
1579                id = batch.id();
1580                if batch.desc().lower() != &frontier {
1581                    return Err(format!(
1582                        "batch lower {:?} does not match the previous upper {:?}: {:?}",
1583                        batch.desc().lower(),
1584                        frontier,
1585                        self
1586                    ));
1587                }
1588                frontier.clone_from(batch.desc().upper());
1589                if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
1590                    return Err(format!(
1591                        "since of batch {:?} past the spine since {:?}: {:?}",
1592                        batch.desc().since(),
1593                        self.since,
1594                        self
1595                    ));
1596                }
1597            }
1598        }
1599        if self.next_id != id.1 {
1600            return Err(format!(
1601                "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
1602                self.next_id, id, self
1603            ));
1604        }
1605        if self.upper != frontier {
1606            return Err(format!(
1607                "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
1608                self.upper, frontier, self
1609            ));
1610        }
1611        Ok(())
1612    }
1613}
1614
1615/// Describes the state of a layer.
1616///
1617/// A layer can be empty, contain a single batch, or contain a pair of batches
1618/// that are in the process of merging into a batch for the next layer.
1619#[derive(Debug, Clone)]
1620struct MergeState<T> {
1621    batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1622    merge: Option<IdFuelingMerge<T>>,
1623}
1624
1625impl<T> Default for MergeState<T> {
1626    fn default() -> Self {
1627        Self {
1628            batches: ArrayVec::new(),
1629            merge: None,
1630        }
1631    }
1632}
1633
1634impl<T: Timestamp + Lattice> MergeState<T> {
1635    /// An id that covers all the batches in the given merge state, assuming there are any.
1636    fn id(&self) -> Option<SpineId> {
1637        if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
1638            Some(SpineId(first.id().0, last.id().1))
1639        } else {
1640            None
1641        }
1642    }
1643
1644    /// A new single-batch merge state.
1645    fn single(batch: SpineBatch<T>) -> Self {
1646        let mut state = Self::default();
1647        state.push_batch(batch);
1648        state
1649    }
1650
1651    /// Push a new batch at this level, checking invariants.
1652    fn push_batch(&mut self, batch: SpineBatch<T>) {
1653        if let Some(last) = self.batches.last() {
1654            assert_eq!(last.id().1, batch.id().0);
1655            assert_eq!(last.upper(), batch.lower());
1656        }
1657        assert!(
1658            self.merge.is_none(),
1659            "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
1660            batch.id,
1661            self.batches.len(),
1662        );
1663        self.batches
1664            .try_push(batch)
1665            .expect("Attempted to insert batch into full layer!");
1666    }
1667
1668    /// The number of actual updates contained in the level.
1669    fn len(&self) -> usize {
1670        self.batches.iter().map(SpineBatch::len).sum()
1671    }
1672
1673    /// True if this merge state contains no updates.
1674    fn is_empty(&self) -> bool {
1675        self.batches.iter().all(SpineBatch::is_empty)
1676    }
1677
1678    /// True if this level contains no batches.
1679    fn is_vacant(&self) -> bool {
1680        self.batches.is_empty()
1681    }
1682
1683    /// True only for a single-batch state.
1684    fn is_single(&self) -> bool {
1685        self.batches.len() == 1
1686    }
1687
1688    /// True if this merge cannot hold any more batches.
1689    /// (i.e. for a binary merge tree, true if this layer holds two batches.)
1690    fn is_full(&self) -> bool {
1691        self.batches.is_full()
1692    }
1693
1694    /// Immediately complete any merge.
1695    ///
1696    /// The result is either a batch, if there is a non-trivial batch to return
1697    /// or `None` if there is no meaningful batch to return.
1698    ///
1699    /// There is the additional option of input batches.
1700    fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1701        let mut this = mem::take(self);
1702        if this.batches.len() <= 1 {
1703            this.batches.pop()
1704        } else {
1705            // Merge the remaining batches, regardless of whether we have a fully fueled merge.
1706            let id_merge = this
1707                .merge
1708                .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
1709            id_merge.merge.done(this.batches, log)
1710        }
1711    }
1712
1713    /// True iff the layer is a complete merge, ready for extraction.
1714    fn is_complete(&self) -> bool {
1715        match &self.merge {
1716            Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
1717            None => false,
1718        }
1719    }
1720
1721    /// Performs a bounded amount of work towards a merge.
1722    fn work(&mut self, fuel: &mut isize) {
1723        // We only perform work for merges in progress.
1724        if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
1725            merge.work(&self.batches[..], fuel)
1726        }
1727    }
1728}
1729
1730#[cfg(test)]
1731pub mod datadriven {
1732    use crate::internal::datadriven::DirectiveArgs;
1733
1734    use super::*;
1735
1736    /// Shared state for a single [crate::internal::trace] [datadriven::TestFile].
1737    #[derive(Debug, Default)]
1738    pub struct TraceState {
1739        pub trace: Trace<u64>,
1740        pub merge_reqs: Vec<FueledMergeReq<u64>>,
1741    }
1742
1743    pub fn since_upper(
1744        datadriven: &TraceState,
1745        _args: DirectiveArgs,
1746    ) -> Result<String, anyhow::Error> {
1747        Ok(format!(
1748            "{:?}{:?}\n",
1749            datadriven.trace.since().elements(),
1750            datadriven.trace.upper().elements()
1751        ))
1752    }
1753
1754    pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
1755        let mut s = String::new();
1756        for b in datadriven.trace.spine.spine_batches() {
1757            s.push_str(b.describe(true).as_str());
1758            s.push('\n');
1759        }
1760        Ok(s)
1761    }
1762
1763    pub fn insert(
1764        datadriven: &mut TraceState,
1765        args: DirectiveArgs,
1766    ) -> Result<String, anyhow::Error> {
1767        for x in args
1768            .input
1769            .trim()
1770            .split('\n')
1771            .map(DirectiveArgs::parse_hollow_batch)
1772        {
1773            datadriven
1774                .merge_reqs
1775                .append(&mut datadriven.trace.push_batch(x));
1776        }
1777        Ok("ok\n".to_owned())
1778    }
1779
1780    pub fn downgrade_since(
1781        datadriven: &mut TraceState,
1782        args: DirectiveArgs,
1783    ) -> Result<String, anyhow::Error> {
1784        let since = args.expect("since");
1785        datadriven
1786            .trace
1787            .downgrade_since(&Antichain::from_elem(since));
1788        Ok("ok\n".to_owned())
1789    }
1790
1791    pub fn take_merge_req(
1792        datadriven: &mut TraceState,
1793        _args: DirectiveArgs,
1794    ) -> Result<String, anyhow::Error> {
1795        let mut s = String::new();
1796        for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
1797            write!(
1798                s,
1799                "{:?}{:?}{:?} {}\n",
1800                merge_req.desc.lower().elements(),
1801                merge_req.desc.upper().elements(),
1802                merge_req.desc.since().elements(),
1803                merge_req
1804                    .inputs
1805                    .iter()
1806                    .flat_map(|x| x.batch.parts.iter())
1807                    .map(|x| x.printable_name())
1808                    .collect::<Vec<_>>()
1809                    .join(" ")
1810            );
1811        }
1812        Ok(s)
1813    }
1814
1815    pub fn apply_merge_res(
1816        datadriven: &mut TraceState,
1817        args: DirectiveArgs,
1818    ) -> Result<String, anyhow::Error> {
1819        let res = FueledMergeRes {
1820            output: DirectiveArgs::parse_hollow_batch(args.input),
1821            new_active_compaction: None,
1822        };
1823        match datadriven.trace.apply_merge_res(&res) {
1824            ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
1825            ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
1826            ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
1827            ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
1828            ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
1829        }
1830    }
1831}
1832
1833#[cfg(test)]
1834pub(crate) mod tests {
1835    use std::ops::Range;
1836
1837    use proptest::prelude::*;
1838    use semver::Version;
1839
1840    use crate::internal::state::tests::any_hollow_batch;
1841
1842    use super::*;
1843
1844    pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
1845        num_batches: Range<usize>,
1846    ) -> impl Strategy<Value = Trace<T>> {
1847        Strategy::prop_map(
1848            (
1849                any::<Option<T>>(),
1850                proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
1851                any::<bool>(),
1852                any::<u64>(),
1853            ),
1854            |(since, mut batches, roundtrip_structure, timeout_ms)| {
1855                let mut trace = Trace::<T>::default();
1856                trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
1857
1858                // Fix up the arbitrary HollowBatches so the lowers and uppers
1859                // align.
1860                batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
1861                let mut lower = Antichain::from_elem(T::minimum());
1862                for mut batch in batches {
1863                    // Overall trace since has to be past each batch's since.
1864                    if PartialOrder::less_than(trace.since(), batch.desc.since()) {
1865                        trace.downgrade_since(batch.desc.since());
1866                    }
1867                    batch.desc = Description::new(
1868                        lower.clone(),
1869                        batch.desc.upper().clone(),
1870                        batch.desc.since().clone(),
1871                    );
1872                    lower.clone_from(batch.desc.upper());
1873                    let _merge_req = trace.push_batch(batch);
1874                }
1875                let reqs: Vec<_> = trace
1876                    .fueled_merge_reqs_before_ms(timeout_ms, None)
1877                    .collect();
1878                for req in reqs {
1879                    trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
1880                }
1881                trace.roundtrip_structure = roundtrip_structure;
1882                trace
1883            },
1884        )
1885    }
1886
1887    #[mz_ore::test]
1888    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
1889    fn test_roundtrips() {
1890        fn check(trace: Trace<i64>) {
1891            trace.validate().unwrap();
1892            let flat = trace.flatten();
1893            let unflat = Trace::unflatten(flat).unwrap();
1894            assert_eq!(trace, unflat);
1895        }
1896
1897        proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
1898    }
1899
1900    #[mz_ore::test]
1901    fn fueled_merge_reqs() {
1902        let mut trace: Trace<u64> = Trace::default();
1903        let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
1904            0,
1905            10,
1906            &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
1907            1000,
1908        ));
1909
1910        assert!(fueled_reqs.is_empty());
1911        assert_eq!(
1912            trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
1913            0,
1914            "no merge reqs when not filtering by version"
1915        );
1916        assert_eq!(
1917            trace
1918                .fueled_merge_reqs_before_ms(
1919                    u64::MAX,
1920                    Some(WriterKey::for_version(&Version::new(0, 50, 0)))
1921                )
1922                .count(),
1923            0,
1924            "zero batches are older than a past version"
1925        );
1926        assert_eq!(
1927            trace
1928                .fueled_merge_reqs_before_ms(
1929                    u64::MAX,
1930                    Some(WriterKey::for_version(&Version::new(99, 99, 0)))
1931                )
1932                .count(),
1933            1,
1934            "one batch is older than a future version"
1935        );
1936    }
1937
1938    #[mz_ore::test]
1939    fn remove_redundant_merge_reqs() {
1940        fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
1941            FueledMergeReq {
1942                id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
1943                desc: Description::new(
1944                    Antichain::from_elem(lower),
1945                    Antichain::from_elem(upper),
1946                    Antichain::new(),
1947                ),
1948                inputs: vec![],
1949            }
1950        }
1951
1952        // Empty
1953        assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
1954
1955        // Single
1956        assert_eq!(
1957            Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
1958            vec![req(0, 1)]
1959        );
1960
1961        // Duplicate
1962        assert_eq!(
1963            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
1964            vec![req(0, 1)]
1965        );
1966
1967        // Nothing covered
1968        assert_eq!(
1969            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
1970            vec![req(1, 2), req(0, 1)]
1971        );
1972
1973        // Covered
1974        assert_eq!(
1975            Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
1976            vec![req(0, 3)]
1977        );
1978
1979        // Covered, lower equal
1980        assert_eq!(
1981            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
1982            vec![req(0, 3)]
1983        );
1984
1985        // Covered, upper equal
1986        assert_eq!(
1987            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
1988            vec![req(0, 3)]
1989        );
1990
1991        // Covered, unexpected order (doesn't happen in practice)
1992        assert_eq!(
1993            Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
1994            vec![req(0, 3)]
1995        );
1996
1997        // Partially overlapping
1998        assert_eq!(
1999            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
2000            vec![req(1, 3), req(0, 2)]
2001        );
2002
2003        // Partially overlapping, the other order
2004        assert_eq!(
2005            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
2006            vec![req(0, 2), req(1, 3)]
2007        );
2008
2009        // Different sinces (doesn't happen in practice)
2010        let req015 = FueledMergeReq {
2011            id: SpineId(0, 1),
2012            desc: Description::new(
2013                Antichain::from_elem(0),
2014                Antichain::from_elem(1),
2015                Antichain::from_elem(5),
2016            ),
2017            inputs: vec![],
2018        };
2019        assert_eq!(
2020            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2021            vec![req015, req(0, 1)]
2022        );
2023    }
2024}