mz_persist_client/internal/
state_diff.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use bytes::{Bytes, BytesMut};
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::Description;
18use mz_ore::assert_none;
19use mz_ore::cast::CastFrom;
20use mz_persist::location::{SeqNo, VersionedData};
21use mz_persist_types::Codec64;
22use mz_persist_types::schema::SchemaId;
23use mz_proto::TryFromProtoError;
24use timely::PartialOrder;
25use timely::progress::{Antichain, Timestamp};
26use tracing::debug;
27
28use crate::critical::CriticalReaderId;
29use crate::internal::paths::PartialRollupKey;
30use crate::internal::state::{
31    CriticalReaderState, EncodedSchemas, HollowBatch, HollowBlobRef, HollowRollup,
32    LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
33    State, StateCollections, WriterState,
34};
35use crate::internal::trace::CompactionInput;
36use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
37use crate::read::LeasedReaderId;
38use crate::write::WriterId;
39use crate::{Metrics, PersistConfig, ShardId};
40
41use StateFieldValDiff::*;
42
43use super::state::{ActiveGc, ActiveRollup};
44
45#[derive(Clone, Debug)]
46#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
47pub enum StateFieldValDiff<V> {
48    Insert(V),
49    Update(V, V),
50    Delete(V),
51}
52
53#[derive(Clone)]
54#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
55pub struct StateFieldDiff<K, V> {
56    pub key: K,
57    pub val: StateFieldValDiff<V>,
58}
59
60impl<K: Debug, V: Debug> std::fmt::Debug for StateFieldDiff<K, V> {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("StateFieldDiff")
63            // In the cases we've seen in the wild, it's been more useful to
64            // have the val printed first.
65            .field("val", &self.val)
66            .field("key", &self.key)
67            .finish()
68    }
69}
70
71#[derive(Debug)]
72#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
73pub struct StateDiff<T> {
74    pub(crate) applier_version: semver::Version,
75    pub(crate) seqno_from: SeqNo,
76    pub(crate) seqno_to: SeqNo,
77    pub(crate) walltime_ms: u64,
78    pub(crate) latest_rollup_key: PartialRollupKey,
79    pub(crate) rollups: Vec<StateFieldDiff<SeqNo, HollowRollup>>,
80    pub(crate) active_rollup: Vec<StateFieldDiff<(), ActiveRollup>>,
81    pub(crate) active_gc: Vec<StateFieldDiff<(), ActiveGc>>,
82    pub(crate) hostname: Vec<StateFieldDiff<(), String>>,
83    pub(crate) last_gc_req: Vec<StateFieldDiff<(), SeqNo>>,
84    pub(crate) leased_readers: Vec<StateFieldDiff<LeasedReaderId, LeasedReaderState<T>>>,
85    pub(crate) critical_readers: Vec<StateFieldDiff<CriticalReaderId, CriticalReaderState<T>>>,
86    pub(crate) writers: Vec<StateFieldDiff<WriterId, WriterState<T>>>,
87    pub(crate) schemas: Vec<StateFieldDiff<SchemaId, EncodedSchemas>>,
88    pub(crate) since: Vec<StateFieldDiff<(), Antichain<T>>>,
89    pub(crate) legacy_batches: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
90    pub(crate) hollow_batches: Vec<StateFieldDiff<SpineId, Arc<HollowBatch<T>>>>,
91    pub(crate) spine_batches: Vec<StateFieldDiff<SpineId, ThinSpineBatch<T>>>,
92    pub(crate) merges: Vec<StateFieldDiff<SpineId, ThinMerge<T>>>,
93}
94
95impl<T: Timestamp + Codec64> StateDiff<T> {
96    pub fn new(
97        applier_version: semver::Version,
98        seqno_from: SeqNo,
99        seqno_to: SeqNo,
100        walltime_ms: u64,
101        latest_rollup_key: PartialRollupKey,
102    ) -> Self {
103        StateDiff {
104            applier_version,
105            seqno_from,
106            seqno_to,
107            walltime_ms,
108            latest_rollup_key,
109            rollups: Vec::default(),
110            active_rollup: Vec::default(),
111            active_gc: Vec::default(),
112            hostname: Vec::default(),
113            last_gc_req: Vec::default(),
114            leased_readers: Vec::default(),
115            critical_readers: Vec::default(),
116            writers: Vec::default(),
117            schemas: Vec::default(),
118            since: Vec::default(),
119            legacy_batches: Vec::default(),
120            hollow_batches: Vec::default(),
121            spine_batches: Vec::default(),
122            merges: Vec::default(),
123        }
124    }
125
126    pub fn referenced_batches(&self) -> impl Iterator<Item = StateFieldValDiff<&HollowBatch<T>>> {
127        let legacy_batches = self
128            .legacy_batches
129            .iter()
130            .filter_map(|diff| match diff.val {
131                Insert(()) => Some(Insert(&diff.key)),
132                Update((), ()) => None, // Ignoring a noop diff.
133                Delete(()) => Some(Delete(&diff.key)),
134            });
135        let hollow_batches = self.hollow_batches.iter().map(|diff| match &diff.val {
136            Insert(batch) => Insert(&**batch),
137            Update(before, after) => Update(&**before, &**after),
138            Delete(batch) => Delete(&**batch),
139        });
140        legacy_batches.chain(hollow_batches)
141    }
142}
143
144impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
145    pub fn from_diff(from: &State<T>, to: &State<T>) -> Self {
146        // Deconstruct from and to so we get a compile failure if new
147        // fields are added.
148        let State {
149            applier_version: _,
150            shard_id: from_shard_id,
151            seqno: from_seqno,
152            hostname: from_hostname,
153            walltime_ms: _, // Intentionally unused
154            collections:
155                StateCollections {
156                    last_gc_req: from_last_gc_req,
157                    rollups: from_rollups,
158                    active_rollup: from_active_rollup,
159                    active_gc: from_active_gc,
160                    leased_readers: from_leased_readers,
161                    critical_readers: from_critical_readers,
162                    writers: from_writers,
163                    schemas: from_schemas,
164                    trace: from_trace,
165                },
166        } = from;
167        let State {
168            applier_version: to_applier_version,
169            shard_id: to_shard_id,
170            seqno: to_seqno,
171            walltime_ms: to_walltime_ms,
172            hostname: to_hostname,
173            collections:
174                StateCollections {
175                    last_gc_req: to_last_gc_req,
176                    rollups: to_rollups,
177                    active_rollup: to_active_rollup,
178                    active_gc: to_active_gc,
179                    leased_readers: to_leased_readers,
180                    critical_readers: to_critical_readers,
181                    writers: to_writers,
182                    schemas: to_schemas,
183                    trace: to_trace,
184                },
185        } = to;
186        assert_eq!(from_shard_id, to_shard_id);
187
188        let (_, latest_rollup) = to.latest_rollup();
189        let mut diffs = Self::new(
190            to_applier_version.clone(),
191            *from_seqno,
192            *to_seqno,
193            *to_walltime_ms,
194            latest_rollup.key.clone(),
195        );
196        diff_field_single(from_hostname, to_hostname, &mut diffs.hostname);
197        diff_field_single(from_last_gc_req, to_last_gc_req, &mut diffs.last_gc_req);
198        diff_field_sorted_iter(
199            from_active_rollup.iter().map(|r| (&(), r)),
200            to_active_rollup.iter().map(|r| (&(), r)),
201            &mut diffs.active_rollup,
202        );
203        diff_field_sorted_iter(
204            from_active_gc.iter().map(|g| (&(), g)),
205            to_active_gc.iter().map(|g| (&(), g)),
206            &mut diffs.active_gc,
207        );
208        diff_field_sorted_iter(from_rollups.iter(), to_rollups, &mut diffs.rollups);
209        diff_field_sorted_iter(
210            from_leased_readers.iter(),
211            to_leased_readers,
212            &mut diffs.leased_readers,
213        );
214        diff_field_sorted_iter(
215            from_critical_readers.iter(),
216            to_critical_readers,
217            &mut diffs.critical_readers,
218        );
219        diff_field_sorted_iter(from_writers.iter(), to_writers, &mut diffs.writers);
220        diff_field_sorted_iter(from_schemas.iter(), to_schemas, &mut diffs.schemas);
221        diff_field_single(from_trace.since(), to_trace.since(), &mut diffs.since);
222
223        let from_flat = from_trace.flatten();
224        let to_flat = to_trace.flatten();
225        diff_field_sorted_iter(
226            from_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
227            to_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
228            &mut diffs.legacy_batches,
229        );
230        diff_field_sorted_iter(
231            from_flat.hollow_batches.iter(),
232            to_flat.hollow_batches.iter(),
233            &mut diffs.hollow_batches,
234        );
235        diff_field_sorted_iter(
236            from_flat.spine_batches.iter(),
237            to_flat.spine_batches.iter(),
238            &mut diffs.spine_batches,
239        );
240        diff_field_sorted_iter(
241            from_flat.merges.iter(),
242            to_flat.merges.iter(),
243            &mut diffs.merges,
244        );
245        diffs
246    }
247
248    pub(crate) fn blob_inserts(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
249        let batches = self
250            .referenced_batches()
251            .filter_map(|spine_diff| match spine_diff {
252                Insert(b) | Update(_, b) => Some(HollowBlobRef::Batch(b)),
253                Delete(_) => None, // No-op
254            });
255        let rollups = self
256            .rollups
257            .iter()
258            .filter_map(|rollups_diff| match &rollups_diff.val {
259                StateFieldValDiff::Insert(x) | StateFieldValDiff::Update(_, x) => {
260                    Some(HollowBlobRef::Rollup(x))
261                }
262                StateFieldValDiff::Delete(_) => None, // No-op
263            });
264        batches.chain(rollups)
265    }
266
267    pub(crate) fn part_deletes(&self) -> impl Iterator<Item = &RunPart<T>> {
268        // With the introduction of incremental compaction, we
269        // need to be more careful about what we consider "deleted".
270        // If there is a HollowBatch that we replace 2 out of the 4 runs of,
271        // we need to ensure that we only delete the runs that are actually
272        // no longer referenced.
273        let removed = self
274            .referenced_batches()
275            .filter_map(|spine_diff| match spine_diff {
276                Insert(_) => None,
277                Update(a, _) | Delete(a) => Some(a.parts.iter().collect::<Vec<_>>()),
278            });
279
280        let added: std::collections::BTreeSet<_> = self
281            .referenced_batches()
282            .filter_map(|spine_diff| match spine_diff {
283                Insert(a) | Update(_, a) => Some(a.parts.iter().collect::<Vec<_>>()),
284                Delete(_) => None,
285            })
286            .flatten()
287            .collect();
288
289        removed
290            .into_iter()
291            .flat_map(|x| x)
292            .filter(move |part| !added.contains(part))
293    }
294
295    pub(crate) fn rollup_deletes(&self) -> impl Iterator<Item = &HollowRollup> {
296        self.rollups
297            .iter()
298            .filter_map(|rollups_diff| match &rollups_diff.val {
299                Insert(_) => None,
300                Update(a, _) | Delete(a) => Some(a),
301            })
302    }
303
304    #[cfg(any(test, debug_assertions))]
305    #[allow(dead_code)]
306    pub fn validate_roundtrip<K, V, D>(
307        metrics: &Metrics,
308        from_state: &crate::internal::state::TypedState<K, V, T, D>,
309        diff: &Self,
310        to_state: &crate::internal::state::TypedState<K, V, T, D>,
311    ) -> Result<(), String>
312    where
313        K: mz_persist_types::Codec + std::fmt::Debug,
314        V: mz_persist_types::Codec + std::fmt::Debug,
315        D: differential_dataflow::difference::Semigroup + Codec64,
316    {
317        use mz_proto::RustType;
318        use prost::Message;
319
320        use crate::internal::state::ProtoStateDiff;
321
322        let mut roundtrip_state = from_state.clone(
323            from_state.applier_version.clone(),
324            from_state.hostname.clone(),
325        );
326        roundtrip_state.apply_diff(metrics, diff.clone())?;
327
328        if &roundtrip_state != to_state {
329            // The weird spacing in this format string is so they all line up
330            // when printed out.
331            return Err(format!(
332                "state didn't roundtrip\n  from_state {:?}\n  to_state   {:?}\n  rt_state   {:?}\n  diff       {:?}\n",
333                from_state, to_state, roundtrip_state, diff
334            ));
335        }
336
337        let encoded_diff = diff.into_proto().encode_to_vec();
338        let roundtrip_diff = Self::from_proto(
339            ProtoStateDiff::decode(encoded_diff.as_slice()).map_err(|err| err.to_string())?,
340        )
341        .map_err(|err| err.to_string())?;
342
343        if &roundtrip_diff != diff {
344            // The weird spacing in this format string is so they all line up
345            // when printed out.
346            return Err(format!(
347                "diff didn't roundtrip\n  diff    {:?}\n  rt_diff {:?}",
348                diff, roundtrip_diff
349            ));
350        }
351
352        Ok(())
353    }
354}
355
356impl<T: Timestamp + Lattice + Codec64> State<T> {
357    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
358        &mut self,
359        cfg: &PersistConfig,
360        metrics: &Metrics,
361        diffs: I,
362    ) {
363        let mut state_seqno = self.seqno;
364        let diffs = diffs.into_iter().filter_map(move |x| {
365            if x.seqno != state_seqno.next() {
366                // No-op.
367                return None;
368            }
369            let data = x.data.clone();
370            let diff = metrics
371                .codecs
372                .state_diff
373                // Note: `x.data` is a `Bytes`, so cloning just increments a ref count
374                .decode(|| StateDiff::decode(&cfg.build_version, x.data.clone()));
375            assert_eq!(diff.seqno_from, state_seqno);
376            state_seqno = diff.seqno_to;
377            Some((diff, data))
378        });
379        self.apply_diffs(metrics, diffs);
380    }
381}
382
383impl<T: Timestamp + Lattice + Codec64> State<T> {
384    pub fn apply_diffs<I: IntoIterator<Item = (StateDiff<T>, Bytes)>>(
385        &mut self,
386        metrics: &Metrics,
387        diffs: I,
388    ) {
389        for (diff, data) in diffs {
390            // TODO: This could special-case batch apply for diffs where it's
391            // more efficient (in particular, spine batches that hit the slow
392            // path).
393            match self.apply_diff(metrics, diff) {
394                Ok(()) => {}
395                Err(err) => {
396                    // Having the full diff in the error message is critical for debugging any
397                    // issues that may arise from diff application. We pass along the original
398                    // Bytes it decoded from just so we can decode in this error path, while
399                    // avoiding any extraneous clones in the expected Ok path.
400                    let diff = StateDiff::<T>::decode(&self.applier_version, data);
401                    panic!(
402                        "state diff should apply cleanly: {} diff {:?} state {:?}",
403                        err, diff, self
404                    )
405                }
406            }
407        }
408    }
409
410    // Intentionally not even pub(crate) because all callers should use
411    // [Self::apply_diffs].
412    pub(super) fn apply_diff(
413        &mut self,
414        metrics: &Metrics,
415        diff: StateDiff<T>,
416    ) -> Result<(), String> {
417        // Deconstruct diff so we get a compile failure if new fields are added.
418        let StateDiff {
419            applier_version: diff_applier_version,
420            seqno_from: diff_seqno_from,
421            seqno_to: diff_seqno_to,
422            walltime_ms: diff_walltime_ms,
423            latest_rollup_key: _,
424            rollups: diff_rollups,
425            active_rollup: diff_active_rollup,
426            active_gc: diff_active_gc,
427            hostname: diff_hostname,
428            last_gc_req: diff_last_gc_req,
429            leased_readers: diff_leased_readers,
430            critical_readers: diff_critical_readers,
431            writers: diff_writers,
432            schemas: diff_schemas,
433            since: diff_since,
434            legacy_batches: diff_legacy_batches,
435            hollow_batches: diff_hollow_batches,
436            spine_batches: diff_spine_batches,
437            merges: diff_merges,
438        } = diff;
439        if self.seqno == diff_seqno_to {
440            return Ok(());
441        }
442        if self.seqno != diff_seqno_from {
443            return Err(format!(
444                "could not apply diff {} -> {} to state {}",
445                diff_seqno_from, diff_seqno_to, self.seqno
446            ));
447        }
448        self.seqno = diff_seqno_to;
449        self.applier_version = diff_applier_version;
450        self.walltime_ms = diff_walltime_ms;
451        force_apply_diffs_single(
452            &self.shard_id,
453            diff_seqno_to,
454            "hostname",
455            diff_hostname,
456            &mut self.hostname,
457            metrics,
458        )?;
459
460        // Deconstruct collections so we get a compile failure if new fields are
461        // added.
462        let StateCollections {
463            last_gc_req,
464            rollups,
465            active_rollup,
466            active_gc,
467            leased_readers,
468            critical_readers,
469            writers,
470            schemas,
471            trace,
472        } = &mut self.collections;
473
474        apply_diffs_map("rollups", diff_rollups, rollups)?;
475        apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?;
476        apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?;
477        apply_diffs_single_option("active_gc", diff_active_gc, active_gc)?;
478        apply_diffs_map("leased_readers", diff_leased_readers, leased_readers)?;
479        apply_diffs_map("critical_readers", diff_critical_readers, critical_readers)?;
480        apply_diffs_map("writers", diff_writers, writers)?;
481        apply_diffs_map("schemas", diff_schemas, schemas)?;
482
483        let structure_unchanged = diff_hollow_batches.is_empty()
484            && diff_spine_batches.is_empty()
485            && diff_merges.is_empty();
486        let spine_unchanged =
487            diff_since.is_empty() && diff_legacy_batches.is_empty() && structure_unchanged;
488
489        if spine_unchanged {
490            return Ok(());
491        }
492
493        let mut flat = if trace.roundtrip_structure {
494            metrics.state.apply_spine_flattened.inc();
495            let mut flat = trace.flatten();
496            apply_diffs_single("since", diff_since, &mut flat.since)?;
497            apply_diffs_map(
498                "legacy_batches",
499                diff_legacy_batches
500                    .into_iter()
501                    .map(|StateFieldDiff { key, val }| StateFieldDiff {
502                        key: Arc::new(key),
503                        val,
504                    }),
505                &mut flat.legacy_batches,
506            )?;
507            Some(flat)
508        } else {
509            for x in diff_since {
510                match x.val {
511                    Update(from, to) => {
512                        if trace.since() != &from {
513                            return Err(format!(
514                                "since update didn't match: {:?} vs {:?}",
515                                self.collections.trace.since(),
516                                &from
517                            ));
518                        }
519                        trace.downgrade_since(&to);
520                    }
521                    Insert(_) => return Err("cannot insert since field".to_string()),
522                    Delete(_) => return Err("cannot delete since field".to_string()),
523                }
524            }
525            if !diff_legacy_batches.is_empty() {
526                apply_diffs_spine(metrics, diff_legacy_batches, trace)?;
527                debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
528            }
529            None
530        };
531
532        if !structure_unchanged {
533            let flat = flat.get_or_insert_with(|| trace.flatten());
534            apply_diffs_map(
535                "hollow_batches",
536                diff_hollow_batches,
537                &mut flat.hollow_batches,
538            )?;
539            apply_diffs_map("spine_batches", diff_spine_batches, &mut flat.spine_batches)?;
540            apply_diffs_map("merges", diff_merges, &mut flat.merges)?;
541        }
542
543        if let Some(flat) = flat {
544            *trace = Trace::unflatten(flat)?;
545        }
546
547        // There's various sanity checks that this method could run (e.g. since,
548        // upper, seqno_since, etc don't regress or that diff.latest_rollup ==
549        // state.rollups.last()), are they a good idea? On one hand, I like
550        // sanity checks, other the other, one of the goals here is to keep
551        // apply logic as straightforward and unchanging as possible.
552        Ok(())
553    }
554}
555
556fn diff_field_single<T: PartialEq + Clone>(
557    from: &T,
558    to: &T,
559    diffs: &mut Vec<StateFieldDiff<(), T>>,
560) {
561    // This could use the `diff_field_sorted_iter(once(from), once(to), diffs)`
562    // general impl, but we just do the obvious thing.
563    if from != to {
564        diffs.push(StateFieldDiff {
565            key: (),
566            val: Update(from.clone(), to.clone()),
567        })
568    }
569}
570
571fn apply_diffs_single_option<X: PartialEq + Debug>(
572    name: &str,
573    diffs: Vec<StateFieldDiff<(), X>>,
574    single: &mut Option<X>,
575) -> Result<(), String> {
576    for diff in diffs {
577        apply_diff_single_option(name, diff, single)?;
578    }
579    Ok(())
580}
581
582fn apply_diff_single_option<X: PartialEq + Debug>(
583    name: &str,
584    diff: StateFieldDiff<(), X>,
585    single: &mut Option<X>,
586) -> Result<(), String> {
587    match diff.val {
588        Update(from, to) => {
589            if single.as_ref() != Some(&from) {
590                return Err(format!(
591                    "{} update didn't match: {:?} vs {:?}",
592                    name, single, &from
593                ));
594            }
595            *single = Some(to)
596        }
597        Insert(to) => {
598            if single.is_some() {
599                return Err(format!("{} insert found existing value", name));
600            }
601            *single = Some(to)
602        }
603        Delete(from) => {
604            if single.as_ref() != Some(&from) {
605                return Err(format!(
606                    "{} delete didn't match: {:?} vs {:?}",
607                    name, single, &from
608                ));
609            }
610            *single = None
611        }
612    }
613    Ok(())
614}
615
616fn apply_diffs_single<X: PartialEq + Debug>(
617    name: &str,
618    diffs: Vec<StateFieldDiff<(), X>>,
619    single: &mut X,
620) -> Result<(), String> {
621    for diff in diffs {
622        apply_diff_single(name, diff, single)?;
623    }
624    Ok(())
625}
626
627fn apply_diff_single<X: PartialEq + Debug>(
628    name: &str,
629    diff: StateFieldDiff<(), X>,
630    single: &mut X,
631) -> Result<(), String> {
632    match diff.val {
633        Update(from, to) => {
634            if single != &from {
635                return Err(format!(
636                    "{} update didn't match: {:?} vs {:?}",
637                    name, single, &from
638                ));
639            }
640            *single = to
641        }
642        Insert(_) => return Err(format!("cannot insert {} field", name)),
643        Delete(_) => return Err(format!("cannot delete {} field", name)),
644    }
645    Ok(())
646}
647
648// A hack to force apply a diff, making `single` equal to
649// the Update `to` value, ignoring a mismatch on `from`.
650// Used to migrate forward after writing down incorrect
651// diffs.
652//
653// TODO: delete this once `hostname` has zero mismatches
654fn force_apply_diffs_single<X: PartialEq + Debug>(
655    shard_id: &ShardId,
656    seqno: SeqNo,
657    name: &str,
658    diffs: Vec<StateFieldDiff<(), X>>,
659    single: &mut X,
660    metrics: &Metrics,
661) -> Result<(), String> {
662    for diff in diffs {
663        force_apply_diff_single(shard_id, seqno, name, diff, single, metrics)?;
664    }
665    Ok(())
666}
667
668fn force_apply_diff_single<X: PartialEq + Debug>(
669    shard_id: &ShardId,
670    seqno: SeqNo,
671    name: &str,
672    diff: StateFieldDiff<(), X>,
673    single: &mut X,
674    metrics: &Metrics,
675) -> Result<(), String> {
676    match diff.val {
677        Update(from, to) => {
678            if single != &from {
679                debug!(
680                    "{}: update didn't match: {:?} vs {:?}, continuing to force apply diff to {:?} for shard {} and seqno {}",
681                    name, single, &from, &to, shard_id, seqno
682                );
683                metrics.state.force_apply_hostname.inc();
684            }
685            *single = to
686        }
687        Insert(_) => return Err(format!("cannot insert {} field", name)),
688        Delete(_) => return Err(format!("cannot delete {} field", name)),
689    }
690    Ok(())
691}
692
693fn diff_field_sorted_iter<'a, K, V, IF, IT>(from: IF, to: IT, diffs: &mut Vec<StateFieldDiff<K, V>>)
694where
695    K: Ord + Clone + 'a,
696    V: PartialEq + Clone + 'a,
697    IF: IntoIterator<Item = (&'a K, &'a V)>,
698    IT: IntoIterator<Item = (&'a K, &'a V)>,
699{
700    let (mut from, mut to) = (from.into_iter(), to.into_iter());
701    let (mut f, mut t) = (from.next(), to.next());
702    loop {
703        match (f, t) {
704            (None, None) => break,
705            (Some((fk, fv)), Some((tk, tv))) => match fk.cmp(tk) {
706                Ordering::Less => {
707                    diffs.push(StateFieldDiff {
708                        key: fk.clone(),
709                        val: Delete(fv.clone()),
710                    });
711                    let f_next = from.next();
712                    debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
713                    f = f_next;
714                }
715                Ordering::Greater => {
716                    diffs.push(StateFieldDiff {
717                        key: tk.clone(),
718                        val: Insert(tv.clone()),
719                    });
720                    let t_next = to.next();
721                    debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
722                    t = t_next;
723                }
724                Ordering::Equal => {
725                    // TODO: regression test for this if, I missed it in the
726                    // original impl :)
727                    if fv != tv {
728                        diffs.push(StateFieldDiff {
729                            key: fk.clone(),
730                            val: Update(fv.clone(), tv.clone()),
731                        });
732                    }
733                    let f_next = from.next();
734                    debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
735                    f = f_next;
736                    let t_next = to.next();
737                    debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
738                    t = t_next;
739                }
740            },
741            (None, Some((tk, tv))) => {
742                diffs.push(StateFieldDiff {
743                    key: tk.clone(),
744                    val: Insert(tv.clone()),
745                });
746                let t_next = to.next();
747                debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
748                t = t_next;
749            }
750            (Some((fk, fv)), None) => {
751                diffs.push(StateFieldDiff {
752                    key: fk.clone(),
753                    val: Delete(fv.clone()),
754                });
755                let f_next = from.next();
756                debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
757                f = f_next;
758            }
759        }
760    }
761}
762
763fn apply_diffs_map<K: Ord, V: PartialEq + Debug>(
764    name: &str,
765    diffs: impl IntoIterator<Item = StateFieldDiff<K, V>>,
766    map: &mut BTreeMap<K, V>,
767) -> Result<(), String> {
768    for diff in diffs {
769        apply_diff_map(name, diff, map)?;
770    }
771    Ok(())
772}
773
774// This might leave state in an invalid (umm) state when returning an error. The
775// caller ultimately ends up panic'ing on error, but if that changes, we might
776// want to revisit this.
777fn apply_diff_map<K: Ord, V: PartialEq + Debug>(
778    name: &str,
779    diff: StateFieldDiff<K, V>,
780    map: &mut BTreeMap<K, V>,
781) -> Result<(), String> {
782    match diff.val {
783        Insert(to) => {
784            let prev = map.insert(diff.key, to);
785            if prev != None {
786                return Err(format!("{} insert found existing value: {:?}", name, prev));
787            }
788        }
789        Update(from, to) => {
790            let prev = map.insert(diff.key, to);
791            if prev.as_ref() != Some(&from) {
792                return Err(format!(
793                    "{} update didn't match: {:?} vs {:?}",
794                    name,
795                    prev,
796                    Some(from),
797                ));
798            }
799        }
800        Delete(from) => {
801            let prev = map.remove(&diff.key);
802            if prev.as_ref() != Some(&from) {
803                return Err(format!(
804                    "{} delete didn't match: {:?} vs {:?}",
805                    name,
806                    prev,
807                    Some(from),
808                ));
809            }
810        }
811    };
812    Ok(())
813}
814
815// This might leave state in an invalid (umm) state when returning an error. The
816// caller ultimately ends up panic'ing on error, but if that changes, we might
817// want to revisit this.
818fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
819    metrics: &Metrics,
820    mut diffs: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
821    trace: &mut Trace<T>,
822) -> Result<(), String> {
823    // Another special case: sniff out a newly inserted batch (one whose lower
824    // lines up with the current upper) and handle that now. Then fall through
825    // to the rest of the handling on whatever is left.
826    if let Some(insert) = sniff_insert(&mut diffs, trace.upper()) {
827        // Ignore merge_reqs because whichever process generated this diff is
828        // assigned the work.
829        let () = trace.push_batch_no_merge_reqs(insert);
830        // If this insert was the only thing in diffs, then return now instead
831        // of falling through to the "no diffs" case in the match so we can inc
832        // the apply_spine_fast_path metric.
833        if diffs.is_empty() {
834            metrics.state.apply_spine_fast_path.inc();
835            return Ok(());
836        }
837    }
838
839    match &diffs[..] {
840        // Fast-path: no diffs.
841        [] => return Ok(()),
842
843        // Fast-path: batch insert with both new and most recent batch empty.
844        // Spine will happily merge these empty batches together without a call
845        // out to compaction.
846        [
847            StateFieldDiff {
848                key: del,
849                val: StateFieldValDiff::Delete(()),
850            },
851            StateFieldDiff {
852                key: ins,
853                val: StateFieldValDiff::Insert(()),
854            },
855        ] => {
856            if del.is_empty()
857                && ins.is_empty()
858                && del.desc.lower() == ins.desc.lower()
859                && PartialOrder::less_than(del.desc.upper(), ins.desc.upper())
860            {
861                // Ignore merge_reqs because whichever process generated this diff is
862                // assigned the work.
863                let () = trace.push_batch_no_merge_reqs(HollowBatch::empty(Description::new(
864                    del.desc.upper().clone(),
865                    ins.desc.upper().clone(),
866                    // `keys.len() == 0` for both `del` and `ins` means we
867                    // don't have to think about what the compaction
868                    // frontier is for these batches (nothing in them, so nothing could have been compacted.
869                    Antichain::from_elem(T::minimum()),
870                )));
871                metrics.state.apply_spine_fast_path.inc();
872                return Ok(());
873            }
874        }
875        // Fall-through
876        _ => {}
877    }
878
879    // Fast-path: compaction
880    if let Some((_inputs, output)) = sniff_compaction(&diffs) {
881        let res = FueledMergeRes {
882            output,
883            input: CompactionInput::Legacy,
884            new_active_compaction: None,
885        };
886        // We can't predict how spine will arrange the batches when it's
887        // hydrated. This means that something that is maintaining a Spine
888        // starting at some seqno may not exactly match something else
889        // maintaining the same spine starting at a different seqno. (Plus,
890        // maybe these aren't even on the same version of the code and we've
891        // changed the spine logic.) Because apply_merge_res is strict,
892        // we're not _guaranteed_ that we can apply a compaction response
893        // that was generated elsewhere. Most of the time we can, though, so
894        // count the good ones and fall back to the slow path below when we
895        // can't.
896        if trace.apply_merge_res_unchecked(&res).applied() {
897            // Maybe return the replaced batches from apply_merge_res and verify
898            // that they match _inputs?
899            metrics.state.apply_spine_fast_path.inc();
900            return Ok(());
901        }
902
903        // Otherwise, try our lenient application of a compaction result.
904        let mut batches = Vec::new();
905        trace.map_batches(|b| batches.push(b.clone()));
906
907        match apply_compaction_lenient(metrics, batches, &res.output) {
908            Ok(batches) => {
909                let mut new_trace = Trace::default();
910                new_trace.roundtrip_structure = trace.roundtrip_structure;
911                new_trace.downgrade_since(trace.since());
912                for batch in batches {
913                    // Ignore merge_reqs because whichever process generated
914                    // this diff is assigned the work.
915                    let () = new_trace.push_batch_no_merge_reqs(batch.clone());
916                }
917                *trace = new_trace;
918                metrics.state.apply_spine_slow_path_lenient.inc();
919                return Ok(());
920            }
921            Err(err) => {
922                return Err(format!(
923                    "lenient compaction result apply unexpectedly failed: {}",
924                    err
925                ));
926            }
927        }
928    }
929
930    // Something complicated is going on, so reconstruct the Trace from scratch.
931    metrics.state.apply_spine_slow_path.inc();
932    debug!(
933        "apply_diffs_spine didn't hit a fast-path diffs={:?} trace={:?}",
934        diffs, trace
935    );
936
937    let batches = {
938        let mut batches = BTreeMap::new();
939        trace.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
940        apply_diffs_map("spine", diffs.clone(), &mut batches).map(|_ok| batches)
941    };
942
943    let batches = match batches {
944        Ok(batches) => batches,
945        Err(err) => {
946            metrics
947                .state
948                .apply_spine_slow_path_with_reconstruction
949                .inc();
950            debug!(
951                "apply_diffs_spines could not apply diffs directly to existing trace batches: {}. diffs={:?} trace={:?}",
952                err, diffs, trace
953            );
954            // if we couldn't apply our diffs directly to our trace's batches, we can
955            // try one more trick: reconstruct a new spine with our existing batches,
956            // in an attempt to create different merges than we currently have. then,
957            // we can try to apply our diffs on top of these new (potentially) merged
958            // batches.
959            let mut reconstructed_spine = Trace::default();
960            reconstructed_spine.roundtrip_structure = trace.roundtrip_structure;
961            trace.map_batches(|b| {
962                // Ignore merge_reqs because whichever process generated this
963                // diff is assigned the work.
964                let () = reconstructed_spine.push_batch_no_merge_reqs(b.clone());
965            });
966
967            let mut batches = BTreeMap::new();
968            reconstructed_spine.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
969            apply_diffs_map("spine", diffs, &mut batches)?;
970            batches
971        }
972    };
973
974    let mut new_trace = Trace::default();
975    new_trace.roundtrip_structure = trace.roundtrip_structure;
976    new_trace.downgrade_since(trace.since());
977    for (batch, ()) in batches {
978        // Ignore merge_reqs because whichever process generated this diff is
979        // assigned the work.
980        let () = new_trace.push_batch_no_merge_reqs(batch);
981    }
982    *trace = new_trace;
983    Ok(())
984}
985
986fn sniff_insert<T: Timestamp + Lattice>(
987    diffs: &mut Vec<StateFieldDiff<HollowBatch<T>, ()>>,
988    upper: &Antichain<T>,
989) -> Option<HollowBatch<T>> {
990    for idx in 0..diffs.len() {
991        match &diffs[idx] {
992            StateFieldDiff {
993                key,
994                val: StateFieldValDiff::Insert(()),
995            } if key.desc.lower() == upper => return Some(diffs.remove(idx).key),
996            _ => continue,
997        }
998    }
999    None
1000}
1001
1002// TODO: Instead of trying to sniff out a compaction from diffs, should we just
1003// be explicit?
1004fn sniff_compaction<'a, T: Timestamp + Lattice>(
1005    diffs: &'a [StateFieldDiff<HollowBatch<T>, ()>],
1006) -> Option<(Vec<&'a HollowBatch<T>>, HollowBatch<T>)> {
1007    // Compaction always produces exactly one output batch (with possibly many
1008    // parts, but we get one Insert for the whole batch.
1009    let mut inserts = diffs.iter().flat_map(|x| match x.val {
1010        StateFieldValDiff::Insert(()) => Some(&x.key),
1011        _ => None,
1012    });
1013    let compaction_output = match inserts.next() {
1014        Some(x) => x,
1015        None => return None,
1016    };
1017    if let Some(_) = inserts.next() {
1018        return None;
1019    }
1020
1021    // Grab all deletes and sanity check that there are no updates.
1022    let mut compaction_inputs = Vec::with_capacity(diffs.len() - 1);
1023    for diff in diffs.iter() {
1024        match diff.val {
1025            StateFieldValDiff::Delete(()) => {
1026                compaction_inputs.push(&diff.key);
1027            }
1028            StateFieldValDiff::Insert(()) => {}
1029            StateFieldValDiff::Update((), ()) => {
1030                // Fall through to let the general case create the error
1031                // message.
1032                return None;
1033            }
1034        }
1035    }
1036
1037    Some((compaction_inputs, compaction_output.clone()))
1038}
1039
1040/// Apply a compaction diff that doesn't exactly line up with the set of
1041/// HollowBatches.
1042///
1043/// Because of the way Spine internally optimizes only _some_ empty batches
1044/// (immediately merges them in), we can end up in a situation where a
1045/// compaction res applied on another copy of state, but when we replay all of
1046/// the state diffs against a new Spine locally, it merges empty batches
1047/// differently in-mem and we can't exactly apply the compaction diff. Example:
1048///
1049/// - compact: [1,2),[2,3) -> [1,3)
1050/// - this spine: [0,2),[2,3) (0,1 is empty)
1051///
1052/// Ideally, we'd figure out a way to avoid this, but nothing immediately comes
1053/// to mind. In the meantime, force the application (otherwise the shard is
1054/// stuck and we can't do anything with it) by manually splitting the empty
1055/// batch back out. For the example above:
1056///
1057/// - [0,1),[1,3) (0,1 is empty)
1058///
1059/// This can only happen when the batch needing to be split is empty, so error
1060/// out if it isn't because that means something unexpected is going on.
1061fn apply_compaction_lenient<'a, T: Timestamp + Lattice>(
1062    metrics: &Metrics,
1063    mut trace: Vec<HollowBatch<T>>,
1064    replacement: &'a HollowBatch<T>,
1065) -> Result<Vec<HollowBatch<T>>, String> {
1066    let mut overlapping_batches = Vec::new();
1067    trace.retain(|b| {
1068        let before_replacement = PartialOrder::less_equal(b.desc.upper(), replacement.desc.lower());
1069        let after_replacement = PartialOrder::less_equal(replacement.desc.upper(), b.desc.lower());
1070        let overlaps_replacement = !(before_replacement || after_replacement);
1071        if overlaps_replacement {
1072            overlapping_batches.push(b.clone());
1073            false
1074        } else {
1075            true
1076        }
1077    });
1078
1079    {
1080        let first_overlapping_batch = match overlapping_batches.first() {
1081            Some(x) => x,
1082            None => return Err("replacement didn't overlap any batches".into()),
1083        };
1084        if PartialOrder::less_than(
1085            first_overlapping_batch.desc.lower(),
1086            replacement.desc.lower(),
1087        ) {
1088            if first_overlapping_batch.len > 0 {
1089                return Err(format!(
1090                    "overlapping batch was unexpectedly non-empty: {:?}",
1091                    first_overlapping_batch
1092                ));
1093            }
1094            let desc = Description::new(
1095                first_overlapping_batch.desc.lower().clone(),
1096                replacement.desc.lower().clone(),
1097                first_overlapping_batch.desc.since().clone(),
1098            );
1099            trace.push(HollowBatch::empty(desc));
1100            metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1101        }
1102    }
1103
1104    {
1105        let last_overlapping_batch = match overlapping_batches.last() {
1106            Some(x) => x,
1107            None => return Err("replacement didn't overlap any batches".into()),
1108        };
1109        if PartialOrder::less_than(
1110            replacement.desc.upper(),
1111            last_overlapping_batch.desc.upper(),
1112        ) {
1113            if last_overlapping_batch.len > 0 {
1114                return Err(format!(
1115                    "overlapping batch was unexpectedly non-empty: {:?}",
1116                    last_overlapping_batch
1117                ));
1118            }
1119            let desc = Description::new(
1120                replacement.desc.upper().clone(),
1121                last_overlapping_batch.desc.upper().clone(),
1122                last_overlapping_batch.desc.since().clone(),
1123            );
1124            trace.push(HollowBatch::empty(desc));
1125            metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1126        }
1127    }
1128    trace.push(replacement.clone());
1129
1130    // We just inserted stuff at the end, so re-sort them into place.
1131    trace.sort_by(|a, b| a.desc.lower().elements().cmp(b.desc.lower().elements()));
1132
1133    // This impl is a touch complex, so sanity check our work.
1134    let mut expected_lower = &Antichain::from_elem(T::minimum());
1135    for b in trace.iter() {
1136        if b.desc.lower() != expected_lower {
1137            return Err(format!(
1138                "lower {:?} did not match expected {:?}: {:?}",
1139                b.desc.lower(),
1140                expected_lower,
1141                trace
1142            ));
1143        }
1144        expected_lower = b.desc.upper();
1145    }
1146    Ok(trace)
1147}
1148
1149/// A type that facilitates the proto encoding of a [`ProtoStateFieldDiffs`]
1150///
1151/// [`ProtoStateFieldDiffs`] is a columnar encoding of [`StateFieldDiff`]s, see
1152/// its doc comment for more info. The underlying buffer for a [`ProtoStateFieldDiffs`]
1153/// is a [`Bytes`] struct, which is an immutable, shared, reference counted,
1154/// buffer of data. Using a [`Bytes`] struct is a very efficient way to manage data
1155/// becuase multiple [`Bytes`] can reference different parts of the same underlying
1156/// portion of memory. See its doc comment for more info.
1157///
1158/// A [`ProtoStateFieldDiffsWriter`] maintains a mutable, unique, data buffer, i.e.
1159/// a [`BytesMut`], which we use when encoding a [`StateFieldDiff`]. And when
1160/// finished encoding, we convert it into a [`ProtoStateFieldDiffs`] by "freezing" the
1161/// underlying buffer, converting it into a [`Bytes`] struct, so it can be shared.
1162///
1163/// [`Bytes`]: bytes::Bytes
1164#[derive(Debug)]
1165pub struct ProtoStateFieldDiffsWriter {
1166    data_buf: BytesMut,
1167    proto: ProtoStateFieldDiffs,
1168}
1169
1170impl ProtoStateFieldDiffsWriter {
1171    /// Record a [`ProtoStateField`] for our columnar encoding.
1172    pub fn push_field(&mut self, field: ProtoStateField) {
1173        self.proto.fields.push(i32::from(field));
1174    }
1175
1176    /// Record a [`ProtoStateFieldDiffType`] for our columnar encoding.
1177    pub fn push_diff_type(&mut self, diff_type: ProtoStateFieldDiffType) {
1178        self.proto.diff_types.push(i32::from(diff_type));
1179    }
1180
1181    /// Encode a message for our columnar encoding.
1182    pub fn encode_proto<M: prost::Message>(&mut self, msg: &M) {
1183        let len_before = self.data_buf.len();
1184        self.data_buf.reserve(msg.encoded_len());
1185
1186        // Note: we use `encode_raw` as opposed to `encode` because all `encode` does is
1187        // check to make sure there's enough bytes in the buffer to fit our message
1188        // which we know there are because we just reserved the space. When benchmarking
1189        // `encode_raw` does offer a slight performance improvement over `encode`.
1190        msg.encode_raw(&mut self.data_buf);
1191
1192        // Record exactly how many bytes were written.
1193        let written_len = self.data_buf.len() - len_before;
1194        self.proto.data_lens.push(u64::cast_from(written_len));
1195    }
1196
1197    pub fn into_proto(self) -> ProtoStateFieldDiffs {
1198        let ProtoStateFieldDiffsWriter {
1199            data_buf,
1200            mut proto,
1201        } = self;
1202
1203        // Assert we didn't write into the proto's data_bytes field
1204        assert!(proto.data_bytes.is_empty());
1205
1206        // Move our buffer into the proto
1207        let data_bytes = data_buf.freeze();
1208        proto.data_bytes = data_bytes;
1209
1210        proto
1211    }
1212}
1213
1214impl ProtoStateFieldDiffs {
1215    pub fn into_writer(mut self) -> ProtoStateFieldDiffsWriter {
1216        // Create a new buffer which we'll encode data into.
1217        let mut data_buf = BytesMut::with_capacity(self.data_bytes.len());
1218
1219        // Take our existing data, and copy it into our buffer.
1220        let existing_data = std::mem::take(&mut self.data_bytes);
1221        data_buf.extend(existing_data);
1222
1223        ProtoStateFieldDiffsWriter {
1224            data_buf,
1225            proto: self,
1226        }
1227    }
1228
1229    pub fn iter<'a>(&'a self) -> ProtoStateFieldDiffsIter<'a> {
1230        let len = self.fields.len();
1231        assert_eq!(self.diff_types.len(), len);
1232
1233        ProtoStateFieldDiffsIter {
1234            len,
1235            diff_idx: 0,
1236            data_idx: 0,
1237            data_offset: 0,
1238            diffs: self,
1239        }
1240    }
1241
1242    pub fn validate(&self) -> Result<(), String> {
1243        if self.fields.len() != self.diff_types.len() {
1244            return Err(format!(
1245                "fields {} and diff_types {} lengths disagree",
1246                self.fields.len(),
1247                self.diff_types.len()
1248            ));
1249        }
1250
1251        let mut expected_data_slices = 0;
1252        for diff_type in self.diff_types.iter() {
1253            // We expect one for the key.
1254            expected_data_slices += 1;
1255            // And 1 or 2 for val depending on the diff type.
1256            match ProtoStateFieldDiffType::try_from(*diff_type) {
1257                Ok(ProtoStateFieldDiffType::Insert) => expected_data_slices += 1,
1258                Ok(ProtoStateFieldDiffType::Update) => expected_data_slices += 2,
1259                Ok(ProtoStateFieldDiffType::Delete) => expected_data_slices += 1,
1260                Err(_) => return Err(format!("unknown diff_type {}", diff_type)),
1261            }
1262        }
1263        if expected_data_slices != self.data_lens.len() {
1264            return Err(format!(
1265                "expected {} data slices got {}",
1266                expected_data_slices,
1267                self.data_lens.len()
1268            ));
1269        }
1270
1271        let expected_data_bytes = usize::cast_from(self.data_lens.iter().copied().sum::<u64>());
1272        if expected_data_bytes != self.data_bytes.len() {
1273            return Err(format!(
1274                "expected {} data bytes got {}",
1275                expected_data_bytes,
1276                self.data_bytes.len()
1277            ));
1278        }
1279
1280        Ok(())
1281    }
1282}
1283
1284#[derive(Debug)]
1285pub struct ProtoStateFieldDiff<'a> {
1286    pub key: &'a [u8],
1287    pub diff_type: ProtoStateFieldDiffType,
1288    pub from: &'a [u8],
1289    pub to: &'a [u8],
1290}
1291
1292pub struct ProtoStateFieldDiffsIter<'a> {
1293    len: usize,
1294    diff_idx: usize,
1295    data_idx: usize,
1296    data_offset: usize,
1297    diffs: &'a ProtoStateFieldDiffs,
1298}
1299
1300impl<'a> Iterator for ProtoStateFieldDiffsIter<'a> {
1301    type Item = Result<(ProtoStateField, ProtoStateFieldDiff<'a>), TryFromProtoError>;
1302
1303    fn next(&mut self) -> Option<Self::Item> {
1304        if self.diff_idx >= self.len {
1305            return None;
1306        }
1307        let mut next_data = || {
1308            let start = self.data_offset;
1309            let end = start + usize::cast_from(self.diffs.data_lens[self.data_idx]);
1310            let data = &self.diffs.data_bytes[start..end];
1311            self.data_idx += 1;
1312            self.data_offset = end;
1313            data
1314        };
1315        let field = match ProtoStateField::try_from(self.diffs.fields[self.diff_idx]) {
1316            Ok(x) => x,
1317            Err(_) => {
1318                return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1319                    "ProtoStateField({})",
1320                    self.diffs.fields[self.diff_idx]
1321                ))));
1322            }
1323        };
1324        let diff_type =
1325            match ProtoStateFieldDiffType::try_from(self.diffs.diff_types[self.diff_idx]) {
1326                Ok(x) => x,
1327                Err(_) => {
1328                    return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1329                        "ProtoStateFieldDiffType({})",
1330                        self.diffs.diff_types[self.diff_idx]
1331                    ))));
1332                }
1333            };
1334        let key = next_data();
1335        let (from, to): (&[u8], &[u8]) = match diff_type {
1336            ProtoStateFieldDiffType::Insert => (&[], next_data()),
1337            ProtoStateFieldDiffType::Update => (next_data(), next_data()),
1338            ProtoStateFieldDiffType::Delete => (next_data(), &[]),
1339        };
1340        let diff = ProtoStateFieldDiff {
1341            key,
1342            diff_type,
1343            from,
1344            to,
1345        };
1346        self.diff_idx += 1;
1347        Some(Ok((field, diff)))
1348    }
1349}
1350
1351#[cfg(test)]
1352mod tests {
1353    use semver::Version;
1354    use std::ops::ControlFlow::Continue;
1355
1356    use crate::internal::paths::{PartId, PartialBatchKey, RollupId, WriterKey};
1357    use mz_ore::metrics::MetricsRegistry;
1358
1359    use crate::ShardId;
1360    use crate::internal::state::TypedState;
1361
1362    use super::*;
1363
1364    /// Model a situation where a "leader" is constantly making changes to its state, and a "follower"
1365    /// is applying those changes as diffs.
1366    #[mz_ore::test]
1367    #[cfg_attr(miri, ignore)] // too slow
1368    fn test_state_sync() {
1369        use proptest::prelude::*;
1370
1371        #[derive(Debug, Clone)]
1372        enum Action {
1373            /// Append a (non)empty batch to the shard that covers the given length of time.
1374            Append { empty: bool, time_delta: u64 },
1375            /// Apply the Nth compaction request we've received to the shard state.
1376            Compact { req: usize },
1377        }
1378
1379        let action_gen: BoxedStrategy<Action> = {
1380            prop::strategy::Union::new([
1381                (any::<bool>(), 1u64..10u64)
1382                    .prop_map(|(empty, time_delta)| Action::Append { empty, time_delta })
1383                    .boxed(),
1384                (0usize..10usize)
1385                    .prop_map(|req| Action::Compact { req })
1386                    .boxed(),
1387            ])
1388            .boxed()
1389        };
1390
1391        fn run(actions: Vec<(Action, bool)>, metrics: &Metrics) {
1392            let version = Version::new(0, 100, 0);
1393            let writer_key = WriterKey::Version(version.to_string());
1394            let id = ShardId::new();
1395            let hostname = "computer";
1396            let typed: TypedState<String, (), u64, i64> =
1397                TypedState::new(version, id, hostname.to_string(), 0);
1398            let mut leader = typed.state;
1399
1400            let seqno = SeqNo::minimum();
1401            let mut lower = 0u64;
1402            let mut merge_reqs = vec![];
1403
1404            leader.collections.rollups.insert(
1405                seqno,
1406                HollowRollup {
1407                    key: PartialRollupKey::new(seqno, &RollupId::new()),
1408                    encoded_size_bytes: None,
1409                },
1410            );
1411            leader.collections.trace.roundtrip_structure = false;
1412            let mut follower = leader.clone();
1413
1414            for (action, roundtrip_structure) in actions {
1415                // Apply the given action and the new roundtrip_structure setting and take a diff.
1416                let mut old_leader = leader.clone();
1417                match action {
1418                    Action::Append { empty, time_delta } => {
1419                        let upper = lower + time_delta;
1420                        let key = if empty {
1421                            None
1422                        } else {
1423                            let id = PartId::new();
1424                            Some(PartialBatchKey::new(&writer_key, &id))
1425                        };
1426
1427                        let keys = key.as_ref().map(|k| k.0.as_str());
1428                        let reqs = leader.collections.trace.push_batch(
1429                            crate::internal::state::tests::hollow(
1430                                lower,
1431                                upper,
1432                                keys.as_slice(),
1433                                if empty { 0 } else { 1 },
1434                            ),
1435                        );
1436                        merge_reqs.extend(reqs);
1437                        lower = upper;
1438                    }
1439                    Action::Compact { req } => {
1440                        if !merge_reqs.is_empty() {
1441                            let req = merge_reqs.remove(req.min(merge_reqs.len() - 1));
1442                            let len = req.inputs.iter().map(|p| p.batch.len).sum();
1443                            let parts = req
1444                                .inputs
1445                                .into_iter()
1446                                .flat_map(|p| p.batch.parts.clone())
1447                                .collect();
1448                            let output = HollowBatch::new_run(req.desc, parts, len);
1449                            leader
1450                                .collections
1451                                .trace
1452                                .apply_merge_res_unchecked(&FueledMergeRes {
1453                                    output,
1454                                    input: CompactionInput::Legacy,
1455                                    new_active_compaction: None,
1456                                });
1457                        }
1458                    }
1459                }
1460                leader.collections.trace.roundtrip_structure = roundtrip_structure;
1461                leader.seqno.0 += 1;
1462                let diff = StateDiff::from_diff(&old_leader, &leader);
1463
1464                // Validate that the diff applies to both the previous state (also checked in
1465                // debug asserts) and our follower that's only synchronized via diffs.
1466                old_leader
1467                    .apply_diff(metrics, diff.clone())
1468                    .expect("diff applies to the old version of the leader state");
1469                follower
1470                    .apply_diff(metrics, diff.clone())
1471                    .expect("diff applies to the synced version of the follower state");
1472
1473                // TODO: once spine structure is roundtripped through diffs, assert that the follower
1474                // has the same batches etc. as the leader does.
1475            }
1476        }
1477
1478        let config = PersistConfig::new_for_tests();
1479        let metrics_registry = MetricsRegistry::new();
1480        let metrics: Metrics = Metrics::new(&config, &metrics_registry);
1481
1482        proptest!(|(actions in prop::collection::vec((action_gen, any::<bool>()), 1..20))| {
1483            run(actions, &metrics)
1484        })
1485    }
1486
1487    // Regression test for the apply_diffs_spine special case that sniffs out an
1488    // insert, applies it, and then lets the remaining diffs (if any) fall
1489    // through to the rest of the code. See database-issues#4431.
1490    #[mz_ore::test]
1491    fn regression_15493_sniff_insert() {
1492        fn hb(lower: u64, upper: u64, len: usize) -> HollowBatch<u64> {
1493            HollowBatch::new_run(
1494                Description::new(
1495                    Antichain::from_elem(lower),
1496                    Antichain::from_elem(upper),
1497                    Antichain::from_elem(0),
1498                ),
1499                Vec::new(),
1500                len,
1501            )
1502        }
1503
1504        // The bug handled here is essentially a set of batches that look like
1505        // the pattern matched by `apply_lenient` _plus_ an insert. In
1506        // apply_diffs_spine, we use `sniff_insert` to steal the insert out of
1507        // the diffs and fall back to the rest of the logic to handle the
1508        // remaining diffs.
1509        //
1510        // Concretely, something like (the numbers are truncated versions of the
1511        // actual bug posted in the issue):
1512        // - spine: [0][7094664]0, [7094664][7185234]100
1513        // - diffs: [0][6805359]0 del, [6805359][7083793]0 del, [0][7083793]0 ins,
1514        //   [7185234][7185859]20 ins
1515        //
1516        // Where this allows us to handle the [7185234,7185859) and then
1517        // apply_lenient handles splitting up [0,7094664) so we can apply the
1518        // [0,6805359)+[6805359,7083793)->[0,7083793) swap.
1519
1520        let batches_before = vec![hb(0, 7094664, 0), hb(7094664, 7185234, 100)];
1521
1522        let diffs = vec![
1523            StateFieldDiff {
1524                key: hb(0, 6805359, 0),
1525                val: StateFieldValDiff::Delete(()),
1526            },
1527            StateFieldDiff {
1528                key: hb(6805359, 7083793, 0),
1529                val: StateFieldValDiff::Delete(()),
1530            },
1531            StateFieldDiff {
1532                key: hb(0, 7083793, 0),
1533                val: StateFieldValDiff::Insert(()),
1534            },
1535            StateFieldDiff {
1536                key: hb(7185234, 7185859, 20),
1537                val: StateFieldValDiff::Insert(()),
1538            },
1539        ];
1540
1541        // Ideally this first batch would be [0][7083793], [7083793,7094664]
1542        // here because `apply_lenient` splits it out, but when `apply_lenient`
1543        // reconstructs the trace, Spine happens to (deterministically) collapse
1544        // them back together. The main value of this test is that the
1545        // `apply_diffs_spine` call below doesn't return an Err, so don't worry
1546        // too much about this, it's just a sanity check.
1547        let batches_after = vec![
1548            hb(0, 7094664, 0),
1549            hb(7094664, 7185234, 100),
1550            hb(7185234, 7185859, 20),
1551        ];
1552
1553        let cfg = PersistConfig::new_for_tests();
1554        let state = TypedState::<(), (), u64, i64>::new(
1555            cfg.build_version.clone(),
1556            ShardId::new(),
1557            cfg.hostname.clone(),
1558            (cfg.now)(),
1559        );
1560        let state = state.clone_apply(&cfg, &mut |_seqno, _cfg, state| {
1561            for b in batches_before.iter() {
1562                let _merge_reqs = state.trace.push_batch(b.clone());
1563            }
1564            Continue::<(), ()>(())
1565        });
1566        let mut state = match state {
1567            Continue((_, x)) => x,
1568            _ => unreachable!(),
1569        };
1570
1571        let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1572        assert_eq!(
1573            apply_diffs_spine(&metrics, diffs, &mut state.collections.trace),
1574            Ok(())
1575        );
1576
1577        let mut actual = Vec::new();
1578        state
1579            .collections
1580            .trace
1581            .map_batches(|b| actual.push(b.clone()));
1582        assert_eq!(actual, batches_after);
1583    }
1584
1585    #[mz_ore::test]
1586    #[cfg_attr(miri, ignore)] // too slow
1587    fn apply_lenient() {
1588        #[track_caller]
1589        fn testcase(
1590            replacement: (u64, u64, u64, usize),
1591            spine: &[(u64, u64, u64, usize)],
1592            expected: Result<&[(u64, u64, u64, usize)], &str>,
1593        ) {
1594            fn batch(x: &(u64, u64, u64, usize)) -> HollowBatch<u64> {
1595                let (lower, upper, since, len) = x;
1596                let desc = Description::new(
1597                    Antichain::from_elem(*lower),
1598                    Antichain::from_elem(*upper),
1599                    Antichain::from_elem(*since),
1600                );
1601                HollowBatch::new_run(desc, Vec::new(), *len)
1602            }
1603            let replacement = batch(&replacement);
1604            let batches = spine.iter().map(batch).collect::<Vec<_>>();
1605
1606            let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1607            let actual = apply_compaction_lenient(&metrics, batches, &replacement);
1608            let expected = match expected {
1609                Ok(batches) => Ok(batches.iter().map(batch).collect::<Vec<_>>()),
1610                Err(err) => Err(err.to_owned()),
1611            };
1612            assert_eq!(actual, expected);
1613        }
1614
1615        // Exact swap of N batches
1616        testcase(
1617            (0, 3, 0, 100),
1618            &[(0, 1, 0, 0), (1, 2, 0, 0), (2, 3, 0, 0)],
1619            Ok(&[(0, 3, 0, 100)]),
1620        );
1621
1622        // Swap out the middle of a batch
1623        testcase(
1624            (1, 2, 0, 100),
1625            &[(0, 3, 0, 0)],
1626            Ok(&[(0, 1, 0, 0), (1, 2, 0, 100), (2, 3, 0, 0)]),
1627        );
1628
1629        // Split batch at replacement lower
1630        testcase(
1631            (2, 4, 0, 100),
1632            &[(0, 3, 0, 0), (3, 4, 0, 0)],
1633            Ok(&[(0, 2, 0, 0), (2, 4, 0, 100)]),
1634        );
1635
1636        // Err: split batch at replacement lower not empty
1637        testcase(
1638            (2, 4, 0, 100),
1639            &[(0, 3, 0, 1), (3, 4, 0, 0)],
1640            Err(
1641                "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([0], [3], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1642            ),
1643        );
1644
1645        // Split batch at replacement lower (untouched batch before the split one)
1646        testcase(
1647            (2, 4, 0, 100),
1648            &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1649            Ok(&[(0, 1, 0, 0), (1, 2, 0, 0), (2, 4, 0, 100)]),
1650        );
1651
1652        // Split batch at replacement lower (since is preserved)
1653        testcase(
1654            (2, 4, 0, 100),
1655            &[(0, 3, 200, 0), (3, 4, 0, 0)],
1656            Ok(&[(0, 2, 200, 0), (2, 4, 0, 100)]),
1657        );
1658
1659        // Split batch at replacement upper
1660        testcase(
1661            (0, 2, 0, 100),
1662            &[(0, 1, 0, 0), (1, 4, 0, 0)],
1663            Ok(&[(0, 2, 0, 100), (2, 4, 0, 0)]),
1664        );
1665
1666        // Err: split batch at replacement upper not empty
1667        testcase(
1668            (0, 2, 0, 100),
1669            &[(0, 1, 0, 0), (1, 4, 0, 1)],
1670            Err(
1671                "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([1], [4], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1672            ),
1673        );
1674
1675        // Split batch at replacement upper (untouched batch after the split one)
1676        testcase(
1677            (0, 2, 0, 100),
1678            &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1679            Ok(&[(0, 2, 0, 100), (2, 3, 0, 0), (3, 4, 0, 0)]),
1680        );
1681
1682        // Split batch at replacement upper (since is preserved)
1683        testcase(
1684            (0, 2, 0, 100),
1685            &[(0, 1, 0, 0), (1, 4, 200, 0)],
1686            Ok(&[(0, 2, 0, 100), (2, 4, 200, 0)]),
1687        );
1688
1689        // Split batch at replacement lower and upper
1690        testcase(
1691            (2, 6, 0, 100),
1692            &[(0, 3, 0, 0), (3, 5, 0, 0), (5, 8, 0, 0)],
1693            Ok(&[(0, 2, 0, 0), (2, 6, 0, 100), (6, 8, 0, 0)]),
1694        );
1695
1696        // Replacement doesn't overlap (after)
1697        testcase(
1698            (2, 3, 0, 100),
1699            &[(0, 1, 0, 0)],
1700            Err("replacement didn't overlap any batches"),
1701        );
1702
1703        // Replacement doesn't overlap (before, though this would never happen in practice)
1704        testcase(
1705            (2, 3, 0, 100),
1706            &[(4, 5, 0, 0)],
1707            Err("replacement didn't overlap any batches"),
1708        );
1709    }
1710}