mz_persist_client/internal/
trace.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An append-only collection of compactable update batches. The Spine below is
11//! a fork of Differential Dataflow's [Spine] with minimal modifications. The
12//! original Spine code is designed for incremental (via "fuel"ing) synchronous
13//! merge of in-memory batches. Persist doesn't want compaction to block
14//! incoming writes and, in fact, may in the future elect to push the work of
15//! compaction onto another machine entirely via RPC. As a result, we abuse the
16//! Spine code as follows:
17//!
18//! [Spine]: differential_dataflow::trace::implementations::spine_fueled::Spine
19//!
20//! - The normal Spine works in terms of [Batch] impls. A `Batch` is added to
21//!   the Spine. As progress is made, the Spine will merge two batches together
22//!   by: constructing a [Batch::Merger], giving it bits of fuel to
23//!   incrementally perform the merge (which spreads out the work, keeping
24//!   latencies even), and then once it's done fueling extracting the new single
25//!   output `Batch` and discarding the inputs.
26//! - Persist instead represents a batch of blob data with a [HollowBatch]
27//!   pointer which contains the normal `Batch` metadata plus the keys necessary
28//!   to retrieve the updates.
29//! - [SpineBatch] wraps `HollowBatch` and has a [FuelingMerge] companion
30//!   (analogous to `Batch::Merger`) that allows us to represent a merge as it
31//!   is fueling. Normally, this would represent real incremental compaction
32//!   progress, but in persist, it's simply a bookkeeping mechanism. Once fully
33//!   fueled, the `FuelingMerge` is turned into a fueled [SpineBatch],
34//!   which to the Spine is indistinguishable from a merged batch. At this
35//!   point, it is eligible for asynchronous compaction and a `FueledMergeReq`
36//!   is generated.
37//! - At any later point, this request may be answered via
38//!   [Trace::apply_merge_res]. This internally replaces the
39//!   `SpineBatch`, which has no effect on the structure of `Spine`
40//!   but replaces the metadata in persist's state to point at the
41//!   new batch.
42//! - `SpineBatch` is explictly allowed to accumulate a list of `HollowBatch`s.
43//!   This decouples compaction from Spine progress and also allows us to reduce
44//!   write amplification by merging `N` batches at once where `N` can be
45//!   greater than 2.
46//!
47//! [Batch]: differential_dataflow::trace::Batch
48//! [Batch::Merger]: differential_dataflow::trace::Batch::Merger
49
50use arrayvec::ArrayVec;
51use std::cmp::Ordering;
52use std::collections::BTreeMap;
53use std::fmt::Debug;
54use std::mem;
55use std::sync::Arc;
56
57use crate::internal::paths::WriterKey;
58use differential_dataflow::lattice::Lattice;
59use differential_dataflow::trace::Description;
60use mz_ore::cast::CastFrom;
61#[allow(unused_imports)] // False positive.
62use mz_ore::fmt::FormatBuffer;
63use serde::{Serialize, Serializer};
64use timely::PartialOrder;
65use timely::progress::frontier::AntichainRef;
66use timely::progress::{Antichain, Timestamp};
67
68use crate::internal::state::HollowBatch;
69
70#[derive(Debug, Clone, PartialEq)]
71pub struct FueledMergeReq<T> {
72    pub id: SpineId,
73    pub desc: Description<T>,
74    pub inputs: Vec<IdHollowBatch<T>>,
75}
76
77#[derive(Debug)]
78pub struct FueledMergeRes<T> {
79    pub output: HollowBatch<T>,
80}
81
82/// An append-only collection of compactable update batches.
83///
84/// In an effort to keep our fork of Spine as close as possible to the original,
85/// we push as many changes as possible into this wrapper.
86#[derive(Debug, Clone)]
87pub struct Trace<T> {
88    spine: Spine<T>,
89    pub(crate) roundtrip_structure: bool,
90}
91
92#[cfg(any(test, debug_assertions))]
93impl<T: PartialEq> PartialEq for Trace<T> {
94    fn eq(&self, other: &Self) -> bool {
95        // Deconstruct self and other so we get a compile failure if new fields
96        // are added.
97        let Trace {
98            spine: _,
99            roundtrip_structure: _,
100        } = self;
101        let Trace {
102            spine: _,
103            roundtrip_structure: _,
104        } = other;
105
106        // Intentionally use HollowBatches for this comparison so we ignore
107        // differences in spine layers.
108        self.batches().eq(other.batches())
109    }
110}
111
112impl<T: Timestamp + Lattice> Default for Trace<T> {
113    fn default() -> Self {
114        Self {
115            spine: Spine::new(),
116            roundtrip_structure: true,
117        }
118    }
119}
120
121#[derive(Clone, Debug, Serialize)]
122pub struct ThinSpineBatch<T> {
123    pub(crate) level: usize,
124    pub(crate) desc: Description<T>,
125    pub(crate) parts: Vec<SpineId>,
126    /// NB: this exists to validate legacy batch bounds during the migration;
127    /// it can be deleted once the roundtrip_structure flag is permanently rolled out.
128    pub(crate) descs: Vec<Description<T>>,
129}
130
131impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
132    fn eq(&self, other: &Self) -> bool {
133        // Ignore the temporary descs vector when comparing for equality.
134        (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
135    }
136}
137
138#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
139pub struct ThinMerge<T> {
140    pub(crate) since: Antichain<T>,
141    pub(crate) remaining_work: usize,
142    pub(crate) active_compaction: Option<ActiveCompaction>,
143}
144
145impl<T: Clone> ThinMerge<T> {
146    fn fueling(merge: &FuelingMerge<T>) -> Self {
147        ThinMerge {
148            since: merge.since.clone(),
149            remaining_work: merge.remaining_work,
150            active_compaction: None,
151        }
152    }
153
154    fn fueled(batch: &SpineBatch<T>) -> Self {
155        ThinMerge {
156            since: batch.desc.since().clone(),
157            remaining_work: 0,
158            active_compaction: batch.active_compaction.clone(),
159        }
160    }
161}
162
163/// This is a "flattened" representation of a Trace. Goals:
164/// - small updates to the trace should result in small differences in the `FlatTrace`;
165/// - two `FlatTrace`s should be efficient to diff;
166/// - converting to and from a `Trace` should be relatively straightforward.
167///
168/// These goals are all somewhat in tension, and the space of possible representations is pretty
169/// large. See individual fields for comments on some of the tradeoffs.
170#[derive(Clone, Debug)]
171pub struct FlatTrace<T> {
172    pub(crate) since: Antichain<T>,
173    /// Hollow batches without an associated ID. If this flattened trace contains spine batches,
174    /// we can figure out which legacy batch belongs in which spine batch by comparing the `desc`s.
175    /// Previously, we serialized a trace as just this list of batches. Keeping this data around
176    /// helps ensure backwards compatibility. In the near future, we may still keep some batches
177    /// here to help minimize the size of diffs -- rewriting all the hollow batches in a shard
178    /// can be prohibitively expensive. Eventually, we'd like to remove this in favour of the
179    /// collection below.
180    pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
181    /// Hollow batches _with_ an associated ID. Spine batches can reference these hollow batches
182    /// by id directly.
183    pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
184    /// Spine batches stored by ID. We reference hollow batches by ID, instead of inlining them,
185    /// to make differential updates smaller when two batches merge together. We also store the
186    /// level on the batch, instead of mapping from level to a list of batches... the level of a
187    /// spine batch doesn't change over time, but the list of batches at a particular level does.
188    pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
189    /// In-progress merges. We store this by spine id instead of level to prepare for some possible
190    /// generalizations to spine (merging N of M batches at a level). This is also a natural place
191    /// to store incremental merge progress in the future.
192    pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
193}
194
195impl<T: Timestamp + Lattice> Trace<T> {
196    pub(crate) fn flatten(&self) -> FlatTrace<T> {
197        let since = self.spine.since.clone();
198        let mut legacy_batches = BTreeMap::new();
199        let mut hollow_batches = BTreeMap::new();
200        let mut spine_batches = BTreeMap::new();
201        let mut merges = BTreeMap::new();
202
203        let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
204            let id = batch.id();
205            let desc = batch.desc.clone();
206            let mut parts = Vec::with_capacity(batch.parts.len());
207            let mut descs = Vec::with_capacity(batch.parts.len());
208            for IdHollowBatch { id, batch } in &batch.parts {
209                parts.push(*id);
210                descs.push(batch.desc.clone());
211                // Ideally, we'd like to put all batches in the hollow_batches collection, since
212                // tracking the spine id reduces ambiguity and makes diffing cheaper. However,
213                // we currently keep most batches in the legacy collection for backwards
214                // compatibility.
215                // As an exception, we add batches with empty time ranges to hollow_batches:
216                // they're otherwise not guaranteed to be unique, and since we only started writing
217                // them down recently there's no backwards compatibility risk.
218                if batch.desc.lower() == batch.desc.upper() {
219                    hollow_batches.insert(*id, Arc::clone(batch));
220                } else {
221                    legacy_batches.insert(Arc::clone(batch), ());
222                }
223            }
224
225            let spine_batch = ThinSpineBatch {
226                level,
227                desc,
228                parts,
229                descs,
230            };
231            spine_batches.insert(id, spine_batch);
232        };
233
234        for (level, state) in self.spine.merging.iter().enumerate() {
235            for batch in &state.batches {
236                push_spine_batch(level, batch);
237                if let Some(c) = &batch.active_compaction {
238                    let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
239                    assert!(
240                        previous.is_none(),
241                        "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
242                        batch.id,
243                    )
244                }
245            }
246            if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
247                let previous = merges.insert(*id, ThinMerge::fueling(merge));
248                assert!(
249                    previous.is_none(),
250                    "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
251                )
252            }
253        }
254
255        if !self.roundtrip_structure {
256            assert!(hollow_batches.is_empty());
257            spine_batches.clear();
258            merges.clear();
259        }
260
261        FlatTrace {
262            since,
263            legacy_batches,
264            hollow_batches,
265            spine_batches,
266            merges,
267        }
268    }
269    pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
270        let FlatTrace {
271            since,
272            legacy_batches,
273            mut hollow_batches,
274            spine_batches,
275            mut merges,
276        } = value;
277
278        // If the flattened representation has spine batches (or is empty)
279        // we know to preserve the structure for this trace.
280        let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
281
282        // We need to look up legacy batches somehow, but we don't have a spine id for them.
283        // Instead, we rely on the fact that the spine must store them in antichain order.
284        // Our timestamp type may not be totally ordered, so we need to implement our own comparator
285        // here. Persist's invariants ensure that all the frontiers we're comparing are comparable,
286        // though.
287        let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
288            if PartialOrder::less_than(left, right) {
289                Ordering::Less
290            } else if PartialOrder::less_than(right, left) {
291                Ordering::Greater
292            } else {
293                Ordering::Equal
294            }
295        };
296        let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
297        legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
298
299        let mut pop_batch =
300            |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
301                if let Some(batch) = hollow_batches.remove(&id) {
302                    if let Some(desc) = expected_desc {
303                        assert_eq!(*desc, batch.desc);
304                    }
305                    return Ok(IdHollowBatch { id, batch });
306                }
307                let mut batch = legacy_batches
308                    .pop()
309                    .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
310
311                let Some(expected_desc) = expected_desc else {
312                    return Ok(IdHollowBatch { id, batch });
313                };
314
315                if expected_desc.lower() != batch.desc.lower() {
316                    return Err(format!(
317                        "hollow batch lower {:?} did not match expected lower {:?}",
318                        batch.desc.lower().elements(),
319                        expected_desc.lower().elements()
320                    ));
321                }
322
323                // Empty legacy batches are not deterministic: different nodes may split them up
324                // in different ways. For now, we rearrange them such to match the spine data.
325                if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
326                    let mut new_upper = batch.desc.upper().clone();
327
328                    // While our current batch is too small, and there's another empty batch
329                    // in the list, roll it in.
330                    while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
331                        let Some(next_batch) = legacy_batches.pop() else {
332                            break;
333                        };
334                        if next_batch.is_empty() {
335                            new_upper.clone_from(next_batch.desc.upper());
336                        } else {
337                            legacy_batches.push(next_batch);
338                            break;
339                        }
340                    }
341
342                    // If our current batch is too large, split it by the expected upper
343                    // and preserve the remainder.
344                    if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
345                        legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
346                            expected_desc.upper().clone(),
347                            new_upper.clone(),
348                            batch.desc.since().clone(),
349                        ))));
350                        new_upper.clone_from(expected_desc.upper());
351                    }
352                    batch = Arc::new(HollowBatch::empty(Description::new(
353                        batch.desc.lower().clone(),
354                        new_upper,
355                        expected_desc.since().clone(),
356                    )))
357                }
358
359                if expected_desc.upper() != batch.desc.upper() {
360                    return Err(format!(
361                        "hollow batch upper {:?} did not match expected upper {:?}",
362                        batch.desc.upper().elements(),
363                        expected_desc.upper().elements()
364                    ));
365                }
366
367                Ok(IdHollowBatch { id, batch })
368            };
369
370        let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
371            (batch.desc.upper().clone(), id.1)
372        } else {
373            (Antichain::from_elem(T::minimum()), 0)
374        };
375        let levels = spine_batches
376            .first_key_value()
377            .map(|(_, batch)| batch.level + 1)
378            .unwrap_or(0);
379        let mut merging = vec![MergeState::default(); levels];
380        for (id, batch) in spine_batches {
381            let level = batch.level;
382
383            let parts = batch
384                .parts
385                .into_iter()
386                .zip(batch.descs.iter().map(Some).chain(std::iter::repeat(None)))
387                .map(|(id, desc)| pop_batch(id, desc))
388                .collect::<Result<Vec<_>, _>>()?;
389            let len = parts.iter().map(|p| (*p).batch.len).sum();
390            let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
391            let batch = SpineBatch {
392                id,
393                desc: batch.desc,
394                parts,
395                active_compaction,
396                len,
397            };
398
399            let state = &mut merging[level];
400
401            state.push_batch(batch);
402            if let Some(id) = state.id() {
403                if let Some(merge) = merges.remove(&id) {
404                    state.merge = Some(IdFuelingMerge {
405                        id,
406                        merge: FuelingMerge {
407                            since: merge.since,
408                            remaining_work: merge.remaining_work,
409                        },
410                    })
411                }
412            }
413        }
414
415        let mut trace = Trace {
416            spine: Spine {
417                effort: 1,
418                next_id,
419                since,
420                upper,
421                merging,
422            },
423            roundtrip_structure,
424        };
425
426        fn check_empty(name: &str, len: usize) -> Result<(), String> {
427            if len != 0 {
428                Err(format!("{len} {name} left after reconstructing spine"))
429            } else {
430                Ok(())
431            }
432        }
433
434        if roundtrip_structure {
435            check_empty("legacy batches", legacy_batches.len())?;
436        } else {
437            // If the structure wasn't actually serialized, we may have legacy batches left over.
438            for batch in legacy_batches.into_iter().rev() {
439                trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
440            }
441        }
442        check_empty("hollow batches", hollow_batches.len())?;
443        check_empty("merges", merges.len())?;
444
445        debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
446
447        Ok(trace)
448    }
449}
450
451#[derive(Clone, Debug, Default)]
452pub(crate) struct SpineMetrics {
453    pub compact_batches: u64,
454    pub compacting_batches: u64,
455    pub noncompact_batches: u64,
456}
457
458impl<T> Trace<T> {
459    pub fn since(&self) -> &Antichain<T> {
460        &self.spine.since
461    }
462
463    pub fn upper(&self) -> &Antichain<T> {
464        &self.spine.upper
465    }
466
467    pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
468        for batch in self.batches() {
469            f(batch);
470        }
471    }
472
473    pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
474        self.spine
475            .spine_batches()
476            .flat_map(|b| b.parts.as_slice())
477            .map(|b| &*b.batch)
478    }
479
480    pub fn num_spine_batches(&self) -> usize {
481        self.spine.spine_batches().count()
482    }
483
484    #[cfg(test)]
485    pub fn num_hollow_batches(&self) -> usize {
486        self.batches().count()
487    }
488
489    #[cfg(test)]
490    pub fn num_updates(&self) -> usize {
491        self.batches().map(|b| b.len).sum()
492    }
493}
494
495impl<T: Timestamp + Lattice> Trace<T> {
496    pub fn downgrade_since(&mut self, since: &Antichain<T>) {
497        self.spine.since.clone_from(since);
498    }
499
500    #[must_use]
501    pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
502        let mut merge_reqs = Vec::new();
503        self.spine.insert(
504            batch,
505            &mut SpineLog::Enabled {
506                merge_reqs: &mut merge_reqs,
507            },
508        );
509        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
510        // Spine::roll_up (internally used by insert) clears all batches out of
511        // levels below a target by walking up from level 0 and merging each
512        // level into the next (providing the necessary fuel). In practice, this
513        // means we'll get a series of requests like `(a, b), (a, b, c), ...`.
514        // It's a waste to do all of these (we'll throw away the results), so we
515        // filter out any that are entirely covered by some other request.
516        Self::remove_redundant_merge_reqs(merge_reqs)
517    }
518
519    pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
520        // TODO: we ought to be able to look up the id for a batch by binary searching the levels.
521        // In the meantime, search backwards, since most compactions are for recent batches.
522        for batch in self.spine.spine_batches_mut().rev() {
523            if batch.id == id {
524                batch.active_compaction = Some(compaction);
525                break;
526            }
527        }
528    }
529
530    /// The same as [Self::push_batch] but without the `FueledMergeReq`s, which
531    /// account for a surprising amount of cpu in prod. database-issues#5411
532    pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
533        self.spine.insert(batch, &mut SpineLog::Disabled);
534    }
535
536    /// Apply some amount of effort to trace maintenance.
537    ///
538    /// The units of effort are updates, and the method should be thought of as
539    /// analogous to inserting as many empty updates, where the trace is
540    /// permitted to perform proportionate work.
541    ///
542    /// Returns true if this did work and false if it left the spine unchanged.
543    #[must_use]
544    pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
545        let mut merge_reqs = Vec::new();
546        let did_work = self.spine.exert(
547            fuel,
548            &mut SpineLog::Enabled {
549                merge_reqs: &mut merge_reqs,
550            },
551        );
552        debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
553        // See the comment in [Self::push_batch].
554        let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
555        (merge_reqs, did_work)
556    }
557
558    /// Validates invariants.
559    ///
560    /// See `Spine::validate` for details.
561    pub fn validate(&self) -> Result<(), String> {
562        self.spine.validate()
563    }
564
565    pub fn apply_merge_res(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
566        for batch in self.spine.spine_batches_mut().rev() {
567            let result = batch.maybe_replace(res);
568            if result.matched() {
569                return result;
570            }
571        }
572        ApplyMergeResult::NotAppliedNoMatch
573    }
574
575    /// Obtain all fueled merge reqs that either have no active compaction, or the previous
576    /// compaction was started at or before the threshold time, in order from oldest to newest.
577    pub(crate) fn fueled_merge_reqs_before_ms(
578        &self,
579        threshold_ms: u64,
580        threshold_writer: Option<WriterKey>,
581    ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
582        self.spine
583            .spine_batches()
584            .filter(move |b| {
585                let noncompact = !b.is_compact();
586                let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
587                    b.parts.iter().any(|b| {
588                        b.batch
589                            .parts
590                            .iter()
591                            .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
592                    })
593                });
594                noncompact || old_writer
595            })
596            .filter(move |b| {
597                // Either there's no active compaction, or the last active compaction
598                // is not after the timeout timestamp.
599                b.active_compaction
600                    .as_ref()
601                    .map_or(true, move |c| c.start_ms <= threshold_ms)
602            })
603            .map(|b| FueledMergeReq {
604                id: b.id,
605                desc: b.desc.clone(),
606                inputs: b.parts.clone(),
607            })
608    }
609
610    // This is only called with the results of one `insert` and so the length of
611    // `merge_reqs` is bounded by the number of levels in the spine (or possibly
612    // some small constant multiple?). The number of levels is logarithmic in the
613    // number of updates in the spine, so this number should stay very small. As
614    // a result, we simply use the naive O(n^2) algorithm here instead of doing
615    // anything fancy with e.g. interval trees.
616    fn remove_redundant_merge_reqs(
617        mut merge_reqs: Vec<FueledMergeReq<T>>,
618    ) -> Vec<FueledMergeReq<T>> {
619        // Returns true if b0 covers b1, false otherwise.
620        fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
621            // TODO: can we relax or remove this since check?
622            b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
623        }
624
625        let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
626        // In practice, merge_reqs will come in sorted such that the "large"
627        // requests are later. Take advantage of this by processing back to
628        // front.
629        while let Some(merge_req) = merge_reqs.pop() {
630            let covered = ret.iter().any(|r| covers(r, &merge_req));
631            if !covered {
632                // Now check if anything we've already staged is covered by this
633                // new req. In practice, the merge_reqs come in sorted and so
634                // this `retain` is a no-op.
635                ret.retain(|r| !covers(&merge_req, r));
636                ret.push(merge_req);
637            }
638        }
639        ret
640    }
641
642    pub fn spine_metrics(&self) -> SpineMetrics {
643        let mut metrics = SpineMetrics::default();
644        for batch in self.spine.spine_batches() {
645            if batch.is_compact() {
646                metrics.compact_batches += 1;
647            } else if batch.is_merging() {
648                metrics.compacting_batches += 1;
649            } else {
650                metrics.noncompact_batches += 1;
651            }
652        }
653        metrics
654    }
655}
656
657/// A log of what transitively happened during a Spine operation: e.g.
658/// FueledMergeReqs were generated.
659enum SpineLog<'a, T> {
660    Enabled {
661        merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
662    },
663    Disabled,
664}
665
666#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
667pub struct SpineId(pub usize, pub usize);
668
669impl Serialize for SpineId {
670    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
671    where
672        S: Serializer,
673    {
674        let SpineId(lo, hi) = self;
675        serializer.serialize_str(&format!("{lo}-{hi}"))
676    }
677}
678
679impl SpineId {
680    fn covers(self, other: SpineId) -> bool {
681        self.0 <= other.0 && other.1 <= self.1
682    }
683}
684
685#[derive(Debug, Clone, PartialEq)]
686pub struct IdHollowBatch<T> {
687    pub id: SpineId,
688    pub batch: Arc<HollowBatch<T>>,
689}
690
691#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
692pub struct ActiveCompaction {
693    pub start_ms: u64,
694}
695
696#[derive(Debug, Clone, PartialEq)]
697struct SpineBatch<T> {
698    id: SpineId,
699    desc: Description<T>,
700    parts: Vec<IdHollowBatch<T>>,
701    active_compaction: Option<ActiveCompaction>,
702    // A cached version of parts.iter().map(|x| x.len).sum()
703    len: usize,
704}
705
706impl<T> SpineBatch<T> {
707    fn merged(batch: IdHollowBatch<T>) -> Self
708    where
709        T: Clone,
710    {
711        Self {
712            id: batch.id,
713            desc: batch.batch.desc.clone(),
714            len: batch.batch.len,
715            parts: vec![batch],
716            active_compaction: None,
717        }
718    }
719}
720
721#[derive(Debug, Copy, Clone)]
722pub enum ApplyMergeResult {
723    AppliedExact,
724    AppliedSubset,
725    NotAppliedNoMatch,
726    NotAppliedInvalidSince,
727    NotAppliedTooManyUpdates,
728}
729
730impl ApplyMergeResult {
731    pub fn applied(&self) -> bool {
732        match self {
733            ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
734            _ => false,
735        }
736    }
737    pub fn matched(&self) -> bool {
738        match self {
739            ApplyMergeResult::AppliedExact
740            | ApplyMergeResult::AppliedSubset
741            | ApplyMergeResult::NotAppliedTooManyUpdates => true,
742            _ => false,
743        }
744    }
745}
746
747impl<T: Timestamp + Lattice> SpineBatch<T> {
748    pub fn lower(&self) -> &Antichain<T> {
749        self.desc().lower()
750    }
751
752    pub fn upper(&self) -> &Antichain<T> {
753        self.desc().upper()
754    }
755
756    fn id(&self) -> SpineId {
757        debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
758        debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
759        self.id
760    }
761
762    pub fn is_compact(&self) -> bool {
763        // This definition is extremely likely to change, but for now, we consider a batch
764        // "compact" if it has at most one hollow batch with at most one run.
765        self.parts.len() <= 1 && self.parts.iter().all(|p| p.batch.run_splits.is_empty())
766    }
767
768    pub fn is_merging(&self) -> bool {
769        self.active_compaction.is_some()
770    }
771
772    fn desc(&self) -> &Description<T> {
773        &self.desc
774    }
775
776    pub fn len(&self) -> usize {
777        // NB: This is an upper bound on len for a non-compact batch; we won't know for sure until
778        // we compact it.
779        debug_assert_eq!(
780            self.len,
781            self.parts.iter().map(|x| x.batch.len).sum::<usize>()
782        );
783        self.len
784    }
785
786    pub fn is_empty(&self) -> bool {
787        self.len() == 0
788    }
789
790    pub fn empty(
791        id: SpineId,
792        lower: Antichain<T>,
793        upper: Antichain<T>,
794        since: Antichain<T>,
795    ) -> Self {
796        SpineBatch::merged(IdHollowBatch {
797            id,
798            batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
799        })
800    }
801
802    pub fn begin_merge(
803        bs: &[Self],
804        compaction_frontier: Option<AntichainRef<T>>,
805    ) -> Option<IdFuelingMerge<T>> {
806        let from = bs.first()?.id().0;
807        let until = bs.last()?.id().1;
808        let id = SpineId(from, until);
809        let mut sinces = bs.iter().map(|b| b.desc().since());
810        let mut since = sinces.next()?.clone();
811        for b in bs {
812            since.join_assign(b.desc().since())
813        }
814        if let Some(compaction_frontier) = compaction_frontier {
815            since.join_assign(&compaction_frontier.to_owned());
816        }
817        let remaining_work = bs.iter().map(|x| x.len()).sum();
818        Some(IdFuelingMerge {
819            id,
820            merge: FuelingMerge {
821                since,
822                remaining_work,
823            },
824        })
825    }
826
827    // TODO: Roundtrip the SpineId through FueledMergeReq/FueledMergeRes?
828    fn maybe_replace(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
829        // The spine's and merge res's sinces don't need to match (which could occur if Spine
830        // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
831        // since must be in advance of the merge res since.
832        if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
833            return ApplyMergeResult::NotAppliedInvalidSince;
834        }
835
836        // If our merge result exactly matches a spine batch, we can swap it in directly
837        let exact_match = res.output.desc.lower() == self.desc().lower()
838            && res.output.desc.upper() == self.desc().upper();
839        if exact_match {
840            // Spine internally has an invariant about a batch being at some level
841            // or higher based on the len. We could end up violating this invariant
842            // if we increased the length of the batch.
843            //
844            // A res output with length greater than the existing spine batch implies
845            // a compaction has already been applied to this range, and with a higher
846            // rate of consolidation than this one. This could happen as a result of
847            // compaction's memory bound limiting the amount of consolidation possible.
848            if res.output.len > self.len() {
849                return ApplyMergeResult::NotAppliedTooManyUpdates;
850            }
851            *self = SpineBatch::merged(IdHollowBatch {
852                id: self.id(),
853                batch: Arc::new(res.output.clone()),
854            });
855            return ApplyMergeResult::AppliedExact;
856        }
857
858        // It is possible the structure of the spine has changed since the merge res
859        // was created, such that it no longer exactly matches the description of a
860        // spine batch. This can happen if another merge has happened in the interim,
861        // or if spine needed to be rebuilt from state.
862        //
863        // When this occurs, we can still attempt to slot the merge res in to replace
864        // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts
865        // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res.
866        let SpineBatch {
867            id,
868            parts,
869            desc,
870            active_compaction: _,
871            len: _,
872        } = self;
873        // first, determine if a subset of parts can be cleanly replaced by the merge res
874        let mut lower = None;
875        let mut upper = None;
876        for (i, batch) in parts.iter().enumerate() {
877            if batch.batch.desc.lower() == res.output.desc.lower() {
878                lower = Some((i, batch.id.0));
879            }
880            if batch.batch.desc.upper() == res.output.desc.upper() {
881                upper = Some((i, batch.id.1));
882            }
883            if lower.is_some() && upper.is_some() {
884                break;
885            }
886        }
887        // next, replace parts with the merge res batch if we can
888        match (lower, upper) {
889            (Some((lower, id_lower)), Some((upper, id_upper))) => {
890                let mut new_parts = vec![];
891                new_parts.extend_from_slice(&parts[..lower]);
892                new_parts.push(IdHollowBatch {
893                    id: SpineId(id_lower, id_upper),
894                    batch: Arc::new(res.output.clone()),
895                });
896                new_parts.extend_from_slice(&parts[upper + 1..]);
897                let new_spine_batch = SpineBatch {
898                    id: *id,
899                    desc: desc.to_owned(),
900                    len: new_parts.iter().map(|x| x.batch.len).sum(),
901                    parts: new_parts,
902                    active_compaction: None,
903                };
904                if new_spine_batch.len() > self.len() {
905                    return ApplyMergeResult::NotAppliedTooManyUpdates;
906                }
907                *self = new_spine_batch;
908                ApplyMergeResult::AppliedSubset
909            }
910            _ => ApplyMergeResult::NotAppliedNoMatch,
911        }
912    }
913
914    #[cfg(test)]
915    fn describe(&self, extended: bool) -> String {
916        let SpineBatch {
917            id,
918            parts,
919            desc,
920            active_compaction,
921            len,
922        } = self;
923        let compaction = match active_compaction {
924            None => "".to_owned(),
925            Some(c) => format!(" (c@{})", c.start_ms),
926        };
927        match extended {
928            false => format!(
929                "[{}-{}]{:?}{:?}{}/{}{compaction}",
930                id.0,
931                id.1,
932                desc.lower().elements(),
933                desc.upper().elements(),
934                parts.len(),
935                len
936            ),
937            true => {
938                format!(
939                    "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
940                    id.0,
941                    id.1,
942                    desc.lower().elements(),
943                    desc.upper().elements(),
944                    desc.since().elements(),
945                    parts.len(),
946                    len,
947                    parts
948                        .iter()
949                        .flat_map(|x| x.batch.parts.iter())
950                        .map(|x| format!(" {}", x.printable_name()))
951                        .collect::<Vec<_>>()
952                        .join("")
953                )
954            }
955        }
956    }
957}
958
959#[derive(Debug, Clone, PartialEq, Serialize)]
960pub struct FuelingMerge<T> {
961    pub(crate) since: Antichain<T>,
962    pub(crate) remaining_work: usize,
963}
964
965#[derive(Debug, Clone, PartialEq, Serialize)]
966pub struct IdFuelingMerge<T> {
967    id: SpineId,
968    merge: FuelingMerge<T>,
969}
970
971impl<T: Timestamp + Lattice> FuelingMerge<T> {
972    /// Perform some amount of work, decrementing `fuel`.
973    ///
974    /// If `fuel` is non-zero after the call, the merging is complete and one
975    /// should call `done` to extract the merged results.
976    // TODO(benesch): rewrite to avoid usage of `as`.
977    #[allow(clippy::as_conversions)]
978    fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
979        let used = std::cmp::min(*fuel as usize, self.remaining_work);
980        self.remaining_work = self.remaining_work.saturating_sub(used);
981        *fuel -= used as isize;
982    }
983
984    /// Extracts merged results.
985    ///
986    /// This method should only be called after `work` has been called and has
987    /// not brought `fuel` to zero. Otherwise, the merge is still in progress.
988    fn done(
989        self,
990        bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
991        log: &mut SpineLog<'_, T>,
992    ) -> Option<SpineBatch<T>> {
993        let first = bs.first()?;
994        let last = bs.last()?;
995        let id = SpineId(first.id().0, last.id().1);
996        assert!(id.0 < id.1);
997        let lower = first.desc().lower().clone();
998        let upper = last.desc().upper().clone();
999        let since = self.since;
1000
1001        // Special case empty batches.
1002        if bs.iter().all(SpineBatch::is_empty) {
1003            return Some(SpineBatch::empty(id, lower, upper, since));
1004        }
1005
1006        let desc = Description::new(lower, upper, since);
1007        let len = bs.iter().map(SpineBatch::len).sum();
1008
1009        // Pre-size the merged_parts Vec. Benchmarking has shown that, at least
1010        // in the worst case, the double iteration is absolutely worth having
1011        // merged_parts pre-sized.
1012        let mut merged_parts_len = 0;
1013        for b in &bs {
1014            merged_parts_len += b.parts.len();
1015        }
1016        let mut merged_parts = Vec::with_capacity(merged_parts_len);
1017        for b in bs {
1018            merged_parts.extend(b.parts)
1019        }
1020        // Sanity check the pre-size code.
1021        debug_assert_eq!(merged_parts.len(), merged_parts_len);
1022
1023        if let SpineLog::Enabled { merge_reqs } = log {
1024            merge_reqs.push(FueledMergeReq {
1025                id,
1026                desc: desc.clone(),
1027                inputs: merged_parts.clone(),
1028            });
1029        }
1030
1031        Some(SpineBatch {
1032            id,
1033            desc,
1034            len,
1035            parts: merged_parts,
1036            active_compaction: None,
1037        })
1038    }
1039}
1040
1041/// The maximum number of batches per level in the spine.
1042/// In practice, we probably want a larger max and a configurable soft cap, but using a
1043/// stack-friendly data structure and keeping this number low makes this safer during the
1044/// initial rollout.
1045const BATCHES_PER_LEVEL: usize = 2;
1046
1047/// An append-only collection of update batches.
1048///
1049/// The `Spine` is a general-purpose trace implementation based on collection
1050/// and merging immutable batches of updates. It is generic with respect to the
1051/// batch type, and can be instantiated for any implementor of `trace::Batch`.
1052///
1053/// ## Design
1054///
1055/// This spine is represented as a list of layers, where each element in the
1056/// list is either
1057///
1058///   1. MergeState::Vacant  empty
1059///   2. MergeState::Single  a single batch
1060///   3. MergeState::Double  a pair of batches
1061///
1062/// Each "batch" has the option to be `None`, indicating a non-batch that
1063/// nonetheless acts as a number of updates proportionate to the level at which
1064/// it exists (for bookkeeping).
1065///
1066/// Each of the batches at layer i contains at most 2^i elements. The sequence
1067/// of batches should have the upper bound of one match the lower bound of the
1068/// next. Batches may be logically empty, with matching upper and lower bounds,
1069/// as a bookkeeping mechanism.
1070///
1071/// Each batch at layer i is treated as if it contains exactly 2^i elements,
1072/// even though it may actually contain fewer elements. This allows us to
1073/// decouple the physical representation from logical amounts of effort invested
1074/// in each batch. It allows us to begin compaction and to reduce the number of
1075/// updates, without compromising our ability to continue to move updates along
1076/// the spine. We are explicitly making the trade-off that while some batches
1077/// might compact at lower levels, we want to treat them as if they contained
1078/// their full set of updates for accounting reasons (to apply work to higher
1079/// levels).
1080///
1081/// We maintain the invariant that for any in-progress merge at level k there
1082/// should be fewer than 2^k records at levels lower than k. That is, even if we
1083/// were to apply an unbounded amount of effort to those records, we would not
1084/// have enough records to prompt a merge into the in-progress merge. Ideally,
1085/// we maintain the extended invariant that for any in-progress merge at level
1086/// k, the remaining effort required (number of records minus applied effort) is
1087/// less than the number of records that would need to be added to reach 2^k
1088/// records in layers below.
1089///
1090/// ## Mathematics
1091///
1092/// When a merge is initiated, there should be a non-negative *deficit* of
1093/// updates before the layers below could plausibly produce a new batch for the
1094/// currently merging layer. We must determine a factor of proportionality, so
1095/// that newly arrived updates provide at least that amount of "fuel" towards
1096/// the merging layer, so that the merge completes before lower levels invade.
1097///
1098/// ### Deficit:
1099///
1100/// A new merge is initiated only in response to the completion of a prior
1101/// merge, or the introduction of new records from outside. The latter case is
1102/// special, and will maintain our invariant trivially, so we will focus on the
1103/// former case.
1104///
1105/// When a merge at level k completes, assuming we have maintained our invariant
1106/// then there should be fewer than 2^k records at lower levels. The newly
1107/// created merge at level k+1 will require up to 2^k+2 units of work, and
1108/// should not expect a new batch until strictly more than 2^k records are
1109/// added. This means that a factor of proportionality of four should be
1110/// sufficient to ensure that the merge completes before a new merge is
1111/// initiated.
1112///
1113/// When new records get introduced, we will need to roll up any batches at
1114/// lower levels, which we treat as the introduction of records. Each of these
1115/// virtual records introduced should either be accounted for the fuel it should
1116/// contribute, as it results in the promotion of batches closer to in-progress
1117/// merges.
1118///
1119/// ### Fuel sharing
1120///
1121/// We like the idea of applying fuel preferentially to merges at *lower*
1122/// levels, under the idea that they are easier to complete, and we benefit from
1123/// fewer total merges in progress. This does delay the completion of merges at
1124/// higher levels, and may not obviously be a total win. If we choose to do
1125/// this, we should make sure that we correctly account for completed merges at
1126/// low layers: they should still extract fuel from new updates even though they
1127/// have completed, at least until they have paid back any "debt" to higher
1128/// layers by continuing to provide fuel as updates arrive.
1129#[derive(Debug, Clone)]
1130struct Spine<T> {
1131    effort: usize,
1132    next_id: usize,
1133    since: Antichain<T>,
1134    upper: Antichain<T>,
1135    merging: Vec<MergeState<T>>,
1136}
1137
1138impl<T> Spine<T> {
1139    /// All batches in the spine, oldest to newest.
1140    pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1141        self.merging.iter().rev().flat_map(|m| &m.batches)
1142    }
1143
1144    /// All (mutable) batches in the spine, oldest to newest.
1145    pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1146        self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1147    }
1148}
1149
1150impl<T: Timestamp + Lattice> Spine<T> {
1151    /// Allocates a fueled `Spine`.
1152    ///
1153    /// This trace will merge batches progressively, with each inserted batch
1154    /// applying a multiple of the batch's length in effort to each merge. The
1155    /// `effort` parameter is that multiplier. This value should be at least one
1156    /// for the merging to happen; a value of zero is not helpful.
1157    pub fn new() -> Self {
1158        Spine {
1159            effort: 1,
1160            next_id: 0,
1161            since: Antichain::from_elem(T::minimum()),
1162            upper: Antichain::from_elem(T::minimum()),
1163            merging: Vec::new(),
1164        }
1165    }
1166
1167    /// Apply some amount of effort to trace maintenance.
1168    ///
1169    /// The units of effort are updates, and the method should be thought of as
1170    /// analogous to inserting as many empty updates, where the trace is
1171    /// permitted to perform proportionate work.
1172    ///
1173    /// Returns true if this did work and false if it left the spine unchanged.
1174    fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1175        self.tidy_layers();
1176        if self.reduced() {
1177            return false;
1178        }
1179
1180        if self.merging.iter().any(|b| b.merge.is_some()) {
1181            let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1182            // If any merges exist, we can directly call `apply_fuel`.
1183            self.apply_fuel(&fuel, log);
1184        } else {
1185            // Otherwise, we'll need to introduce fake updates to move merges
1186            // along.
1187
1188            // Introduce an empty batch with roughly *effort number of virtual updates.
1189            let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1190            let id = self.next_id();
1191            self.introduce_batch(
1192                SpineBatch::empty(
1193                    id,
1194                    self.upper.clone(),
1195                    self.upper.clone(),
1196                    self.since.clone(),
1197                ),
1198                level,
1199                log,
1200            );
1201        }
1202        true
1203    }
1204
1205    pub fn next_id(&mut self) -> SpineId {
1206        let id = self.next_id;
1207        self.next_id += 1;
1208        SpineId(id, self.next_id)
1209    }
1210
1211    // Ideally, this method acts as insertion of `batch`, even if we are not yet
1212    // able to begin merging the batch. This means it is a good time to perform
1213    // amortized work proportional to the size of batch.
1214    pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1215        assert!(batch.desc.lower() != batch.desc.upper());
1216        assert_eq!(batch.desc.lower(), &self.upper);
1217
1218        let id = self.next_id();
1219        let batch = SpineBatch::merged(IdHollowBatch {
1220            id,
1221            batch: Arc::new(batch),
1222        });
1223
1224        self.upper.clone_from(batch.upper());
1225
1226        // If `batch` and the most recently inserted batch are both empty,
1227        // we can just fuse them.
1228        if batch.is_empty() {
1229            if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1230                if self.merging[position].is_single() && self.merging[position].is_empty() {
1231                    self.insert_at(batch, position);
1232                    // Since we just inserted a batch, we should always have work to complete...
1233                    // but otherwise we just leave this layer vacant.
1234                    if let Some(merged) = self.complete_at(position, log) {
1235                        self.merging[position] = MergeState::single(merged);
1236                    }
1237                    return;
1238                }
1239            }
1240        }
1241
1242        // Normal insertion for the batch.
1243        let index = batch.len().next_power_of_two();
1244        self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1245    }
1246
1247    /// True iff there is at most one HollowBatch in `self.merging`.
1248    ///
1249    /// When true, there is no maintenance work to perform in the trace, other
1250    /// than compaction. We do not yet have logic in place to determine if
1251    /// compaction would improve a trace, so for now we are ignoring that.
1252    fn reduced(&self) -> bool {
1253        self.spine_batches()
1254            .flat_map(|b| b.parts.as_slice())
1255            .count()
1256            < 2
1257    }
1258
1259    /// Describes the merge progress of layers in the trace.
1260    ///
1261    /// Intended for diagnostics rather than public consumption.
1262    #[allow(dead_code)]
1263    fn describe(&self) -> Vec<(usize, usize)> {
1264        self.merging
1265            .iter()
1266            .map(|b| (b.batches.len(), b.len()))
1267            .collect()
1268    }
1269
1270    /// Introduces a batch at an indicated level.
1271    ///
1272    /// The level indication is often related to the size of the batch, but it
1273    /// can also be used to artificially fuel the computation by supplying empty
1274    /// batches at non-trivial indices, to move merges along.
1275    fn introduce_batch(
1276        &mut self,
1277        batch: SpineBatch<T>,
1278        batch_index: usize,
1279        log: &mut SpineLog<'_, T>,
1280    ) {
1281        // Step 0.  Determine an amount of fuel to use for the computation.
1282        //
1283        //          Fuel is used to drive maintenance of the data structure,
1284        //          and in particular are used to make progress through merges
1285        //          that are in progress. The amount of fuel to use should be
1286        //          proportional to the number of records introduced, so that
1287        //          we are guaranteed to complete all merges before they are
1288        //          required as arguments to merges again.
1289        //
1290        //          The fuel use policy is negotiable, in that we might aim
1291        //          to use relatively less when we can, so that we return
1292        //          control promptly, or we might account more work to larger
1293        //          batches. Not clear to me which are best, of if there
1294        //          should be a configuration knob controlling this.
1295
1296        // The amount of fuel to use is proportional to 2^batch_index, scaled by
1297        // a factor of self.effort which determines how eager we are in
1298        // performing maintenance work. We need to ensure that each merge in
1299        // progress receives fuel for each introduced batch, and so multiply by
1300        // that as well.
1301        if batch_index > 32 {
1302            println!("Large batch index: {}", batch_index);
1303        }
1304
1305        // We believe that eight units of fuel is sufficient for each introduced
1306        // record, accounted as four for each record, and a potential four more
1307        // for each virtual record associated with promoting existing smaller
1308        // batches. We could try and make this be less, or be scaled to merges
1309        // based on their deficit at time of instantiation. For now, we remain
1310        // conservative.
1311        let mut fuel = 8 << batch_index;
1312        // Scale up by the effort parameter, which is calibrated to one as the
1313        // minimum amount of effort.
1314        fuel *= self.effort;
1315        // Convert to an `isize` so we can observe any fuel shortfall.
1316        // TODO(benesch): avoid dangerous usage of `as`.
1317        #[allow(clippy::as_conversions)]
1318        let fuel = fuel as isize;
1319
1320        // Step 1.  Apply fuel to each in-progress merge.
1321        //
1322        //          Before we can introduce new updates, we must apply any
1323        //          fuel to in-progress merges, as this fuel is what ensures
1324        //          that the merges will be complete by the time we insert
1325        //          the updates.
1326        self.apply_fuel(&fuel, log);
1327
1328        // Step 2.  We must ensure the invariant that adjacent layers do not
1329        //          contain two batches will be satisfied when we insert the
1330        //          batch. We forcibly completing all merges at layers lower
1331        //          than and including `batch_index`, so that the new batch is
1332        //          inserted into an empty layer.
1333        //
1334        //          We could relax this to "strictly less than `batch_index`"
1335        //          if the layer above has only a single batch in it, which
1336        //          seems not implausible if it has been the focus of effort.
1337        //
1338        //          This should be interpreted as the introduction of some
1339        //          volume of fake updates, and we will need to fuel merges
1340        //          by a proportional amount to ensure that they are not
1341        //          surprised later on. The number of fake updates should
1342        //          correspond to the deficit for the layer, which perhaps
1343        //          we should track explicitly.
1344        self.roll_up(batch_index, log);
1345
1346        // Step 3. This insertion should be into an empty layer. It is a logical
1347        //         error otherwise, as we may be violating our invariant, from
1348        //         which all wonderment derives.
1349        self.insert_at(batch, batch_index);
1350
1351        // Step 4. Tidy the largest layers.
1352        //
1353        //         It is important that we not tidy only smaller layers,
1354        //         as their ascension is what ensures the merging and
1355        //         eventual compaction of the largest layers.
1356        self.tidy_layers();
1357    }
1358
1359    /// Ensures that an insertion at layer `index` will succeed.
1360    ///
1361    /// This method is subject to the constraint that all existing batches
1362    /// should occur at higher levels, which requires it to "roll up" batches
1363    /// present at lower levels before the method is called. In doing this, we
1364    /// should not introduce more virtual records than 2^index, as that is the
1365    /// amount of excess fuel we have budgeted for completing merges.
1366    fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1367        // Ensure entries sufficient for `index`.
1368        while self.merging.len() <= index {
1369            self.merging.push(MergeState::default());
1370        }
1371
1372        // We only need to roll up if there are non-vacant layers.
1373        if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1374            // Collect and merge all batches at layers up to but not including
1375            // `index`.
1376            let mut merged = None;
1377            for i in 0..index {
1378                if let Some(merged) = merged.take() {
1379                    self.insert_at(merged, i);
1380                }
1381                merged = self.complete_at(i, log);
1382            }
1383
1384            // The merged results should be introduced at level `index`, which
1385            // should be ready to absorb them (possibly creating a new merge at
1386            // the time).
1387            if let Some(merged) = merged {
1388                self.insert_at(merged, index);
1389            }
1390
1391            // If the insertion results in a merge, we should complete it to
1392            // ensure the upcoming insertion at `index` does not panic.
1393            if self.merging[index].is_full() {
1394                let merged = self.complete_at(index, log).expect("double batch");
1395                self.insert_at(merged, index + 1);
1396            }
1397        }
1398    }
1399
1400    /// Applies an amount of fuel to merges in progress.
1401    ///
1402    /// The supplied `fuel` is for each in progress merge, and if we want to
1403    /// spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we
1404    /// could do so in order to maintain fewer batches on average (at the risk
1405    /// of completing merges of large batches later, but tbh probably not much
1406    /// later).
1407    pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1408        // For the moment our strategy is to apply fuel independently to each
1409        // merge in progress, rather than prioritizing small merges. This sounds
1410        // like a great idea, but we need better accounting in place to ensure
1411        // that merges that borrow against later layers but then complete still
1412        // "acquire" fuel to pay back their debts.
1413        for index in 0..self.merging.len() {
1414            // Give each level independent fuel, for now.
1415            let mut fuel = *fuel;
1416            // Pass along various logging stuffs, in case we need to report
1417            // success.
1418            self.merging[index].work(&mut fuel);
1419            // `fuel` could have a deficit at this point, meaning we over-spent
1420            // when we took a merge step. We could ignore this, or maintain the
1421            // deficit and account future fuel against it before spending again.
1422            // It isn't clear why that would be especially helpful to do; we
1423            // might want to avoid overspends at multiple layers in the same
1424            // invocation (to limit latencies), but there is probably a rich
1425            // policy space here.
1426
1427            // If a merge completes, we can immediately merge it in to the next
1428            // level, which is "guaranteed" to be complete at this point, by our
1429            // fueling discipline.
1430            if self.merging[index].is_complete() {
1431                let complete = self.complete_at(index, log).expect("complete batch");
1432                self.insert_at(complete, index + 1);
1433            }
1434        }
1435    }
1436
1437    /// Inserts a batch at a specific location.
1438    ///
1439    /// This is a non-public internal method that can panic if we try and insert
1440    /// into a layer which already contains two batches (and is still in the
1441    /// process of merging).
1442    fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1443        // Ensure the spine is large enough.
1444        while self.merging.len() <= index {
1445            self.merging.push(MergeState::default());
1446        }
1447
1448        // Insert the batch at the location.
1449        let merging = &mut self.merging[index];
1450        merging.push_batch(batch);
1451        if merging.batches.is_full() {
1452            let compaction_frontier = Some(self.since.borrow());
1453            merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1454        }
1455    }
1456
1457    /// Completes and extracts what ever is at layer `index`, leaving this layer vacant.
1458    fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1459        self.merging[index].complete(log)
1460    }
1461
1462    /// Attempts to draw down large layers to size appropriate layers.
1463    fn tidy_layers(&mut self) {
1464        // If the largest layer is complete (not merging), we can attempt to
1465        // draw it down to the next layer. This is permitted if we can maintain
1466        // our invariant that below each merge there are at most half the
1467        // records that would be required to invade the merge.
1468        if !self.merging.is_empty() {
1469            let mut length = self.merging.len();
1470            if self.merging[length - 1].is_single() {
1471                // To move a batch down, we require that it contain few enough
1472                // records that the lower level is appropriate, and that moving
1473                // the batch would not create a merge violating our invariant.
1474                let appropriate_level = usize::cast_from(
1475                    self.merging[length - 1]
1476                        .len()
1477                        .next_power_of_two()
1478                        .trailing_zeros(),
1479                );
1480
1481                // Continue only as far as is appropriate
1482                while appropriate_level < length - 1 {
1483                    let current = &mut self.merging[length - 2];
1484                    if current.is_vacant() {
1485                        // Vacant batches can be absorbed.
1486                        self.merging.remove(length - 2);
1487                        length = self.merging.len();
1488                    } else {
1489                        if !current.is_full() {
1490                            // Single batches may initiate a merge, if sizes are
1491                            // within bounds, but terminate the loop either way.
1492
1493                            // Determine the number of records that might lead
1494                            // to a merge. Importantly, this is not the number
1495                            // of actual records, but the sum of upper bounds
1496                            // based on indices.
1497                            let mut smaller = 0;
1498                            for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
1499                                smaller += batch.batches.len() << index;
1500                            }
1501
1502                            if smaller <= (1 << length) / 8 {
1503                                // Remove the batch under consideration (shifting the deeper batches up a level),
1504                                // then merge in the single batch at the current level.
1505                                let state = self.merging.remove(length - 2);
1506                                assert_eq!(state.batches.len(), 1);
1507                                for batch in state.batches {
1508                                    self.insert_at(batch, length - 2);
1509                                }
1510                            }
1511                        }
1512                        break;
1513                    }
1514                }
1515            }
1516        }
1517    }
1518
1519    /// Checks invariants:
1520    /// - The lowers and uppers of all batches "line up".
1521    /// - The lower of the "minimum" batch is `antichain[T::minimum]`.
1522    /// - The upper of the "maximum" batch is `== self.upper`.
1523    /// - The since of each batch is `less_equal self.since`.
1524    /// - The `SpineIds` all "line up" and cover from `0` to `self.next_id`.
1525    /// - TODO: Verify fuel and level invariants.
1526    fn validate(&self) -> Result<(), String> {
1527        let mut id = SpineId(0, 0);
1528        let mut frontier = Antichain::from_elem(T::minimum());
1529        for x in self.merging.iter().rev() {
1530            if x.is_full() != x.merge.is_some() {
1531                return Err(format!(
1532                    "all (and only) full batches should have fueling merges (full={}, merge={:?})",
1533                    x.is_full(),
1534                    x.merge,
1535                ));
1536            }
1537
1538            if let Some(m) = &x.merge {
1539                if !x.is_full() {
1540                    return Err(format!(
1541                        "merge should only exist for full batches (len={:?}, merge={:?})",
1542                        x.batches.len(),
1543                        m.id,
1544                    ));
1545                }
1546                if x.id() != Some(m.id) {
1547                    return Err(format!(
1548                        "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
1549                        x.id(),
1550                        m.id,
1551                    ));
1552                }
1553            }
1554
1555            // TODO: Anything we can validate about x.merge? It'd
1556            // be nice to assert that it's bigger than the len of the
1557            // two batches, but apply_merge_res might swap those lengths
1558            // out from under us.
1559            for batch in &x.batches {
1560                if batch.id().0 != id.1 {
1561                    return Err(format!(
1562                        "batch id {:?} does not match the previous id {:?}: {:?}",
1563                        batch.id(),
1564                        id,
1565                        self
1566                    ));
1567                }
1568                id = batch.id();
1569                if batch.desc().lower() != &frontier {
1570                    return Err(format!(
1571                        "batch lower {:?} does not match the previous upper {:?}: {:?}",
1572                        batch.desc().lower(),
1573                        frontier,
1574                        self
1575                    ));
1576                }
1577                frontier.clone_from(batch.desc().upper());
1578                if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
1579                    return Err(format!(
1580                        "since of batch {:?} past the spine since {:?}: {:?}",
1581                        batch.desc().since(),
1582                        self.since,
1583                        self
1584                    ));
1585                }
1586            }
1587        }
1588        if self.next_id != id.1 {
1589            return Err(format!(
1590                "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
1591                self.next_id, id, self
1592            ));
1593        }
1594        if self.upper != frontier {
1595            return Err(format!(
1596                "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
1597                self.upper, frontier, self
1598            ));
1599        }
1600        Ok(())
1601    }
1602}
1603
1604/// Describes the state of a layer.
1605///
1606/// A layer can be empty, contain a single batch, or contain a pair of batches
1607/// that are in the process of merging into a batch for the next layer.
1608#[derive(Debug, Clone)]
1609struct MergeState<T> {
1610    batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1611    merge: Option<IdFuelingMerge<T>>,
1612}
1613
1614impl<T> Default for MergeState<T> {
1615    fn default() -> Self {
1616        Self {
1617            batches: ArrayVec::new(),
1618            merge: None,
1619        }
1620    }
1621}
1622
1623impl<T: Timestamp + Lattice> MergeState<T> {
1624    /// An id that covers all the batches in the given merge state, assuming there are any.
1625    fn id(&self) -> Option<SpineId> {
1626        if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
1627            Some(SpineId(first.id().0, last.id().1))
1628        } else {
1629            None
1630        }
1631    }
1632
1633    /// A new single-batch merge state.
1634    fn single(batch: SpineBatch<T>) -> Self {
1635        let mut state = Self::default();
1636        state.push_batch(batch);
1637        state
1638    }
1639
1640    /// Push a new batch at this level, checking invariants.
1641    fn push_batch(&mut self, batch: SpineBatch<T>) {
1642        if let Some(last) = self.batches.last() {
1643            assert_eq!(last.id().1, batch.id().0);
1644            assert_eq!(last.upper(), batch.lower());
1645        }
1646        assert!(
1647            self.merge.is_none(),
1648            "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
1649            batch.id,
1650            self.batches.len(),
1651        );
1652        self.batches
1653            .try_push(batch)
1654            .expect("Attempted to insert batch into full layer!");
1655    }
1656
1657    /// The number of actual updates contained in the level.
1658    fn len(&self) -> usize {
1659        self.batches.iter().map(SpineBatch::len).sum()
1660    }
1661
1662    /// True if this merge state contains no updates.
1663    fn is_empty(&self) -> bool {
1664        self.batches.iter().all(SpineBatch::is_empty)
1665    }
1666
1667    /// True if this level contains no batches.
1668    fn is_vacant(&self) -> bool {
1669        self.batches.is_empty()
1670    }
1671
1672    /// True only for a single-batch state.
1673    fn is_single(&self) -> bool {
1674        self.batches.len() == 1
1675    }
1676
1677    /// True if this merge cannot hold any more batches.
1678    /// (i.e. for a binary merge tree, true if this layer holds two batches.)
1679    fn is_full(&self) -> bool {
1680        self.batches.is_full()
1681    }
1682
1683    /// Immediately complete any merge.
1684    ///
1685    /// The result is either a batch, if there is a non-trivial batch to return
1686    /// or `None` if there is no meaningful batch to return.
1687    ///
1688    /// There is the additional option of input batches.
1689    fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1690        let mut this = mem::take(self);
1691        if this.batches.len() <= 1 {
1692            this.batches.pop()
1693        } else {
1694            // Merge the remaining batches, regardless of whether we have a fully fueled merge.
1695            let id_merge = this
1696                .merge
1697                .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
1698            id_merge.merge.done(this.batches, log)
1699        }
1700    }
1701
1702    /// True iff the layer is a complete merge, ready for extraction.
1703    fn is_complete(&self) -> bool {
1704        match &self.merge {
1705            Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
1706            None => false,
1707        }
1708    }
1709
1710    /// Performs a bounded amount of work towards a merge.
1711    fn work(&mut self, fuel: &mut isize) {
1712        // We only perform work for merges in progress.
1713        if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
1714            merge.work(&self.batches[..], fuel)
1715        }
1716    }
1717}
1718
1719#[cfg(test)]
1720pub mod datadriven {
1721    use crate::internal::datadriven::DirectiveArgs;
1722
1723    use super::*;
1724
1725    /// Shared state for a single [crate::internal::trace] [datadriven::TestFile].
1726    #[derive(Debug, Default)]
1727    pub struct TraceState {
1728        pub trace: Trace<u64>,
1729        pub merge_reqs: Vec<FueledMergeReq<u64>>,
1730    }
1731
1732    pub fn since_upper(
1733        datadriven: &TraceState,
1734        _args: DirectiveArgs,
1735    ) -> Result<String, anyhow::Error> {
1736        Ok(format!(
1737            "{:?}{:?}\n",
1738            datadriven.trace.since().elements(),
1739            datadriven.trace.upper().elements()
1740        ))
1741    }
1742
1743    pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
1744        let mut s = String::new();
1745        for b in datadriven.trace.spine.spine_batches() {
1746            s.push_str(b.describe(true).as_str());
1747            s.push('\n');
1748        }
1749        Ok(s)
1750    }
1751
1752    pub fn insert(
1753        datadriven: &mut TraceState,
1754        args: DirectiveArgs,
1755    ) -> Result<String, anyhow::Error> {
1756        for x in args
1757            .input
1758            .trim()
1759            .split('\n')
1760            .map(DirectiveArgs::parse_hollow_batch)
1761        {
1762            datadriven
1763                .merge_reqs
1764                .append(&mut datadriven.trace.push_batch(x));
1765        }
1766        Ok("ok\n".to_owned())
1767    }
1768
1769    pub fn downgrade_since(
1770        datadriven: &mut TraceState,
1771        args: DirectiveArgs,
1772    ) -> Result<String, anyhow::Error> {
1773        let since = args.expect("since");
1774        datadriven
1775            .trace
1776            .downgrade_since(&Antichain::from_elem(since));
1777        Ok("ok\n".to_owned())
1778    }
1779
1780    pub fn take_merge_req(
1781        datadriven: &mut TraceState,
1782        _args: DirectiveArgs,
1783    ) -> Result<String, anyhow::Error> {
1784        let mut s = String::new();
1785        for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
1786            write!(
1787                s,
1788                "{:?}{:?}{:?} {}\n",
1789                merge_req.desc.lower().elements(),
1790                merge_req.desc.upper().elements(),
1791                merge_req.desc.since().elements(),
1792                merge_req
1793                    .inputs
1794                    .iter()
1795                    .flat_map(|x| x.batch.parts.iter())
1796                    .map(|x| x.printable_name())
1797                    .collect::<Vec<_>>()
1798                    .join(" ")
1799            );
1800        }
1801        Ok(s)
1802    }
1803
1804    pub fn apply_merge_res(
1805        datadriven: &mut TraceState,
1806        args: DirectiveArgs,
1807    ) -> Result<String, anyhow::Error> {
1808        let res = FueledMergeRes {
1809            output: DirectiveArgs::parse_hollow_batch(args.input),
1810        };
1811        match datadriven.trace.apply_merge_res(&res) {
1812            ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
1813            ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
1814            ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
1815            ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
1816            ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
1817        }
1818    }
1819}
1820
1821#[cfg(test)]
1822pub(crate) mod tests {
1823    use std::ops::Range;
1824
1825    use proptest::prelude::*;
1826    use semver::Version;
1827
1828    use crate::internal::state::tests::any_hollow_batch;
1829
1830    use super::*;
1831
1832    pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
1833        num_batches: Range<usize>,
1834    ) -> impl Strategy<Value = Trace<T>> {
1835        Strategy::prop_map(
1836            (
1837                any::<Option<T>>(),
1838                proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
1839                any::<bool>(),
1840                any::<u64>(),
1841            ),
1842            |(since, mut batches, roundtrip_structure, timeout_ms)| {
1843                let mut trace = Trace::<T>::default();
1844                trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
1845
1846                // Fix up the arbitrary HollowBatches so the lowers and uppers
1847                // align.
1848                batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
1849                let mut lower = Antichain::from_elem(T::minimum());
1850                for mut batch in batches {
1851                    // Overall trace since has to be past each batch's since.
1852                    if PartialOrder::less_than(trace.since(), batch.desc.since()) {
1853                        trace.downgrade_since(batch.desc.since());
1854                    }
1855                    batch.desc = Description::new(
1856                        lower.clone(),
1857                        batch.desc.upper().clone(),
1858                        batch.desc.since().clone(),
1859                    );
1860                    lower.clone_from(batch.desc.upper());
1861                    let _merge_req = trace.push_batch(batch);
1862                }
1863                let reqs: Vec<_> = trace
1864                    .fueled_merge_reqs_before_ms(timeout_ms, None)
1865                    .collect();
1866                for req in reqs {
1867                    trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
1868                }
1869                trace.roundtrip_structure = roundtrip_structure;
1870                trace
1871            },
1872        )
1873    }
1874
1875    #[mz_ore::test]
1876    #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
1877    fn test_roundtrips() {
1878        fn check(trace: Trace<i64>) {
1879            trace.validate().unwrap();
1880            let flat = trace.flatten();
1881            let unflat = Trace::unflatten(flat).unwrap();
1882            assert_eq!(trace, unflat);
1883        }
1884
1885        proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
1886    }
1887
1888    #[mz_ore::test]
1889    fn fueled_merge_reqs() {
1890        let mut trace: Trace<u64> = Trace::default();
1891        let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
1892            0,
1893            10,
1894            &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
1895            1000,
1896        ));
1897
1898        assert!(fueled_reqs.is_empty());
1899        assert_eq!(
1900            trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
1901            0,
1902            "no merge reqs when not filtering by version"
1903        );
1904        assert_eq!(
1905            trace
1906                .fueled_merge_reqs_before_ms(
1907                    u64::MAX,
1908                    Some(WriterKey::for_version(&Version::new(0, 50, 0)))
1909                )
1910                .count(),
1911            0,
1912            "zero batches are older than a past version"
1913        );
1914        assert_eq!(
1915            trace
1916                .fueled_merge_reqs_before_ms(
1917                    u64::MAX,
1918                    Some(WriterKey::for_version(&Version::new(99, 99, 0)))
1919                )
1920                .count(),
1921            1,
1922            "one batch is older than a future version"
1923        );
1924    }
1925
1926    #[mz_ore::test]
1927    fn remove_redundant_merge_reqs() {
1928        fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
1929            FueledMergeReq {
1930                id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
1931                desc: Description::new(
1932                    Antichain::from_elem(lower),
1933                    Antichain::from_elem(upper),
1934                    Antichain::new(),
1935                ),
1936                inputs: vec![],
1937            }
1938        }
1939
1940        // Empty
1941        assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
1942
1943        // Single
1944        assert_eq!(
1945            Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
1946            vec![req(0, 1)]
1947        );
1948
1949        // Duplicate
1950        assert_eq!(
1951            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
1952            vec![req(0, 1)]
1953        );
1954
1955        // Nothing covered
1956        assert_eq!(
1957            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
1958            vec![req(1, 2), req(0, 1)]
1959        );
1960
1961        // Covered
1962        assert_eq!(
1963            Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
1964            vec![req(0, 3)]
1965        );
1966
1967        // Covered, lower equal
1968        assert_eq!(
1969            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
1970            vec![req(0, 3)]
1971        );
1972
1973        // Covered, upper equal
1974        assert_eq!(
1975            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
1976            vec![req(0, 3)]
1977        );
1978
1979        // Covered, unexpected order (doesn't happen in practice)
1980        assert_eq!(
1981            Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
1982            vec![req(0, 3)]
1983        );
1984
1985        // Partially overlapping
1986        assert_eq!(
1987            Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
1988            vec![req(1, 3), req(0, 2)]
1989        );
1990
1991        // Partially overlapping, the other order
1992        assert_eq!(
1993            Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
1994            vec![req(0, 2), req(1, 3)]
1995        );
1996
1997        // Different sinces (doesn't happen in practice)
1998        let req015 = FueledMergeReq {
1999            id: SpineId(0, 1),
2000            desc: Description::new(
2001                Antichain::from_elem(0),
2002                Antichain::from_elem(1),
2003                Antichain::from_elem(5),
2004            ),
2005            inputs: vec![],
2006        };
2007        assert_eq!(
2008            Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2009            vec![req015, req(0, 1)]
2010        );
2011    }
2012}