mz_persist_client/internal/trace.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An append-only collection of compactable update batches. The Spine below is
11//! a fork of Differential Dataflow's [Spine] with minimal modifications. The
12//! original Spine code is designed for incremental (via "fuel"ing) synchronous
13//! merge of in-memory batches. Persist doesn't want compaction to block
14//! incoming writes and, in fact, may in the future elect to push the work of
15//! compaction onto another machine entirely via RPC. As a result, we abuse the
16//! Spine code as follows:
17//!
18//! [Spine]: differential_dataflow::trace::implementations::spine_fueled::Spine
19//!
20//! - The normal Spine works in terms of [Batch] impls. A `Batch` is added to
21//! the Spine. As progress is made, the Spine will merge two batches together
22//! by: constructing a [Batch::Merger], giving it bits of fuel to
23//! incrementally perform the merge (which spreads out the work, keeping
24//! latencies even), and then once it's done fueling extracting the new single
25//! output `Batch` and discarding the inputs.
26//! - Persist instead represents a batch of blob data with a [HollowBatch]
27//! pointer which contains the normal `Batch` metadata plus the keys necessary
28//! to retrieve the updates.
29//! - [SpineBatch] wraps `HollowBatch` and has a [FuelingMerge] companion
30//! (analogous to `Batch::Merger`) that allows us to represent a merge as it
31//! is fueling. Normally, this would represent real incremental compaction
32//! progress, but in persist, it's simply a bookkeeping mechanism. Once fully
33//! fueled, the `FuelingMerge` is turned into a fueled [SpineBatch],
34//! which to the Spine is indistinguishable from a merged batch. At this
35//! point, it is eligible for asynchronous compaction and a `FueledMergeReq`
36//! is generated.
37//! - At any later point, this request may be answered via
38//! [Trace::apply_merge_res]. This internally replaces the
39//! `SpineBatch`, which has no effect on the structure of `Spine`
40//! but replaces the metadata in persist's state to point at the
41//! new batch.
42//! - `SpineBatch` is explictly allowed to accumulate a list of `HollowBatch`s.
43//! This decouples compaction from Spine progress and also allows us to reduce
44//! write amplification by merging `N` batches at once where `N` can be
45//! greater than 2.
46//!
47//! [Batch]: differential_dataflow::trace::Batch
48//! [Batch::Merger]: differential_dataflow::trace::Batch::Merger
49
50use arrayvec::ArrayVec;
51use std::cmp::Ordering;
52use std::collections::BTreeMap;
53use std::fmt::Debug;
54use std::mem;
55use std::sync::Arc;
56
57use crate::internal::paths::WriterKey;
58use differential_dataflow::lattice::Lattice;
59use differential_dataflow::trace::Description;
60use mz_ore::cast::CastFrom;
61#[allow(unused_imports)] // False positive.
62use mz_ore::fmt::FormatBuffer;
63use serde::{Serialize, Serializer};
64use timely::PartialOrder;
65use timely::progress::frontier::AntichainRef;
66use timely::progress::{Antichain, Timestamp};
67
68use crate::internal::state::HollowBatch;
69
70#[derive(Debug, Clone, PartialEq)]
71pub struct FueledMergeReq<T> {
72 pub id: SpineId,
73 pub desc: Description<T>,
74 pub inputs: Vec<IdHollowBatch<T>>,
75}
76
77#[derive(Debug)]
78pub struct FueledMergeRes<T> {
79 pub output: HollowBatch<T>,
80}
81
82/// An append-only collection of compactable update batches.
83///
84/// In an effort to keep our fork of Spine as close as possible to the original,
85/// we push as many changes as possible into this wrapper.
86#[derive(Debug, Clone)]
87pub struct Trace<T> {
88 spine: Spine<T>,
89 pub(crate) roundtrip_structure: bool,
90}
91
92#[cfg(any(test, debug_assertions))]
93impl<T: PartialEq> PartialEq for Trace<T> {
94 fn eq(&self, other: &Self) -> bool {
95 // Deconstruct self and other so we get a compile failure if new fields
96 // are added.
97 let Trace {
98 spine: _,
99 roundtrip_structure: _,
100 } = self;
101 let Trace {
102 spine: _,
103 roundtrip_structure: _,
104 } = other;
105
106 // Intentionally use HollowBatches for this comparison so we ignore
107 // differences in spine layers.
108 self.batches().eq(other.batches())
109 }
110}
111
112impl<T: Timestamp + Lattice> Default for Trace<T> {
113 fn default() -> Self {
114 Self {
115 spine: Spine::new(),
116 roundtrip_structure: true,
117 }
118 }
119}
120
121#[derive(Clone, Debug, Serialize)]
122pub struct ThinSpineBatch<T> {
123 pub(crate) level: usize,
124 pub(crate) desc: Description<T>,
125 pub(crate) parts: Vec<SpineId>,
126 /// NB: this exists to validate legacy batch bounds during the migration;
127 /// it can be deleted once the roundtrip_structure flag is permanently rolled out.
128 pub(crate) descs: Vec<Description<T>>,
129}
130
131impl<T: PartialEq> PartialEq for ThinSpineBatch<T> {
132 fn eq(&self, other: &Self) -> bool {
133 // Ignore the temporary descs vector when comparing for equality.
134 (self.level, &self.desc, &self.parts).eq(&(other.level, &other.desc, &other.parts))
135 }
136}
137
138#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
139pub struct ThinMerge<T> {
140 pub(crate) since: Antichain<T>,
141 pub(crate) remaining_work: usize,
142 pub(crate) active_compaction: Option<ActiveCompaction>,
143}
144
145impl<T: Clone> ThinMerge<T> {
146 fn fueling(merge: &FuelingMerge<T>) -> Self {
147 ThinMerge {
148 since: merge.since.clone(),
149 remaining_work: merge.remaining_work,
150 active_compaction: None,
151 }
152 }
153
154 fn fueled(batch: &SpineBatch<T>) -> Self {
155 ThinMerge {
156 since: batch.desc.since().clone(),
157 remaining_work: 0,
158 active_compaction: batch.active_compaction.clone(),
159 }
160 }
161}
162
163/// This is a "flattened" representation of a Trace. Goals:
164/// - small updates to the trace should result in small differences in the `FlatTrace`;
165/// - two `FlatTrace`s should be efficient to diff;
166/// - converting to and from a `Trace` should be relatively straightforward.
167///
168/// These goals are all somewhat in tension, and the space of possible representations is pretty
169/// large. See individual fields for comments on some of the tradeoffs.
170#[derive(Clone, Debug)]
171pub struct FlatTrace<T> {
172 pub(crate) since: Antichain<T>,
173 /// Hollow batches without an associated ID. If this flattened trace contains spine batches,
174 /// we can figure out which legacy batch belongs in which spine batch by comparing the `desc`s.
175 /// Previously, we serialized a trace as just this list of batches. Keeping this data around
176 /// helps ensure backwards compatibility. In the near future, we may still keep some batches
177 /// here to help minimize the size of diffs -- rewriting all the hollow batches in a shard
178 /// can be prohibitively expensive. Eventually, we'd like to remove this in favour of the
179 /// collection below.
180 pub(crate) legacy_batches: BTreeMap<Arc<HollowBatch<T>>, ()>,
181 /// Hollow batches _with_ an associated ID. Spine batches can reference these hollow batches
182 /// by id directly.
183 pub(crate) hollow_batches: BTreeMap<SpineId, Arc<HollowBatch<T>>>,
184 /// Spine batches stored by ID. We reference hollow batches by ID, instead of inlining them,
185 /// to make differential updates smaller when two batches merge together. We also store the
186 /// level on the batch, instead of mapping from level to a list of batches... the level of a
187 /// spine batch doesn't change over time, but the list of batches at a particular level does.
188 pub(crate) spine_batches: BTreeMap<SpineId, ThinSpineBatch<T>>,
189 /// In-progress merges. We store this by spine id instead of level to prepare for some possible
190 /// generalizations to spine (merging N of M batches at a level). This is also a natural place
191 /// to store incremental merge progress in the future.
192 pub(crate) merges: BTreeMap<SpineId, ThinMerge<T>>,
193}
194
195impl<T: Timestamp + Lattice> Trace<T> {
196 pub(crate) fn flatten(&self) -> FlatTrace<T> {
197 let since = self.spine.since.clone();
198 let mut legacy_batches = BTreeMap::new();
199 let mut hollow_batches = BTreeMap::new();
200 let mut spine_batches = BTreeMap::new();
201 let mut merges = BTreeMap::new();
202
203 let mut push_spine_batch = |level: usize, batch: &SpineBatch<T>| {
204 let id = batch.id();
205 let desc = batch.desc.clone();
206 let mut parts = Vec::with_capacity(batch.parts.len());
207 let mut descs = Vec::with_capacity(batch.parts.len());
208 for IdHollowBatch { id, batch } in &batch.parts {
209 parts.push(*id);
210 descs.push(batch.desc.clone());
211 // Ideally, we'd like to put all batches in the hollow_batches collection, since
212 // tracking the spine id reduces ambiguity and makes diffing cheaper. However,
213 // we currently keep most batches in the legacy collection for backwards
214 // compatibility.
215 // As an exception, we add batches with empty time ranges to hollow_batches:
216 // they're otherwise not guaranteed to be unique, and since we only started writing
217 // them down recently there's no backwards compatibility risk.
218 if batch.desc.lower() == batch.desc.upper() {
219 hollow_batches.insert(*id, Arc::clone(batch));
220 } else {
221 legacy_batches.insert(Arc::clone(batch), ());
222 }
223 }
224
225 let spine_batch = ThinSpineBatch {
226 level,
227 desc,
228 parts,
229 descs,
230 };
231 spine_batches.insert(id, spine_batch);
232 };
233
234 for (level, state) in self.spine.merging.iter().enumerate() {
235 for batch in &state.batches {
236 push_spine_batch(level, batch);
237 if let Some(c) = &batch.active_compaction {
238 let previous = merges.insert(batch.id, ThinMerge::fueled(batch));
239 assert!(
240 previous.is_none(),
241 "recording a compaction for a batch that already exists! (level={level}, id={:?}, compaction={c:?})",
242 batch.id,
243 )
244 }
245 }
246 if let Some(IdFuelingMerge { id, merge }) = state.merge.as_ref() {
247 let previous = merges.insert(*id, ThinMerge::fueling(merge));
248 assert!(
249 previous.is_none(),
250 "fueling a merge for a batch that already exists! (level={level}, id={id:?}, merge={merge:?})"
251 )
252 }
253 }
254
255 if !self.roundtrip_structure {
256 assert!(hollow_batches.is_empty());
257 spine_batches.clear();
258 merges.clear();
259 }
260
261 FlatTrace {
262 since,
263 legacy_batches,
264 hollow_batches,
265 spine_batches,
266 merges,
267 }
268 }
269 pub(crate) fn unflatten(value: FlatTrace<T>) -> Result<Self, String> {
270 let FlatTrace {
271 since,
272 legacy_batches,
273 mut hollow_batches,
274 spine_batches,
275 mut merges,
276 } = value;
277
278 // If the flattened representation has spine batches (or is empty)
279 // we know to preserve the structure for this trace.
280 let roundtrip_structure = !spine_batches.is_empty() || legacy_batches.is_empty();
281
282 // We need to look up legacy batches somehow, but we don't have a spine id for them.
283 // Instead, we rely on the fact that the spine must store them in antichain order.
284 // Our timestamp type may not be totally ordered, so we need to implement our own comparator
285 // here. Persist's invariants ensure that all the frontiers we're comparing are comparable,
286 // though.
287 let compare_chains = |left: &Antichain<T>, right: &Antichain<T>| {
288 if PartialOrder::less_than(left, right) {
289 Ordering::Less
290 } else if PartialOrder::less_than(right, left) {
291 Ordering::Greater
292 } else {
293 Ordering::Equal
294 }
295 };
296 let mut legacy_batches: Vec<_> = legacy_batches.into_iter().map(|(k, _)| k).collect();
297 legacy_batches.sort_by(|a, b| compare_chains(a.desc.lower(), b.desc.lower()).reverse());
298
299 let mut pop_batch =
300 |id: SpineId, expected_desc: Option<&Description<T>>| -> Result<_, String> {
301 if let Some(batch) = hollow_batches.remove(&id) {
302 if let Some(desc) = expected_desc {
303 assert_eq!(*desc, batch.desc);
304 }
305 return Ok(IdHollowBatch { id, batch });
306 }
307 let mut batch = legacy_batches
308 .pop()
309 .ok_or_else(|| format!("missing referenced hollow batch {id:?}"))?;
310
311 let Some(expected_desc) = expected_desc else {
312 return Ok(IdHollowBatch { id, batch });
313 };
314
315 if expected_desc.lower() != batch.desc.lower() {
316 return Err(format!(
317 "hollow batch lower {:?} did not match expected lower {:?}",
318 batch.desc.lower().elements(),
319 expected_desc.lower().elements()
320 ));
321 }
322
323 // Empty legacy batches are not deterministic: different nodes may split them up
324 // in different ways. For now, we rearrange them such to match the spine data.
325 if batch.parts.is_empty() && batch.run_splits.is_empty() && batch.len == 0 {
326 let mut new_upper = batch.desc.upper().clone();
327
328 // While our current batch is too small, and there's another empty batch
329 // in the list, roll it in.
330 while PartialOrder::less_than(&new_upper, expected_desc.upper()) {
331 let Some(next_batch) = legacy_batches.pop() else {
332 break;
333 };
334 if next_batch.is_empty() {
335 new_upper.clone_from(next_batch.desc.upper());
336 } else {
337 legacy_batches.push(next_batch);
338 break;
339 }
340 }
341
342 // If our current batch is too large, split it by the expected upper
343 // and preserve the remainder.
344 if PartialOrder::less_than(expected_desc.upper(), &new_upper) {
345 legacy_batches.push(Arc::new(HollowBatch::empty(Description::new(
346 expected_desc.upper().clone(),
347 new_upper.clone(),
348 batch.desc.since().clone(),
349 ))));
350 new_upper.clone_from(expected_desc.upper());
351 }
352 batch = Arc::new(HollowBatch::empty(Description::new(
353 batch.desc.lower().clone(),
354 new_upper,
355 expected_desc.since().clone(),
356 )))
357 }
358
359 if expected_desc.upper() != batch.desc.upper() {
360 return Err(format!(
361 "hollow batch upper {:?} did not match expected upper {:?}",
362 batch.desc.upper().elements(),
363 expected_desc.upper().elements()
364 ));
365 }
366
367 Ok(IdHollowBatch { id, batch })
368 };
369
370 let (upper, next_id) = if let Some((id, batch)) = spine_batches.last_key_value() {
371 (batch.desc.upper().clone(), id.1)
372 } else {
373 (Antichain::from_elem(T::minimum()), 0)
374 };
375 let levels = spine_batches
376 .first_key_value()
377 .map(|(_, batch)| batch.level + 1)
378 .unwrap_or(0);
379 let mut merging = vec![MergeState::default(); levels];
380 for (id, batch) in spine_batches {
381 let level = batch.level;
382
383 let parts = batch
384 .parts
385 .into_iter()
386 .zip(batch.descs.iter().map(Some).chain(std::iter::repeat(None)))
387 .map(|(id, desc)| pop_batch(id, desc))
388 .collect::<Result<Vec<_>, _>>()?;
389 let len = parts.iter().map(|p| (*p).batch.len).sum();
390 let active_compaction = merges.remove(&id).and_then(|m| m.active_compaction);
391 let batch = SpineBatch {
392 id,
393 desc: batch.desc,
394 parts,
395 active_compaction,
396 len,
397 };
398
399 let state = &mut merging[level];
400
401 state.push_batch(batch);
402 if let Some(id) = state.id() {
403 if let Some(merge) = merges.remove(&id) {
404 state.merge = Some(IdFuelingMerge {
405 id,
406 merge: FuelingMerge {
407 since: merge.since,
408 remaining_work: merge.remaining_work,
409 },
410 })
411 }
412 }
413 }
414
415 let mut trace = Trace {
416 spine: Spine {
417 effort: 1,
418 next_id,
419 since,
420 upper,
421 merging,
422 },
423 roundtrip_structure,
424 };
425
426 fn check_empty(name: &str, len: usize) -> Result<(), String> {
427 if len != 0 {
428 Err(format!("{len} {name} left after reconstructing spine"))
429 } else {
430 Ok(())
431 }
432 }
433
434 if roundtrip_structure {
435 check_empty("legacy batches", legacy_batches.len())?;
436 } else {
437 // If the structure wasn't actually serialized, we may have legacy batches left over.
438 for batch in legacy_batches.into_iter().rev() {
439 trace.push_batch_no_merge_reqs(Arc::unwrap_or_clone(batch));
440 }
441 }
442 check_empty("hollow batches", hollow_batches.len())?;
443 check_empty("merges", merges.len())?;
444
445 debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
446
447 Ok(trace)
448 }
449}
450
451#[derive(Clone, Debug, Default)]
452pub(crate) struct SpineMetrics {
453 pub compact_batches: u64,
454 pub compacting_batches: u64,
455 pub noncompact_batches: u64,
456}
457
458impl<T> Trace<T> {
459 pub fn since(&self) -> &Antichain<T> {
460 &self.spine.since
461 }
462
463 pub fn upper(&self) -> &Antichain<T> {
464 &self.spine.upper
465 }
466
467 pub fn map_batches<'a, F: FnMut(&'a HollowBatch<T>)>(&'a self, mut f: F) {
468 for batch in self.batches() {
469 f(batch);
470 }
471 }
472
473 pub fn batches(&self) -> impl Iterator<Item = &HollowBatch<T>> {
474 self.spine
475 .spine_batches()
476 .flat_map(|b| b.parts.as_slice())
477 .map(|b| &*b.batch)
478 }
479
480 pub fn num_spine_batches(&self) -> usize {
481 self.spine.spine_batches().count()
482 }
483
484 #[cfg(test)]
485 pub fn num_hollow_batches(&self) -> usize {
486 self.batches().count()
487 }
488
489 #[cfg(test)]
490 pub fn num_updates(&self) -> usize {
491 self.batches().map(|b| b.len).sum()
492 }
493}
494
495impl<T: Timestamp + Lattice> Trace<T> {
496 pub fn downgrade_since(&mut self, since: &Antichain<T>) {
497 self.spine.since.clone_from(since);
498 }
499
500 #[must_use]
501 pub fn push_batch(&mut self, batch: HollowBatch<T>) -> Vec<FueledMergeReq<T>> {
502 let mut merge_reqs = Vec::new();
503 self.spine.insert(
504 batch,
505 &mut SpineLog::Enabled {
506 merge_reqs: &mut merge_reqs,
507 },
508 );
509 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
510 // Spine::roll_up (internally used by insert) clears all batches out of
511 // levels below a target by walking up from level 0 and merging each
512 // level into the next (providing the necessary fuel). In practice, this
513 // means we'll get a series of requests like `(a, b), (a, b, c), ...`.
514 // It's a waste to do all of these (we'll throw away the results), so we
515 // filter out any that are entirely covered by some other request.
516 Self::remove_redundant_merge_reqs(merge_reqs)
517 }
518
519 pub fn claim_compaction(&mut self, id: SpineId, compaction: ActiveCompaction) {
520 // TODO: we ought to be able to look up the id for a batch by binary searching the levels.
521 // In the meantime, search backwards, since most compactions are for recent batches.
522 for batch in self.spine.spine_batches_mut().rev() {
523 if batch.id == id {
524 batch.active_compaction = Some(compaction);
525 break;
526 }
527 }
528 }
529
530 /// The same as [Self::push_batch] but without the `FueledMergeReq`s, which
531 /// account for a surprising amount of cpu in prod. database-issues#5411
532 pub(crate) fn push_batch_no_merge_reqs(&mut self, batch: HollowBatch<T>) {
533 self.spine.insert(batch, &mut SpineLog::Disabled);
534 }
535
536 /// Apply some amount of effort to trace maintenance.
537 ///
538 /// The units of effort are updates, and the method should be thought of as
539 /// analogous to inserting as many empty updates, where the trace is
540 /// permitted to perform proportionate work.
541 ///
542 /// Returns true if this did work and false if it left the spine unchanged.
543 #[must_use]
544 pub fn exert(&mut self, fuel: usize) -> (Vec<FueledMergeReq<T>>, bool) {
545 let mut merge_reqs = Vec::new();
546 let did_work = self.spine.exert(
547 fuel,
548 &mut SpineLog::Enabled {
549 merge_reqs: &mut merge_reqs,
550 },
551 );
552 debug_assert_eq!(self.spine.validate(), Ok(()), "{:?}", self);
553 // See the comment in [Self::push_batch].
554 let merge_reqs = Self::remove_redundant_merge_reqs(merge_reqs);
555 (merge_reqs, did_work)
556 }
557
558 /// Validates invariants.
559 ///
560 /// See `Spine::validate` for details.
561 pub fn validate(&self) -> Result<(), String> {
562 self.spine.validate()
563 }
564
565 pub fn apply_merge_res(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
566 for batch in self.spine.spine_batches_mut().rev() {
567 let result = batch.maybe_replace(res);
568 if result.matched() {
569 return result;
570 }
571 }
572 ApplyMergeResult::NotAppliedNoMatch
573 }
574
575 /// Obtain all fueled merge reqs that either have no active compaction, or the previous
576 /// compaction was started at or before the threshold time, in order from oldest to newest.
577 pub(crate) fn fueled_merge_reqs_before_ms(
578 &self,
579 threshold_ms: u64,
580 threshold_writer: Option<WriterKey>,
581 ) -> impl Iterator<Item = FueledMergeReq<T>> + '_ {
582 self.spine
583 .spine_batches()
584 .filter(move |b| {
585 let noncompact = !b.is_compact();
586 let old_writer = threshold_writer.as_ref().map_or(false, |min_writer| {
587 b.parts.iter().any(|b| {
588 b.batch
589 .parts
590 .iter()
591 .any(|p| p.writer_key().map_or(false, |writer| writer < *min_writer))
592 })
593 });
594 noncompact || old_writer
595 })
596 .filter(move |b| {
597 // Either there's no active compaction, or the last active compaction
598 // is not after the timeout timestamp.
599 b.active_compaction
600 .as_ref()
601 .map_or(true, move |c| c.start_ms <= threshold_ms)
602 })
603 .map(|b| FueledMergeReq {
604 id: b.id,
605 desc: b.desc.clone(),
606 inputs: b.parts.clone(),
607 })
608 }
609
610 // This is only called with the results of one `insert` and so the length of
611 // `merge_reqs` is bounded by the number of levels in the spine (or possibly
612 // some small constant multiple?). The number of levels is logarithmic in the
613 // number of updates in the spine, so this number should stay very small. As
614 // a result, we simply use the naive O(n^2) algorithm here instead of doing
615 // anything fancy with e.g. interval trees.
616 fn remove_redundant_merge_reqs(
617 mut merge_reqs: Vec<FueledMergeReq<T>>,
618 ) -> Vec<FueledMergeReq<T>> {
619 // Returns true if b0 covers b1, false otherwise.
620 fn covers<T: PartialOrder>(b0: &FueledMergeReq<T>, b1: &FueledMergeReq<T>) -> bool {
621 // TODO: can we relax or remove this since check?
622 b0.id.covers(b1.id) && b0.desc.since() == b1.desc.since()
623 }
624
625 let mut ret = Vec::<FueledMergeReq<T>>::with_capacity(merge_reqs.len());
626 // In practice, merge_reqs will come in sorted such that the "large"
627 // requests are later. Take advantage of this by processing back to
628 // front.
629 while let Some(merge_req) = merge_reqs.pop() {
630 let covered = ret.iter().any(|r| covers(r, &merge_req));
631 if !covered {
632 // Now check if anything we've already staged is covered by this
633 // new req. In practice, the merge_reqs come in sorted and so
634 // this `retain` is a no-op.
635 ret.retain(|r| !covers(&merge_req, r));
636 ret.push(merge_req);
637 }
638 }
639 ret
640 }
641
642 pub fn spine_metrics(&self) -> SpineMetrics {
643 let mut metrics = SpineMetrics::default();
644 for batch in self.spine.spine_batches() {
645 if batch.is_compact() {
646 metrics.compact_batches += 1;
647 } else if batch.is_merging() {
648 metrics.compacting_batches += 1;
649 } else {
650 metrics.noncompact_batches += 1;
651 }
652 }
653 metrics
654 }
655}
656
657/// A log of what transitively happened during a Spine operation: e.g.
658/// FueledMergeReqs were generated.
659enum SpineLog<'a, T> {
660 Enabled {
661 merge_reqs: &'a mut Vec<FueledMergeReq<T>>,
662 },
663 Disabled,
664}
665
666#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
667pub struct SpineId(pub usize, pub usize);
668
669impl Serialize for SpineId {
670 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
671 where
672 S: Serializer,
673 {
674 let SpineId(lo, hi) = self;
675 serializer.serialize_str(&format!("{lo}-{hi}"))
676 }
677}
678
679impl SpineId {
680 fn covers(self, other: SpineId) -> bool {
681 self.0 <= other.0 && other.1 <= self.1
682 }
683}
684
685#[derive(Debug, Clone, PartialEq)]
686pub struct IdHollowBatch<T> {
687 pub id: SpineId,
688 pub batch: Arc<HollowBatch<T>>,
689}
690
691#[derive(Debug, Clone, Eq, PartialEq, Serialize)]
692pub struct ActiveCompaction {
693 pub start_ms: u64,
694}
695
696#[derive(Debug, Clone, PartialEq)]
697struct SpineBatch<T> {
698 id: SpineId,
699 desc: Description<T>,
700 parts: Vec<IdHollowBatch<T>>,
701 active_compaction: Option<ActiveCompaction>,
702 // A cached version of parts.iter().map(|x| x.len).sum()
703 len: usize,
704}
705
706impl<T> SpineBatch<T> {
707 fn merged(batch: IdHollowBatch<T>) -> Self
708 where
709 T: Clone,
710 {
711 Self {
712 id: batch.id,
713 desc: batch.batch.desc.clone(),
714 len: batch.batch.len,
715 parts: vec![batch],
716 active_compaction: None,
717 }
718 }
719}
720
721#[derive(Debug, Copy, Clone)]
722pub enum ApplyMergeResult {
723 AppliedExact,
724 AppliedSubset,
725 NotAppliedNoMatch,
726 NotAppliedInvalidSince,
727 NotAppliedTooManyUpdates,
728}
729
730impl ApplyMergeResult {
731 pub fn applied(&self) -> bool {
732 match self {
733 ApplyMergeResult::AppliedExact | ApplyMergeResult::AppliedSubset => true,
734 _ => false,
735 }
736 }
737 pub fn matched(&self) -> bool {
738 match self {
739 ApplyMergeResult::AppliedExact
740 | ApplyMergeResult::AppliedSubset
741 | ApplyMergeResult::NotAppliedTooManyUpdates => true,
742 _ => false,
743 }
744 }
745}
746
747impl<T: Timestamp + Lattice> SpineBatch<T> {
748 pub fn lower(&self) -> &Antichain<T> {
749 self.desc().lower()
750 }
751
752 pub fn upper(&self) -> &Antichain<T> {
753 self.desc().upper()
754 }
755
756 fn id(&self) -> SpineId {
757 debug_assert_eq!(self.parts.first().map(|x| x.id.0), Some(self.id.0));
758 debug_assert_eq!(self.parts.last().map(|x| x.id.1), Some(self.id.1));
759 self.id
760 }
761
762 pub fn is_compact(&self) -> bool {
763 // This definition is extremely likely to change, but for now, we consider a batch
764 // "compact" if it has at most one hollow batch with at most one run.
765 self.parts.len() <= 1 && self.parts.iter().all(|p| p.batch.run_splits.is_empty())
766 }
767
768 pub fn is_merging(&self) -> bool {
769 self.active_compaction.is_some()
770 }
771
772 fn desc(&self) -> &Description<T> {
773 &self.desc
774 }
775
776 pub fn len(&self) -> usize {
777 // NB: This is an upper bound on len for a non-compact batch; we won't know for sure until
778 // we compact it.
779 debug_assert_eq!(
780 self.len,
781 self.parts.iter().map(|x| x.batch.len).sum::<usize>()
782 );
783 self.len
784 }
785
786 pub fn is_empty(&self) -> bool {
787 self.len() == 0
788 }
789
790 pub fn empty(
791 id: SpineId,
792 lower: Antichain<T>,
793 upper: Antichain<T>,
794 since: Antichain<T>,
795 ) -> Self {
796 SpineBatch::merged(IdHollowBatch {
797 id,
798 batch: Arc::new(HollowBatch::empty(Description::new(lower, upper, since))),
799 })
800 }
801
802 pub fn begin_merge(
803 bs: &[Self],
804 compaction_frontier: Option<AntichainRef<T>>,
805 ) -> Option<IdFuelingMerge<T>> {
806 let from = bs.first()?.id().0;
807 let until = bs.last()?.id().1;
808 let id = SpineId(from, until);
809 let mut sinces = bs.iter().map(|b| b.desc().since());
810 let mut since = sinces.next()?.clone();
811 for b in bs {
812 since.join_assign(b.desc().since())
813 }
814 if let Some(compaction_frontier) = compaction_frontier {
815 since.join_assign(&compaction_frontier.to_owned());
816 }
817 let remaining_work = bs.iter().map(|x| x.len()).sum();
818 Some(IdFuelingMerge {
819 id,
820 merge: FuelingMerge {
821 since,
822 remaining_work,
823 },
824 })
825 }
826
827 // TODO: Roundtrip the SpineId through FueledMergeReq/FueledMergeRes?
828 fn maybe_replace(&mut self, res: &FueledMergeRes<T>) -> ApplyMergeResult {
829 // The spine's and merge res's sinces don't need to match (which could occur if Spine
830 // has been reloaded from state due to compare_and_set mismatch), but if so, the Spine
831 // since must be in advance of the merge res since.
832 if !PartialOrder::less_equal(res.output.desc.since(), self.desc().since()) {
833 return ApplyMergeResult::NotAppliedInvalidSince;
834 }
835
836 // If our merge result exactly matches a spine batch, we can swap it in directly
837 let exact_match = res.output.desc.lower() == self.desc().lower()
838 && res.output.desc.upper() == self.desc().upper();
839 if exact_match {
840 // Spine internally has an invariant about a batch being at some level
841 // or higher based on the len. We could end up violating this invariant
842 // if we increased the length of the batch.
843 //
844 // A res output with length greater than the existing spine batch implies
845 // a compaction has already been applied to this range, and with a higher
846 // rate of consolidation than this one. This could happen as a result of
847 // compaction's memory bound limiting the amount of consolidation possible.
848 if res.output.len > self.len() {
849 return ApplyMergeResult::NotAppliedTooManyUpdates;
850 }
851 *self = SpineBatch::merged(IdHollowBatch {
852 id: self.id(),
853 batch: Arc::new(res.output.clone()),
854 });
855 return ApplyMergeResult::AppliedExact;
856 }
857
858 // It is possible the structure of the spine has changed since the merge res
859 // was created, such that it no longer exactly matches the description of a
860 // spine batch. This can happen if another merge has happened in the interim,
861 // or if spine needed to be rebuilt from state.
862 //
863 // When this occurs, we can still attempt to slot the merge res in to replace
864 // the parts of a fueled merge. e.g. if the res is for `[1,3)` and the parts
865 // are `[0,1),[1,2),[2,3),[3,4)`, we can swap out the middle two parts for res.
866 let SpineBatch {
867 id,
868 parts,
869 desc,
870 active_compaction: _,
871 len: _,
872 } = self;
873 // first, determine if a subset of parts can be cleanly replaced by the merge res
874 let mut lower = None;
875 let mut upper = None;
876 for (i, batch) in parts.iter().enumerate() {
877 if batch.batch.desc.lower() == res.output.desc.lower() {
878 lower = Some((i, batch.id.0));
879 }
880 if batch.batch.desc.upper() == res.output.desc.upper() {
881 upper = Some((i, batch.id.1));
882 }
883 if lower.is_some() && upper.is_some() {
884 break;
885 }
886 }
887 // next, replace parts with the merge res batch if we can
888 match (lower, upper) {
889 (Some((lower, id_lower)), Some((upper, id_upper))) => {
890 let mut new_parts = vec![];
891 new_parts.extend_from_slice(&parts[..lower]);
892 new_parts.push(IdHollowBatch {
893 id: SpineId(id_lower, id_upper),
894 batch: Arc::new(res.output.clone()),
895 });
896 new_parts.extend_from_slice(&parts[upper + 1..]);
897 let new_spine_batch = SpineBatch {
898 id: *id,
899 desc: desc.to_owned(),
900 len: new_parts.iter().map(|x| x.batch.len).sum(),
901 parts: new_parts,
902 active_compaction: None,
903 };
904 if new_spine_batch.len() > self.len() {
905 return ApplyMergeResult::NotAppliedTooManyUpdates;
906 }
907 *self = new_spine_batch;
908 ApplyMergeResult::AppliedSubset
909 }
910 _ => ApplyMergeResult::NotAppliedNoMatch,
911 }
912 }
913
914 #[cfg(test)]
915 fn describe(&self, extended: bool) -> String {
916 let SpineBatch {
917 id,
918 parts,
919 desc,
920 active_compaction,
921 len,
922 } = self;
923 let compaction = match active_compaction {
924 None => "".to_owned(),
925 Some(c) => format!(" (c@{})", c.start_ms),
926 };
927 match extended {
928 false => format!(
929 "[{}-{}]{:?}{:?}{}/{}{compaction}",
930 id.0,
931 id.1,
932 desc.lower().elements(),
933 desc.upper().elements(),
934 parts.len(),
935 len
936 ),
937 true => {
938 format!(
939 "[{}-{}]{:?}{:?}{:?} {}/{}{}{compaction}",
940 id.0,
941 id.1,
942 desc.lower().elements(),
943 desc.upper().elements(),
944 desc.since().elements(),
945 parts.len(),
946 len,
947 parts
948 .iter()
949 .flat_map(|x| x.batch.parts.iter())
950 .map(|x| format!(" {}", x.printable_name()))
951 .collect::<Vec<_>>()
952 .join("")
953 )
954 }
955 }
956 }
957}
958
959#[derive(Debug, Clone, PartialEq, Serialize)]
960pub struct FuelingMerge<T> {
961 pub(crate) since: Antichain<T>,
962 pub(crate) remaining_work: usize,
963}
964
965#[derive(Debug, Clone, PartialEq, Serialize)]
966pub struct IdFuelingMerge<T> {
967 id: SpineId,
968 merge: FuelingMerge<T>,
969}
970
971impl<T: Timestamp + Lattice> FuelingMerge<T> {
972 /// Perform some amount of work, decrementing `fuel`.
973 ///
974 /// If `fuel` is non-zero after the call, the merging is complete and one
975 /// should call `done` to extract the merged results.
976 // TODO(benesch): rewrite to avoid usage of `as`.
977 #[allow(clippy::as_conversions)]
978 fn work(&mut self, _: &[SpineBatch<T>], fuel: &mut isize) {
979 let used = std::cmp::min(*fuel as usize, self.remaining_work);
980 self.remaining_work = self.remaining_work.saturating_sub(used);
981 *fuel -= used as isize;
982 }
983
984 /// Extracts merged results.
985 ///
986 /// This method should only be called after `work` has been called and has
987 /// not brought `fuel` to zero. Otherwise, the merge is still in progress.
988 fn done(
989 self,
990 bs: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
991 log: &mut SpineLog<'_, T>,
992 ) -> Option<SpineBatch<T>> {
993 let first = bs.first()?;
994 let last = bs.last()?;
995 let id = SpineId(first.id().0, last.id().1);
996 assert!(id.0 < id.1);
997 let lower = first.desc().lower().clone();
998 let upper = last.desc().upper().clone();
999 let since = self.since;
1000
1001 // Special case empty batches.
1002 if bs.iter().all(SpineBatch::is_empty) {
1003 return Some(SpineBatch::empty(id, lower, upper, since));
1004 }
1005
1006 let desc = Description::new(lower, upper, since);
1007 let len = bs.iter().map(SpineBatch::len).sum();
1008
1009 // Pre-size the merged_parts Vec. Benchmarking has shown that, at least
1010 // in the worst case, the double iteration is absolutely worth having
1011 // merged_parts pre-sized.
1012 let mut merged_parts_len = 0;
1013 for b in &bs {
1014 merged_parts_len += b.parts.len();
1015 }
1016 let mut merged_parts = Vec::with_capacity(merged_parts_len);
1017 for b in bs {
1018 merged_parts.extend(b.parts)
1019 }
1020 // Sanity check the pre-size code.
1021 debug_assert_eq!(merged_parts.len(), merged_parts_len);
1022
1023 if let SpineLog::Enabled { merge_reqs } = log {
1024 merge_reqs.push(FueledMergeReq {
1025 id,
1026 desc: desc.clone(),
1027 inputs: merged_parts.clone(),
1028 });
1029 }
1030
1031 Some(SpineBatch {
1032 id,
1033 desc,
1034 len,
1035 parts: merged_parts,
1036 active_compaction: None,
1037 })
1038 }
1039}
1040
1041/// The maximum number of batches per level in the spine.
1042/// In practice, we probably want a larger max and a configurable soft cap, but using a
1043/// stack-friendly data structure and keeping this number low makes this safer during the
1044/// initial rollout.
1045const BATCHES_PER_LEVEL: usize = 2;
1046
1047/// An append-only collection of update batches.
1048///
1049/// The `Spine` is a general-purpose trace implementation based on collection
1050/// and merging immutable batches of updates. It is generic with respect to the
1051/// batch type, and can be instantiated for any implementor of `trace::Batch`.
1052///
1053/// ## Design
1054///
1055/// This spine is represented as a list of layers, where each element in the
1056/// list is either
1057///
1058/// 1. MergeState::Vacant empty
1059/// 2. MergeState::Single a single batch
1060/// 3. MergeState::Double a pair of batches
1061///
1062/// Each "batch" has the option to be `None`, indicating a non-batch that
1063/// nonetheless acts as a number of updates proportionate to the level at which
1064/// it exists (for bookkeeping).
1065///
1066/// Each of the batches at layer i contains at most 2^i elements. The sequence
1067/// of batches should have the upper bound of one match the lower bound of the
1068/// next. Batches may be logically empty, with matching upper and lower bounds,
1069/// as a bookkeeping mechanism.
1070///
1071/// Each batch at layer i is treated as if it contains exactly 2^i elements,
1072/// even though it may actually contain fewer elements. This allows us to
1073/// decouple the physical representation from logical amounts of effort invested
1074/// in each batch. It allows us to begin compaction and to reduce the number of
1075/// updates, without compromising our ability to continue to move updates along
1076/// the spine. We are explicitly making the trade-off that while some batches
1077/// might compact at lower levels, we want to treat them as if they contained
1078/// their full set of updates for accounting reasons (to apply work to higher
1079/// levels).
1080///
1081/// We maintain the invariant that for any in-progress merge at level k there
1082/// should be fewer than 2^k records at levels lower than k. That is, even if we
1083/// were to apply an unbounded amount of effort to those records, we would not
1084/// have enough records to prompt a merge into the in-progress merge. Ideally,
1085/// we maintain the extended invariant that for any in-progress merge at level
1086/// k, the remaining effort required (number of records minus applied effort) is
1087/// less than the number of records that would need to be added to reach 2^k
1088/// records in layers below.
1089///
1090/// ## Mathematics
1091///
1092/// When a merge is initiated, there should be a non-negative *deficit* of
1093/// updates before the layers below could plausibly produce a new batch for the
1094/// currently merging layer. We must determine a factor of proportionality, so
1095/// that newly arrived updates provide at least that amount of "fuel" towards
1096/// the merging layer, so that the merge completes before lower levels invade.
1097///
1098/// ### Deficit:
1099///
1100/// A new merge is initiated only in response to the completion of a prior
1101/// merge, or the introduction of new records from outside. The latter case is
1102/// special, and will maintain our invariant trivially, so we will focus on the
1103/// former case.
1104///
1105/// When a merge at level k completes, assuming we have maintained our invariant
1106/// then there should be fewer than 2^k records at lower levels. The newly
1107/// created merge at level k+1 will require up to 2^k+2 units of work, and
1108/// should not expect a new batch until strictly more than 2^k records are
1109/// added. This means that a factor of proportionality of four should be
1110/// sufficient to ensure that the merge completes before a new merge is
1111/// initiated.
1112///
1113/// When new records get introduced, we will need to roll up any batches at
1114/// lower levels, which we treat as the introduction of records. Each of these
1115/// virtual records introduced should either be accounted for the fuel it should
1116/// contribute, as it results in the promotion of batches closer to in-progress
1117/// merges.
1118///
1119/// ### Fuel sharing
1120///
1121/// We like the idea of applying fuel preferentially to merges at *lower*
1122/// levels, under the idea that they are easier to complete, and we benefit from
1123/// fewer total merges in progress. This does delay the completion of merges at
1124/// higher levels, and may not obviously be a total win. If we choose to do
1125/// this, we should make sure that we correctly account for completed merges at
1126/// low layers: they should still extract fuel from new updates even though they
1127/// have completed, at least until they have paid back any "debt" to higher
1128/// layers by continuing to provide fuel as updates arrive.
1129#[derive(Debug, Clone)]
1130struct Spine<T> {
1131 effort: usize,
1132 next_id: usize,
1133 since: Antichain<T>,
1134 upper: Antichain<T>,
1135 merging: Vec<MergeState<T>>,
1136}
1137
1138impl<T> Spine<T> {
1139 /// All batches in the spine, oldest to newest.
1140 pub fn spine_batches(&self) -> impl Iterator<Item = &SpineBatch<T>> {
1141 self.merging.iter().rev().flat_map(|m| &m.batches)
1142 }
1143
1144 /// All (mutable) batches in the spine, oldest to newest.
1145 pub fn spine_batches_mut(&mut self) -> impl DoubleEndedIterator<Item = &mut SpineBatch<T>> {
1146 self.merging.iter_mut().rev().flat_map(|m| &mut m.batches)
1147 }
1148}
1149
1150impl<T: Timestamp + Lattice> Spine<T> {
1151 /// Allocates a fueled `Spine`.
1152 ///
1153 /// This trace will merge batches progressively, with each inserted batch
1154 /// applying a multiple of the batch's length in effort to each merge. The
1155 /// `effort` parameter is that multiplier. This value should be at least one
1156 /// for the merging to happen; a value of zero is not helpful.
1157 pub fn new() -> Self {
1158 Spine {
1159 effort: 1,
1160 next_id: 0,
1161 since: Antichain::from_elem(T::minimum()),
1162 upper: Antichain::from_elem(T::minimum()),
1163 merging: Vec::new(),
1164 }
1165 }
1166
1167 /// Apply some amount of effort to trace maintenance.
1168 ///
1169 /// The units of effort are updates, and the method should be thought of as
1170 /// analogous to inserting as many empty updates, where the trace is
1171 /// permitted to perform proportionate work.
1172 ///
1173 /// Returns true if this did work and false if it left the spine unchanged.
1174 fn exert(&mut self, effort: usize, log: &mut SpineLog<'_, T>) -> bool {
1175 self.tidy_layers();
1176 if self.reduced() {
1177 return false;
1178 }
1179
1180 if self.merging.iter().any(|b| b.merge.is_some()) {
1181 let fuel = isize::try_from(effort).unwrap_or(isize::MAX);
1182 // If any merges exist, we can directly call `apply_fuel`.
1183 self.apply_fuel(&fuel, log);
1184 } else {
1185 // Otherwise, we'll need to introduce fake updates to move merges
1186 // along.
1187
1188 // Introduce an empty batch with roughly *effort number of virtual updates.
1189 let level = usize::cast_from(effort.next_power_of_two().trailing_zeros());
1190 let id = self.next_id();
1191 self.introduce_batch(
1192 SpineBatch::empty(
1193 id,
1194 self.upper.clone(),
1195 self.upper.clone(),
1196 self.since.clone(),
1197 ),
1198 level,
1199 log,
1200 );
1201 }
1202 true
1203 }
1204
1205 pub fn next_id(&mut self) -> SpineId {
1206 let id = self.next_id;
1207 self.next_id += 1;
1208 SpineId(id, self.next_id)
1209 }
1210
1211 // Ideally, this method acts as insertion of `batch`, even if we are not yet
1212 // able to begin merging the batch. This means it is a good time to perform
1213 // amortized work proportional to the size of batch.
1214 pub fn insert(&mut self, batch: HollowBatch<T>, log: &mut SpineLog<'_, T>) {
1215 assert!(batch.desc.lower() != batch.desc.upper());
1216 assert_eq!(batch.desc.lower(), &self.upper);
1217
1218 let id = self.next_id();
1219 let batch = SpineBatch::merged(IdHollowBatch {
1220 id,
1221 batch: Arc::new(batch),
1222 });
1223
1224 self.upper.clone_from(batch.upper());
1225
1226 // If `batch` and the most recently inserted batch are both empty,
1227 // we can just fuse them.
1228 if batch.is_empty() {
1229 if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
1230 if self.merging[position].is_single() && self.merging[position].is_empty() {
1231 self.insert_at(batch, position);
1232 // Since we just inserted a batch, we should always have work to complete...
1233 // but otherwise we just leave this layer vacant.
1234 if let Some(merged) = self.complete_at(position, log) {
1235 self.merging[position] = MergeState::single(merged);
1236 }
1237 return;
1238 }
1239 }
1240 }
1241
1242 // Normal insertion for the batch.
1243 let index = batch.len().next_power_of_two();
1244 self.introduce_batch(batch, usize::cast_from(index.trailing_zeros()), log);
1245 }
1246
1247 /// True iff there is at most one HollowBatch in `self.merging`.
1248 ///
1249 /// When true, there is no maintenance work to perform in the trace, other
1250 /// than compaction. We do not yet have logic in place to determine if
1251 /// compaction would improve a trace, so for now we are ignoring that.
1252 fn reduced(&self) -> bool {
1253 self.spine_batches()
1254 .flat_map(|b| b.parts.as_slice())
1255 .count()
1256 < 2
1257 }
1258
1259 /// Describes the merge progress of layers in the trace.
1260 ///
1261 /// Intended for diagnostics rather than public consumption.
1262 #[allow(dead_code)]
1263 fn describe(&self) -> Vec<(usize, usize)> {
1264 self.merging
1265 .iter()
1266 .map(|b| (b.batches.len(), b.len()))
1267 .collect()
1268 }
1269
1270 /// Introduces a batch at an indicated level.
1271 ///
1272 /// The level indication is often related to the size of the batch, but it
1273 /// can also be used to artificially fuel the computation by supplying empty
1274 /// batches at non-trivial indices, to move merges along.
1275 fn introduce_batch(
1276 &mut self,
1277 batch: SpineBatch<T>,
1278 batch_index: usize,
1279 log: &mut SpineLog<'_, T>,
1280 ) {
1281 // Step 0. Determine an amount of fuel to use for the computation.
1282 //
1283 // Fuel is used to drive maintenance of the data structure,
1284 // and in particular are used to make progress through merges
1285 // that are in progress. The amount of fuel to use should be
1286 // proportional to the number of records introduced, so that
1287 // we are guaranteed to complete all merges before they are
1288 // required as arguments to merges again.
1289 //
1290 // The fuel use policy is negotiable, in that we might aim
1291 // to use relatively less when we can, so that we return
1292 // control promptly, or we might account more work to larger
1293 // batches. Not clear to me which are best, of if there
1294 // should be a configuration knob controlling this.
1295
1296 // The amount of fuel to use is proportional to 2^batch_index, scaled by
1297 // a factor of self.effort which determines how eager we are in
1298 // performing maintenance work. We need to ensure that each merge in
1299 // progress receives fuel for each introduced batch, and so multiply by
1300 // that as well.
1301 if batch_index > 32 {
1302 println!("Large batch index: {}", batch_index);
1303 }
1304
1305 // We believe that eight units of fuel is sufficient for each introduced
1306 // record, accounted as four for each record, and a potential four more
1307 // for each virtual record associated with promoting existing smaller
1308 // batches. We could try and make this be less, or be scaled to merges
1309 // based on their deficit at time of instantiation. For now, we remain
1310 // conservative.
1311 let mut fuel = 8 << batch_index;
1312 // Scale up by the effort parameter, which is calibrated to one as the
1313 // minimum amount of effort.
1314 fuel *= self.effort;
1315 // Convert to an `isize` so we can observe any fuel shortfall.
1316 // TODO(benesch): avoid dangerous usage of `as`.
1317 #[allow(clippy::as_conversions)]
1318 let fuel = fuel as isize;
1319
1320 // Step 1. Apply fuel to each in-progress merge.
1321 //
1322 // Before we can introduce new updates, we must apply any
1323 // fuel to in-progress merges, as this fuel is what ensures
1324 // that the merges will be complete by the time we insert
1325 // the updates.
1326 self.apply_fuel(&fuel, log);
1327
1328 // Step 2. We must ensure the invariant that adjacent layers do not
1329 // contain two batches will be satisfied when we insert the
1330 // batch. We forcibly completing all merges at layers lower
1331 // than and including `batch_index`, so that the new batch is
1332 // inserted into an empty layer.
1333 //
1334 // We could relax this to "strictly less than `batch_index`"
1335 // if the layer above has only a single batch in it, which
1336 // seems not implausible if it has been the focus of effort.
1337 //
1338 // This should be interpreted as the introduction of some
1339 // volume of fake updates, and we will need to fuel merges
1340 // by a proportional amount to ensure that they are not
1341 // surprised later on. The number of fake updates should
1342 // correspond to the deficit for the layer, which perhaps
1343 // we should track explicitly.
1344 self.roll_up(batch_index, log);
1345
1346 // Step 3. This insertion should be into an empty layer. It is a logical
1347 // error otherwise, as we may be violating our invariant, from
1348 // which all wonderment derives.
1349 self.insert_at(batch, batch_index);
1350
1351 // Step 4. Tidy the largest layers.
1352 //
1353 // It is important that we not tidy only smaller layers,
1354 // as their ascension is what ensures the merging and
1355 // eventual compaction of the largest layers.
1356 self.tidy_layers();
1357 }
1358
1359 /// Ensures that an insertion at layer `index` will succeed.
1360 ///
1361 /// This method is subject to the constraint that all existing batches
1362 /// should occur at higher levels, which requires it to "roll up" batches
1363 /// present at lower levels before the method is called. In doing this, we
1364 /// should not introduce more virtual records than 2^index, as that is the
1365 /// amount of excess fuel we have budgeted for completing merges.
1366 fn roll_up(&mut self, index: usize, log: &mut SpineLog<'_, T>) {
1367 // Ensure entries sufficient for `index`.
1368 while self.merging.len() <= index {
1369 self.merging.push(MergeState::default());
1370 }
1371
1372 // We only need to roll up if there are non-vacant layers.
1373 if self.merging[..index].iter().any(|m| !m.is_vacant()) {
1374 // Collect and merge all batches at layers up to but not including
1375 // `index`.
1376 let mut merged = None;
1377 for i in 0..index {
1378 if let Some(merged) = merged.take() {
1379 self.insert_at(merged, i);
1380 }
1381 merged = self.complete_at(i, log);
1382 }
1383
1384 // The merged results should be introduced at level `index`, which
1385 // should be ready to absorb them (possibly creating a new merge at
1386 // the time).
1387 if let Some(merged) = merged {
1388 self.insert_at(merged, index);
1389 }
1390
1391 // If the insertion results in a merge, we should complete it to
1392 // ensure the upcoming insertion at `index` does not panic.
1393 if self.merging[index].is_full() {
1394 let merged = self.complete_at(index, log).expect("double batch");
1395 self.insert_at(merged, index + 1);
1396 }
1397 }
1398 }
1399
1400 /// Applies an amount of fuel to merges in progress.
1401 ///
1402 /// The supplied `fuel` is for each in progress merge, and if we want to
1403 /// spend the fuel non-uniformly (e.g. prioritizing merges at low layers) we
1404 /// could do so in order to maintain fewer batches on average (at the risk
1405 /// of completing merges of large batches later, but tbh probably not much
1406 /// later).
1407 pub fn apply_fuel(&mut self, fuel: &isize, log: &mut SpineLog<'_, T>) {
1408 // For the moment our strategy is to apply fuel independently to each
1409 // merge in progress, rather than prioritizing small merges. This sounds
1410 // like a great idea, but we need better accounting in place to ensure
1411 // that merges that borrow against later layers but then complete still
1412 // "acquire" fuel to pay back their debts.
1413 for index in 0..self.merging.len() {
1414 // Give each level independent fuel, for now.
1415 let mut fuel = *fuel;
1416 // Pass along various logging stuffs, in case we need to report
1417 // success.
1418 self.merging[index].work(&mut fuel);
1419 // `fuel` could have a deficit at this point, meaning we over-spent
1420 // when we took a merge step. We could ignore this, or maintain the
1421 // deficit and account future fuel against it before spending again.
1422 // It isn't clear why that would be especially helpful to do; we
1423 // might want to avoid overspends at multiple layers in the same
1424 // invocation (to limit latencies), but there is probably a rich
1425 // policy space here.
1426
1427 // If a merge completes, we can immediately merge it in to the next
1428 // level, which is "guaranteed" to be complete at this point, by our
1429 // fueling discipline.
1430 if self.merging[index].is_complete() {
1431 let complete = self.complete_at(index, log).expect("complete batch");
1432 self.insert_at(complete, index + 1);
1433 }
1434 }
1435 }
1436
1437 /// Inserts a batch at a specific location.
1438 ///
1439 /// This is a non-public internal method that can panic if we try and insert
1440 /// into a layer which already contains two batches (and is still in the
1441 /// process of merging).
1442 fn insert_at(&mut self, batch: SpineBatch<T>, index: usize) {
1443 // Ensure the spine is large enough.
1444 while self.merging.len() <= index {
1445 self.merging.push(MergeState::default());
1446 }
1447
1448 // Insert the batch at the location.
1449 let merging = &mut self.merging[index];
1450 merging.push_batch(batch);
1451 if merging.batches.is_full() {
1452 let compaction_frontier = Some(self.since.borrow());
1453 merging.merge = SpineBatch::begin_merge(&merging.batches[..], compaction_frontier)
1454 }
1455 }
1456
1457 /// Completes and extracts what ever is at layer `index`, leaving this layer vacant.
1458 fn complete_at(&mut self, index: usize, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1459 self.merging[index].complete(log)
1460 }
1461
1462 /// Attempts to draw down large layers to size appropriate layers.
1463 fn tidy_layers(&mut self) {
1464 // If the largest layer is complete (not merging), we can attempt to
1465 // draw it down to the next layer. This is permitted if we can maintain
1466 // our invariant that below each merge there are at most half the
1467 // records that would be required to invade the merge.
1468 if !self.merging.is_empty() {
1469 let mut length = self.merging.len();
1470 if self.merging[length - 1].is_single() {
1471 // To move a batch down, we require that it contain few enough
1472 // records that the lower level is appropriate, and that moving
1473 // the batch would not create a merge violating our invariant.
1474 let appropriate_level = usize::cast_from(
1475 self.merging[length - 1]
1476 .len()
1477 .next_power_of_two()
1478 .trailing_zeros(),
1479 );
1480
1481 // Continue only as far as is appropriate
1482 while appropriate_level < length - 1 {
1483 let current = &mut self.merging[length - 2];
1484 if current.is_vacant() {
1485 // Vacant batches can be absorbed.
1486 self.merging.remove(length - 2);
1487 length = self.merging.len();
1488 } else {
1489 if !current.is_full() {
1490 // Single batches may initiate a merge, if sizes are
1491 // within bounds, but terminate the loop either way.
1492
1493 // Determine the number of records that might lead
1494 // to a merge. Importantly, this is not the number
1495 // of actual records, but the sum of upper bounds
1496 // based on indices.
1497 let mut smaller = 0;
1498 for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
1499 smaller += batch.batches.len() << index;
1500 }
1501
1502 if smaller <= (1 << length) / 8 {
1503 // Remove the batch under consideration (shifting the deeper batches up a level),
1504 // then merge in the single batch at the current level.
1505 let state = self.merging.remove(length - 2);
1506 assert_eq!(state.batches.len(), 1);
1507 for batch in state.batches {
1508 self.insert_at(batch, length - 2);
1509 }
1510 }
1511 }
1512 break;
1513 }
1514 }
1515 }
1516 }
1517 }
1518
1519 /// Checks invariants:
1520 /// - The lowers and uppers of all batches "line up".
1521 /// - The lower of the "minimum" batch is `antichain[T::minimum]`.
1522 /// - The upper of the "maximum" batch is `== self.upper`.
1523 /// - The since of each batch is `less_equal self.since`.
1524 /// - The `SpineIds` all "line up" and cover from `0` to `self.next_id`.
1525 /// - TODO: Verify fuel and level invariants.
1526 fn validate(&self) -> Result<(), String> {
1527 let mut id = SpineId(0, 0);
1528 let mut frontier = Antichain::from_elem(T::minimum());
1529 for x in self.merging.iter().rev() {
1530 if x.is_full() != x.merge.is_some() {
1531 return Err(format!(
1532 "all (and only) full batches should have fueling merges (full={}, merge={:?})",
1533 x.is_full(),
1534 x.merge,
1535 ));
1536 }
1537
1538 if let Some(m) = &x.merge {
1539 if !x.is_full() {
1540 return Err(format!(
1541 "merge should only exist for full batches (len={:?}, merge={:?})",
1542 x.batches.len(),
1543 m.id,
1544 ));
1545 }
1546 if x.id() != Some(m.id) {
1547 return Err(format!(
1548 "merge id should match the range of the batch ids (batch={:?}, merge={:?})",
1549 x.id(),
1550 m.id,
1551 ));
1552 }
1553 }
1554
1555 // TODO: Anything we can validate about x.merge? It'd
1556 // be nice to assert that it's bigger than the len of the
1557 // two batches, but apply_merge_res might swap those lengths
1558 // out from under us.
1559 for batch in &x.batches {
1560 if batch.id().0 != id.1 {
1561 return Err(format!(
1562 "batch id {:?} does not match the previous id {:?}: {:?}",
1563 batch.id(),
1564 id,
1565 self
1566 ));
1567 }
1568 id = batch.id();
1569 if batch.desc().lower() != &frontier {
1570 return Err(format!(
1571 "batch lower {:?} does not match the previous upper {:?}: {:?}",
1572 batch.desc().lower(),
1573 frontier,
1574 self
1575 ));
1576 }
1577 frontier.clone_from(batch.desc().upper());
1578 if !PartialOrder::less_equal(batch.desc().since(), &self.since) {
1579 return Err(format!(
1580 "since of batch {:?} past the spine since {:?}: {:?}",
1581 batch.desc().since(),
1582 self.since,
1583 self
1584 ));
1585 }
1586 }
1587 }
1588 if self.next_id != id.1 {
1589 return Err(format!(
1590 "spine next_id {:?} does not match the last batch's id {:?}: {:?}",
1591 self.next_id, id, self
1592 ));
1593 }
1594 if self.upper != frontier {
1595 return Err(format!(
1596 "spine upper {:?} does not match the last batch's upper {:?}: {:?}",
1597 self.upper, frontier, self
1598 ));
1599 }
1600 Ok(())
1601 }
1602}
1603
1604/// Describes the state of a layer.
1605///
1606/// A layer can be empty, contain a single batch, or contain a pair of batches
1607/// that are in the process of merging into a batch for the next layer.
1608#[derive(Debug, Clone)]
1609struct MergeState<T> {
1610 batches: ArrayVec<SpineBatch<T>, BATCHES_PER_LEVEL>,
1611 merge: Option<IdFuelingMerge<T>>,
1612}
1613
1614impl<T> Default for MergeState<T> {
1615 fn default() -> Self {
1616 Self {
1617 batches: ArrayVec::new(),
1618 merge: None,
1619 }
1620 }
1621}
1622
1623impl<T: Timestamp + Lattice> MergeState<T> {
1624 /// An id that covers all the batches in the given merge state, assuming there are any.
1625 fn id(&self) -> Option<SpineId> {
1626 if let (Some(first), Some(last)) = (self.batches.first(), self.batches.last()) {
1627 Some(SpineId(first.id().0, last.id().1))
1628 } else {
1629 None
1630 }
1631 }
1632
1633 /// A new single-batch merge state.
1634 fn single(batch: SpineBatch<T>) -> Self {
1635 let mut state = Self::default();
1636 state.push_batch(batch);
1637 state
1638 }
1639
1640 /// Push a new batch at this level, checking invariants.
1641 fn push_batch(&mut self, batch: SpineBatch<T>) {
1642 if let Some(last) = self.batches.last() {
1643 assert_eq!(last.id().1, batch.id().0);
1644 assert_eq!(last.upper(), batch.lower());
1645 }
1646 assert!(
1647 self.merge.is_none(),
1648 "Attempted to insert batch into incomplete merge! (batch={:?}, batch_count={})",
1649 batch.id,
1650 self.batches.len(),
1651 );
1652 self.batches
1653 .try_push(batch)
1654 .expect("Attempted to insert batch into full layer!");
1655 }
1656
1657 /// The number of actual updates contained in the level.
1658 fn len(&self) -> usize {
1659 self.batches.iter().map(SpineBatch::len).sum()
1660 }
1661
1662 /// True if this merge state contains no updates.
1663 fn is_empty(&self) -> bool {
1664 self.batches.iter().all(SpineBatch::is_empty)
1665 }
1666
1667 /// True if this level contains no batches.
1668 fn is_vacant(&self) -> bool {
1669 self.batches.is_empty()
1670 }
1671
1672 /// True only for a single-batch state.
1673 fn is_single(&self) -> bool {
1674 self.batches.len() == 1
1675 }
1676
1677 /// True if this merge cannot hold any more batches.
1678 /// (i.e. for a binary merge tree, true if this layer holds two batches.)
1679 fn is_full(&self) -> bool {
1680 self.batches.is_full()
1681 }
1682
1683 /// Immediately complete any merge.
1684 ///
1685 /// The result is either a batch, if there is a non-trivial batch to return
1686 /// or `None` if there is no meaningful batch to return.
1687 ///
1688 /// There is the additional option of input batches.
1689 fn complete(&mut self, log: &mut SpineLog<'_, T>) -> Option<SpineBatch<T>> {
1690 let mut this = mem::take(self);
1691 if this.batches.len() <= 1 {
1692 this.batches.pop()
1693 } else {
1694 // Merge the remaining batches, regardless of whether we have a fully fueled merge.
1695 let id_merge = this
1696 .merge
1697 .or_else(|| SpineBatch::begin_merge(&self.batches[..], None))?;
1698 id_merge.merge.done(this.batches, log)
1699 }
1700 }
1701
1702 /// True iff the layer is a complete merge, ready for extraction.
1703 fn is_complete(&self) -> bool {
1704 match &self.merge {
1705 Some(IdFuelingMerge { merge, .. }) => merge.remaining_work == 0,
1706 None => false,
1707 }
1708 }
1709
1710 /// Performs a bounded amount of work towards a merge.
1711 fn work(&mut self, fuel: &mut isize) {
1712 // We only perform work for merges in progress.
1713 if let Some(IdFuelingMerge { merge, .. }) = &mut self.merge {
1714 merge.work(&self.batches[..], fuel)
1715 }
1716 }
1717}
1718
1719#[cfg(test)]
1720pub mod datadriven {
1721 use crate::internal::datadriven::DirectiveArgs;
1722
1723 use super::*;
1724
1725 /// Shared state for a single [crate::internal::trace] [datadriven::TestFile].
1726 #[derive(Debug, Default)]
1727 pub struct TraceState {
1728 pub trace: Trace<u64>,
1729 pub merge_reqs: Vec<FueledMergeReq<u64>>,
1730 }
1731
1732 pub fn since_upper(
1733 datadriven: &TraceState,
1734 _args: DirectiveArgs,
1735 ) -> Result<String, anyhow::Error> {
1736 Ok(format!(
1737 "{:?}{:?}\n",
1738 datadriven.trace.since().elements(),
1739 datadriven.trace.upper().elements()
1740 ))
1741 }
1742
1743 pub fn batches(datadriven: &TraceState, _args: DirectiveArgs) -> Result<String, anyhow::Error> {
1744 let mut s = String::new();
1745 for b in datadriven.trace.spine.spine_batches() {
1746 s.push_str(b.describe(true).as_str());
1747 s.push('\n');
1748 }
1749 Ok(s)
1750 }
1751
1752 pub fn insert(
1753 datadriven: &mut TraceState,
1754 args: DirectiveArgs,
1755 ) -> Result<String, anyhow::Error> {
1756 for x in args
1757 .input
1758 .trim()
1759 .split('\n')
1760 .map(DirectiveArgs::parse_hollow_batch)
1761 {
1762 datadriven
1763 .merge_reqs
1764 .append(&mut datadriven.trace.push_batch(x));
1765 }
1766 Ok("ok\n".to_owned())
1767 }
1768
1769 pub fn downgrade_since(
1770 datadriven: &mut TraceState,
1771 args: DirectiveArgs,
1772 ) -> Result<String, anyhow::Error> {
1773 let since = args.expect("since");
1774 datadriven
1775 .trace
1776 .downgrade_since(&Antichain::from_elem(since));
1777 Ok("ok\n".to_owned())
1778 }
1779
1780 pub fn take_merge_req(
1781 datadriven: &mut TraceState,
1782 _args: DirectiveArgs,
1783 ) -> Result<String, anyhow::Error> {
1784 let mut s = String::new();
1785 for merge_req in std::mem::take(&mut datadriven.merge_reqs) {
1786 write!(
1787 s,
1788 "{:?}{:?}{:?} {}\n",
1789 merge_req.desc.lower().elements(),
1790 merge_req.desc.upper().elements(),
1791 merge_req.desc.since().elements(),
1792 merge_req
1793 .inputs
1794 .iter()
1795 .flat_map(|x| x.batch.parts.iter())
1796 .map(|x| x.printable_name())
1797 .collect::<Vec<_>>()
1798 .join(" ")
1799 );
1800 }
1801 Ok(s)
1802 }
1803
1804 pub fn apply_merge_res(
1805 datadriven: &mut TraceState,
1806 args: DirectiveArgs,
1807 ) -> Result<String, anyhow::Error> {
1808 let res = FueledMergeRes {
1809 output: DirectiveArgs::parse_hollow_batch(args.input),
1810 };
1811 match datadriven.trace.apply_merge_res(&res) {
1812 ApplyMergeResult::AppliedExact => Ok("applied exact\n".into()),
1813 ApplyMergeResult::AppliedSubset => Ok("applied subset\n".into()),
1814 ApplyMergeResult::NotAppliedNoMatch => Ok("no-op\n".into()),
1815 ApplyMergeResult::NotAppliedInvalidSince => Ok("no-op invalid since\n".into()),
1816 ApplyMergeResult::NotAppliedTooManyUpdates => Ok("no-op too many updates\n".into()),
1817 }
1818 }
1819}
1820
1821#[cfg(test)]
1822pub(crate) mod tests {
1823 use std::ops::Range;
1824
1825 use proptest::prelude::*;
1826 use semver::Version;
1827
1828 use crate::internal::state::tests::any_hollow_batch;
1829
1830 use super::*;
1831
1832 pub fn any_trace<T: Arbitrary + Timestamp + Lattice>(
1833 num_batches: Range<usize>,
1834 ) -> impl Strategy<Value = Trace<T>> {
1835 Strategy::prop_map(
1836 (
1837 any::<Option<T>>(),
1838 proptest::collection::vec(any_hollow_batch::<T>(), num_batches),
1839 any::<bool>(),
1840 any::<u64>(),
1841 ),
1842 |(since, mut batches, roundtrip_structure, timeout_ms)| {
1843 let mut trace = Trace::<T>::default();
1844 trace.downgrade_since(&since.map_or_else(Antichain::new, Antichain::from_elem));
1845
1846 // Fix up the arbitrary HollowBatches so the lowers and uppers
1847 // align.
1848 batches.sort_by(|x, y| x.desc.upper().elements().cmp(y.desc.upper().elements()));
1849 let mut lower = Antichain::from_elem(T::minimum());
1850 for mut batch in batches {
1851 // Overall trace since has to be past each batch's since.
1852 if PartialOrder::less_than(trace.since(), batch.desc.since()) {
1853 trace.downgrade_since(batch.desc.since());
1854 }
1855 batch.desc = Description::new(
1856 lower.clone(),
1857 batch.desc.upper().clone(),
1858 batch.desc.since().clone(),
1859 );
1860 lower.clone_from(batch.desc.upper());
1861 let _merge_req = trace.push_batch(batch);
1862 }
1863 let reqs: Vec<_> = trace
1864 .fueled_merge_reqs_before_ms(timeout_ms, None)
1865 .collect();
1866 for req in reqs {
1867 trace.claim_compaction(req.id, ActiveCompaction { start_ms: 0 })
1868 }
1869 trace.roundtrip_structure = roundtrip_structure;
1870 trace
1871 },
1872 )
1873 }
1874
1875 #[mz_ore::test]
1876 #[cfg_attr(miri, ignore)] // proptest is too heavy for miri!
1877 fn test_roundtrips() {
1878 fn check(trace: Trace<i64>) {
1879 trace.validate().unwrap();
1880 let flat = trace.flatten();
1881 let unflat = Trace::unflatten(flat).unwrap();
1882 assert_eq!(trace, unflat);
1883 }
1884
1885 proptest!(|(trace in any_trace::<i64>(1..10))| { check(trace) })
1886 }
1887
1888 #[mz_ore::test]
1889 fn fueled_merge_reqs() {
1890 let mut trace: Trace<u64> = Trace::default();
1891 let fueled_reqs = trace.push_batch(crate::internal::state::tests::hollow(
1892 0,
1893 10,
1894 &["n0011500/p3122e2a1-a0c7-429f-87aa-1019bf4f5f86"],
1895 1000,
1896 ));
1897
1898 assert!(fueled_reqs.is_empty());
1899 assert_eq!(
1900 trace.fueled_merge_reqs_before_ms(u64::MAX, None).count(),
1901 0,
1902 "no merge reqs when not filtering by version"
1903 );
1904 assert_eq!(
1905 trace
1906 .fueled_merge_reqs_before_ms(
1907 u64::MAX,
1908 Some(WriterKey::for_version(&Version::new(0, 50, 0)))
1909 )
1910 .count(),
1911 0,
1912 "zero batches are older than a past version"
1913 );
1914 assert_eq!(
1915 trace
1916 .fueled_merge_reqs_before_ms(
1917 u64::MAX,
1918 Some(WriterKey::for_version(&Version::new(99, 99, 0)))
1919 )
1920 .count(),
1921 1,
1922 "one batch is older than a future version"
1923 );
1924 }
1925
1926 #[mz_ore::test]
1927 fn remove_redundant_merge_reqs() {
1928 fn req(lower: u64, upper: u64) -> FueledMergeReq<u64> {
1929 FueledMergeReq {
1930 id: SpineId(usize::cast_from(lower), usize::cast_from(upper)),
1931 desc: Description::new(
1932 Antichain::from_elem(lower),
1933 Antichain::from_elem(upper),
1934 Antichain::new(),
1935 ),
1936 inputs: vec![],
1937 }
1938 }
1939
1940 // Empty
1941 assert_eq!(Trace::<u64>::remove_redundant_merge_reqs(vec![]), vec![]);
1942
1943 // Single
1944 assert_eq!(
1945 Trace::remove_redundant_merge_reqs(vec![req(0, 1)]),
1946 vec![req(0, 1)]
1947 );
1948
1949 // Duplicate
1950 assert_eq!(
1951 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(0, 1)]),
1952 vec![req(0, 1)]
1953 );
1954
1955 // Nothing covered
1956 assert_eq!(
1957 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req(1, 2)]),
1958 vec![req(1, 2), req(0, 1)]
1959 );
1960
1961 // Covered
1962 assert_eq!(
1963 Trace::remove_redundant_merge_reqs(vec![req(1, 2), req(0, 3)]),
1964 vec![req(0, 3)]
1965 );
1966
1967 // Covered, lower equal
1968 assert_eq!(
1969 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(0, 3)]),
1970 vec![req(0, 3)]
1971 );
1972
1973 // Covered, upper equal
1974 assert_eq!(
1975 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 3)]),
1976 vec![req(0, 3)]
1977 );
1978
1979 // Covered, unexpected order (doesn't happen in practice)
1980 assert_eq!(
1981 Trace::remove_redundant_merge_reqs(vec![req(0, 3), req(1, 2)]),
1982 vec![req(0, 3)]
1983 );
1984
1985 // Partially overlapping
1986 assert_eq!(
1987 Trace::remove_redundant_merge_reqs(vec![req(0, 2), req(1, 3)]),
1988 vec![req(1, 3), req(0, 2)]
1989 );
1990
1991 // Partially overlapping, the other order
1992 assert_eq!(
1993 Trace::remove_redundant_merge_reqs(vec![req(1, 3), req(0, 2)]),
1994 vec![req(0, 2), req(1, 3)]
1995 );
1996
1997 // Different sinces (doesn't happen in practice)
1998 let req015 = FueledMergeReq {
1999 id: SpineId(0, 1),
2000 desc: Description::new(
2001 Antichain::from_elem(0),
2002 Antichain::from_elem(1),
2003 Antichain::from_elem(5),
2004 ),
2005 inputs: vec![],
2006 };
2007 assert_eq!(
2008 Trace::remove_redundant_merge_reqs(vec![req(0, 1), req015.clone()]),
2009 vec![req015, req(0, 1)]
2010 );
2011 }
2012}