mz_persist_client/internal/
state_diff.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use bytes::{Bytes, BytesMut};
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::Description;
18use mz_ore::assert_none;
19use mz_ore::cast::CastFrom;
20use mz_persist::location::{SeqNo, VersionedData};
21use mz_persist_types::Codec64;
22use mz_persist_types::schema::SchemaId;
23use mz_proto::TryFromProtoError;
24use timely::PartialOrder;
25use timely::progress::{Antichain, Timestamp};
26use tracing::debug;
27
28use crate::critical::CriticalReaderId;
29use crate::internal::paths::PartialRollupKey;
30use crate::internal::state::{
31    CriticalReaderState, EncodedSchemas, HollowBatch, HollowBlobRef, HollowRollup,
32    LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
33    State, StateCollections, WriterState,
34};
35use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
36use crate::read::LeasedReaderId;
37use crate::write::WriterId;
38use crate::{Metrics, PersistConfig, ShardId};
39
40use StateFieldValDiff::*;
41
42use super::state::{ActiveGc, ActiveRollup};
43
44#[derive(Clone, Debug)]
45#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
46pub enum StateFieldValDiff<V> {
47    Insert(V),
48    Update(V, V),
49    Delete(V),
50}
51
52#[derive(Clone)]
53#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
54pub struct StateFieldDiff<K, V> {
55    pub key: K,
56    pub val: StateFieldValDiff<V>,
57}
58
59impl<K: Debug, V: Debug> std::fmt::Debug for StateFieldDiff<K, V> {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("StateFieldDiff")
62            // In the cases we've seen in the wild, it's been more useful to
63            // have the val printed first.
64            .field("val", &self.val)
65            .field("key", &self.key)
66            .finish()
67    }
68}
69
70#[derive(Debug)]
71#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
72pub struct StateDiff<T> {
73    pub(crate) applier_version: semver::Version,
74    pub(crate) seqno_from: SeqNo,
75    pub(crate) seqno_to: SeqNo,
76    pub(crate) walltime_ms: u64,
77    pub(crate) latest_rollup_key: PartialRollupKey,
78    pub(crate) rollups: Vec<StateFieldDiff<SeqNo, HollowRollup>>,
79    pub(crate) active_rollup: Vec<StateFieldDiff<(), ActiveRollup>>,
80    pub(crate) active_gc: Vec<StateFieldDiff<(), ActiveGc>>,
81    pub(crate) hostname: Vec<StateFieldDiff<(), String>>,
82    pub(crate) last_gc_req: Vec<StateFieldDiff<(), SeqNo>>,
83    pub(crate) leased_readers: Vec<StateFieldDiff<LeasedReaderId, LeasedReaderState<T>>>,
84    pub(crate) critical_readers: Vec<StateFieldDiff<CriticalReaderId, CriticalReaderState<T>>>,
85    pub(crate) writers: Vec<StateFieldDiff<WriterId, WriterState<T>>>,
86    pub(crate) schemas: Vec<StateFieldDiff<SchemaId, EncodedSchemas>>,
87    pub(crate) since: Vec<StateFieldDiff<(), Antichain<T>>>,
88    pub(crate) legacy_batches: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
89    pub(crate) hollow_batches: Vec<StateFieldDiff<SpineId, Arc<HollowBatch<T>>>>,
90    pub(crate) spine_batches: Vec<StateFieldDiff<SpineId, ThinSpineBatch<T>>>,
91    pub(crate) merges: Vec<StateFieldDiff<SpineId, ThinMerge<T>>>,
92}
93
94impl<T: Timestamp + Codec64> StateDiff<T> {
95    pub fn new(
96        applier_version: semver::Version,
97        seqno_from: SeqNo,
98        seqno_to: SeqNo,
99        walltime_ms: u64,
100        latest_rollup_key: PartialRollupKey,
101    ) -> Self {
102        StateDiff {
103            applier_version,
104            seqno_from,
105            seqno_to,
106            walltime_ms,
107            latest_rollup_key,
108            rollups: Vec::default(),
109            active_rollup: Vec::default(),
110            active_gc: Vec::default(),
111            hostname: Vec::default(),
112            last_gc_req: Vec::default(),
113            leased_readers: Vec::default(),
114            critical_readers: Vec::default(),
115            writers: Vec::default(),
116            schemas: Vec::default(),
117            since: Vec::default(),
118            legacy_batches: Vec::default(),
119            hollow_batches: Vec::default(),
120            spine_batches: Vec::default(),
121            merges: Vec::default(),
122        }
123    }
124
125    pub fn referenced_batches(&self) -> impl Iterator<Item = StateFieldValDiff<&HollowBatch<T>>> {
126        let legacy_batches = self
127            .legacy_batches
128            .iter()
129            .filter_map(|diff| match diff.val {
130                Insert(()) => Some(Insert(&diff.key)),
131                Update((), ()) => None, // Ignoring a noop diff.
132                Delete(()) => Some(Delete(&diff.key)),
133            });
134        let hollow_batches = self.hollow_batches.iter().map(|diff| match &diff.val {
135            Insert(batch) => Insert(&**batch),
136            Update(before, after) => Update(&**before, &**after),
137            Delete(batch) => Delete(&**batch),
138        });
139        legacy_batches.chain(hollow_batches)
140    }
141}
142
143impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
144    pub fn from_diff(from: &State<T>, to: &State<T>) -> Self {
145        // Deconstruct from and to so we get a compile failure if new
146        // fields are added.
147        let State {
148            applier_version: _,
149            shard_id: from_shard_id,
150            seqno: from_seqno,
151            hostname: from_hostname,
152            walltime_ms: _, // Intentionally unused
153            collections:
154                StateCollections {
155                    last_gc_req: from_last_gc_req,
156                    rollups: from_rollups,
157                    active_rollup: from_active_rollup,
158                    active_gc: from_active_gc,
159                    leased_readers: from_leased_readers,
160                    critical_readers: from_critical_readers,
161                    writers: from_writers,
162                    schemas: from_schemas,
163                    trace: from_trace,
164                },
165        } = from;
166        let State {
167            applier_version: to_applier_version,
168            shard_id: to_shard_id,
169            seqno: to_seqno,
170            walltime_ms: to_walltime_ms,
171            hostname: to_hostname,
172            collections:
173                StateCollections {
174                    last_gc_req: to_last_gc_req,
175                    rollups: to_rollups,
176                    active_rollup: to_active_rollup,
177                    active_gc: to_active_gc,
178                    leased_readers: to_leased_readers,
179                    critical_readers: to_critical_readers,
180                    writers: to_writers,
181                    schemas: to_schemas,
182                    trace: to_trace,
183                },
184        } = to;
185        assert_eq!(from_shard_id, to_shard_id);
186
187        let (_, latest_rollup) = to.latest_rollup();
188        let mut diffs = Self::new(
189            to_applier_version.clone(),
190            *from_seqno,
191            *to_seqno,
192            *to_walltime_ms,
193            latest_rollup.key.clone(),
194        );
195        diff_field_single(from_hostname, to_hostname, &mut diffs.hostname);
196        diff_field_single(from_last_gc_req, to_last_gc_req, &mut diffs.last_gc_req);
197        diff_field_sorted_iter(
198            from_active_rollup.iter().map(|r| (&(), r)),
199            to_active_rollup.iter().map(|r| (&(), r)),
200            &mut diffs.active_rollup,
201        );
202        diff_field_sorted_iter(
203            from_active_gc.iter().map(|g| (&(), g)),
204            to_active_gc.iter().map(|g| (&(), g)),
205            &mut diffs.active_gc,
206        );
207        diff_field_sorted_iter(from_rollups.iter(), to_rollups, &mut diffs.rollups);
208        diff_field_sorted_iter(
209            from_leased_readers.iter(),
210            to_leased_readers,
211            &mut diffs.leased_readers,
212        );
213        diff_field_sorted_iter(
214            from_critical_readers.iter(),
215            to_critical_readers,
216            &mut diffs.critical_readers,
217        );
218        diff_field_sorted_iter(from_writers.iter(), to_writers, &mut diffs.writers);
219        diff_field_sorted_iter(from_schemas.iter(), to_schemas, &mut diffs.schemas);
220        diff_field_single(from_trace.since(), to_trace.since(), &mut diffs.since);
221
222        let from_flat = from_trace.flatten();
223        let to_flat = to_trace.flatten();
224        diff_field_sorted_iter(
225            from_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
226            to_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
227            &mut diffs.legacy_batches,
228        );
229        diff_field_sorted_iter(
230            from_flat.hollow_batches.iter(),
231            to_flat.hollow_batches.iter(),
232            &mut diffs.hollow_batches,
233        );
234        diff_field_sorted_iter(
235            from_flat.spine_batches.iter(),
236            to_flat.spine_batches.iter(),
237            &mut diffs.spine_batches,
238        );
239        diff_field_sorted_iter(
240            from_flat.merges.iter(),
241            to_flat.merges.iter(),
242            &mut diffs.merges,
243        );
244        diffs
245    }
246
247    pub(crate) fn blob_inserts(&self) -> impl Iterator<Item = HollowBlobRef<T>> {
248        let batches = self
249            .referenced_batches()
250            .filter_map(|spine_diff| match spine_diff {
251                Insert(b) | Update(_, b) => Some(HollowBlobRef::Batch(b)),
252                Delete(_) => None, // No-op
253            });
254        let rollups = self
255            .rollups
256            .iter()
257            .filter_map(|rollups_diff| match &rollups_diff.val {
258                StateFieldValDiff::Insert(x) | StateFieldValDiff::Update(_, x) => {
259                    Some(HollowBlobRef::Rollup(x))
260                }
261                StateFieldValDiff::Delete(_) => None, // No-op
262            });
263        batches.chain(rollups)
264    }
265
266    pub(crate) fn part_deletes(&self) -> impl Iterator<Item = &RunPart<T>> {
267        // With the introduction of incremental compaction, we
268        // need to be more careful about what we consider "deleted".
269        // If there is a HollowBatch that we replace 2 out of the 4 runs of,
270        // we need to ensure that we only delete the runs that are actually
271        // no longer referenced.
272        let removed = self
273            .referenced_batches()
274            .filter_map(|spine_diff| match spine_diff {
275                Insert(_) => None,
276                Update(a, _) | Delete(a) => Some(a.parts.iter().collect::<Vec<_>>()),
277            });
278
279        let added: std::collections::BTreeSet<_> = self
280            .referenced_batches()
281            .filter_map(|spine_diff| match spine_diff {
282                Insert(a) | Update(_, a) => Some(a.parts.iter().collect::<Vec<_>>()),
283                Delete(_) => None,
284            })
285            .flatten()
286            .collect();
287
288        removed
289            .into_iter()
290            .flat_map(|x| x)
291            .filter(move |part| !added.contains(part))
292    }
293
294    pub(crate) fn rollup_deletes(&self) -> impl Iterator<Item = &HollowRollup> {
295        self.rollups
296            .iter()
297            .filter_map(|rollups_diff| match &rollups_diff.val {
298                Insert(_) => None,
299                Update(a, _) | Delete(a) => Some(a),
300            })
301    }
302
303    #[cfg(any(test, debug_assertions))]
304    #[allow(dead_code)]
305    pub fn validate_roundtrip<K, V, D>(
306        metrics: &Metrics,
307        from_state: &crate::internal::state::TypedState<K, V, T, D>,
308        diff: &Self,
309        to_state: &crate::internal::state::TypedState<K, V, T, D>,
310    ) -> Result<(), String>
311    where
312        K: mz_persist_types::Codec + std::fmt::Debug,
313        V: mz_persist_types::Codec + std::fmt::Debug,
314        D: differential_dataflow::difference::Semigroup + Codec64,
315    {
316        use mz_proto::RustType;
317        use prost::Message;
318
319        use crate::internal::state::ProtoStateDiff;
320
321        let mut roundtrip_state = from_state.clone(
322            from_state.applier_version.clone(),
323            from_state.hostname.clone(),
324        );
325        roundtrip_state.apply_diff(metrics, diff.clone())?;
326
327        if &roundtrip_state != to_state {
328            // The weird spacing in this format string is so they all line up
329            // when printed out.
330            return Err(format!(
331                "state didn't roundtrip\n  from_state {:?}\n  to_state   {:?}\n  rt_state   {:?}\n  diff       {:?}\n",
332                from_state, to_state, roundtrip_state, diff
333            ));
334        }
335
336        let encoded_diff = diff.into_proto().encode_to_vec();
337        let roundtrip_diff = Self::from_proto(
338            ProtoStateDiff::decode(encoded_diff.as_slice()).map_err(|err| err.to_string())?,
339        )
340        .map_err(|err| err.to_string())?;
341
342        if &roundtrip_diff != diff {
343            // The weird spacing in this format string is so they all line up
344            // when printed out.
345            return Err(format!(
346                "diff didn't roundtrip\n  diff    {:?}\n  rt_diff {:?}",
347                diff, roundtrip_diff
348            ));
349        }
350
351        Ok(())
352    }
353}
354
355impl<T: Timestamp + Lattice + Codec64> State<T> {
356    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
357        &mut self,
358        cfg: &PersistConfig,
359        metrics: &Metrics,
360        diffs: I,
361    ) {
362        let mut state_seqno = self.seqno;
363        let diffs = diffs.into_iter().filter_map(move |x| {
364            if x.seqno != state_seqno.next() {
365                // No-op.
366                return None;
367            }
368            let data = x.data.clone();
369            let diff = metrics
370                .codecs
371                .state_diff
372                // Note: `x.data` is a `Bytes`, so cloning just increments a ref count
373                .decode(|| StateDiff::decode(&cfg.build_version, x.data.clone()));
374            assert_eq!(diff.seqno_from, state_seqno);
375            state_seqno = diff.seqno_to;
376            Some((diff, data))
377        });
378        self.apply_diffs(metrics, diffs);
379    }
380}
381
382impl<T: Timestamp + Lattice + Codec64> State<T> {
383    pub fn apply_diffs<I: IntoIterator<Item = (StateDiff<T>, Bytes)>>(
384        &mut self,
385        metrics: &Metrics,
386        diffs: I,
387    ) {
388        for (diff, data) in diffs {
389            // TODO: This could special-case batch apply for diffs where it's
390            // more efficient (in particular, spine batches that hit the slow
391            // path).
392            match self.apply_diff(metrics, diff) {
393                Ok(()) => {}
394                Err(err) => {
395                    // Having the full diff in the error message is critical for debugging any
396                    // issues that may arise from diff application. We pass along the original
397                    // Bytes it decoded from just so we can decode in this error path, while
398                    // avoiding any extraneous clones in the expected Ok path.
399                    let diff = StateDiff::<T>::decode(&self.applier_version, data);
400                    panic!(
401                        "state diff should apply cleanly: {} diff {:?} state {:?}",
402                        err, diff, self
403                    )
404                }
405            }
406        }
407    }
408
409    // Intentionally not even pub(crate) because all callers should use
410    // [Self::apply_diffs].
411    pub(super) fn apply_diff(
412        &mut self,
413        metrics: &Metrics,
414        diff: StateDiff<T>,
415    ) -> Result<(), String> {
416        // Deconstruct diff so we get a compile failure if new fields are added.
417        let StateDiff {
418            applier_version: diff_applier_version,
419            seqno_from: diff_seqno_from,
420            seqno_to: diff_seqno_to,
421            walltime_ms: diff_walltime_ms,
422            latest_rollup_key: _,
423            rollups: diff_rollups,
424            active_rollup: diff_active_rollup,
425            active_gc: diff_active_gc,
426            hostname: diff_hostname,
427            last_gc_req: diff_last_gc_req,
428            leased_readers: diff_leased_readers,
429            critical_readers: diff_critical_readers,
430            writers: diff_writers,
431            schemas: diff_schemas,
432            since: diff_since,
433            legacy_batches: diff_legacy_batches,
434            hollow_batches: diff_hollow_batches,
435            spine_batches: diff_spine_batches,
436            merges: diff_merges,
437        } = diff;
438        if self.seqno == diff_seqno_to {
439            return Ok(());
440        }
441        if self.seqno != diff_seqno_from {
442            return Err(format!(
443                "could not apply diff {} -> {} to state {}",
444                diff_seqno_from, diff_seqno_to, self.seqno
445            ));
446        }
447        self.seqno = diff_seqno_to;
448        self.applier_version = diff_applier_version;
449        self.walltime_ms = diff_walltime_ms;
450        force_apply_diffs_single(
451            &self.shard_id,
452            diff_seqno_to,
453            "hostname",
454            diff_hostname,
455            &mut self.hostname,
456            metrics,
457        )?;
458
459        // Deconstruct collections so we get a compile failure if new fields are
460        // added.
461        let StateCollections {
462            last_gc_req,
463            rollups,
464            active_rollup,
465            active_gc,
466            leased_readers,
467            critical_readers,
468            writers,
469            schemas,
470            trace,
471        } = &mut self.collections;
472
473        apply_diffs_map("rollups", diff_rollups, rollups)?;
474        apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?;
475        apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?;
476        apply_diffs_single_option("active_gc", diff_active_gc, active_gc)?;
477        apply_diffs_map("leased_readers", diff_leased_readers, leased_readers)?;
478        apply_diffs_map("critical_readers", diff_critical_readers, critical_readers)?;
479        apply_diffs_map("writers", diff_writers, writers)?;
480        apply_diffs_map("schemas", diff_schemas, schemas)?;
481
482        let structure_unchanged = diff_hollow_batches.is_empty()
483            && diff_spine_batches.is_empty()
484            && diff_merges.is_empty();
485        let spine_unchanged =
486            diff_since.is_empty() && diff_legacy_batches.is_empty() && structure_unchanged;
487
488        if spine_unchanged {
489            return Ok(());
490        }
491
492        let mut flat = if trace.roundtrip_structure {
493            metrics.state.apply_spine_flattened.inc();
494            let mut flat = trace.flatten();
495            apply_diffs_single("since", diff_since, &mut flat.since)?;
496            apply_diffs_map(
497                "legacy_batches",
498                diff_legacy_batches
499                    .into_iter()
500                    .map(|StateFieldDiff { key, val }| StateFieldDiff {
501                        key: Arc::new(key),
502                        val,
503                    }),
504                &mut flat.legacy_batches,
505            )?;
506            Some(flat)
507        } else {
508            for x in diff_since {
509                match x.val {
510                    Update(from, to) => {
511                        if trace.since() != &from {
512                            return Err(format!(
513                                "since update didn't match: {:?} vs {:?}",
514                                self.collections.trace.since(),
515                                &from
516                            ));
517                        }
518                        trace.downgrade_since(&to);
519                    }
520                    Insert(_) => return Err("cannot insert since field".to_string()),
521                    Delete(_) => return Err("cannot delete since field".to_string()),
522                }
523            }
524            if !diff_legacy_batches.is_empty() {
525                apply_diffs_spine(metrics, diff_legacy_batches, trace)?;
526                debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
527            }
528            None
529        };
530
531        if !structure_unchanged {
532            let flat = flat.get_or_insert_with(|| trace.flatten());
533            apply_diffs_map(
534                "hollow_batches",
535                diff_hollow_batches,
536                &mut flat.hollow_batches,
537            )?;
538            apply_diffs_map("spine_batches", diff_spine_batches, &mut flat.spine_batches)?;
539            apply_diffs_map("merges", diff_merges, &mut flat.merges)?;
540        }
541
542        if let Some(flat) = flat {
543            *trace = Trace::unflatten(flat)?;
544        }
545
546        // There's various sanity checks that this method could run (e.g. since,
547        // upper, seqno_since, etc don't regress or that diff.latest_rollup ==
548        // state.rollups.last()), are they a good idea? On one hand, I like
549        // sanity checks, other the other, one of the goals here is to keep
550        // apply logic as straightforward and unchanging as possible.
551        Ok(())
552    }
553}
554
555fn diff_field_single<T: PartialEq + Clone>(
556    from: &T,
557    to: &T,
558    diffs: &mut Vec<StateFieldDiff<(), T>>,
559) {
560    // This could use the `diff_field_sorted_iter(once(from), once(to), diffs)`
561    // general impl, but we just do the obvious thing.
562    if from != to {
563        diffs.push(StateFieldDiff {
564            key: (),
565            val: Update(from.clone(), to.clone()),
566        })
567    }
568}
569
570fn apply_diffs_single_option<X: PartialEq + Debug>(
571    name: &str,
572    diffs: Vec<StateFieldDiff<(), X>>,
573    single: &mut Option<X>,
574) -> Result<(), String> {
575    for diff in diffs {
576        apply_diff_single_option(name, diff, single)?;
577    }
578    Ok(())
579}
580
581fn apply_diff_single_option<X: PartialEq + Debug>(
582    name: &str,
583    diff: StateFieldDiff<(), X>,
584    single: &mut Option<X>,
585) -> Result<(), String> {
586    match diff.val {
587        Update(from, to) => {
588            if single.as_ref() != Some(&from) {
589                return Err(format!(
590                    "{} update didn't match: {:?} vs {:?}",
591                    name, single, &from
592                ));
593            }
594            *single = Some(to)
595        }
596        Insert(to) => {
597            if single.is_some() {
598                return Err(format!("{} insert found existing value", name));
599            }
600            *single = Some(to)
601        }
602        Delete(from) => {
603            if single.as_ref() != Some(&from) {
604                return Err(format!(
605                    "{} delete didn't match: {:?} vs {:?}",
606                    name, single, &from
607                ));
608            }
609            *single = None
610        }
611    }
612    Ok(())
613}
614
615fn apply_diffs_single<X: PartialEq + Debug>(
616    name: &str,
617    diffs: Vec<StateFieldDiff<(), X>>,
618    single: &mut X,
619) -> Result<(), String> {
620    for diff in diffs {
621        apply_diff_single(name, diff, single)?;
622    }
623    Ok(())
624}
625
626fn apply_diff_single<X: PartialEq + Debug>(
627    name: &str,
628    diff: StateFieldDiff<(), X>,
629    single: &mut X,
630) -> Result<(), String> {
631    match diff.val {
632        Update(from, to) => {
633            if single != &from {
634                return Err(format!(
635                    "{} update didn't match: {:?} vs {:?}",
636                    name, single, &from
637                ));
638            }
639            *single = to
640        }
641        Insert(_) => return Err(format!("cannot insert {} field", name)),
642        Delete(_) => return Err(format!("cannot delete {} field", name)),
643    }
644    Ok(())
645}
646
647// A hack to force apply a diff, making `single` equal to
648// the Update `to` value, ignoring a mismatch on `from`.
649// Used to migrate forward after writing down incorrect
650// diffs.
651//
652// TODO: delete this once `hostname` has zero mismatches
653fn force_apply_diffs_single<X: PartialEq + Debug>(
654    shard_id: &ShardId,
655    seqno: SeqNo,
656    name: &str,
657    diffs: Vec<StateFieldDiff<(), X>>,
658    single: &mut X,
659    metrics: &Metrics,
660) -> Result<(), String> {
661    for diff in diffs {
662        force_apply_diff_single(shard_id, seqno, name, diff, single, metrics)?;
663    }
664    Ok(())
665}
666
667fn force_apply_diff_single<X: PartialEq + Debug>(
668    shard_id: &ShardId,
669    seqno: SeqNo,
670    name: &str,
671    diff: StateFieldDiff<(), X>,
672    single: &mut X,
673    metrics: &Metrics,
674) -> Result<(), String> {
675    match diff.val {
676        Update(from, to) => {
677            if single != &from {
678                debug!(
679                    "{}: update didn't match: {:?} vs {:?}, continuing to force apply diff to {:?} for shard {} and seqno {}",
680                    name, single, &from, &to, shard_id, seqno
681                );
682                metrics.state.force_apply_hostname.inc();
683            }
684            *single = to
685        }
686        Insert(_) => return Err(format!("cannot insert {} field", name)),
687        Delete(_) => return Err(format!("cannot delete {} field", name)),
688    }
689    Ok(())
690}
691
692fn diff_field_sorted_iter<'a, K, V, IF, IT>(from: IF, to: IT, diffs: &mut Vec<StateFieldDiff<K, V>>)
693where
694    K: Ord + Clone + 'a,
695    V: PartialEq + Clone + 'a,
696    IF: IntoIterator<Item = (&'a K, &'a V)>,
697    IT: IntoIterator<Item = (&'a K, &'a V)>,
698{
699    let (mut from, mut to) = (from.into_iter(), to.into_iter());
700    let (mut f, mut t) = (from.next(), to.next());
701    loop {
702        match (f, t) {
703            (None, None) => break,
704            (Some((fk, fv)), Some((tk, tv))) => match fk.cmp(tk) {
705                Ordering::Less => {
706                    diffs.push(StateFieldDiff {
707                        key: fk.clone(),
708                        val: Delete(fv.clone()),
709                    });
710                    let f_next = from.next();
711                    debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
712                    f = f_next;
713                }
714                Ordering::Greater => {
715                    diffs.push(StateFieldDiff {
716                        key: tk.clone(),
717                        val: Insert(tv.clone()),
718                    });
719                    let t_next = to.next();
720                    debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
721                    t = t_next;
722                }
723                Ordering::Equal => {
724                    // TODO: regression test for this if, I missed it in the
725                    // original impl :)
726                    if fv != tv {
727                        diffs.push(StateFieldDiff {
728                            key: fk.clone(),
729                            val: Update(fv.clone(), tv.clone()),
730                        });
731                    }
732                    let f_next = from.next();
733                    debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
734                    f = f_next;
735                    let t_next = to.next();
736                    debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
737                    t = t_next;
738                }
739            },
740            (None, Some((tk, tv))) => {
741                diffs.push(StateFieldDiff {
742                    key: tk.clone(),
743                    val: Insert(tv.clone()),
744                });
745                let t_next = to.next();
746                debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
747                t = t_next;
748            }
749            (Some((fk, fv)), None) => {
750                diffs.push(StateFieldDiff {
751                    key: fk.clone(),
752                    val: Delete(fv.clone()),
753                });
754                let f_next = from.next();
755                debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
756                f = f_next;
757            }
758        }
759    }
760}
761
762fn apply_diffs_map<K: Ord, V: PartialEq + Debug>(
763    name: &str,
764    diffs: impl IntoIterator<Item = StateFieldDiff<K, V>>,
765    map: &mut BTreeMap<K, V>,
766) -> Result<(), String> {
767    for diff in diffs {
768        apply_diff_map(name, diff, map)?;
769    }
770    Ok(())
771}
772
773// This might leave state in an invalid (umm) state when returning an error. The
774// caller ultimately ends up panic'ing on error, but if that changes, we might
775// want to revisit this.
776fn apply_diff_map<K: Ord, V: PartialEq + Debug>(
777    name: &str,
778    diff: StateFieldDiff<K, V>,
779    map: &mut BTreeMap<K, V>,
780) -> Result<(), String> {
781    match diff.val {
782        Insert(to) => {
783            let prev = map.insert(diff.key, to);
784            if prev != None {
785                return Err(format!("{} insert found existing value: {:?}", name, prev));
786            }
787        }
788        Update(from, to) => {
789            let prev = map.insert(diff.key, to);
790            if prev.as_ref() != Some(&from) {
791                return Err(format!(
792                    "{} update didn't match: {:?} vs {:?}",
793                    name,
794                    prev,
795                    Some(from),
796                ));
797            }
798        }
799        Delete(from) => {
800            let prev = map.remove(&diff.key);
801            if prev.as_ref() != Some(&from) {
802                return Err(format!(
803                    "{} delete didn't match: {:?} vs {:?}",
804                    name,
805                    prev,
806                    Some(from),
807                ));
808            }
809        }
810    };
811    Ok(())
812}
813
814// This might leave state in an invalid (umm) state when returning an error. The
815// caller ultimately ends up panic'ing on error, but if that changes, we might
816// want to revisit this.
817fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
818    metrics: &Metrics,
819    mut diffs: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
820    trace: &mut Trace<T>,
821) -> Result<(), String> {
822    // Another special case: sniff out a newly inserted batch (one whose lower
823    // lines up with the current upper) and handle that now. Then fall through
824    // to the rest of the handling on whatever is left.
825    if let Some(insert) = sniff_insert(&mut diffs, trace.upper()) {
826        // Ignore merge_reqs because whichever process generated this diff is
827        // assigned the work.
828        let () = trace.push_batch_no_merge_reqs(insert);
829        // If this insert was the only thing in diffs, then return now instead
830        // of falling through to the "no diffs" case in the match so we can inc
831        // the apply_spine_fast_path metric.
832        if diffs.is_empty() {
833            metrics.state.apply_spine_fast_path.inc();
834            return Ok(());
835        }
836    }
837
838    match &diffs[..] {
839        // Fast-path: no diffs.
840        [] => return Ok(()),
841
842        // Fast-path: batch insert with both new and most recent batch empty.
843        // Spine will happily merge these empty batches together without a call
844        // out to compaction.
845        [
846            StateFieldDiff {
847                key: del,
848                val: StateFieldValDiff::Delete(()),
849            },
850            StateFieldDiff {
851                key: ins,
852                val: StateFieldValDiff::Insert(()),
853            },
854        ] => {
855            if del.is_empty()
856                && ins.is_empty()
857                && del.desc.lower() == ins.desc.lower()
858                && PartialOrder::less_than(del.desc.upper(), ins.desc.upper())
859            {
860                // Ignore merge_reqs because whichever process generated this diff is
861                // assigned the work.
862                let () = trace.push_batch_no_merge_reqs(HollowBatch::empty(Description::new(
863                    del.desc.upper().clone(),
864                    ins.desc.upper().clone(),
865                    // `keys.len() == 0` for both `del` and `ins` means we
866                    // don't have to think about what the compaction
867                    // frontier is for these batches (nothing in them, so nothing could have been compacted.
868                    Antichain::from_elem(T::minimum()),
869                )));
870                metrics.state.apply_spine_fast_path.inc();
871                return Ok(());
872            }
873        }
874        // Fall-through
875        _ => {}
876    }
877
878    // Fast-path: compaction
879    if let Some((_inputs, output)) = sniff_compaction(&diffs) {
880        let res = FueledMergeRes { output };
881        // We can't predict how spine will arrange the batches when it's
882        // hydrated. This means that something that is maintaining a Spine
883        // starting at some seqno may not exactly match something else
884        // maintaining the same spine starting at a different seqno. (Plus,
885        // maybe these aren't even on the same version of the code and we've
886        // changed the spine logic.) Because apply_merge_res is strict,
887        // we're not _guaranteed_ that we can apply a compaction response
888        // that was generated elsewhere. Most of the time we can, though, so
889        // count the good ones and fall back to the slow path below when we
890        // can't.
891        if trace.apply_merge_res_unchecked(&res).applied() {
892            // Maybe return the replaced batches from apply_merge_res and verify
893            // that they match _inputs?
894            metrics.state.apply_spine_fast_path.inc();
895            return Ok(());
896        }
897
898        // Otherwise, try our lenient application of a compaction result.
899        let mut batches = Vec::new();
900        trace.map_batches(|b| batches.push(b.clone()));
901
902        match apply_compaction_lenient(metrics, batches, &res.output) {
903            Ok(batches) => {
904                let mut new_trace = Trace::default();
905                new_trace.roundtrip_structure = trace.roundtrip_structure;
906                new_trace.downgrade_since(trace.since());
907                for batch in batches {
908                    // Ignore merge_reqs because whichever process generated
909                    // this diff is assigned the work.
910                    let () = new_trace.push_batch_no_merge_reqs(batch.clone());
911                }
912                *trace = new_trace;
913                metrics.state.apply_spine_slow_path_lenient.inc();
914                return Ok(());
915            }
916            Err(err) => {
917                return Err(format!(
918                    "lenient compaction result apply unexpectedly failed: {}",
919                    err
920                ));
921            }
922        }
923    }
924
925    // Something complicated is going on, so reconstruct the Trace from scratch.
926    metrics.state.apply_spine_slow_path.inc();
927    debug!(
928        "apply_diffs_spine didn't hit a fast-path diffs={:?} trace={:?}",
929        diffs, trace
930    );
931
932    let batches = {
933        let mut batches = BTreeMap::new();
934        trace.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
935        apply_diffs_map("spine", diffs.clone(), &mut batches).map(|_ok| batches)
936    };
937
938    let batches = match batches {
939        Ok(batches) => batches,
940        Err(err) => {
941            metrics
942                .state
943                .apply_spine_slow_path_with_reconstruction
944                .inc();
945            debug!(
946                "apply_diffs_spines could not apply diffs directly to existing trace batches: {}. diffs={:?} trace={:?}",
947                err, diffs, trace
948            );
949            // if we couldn't apply our diffs directly to our trace's batches, we can
950            // try one more trick: reconstruct a new spine with our existing batches,
951            // in an attempt to create different merges than we currently have. then,
952            // we can try to apply our diffs on top of these new (potentially) merged
953            // batches.
954            let mut reconstructed_spine = Trace::default();
955            reconstructed_spine.roundtrip_structure = trace.roundtrip_structure;
956            trace.map_batches(|b| {
957                // Ignore merge_reqs because whichever process generated this
958                // diff is assigned the work.
959                let () = reconstructed_spine.push_batch_no_merge_reqs(b.clone());
960            });
961
962            let mut batches = BTreeMap::new();
963            reconstructed_spine.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
964            apply_diffs_map("spine", diffs, &mut batches)?;
965            batches
966        }
967    };
968
969    let mut new_trace = Trace::default();
970    new_trace.roundtrip_structure = trace.roundtrip_structure;
971    new_trace.downgrade_since(trace.since());
972    for (batch, ()) in batches {
973        // Ignore merge_reqs because whichever process generated this diff is
974        // assigned the work.
975        let () = new_trace.push_batch_no_merge_reqs(batch);
976    }
977    *trace = new_trace;
978    Ok(())
979}
980
981fn sniff_insert<T: Timestamp + Lattice>(
982    diffs: &mut Vec<StateFieldDiff<HollowBatch<T>, ()>>,
983    upper: &Antichain<T>,
984) -> Option<HollowBatch<T>> {
985    for idx in 0..diffs.len() {
986        match &diffs[idx] {
987            StateFieldDiff {
988                key,
989                val: StateFieldValDiff::Insert(()),
990            } if key.desc.lower() == upper => return Some(diffs.remove(idx).key),
991            _ => continue,
992        }
993    }
994    None
995}
996
997// TODO: Instead of trying to sniff out a compaction from diffs, should we just
998// be explicit?
999fn sniff_compaction<'a, T: Timestamp + Lattice>(
1000    diffs: &'a [StateFieldDiff<HollowBatch<T>, ()>],
1001) -> Option<(Vec<&'a HollowBatch<T>>, HollowBatch<T>)> {
1002    // Compaction always produces exactly one output batch (with possibly many
1003    // parts, but we get one Insert for the whole batch.
1004    let mut inserts = diffs.iter().flat_map(|x| match x.val {
1005        StateFieldValDiff::Insert(()) => Some(&x.key),
1006        _ => None,
1007    });
1008    let compaction_output = match inserts.next() {
1009        Some(x) => x,
1010        None => return None,
1011    };
1012    if let Some(_) = inserts.next() {
1013        return None;
1014    }
1015
1016    // Grab all deletes and sanity check that there are no updates.
1017    let mut compaction_inputs = Vec::with_capacity(diffs.len() - 1);
1018    for diff in diffs.iter() {
1019        match diff.val {
1020            StateFieldValDiff::Delete(()) => {
1021                compaction_inputs.push(&diff.key);
1022            }
1023            StateFieldValDiff::Insert(()) => {}
1024            StateFieldValDiff::Update((), ()) => {
1025                // Fall through to let the general case create the error
1026                // message.
1027                return None;
1028            }
1029        }
1030    }
1031
1032    Some((compaction_inputs, compaction_output.clone()))
1033}
1034
1035/// Apply a compaction diff that doesn't exactly line up with the set of
1036/// HollowBatches.
1037///
1038/// Because of the way Spine internally optimizes only _some_ empty batches
1039/// (immediately merges them in), we can end up in a situation where a
1040/// compaction res applied on another copy of state, but when we replay all of
1041/// the state diffs against a new Spine locally, it merges empty batches
1042/// differently in-mem and we can't exactly apply the compaction diff. Example:
1043///
1044/// - compact: [1,2),[2,3) -> [1,3)
1045/// - this spine: [0,2),[2,3) (0,1 is empty)
1046///
1047/// Ideally, we'd figure out a way to avoid this, but nothing immediately comes
1048/// to mind. In the meantime, force the application (otherwise the shard is
1049/// stuck and we can't do anything with it) by manually splitting the empty
1050/// batch back out. For the example above:
1051///
1052/// - [0,1),[1,3) (0,1 is empty)
1053///
1054/// This can only happen when the batch needing to be split is empty, so error
1055/// out if it isn't because that means something unexpected is going on.
1056fn apply_compaction_lenient<'a, T: Timestamp + Lattice>(
1057    metrics: &Metrics,
1058    mut trace: Vec<HollowBatch<T>>,
1059    replacement: &'a HollowBatch<T>,
1060) -> Result<Vec<HollowBatch<T>>, String> {
1061    let mut overlapping_batches = Vec::new();
1062    trace.retain(|b| {
1063        let before_replacement = PartialOrder::less_equal(b.desc.upper(), replacement.desc.lower());
1064        let after_replacement = PartialOrder::less_equal(replacement.desc.upper(), b.desc.lower());
1065        let overlaps_replacement = !(before_replacement || after_replacement);
1066        if overlaps_replacement {
1067            overlapping_batches.push(b.clone());
1068            false
1069        } else {
1070            true
1071        }
1072    });
1073
1074    {
1075        let first_overlapping_batch = match overlapping_batches.first() {
1076            Some(x) => x,
1077            None => return Err("replacement didn't overlap any batches".into()),
1078        };
1079        if PartialOrder::less_than(
1080            first_overlapping_batch.desc.lower(),
1081            replacement.desc.lower(),
1082        ) {
1083            if first_overlapping_batch.len > 0 {
1084                return Err(format!(
1085                    "overlapping batch was unexpectedly non-empty: {:?}",
1086                    first_overlapping_batch
1087                ));
1088            }
1089            let desc = Description::new(
1090                first_overlapping_batch.desc.lower().clone(),
1091                replacement.desc.lower().clone(),
1092                first_overlapping_batch.desc.since().clone(),
1093            );
1094            trace.push(HollowBatch::empty(desc));
1095            metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1096        }
1097    }
1098
1099    {
1100        let last_overlapping_batch = match overlapping_batches.last() {
1101            Some(x) => x,
1102            None => return Err("replacement didn't overlap any batches".into()),
1103        };
1104        if PartialOrder::less_than(
1105            replacement.desc.upper(),
1106            last_overlapping_batch.desc.upper(),
1107        ) {
1108            if last_overlapping_batch.len > 0 {
1109                return Err(format!(
1110                    "overlapping batch was unexpectedly non-empty: {:?}",
1111                    last_overlapping_batch
1112                ));
1113            }
1114            let desc = Description::new(
1115                replacement.desc.upper().clone(),
1116                last_overlapping_batch.desc.upper().clone(),
1117                last_overlapping_batch.desc.since().clone(),
1118            );
1119            trace.push(HollowBatch::empty(desc));
1120            metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1121        }
1122    }
1123    trace.push(replacement.clone());
1124
1125    // We just inserted stuff at the end, so re-sort them into place.
1126    trace.sort_by(|a, b| a.desc.lower().elements().cmp(b.desc.lower().elements()));
1127
1128    // This impl is a touch complex, so sanity check our work.
1129    let mut expected_lower = &Antichain::from_elem(T::minimum());
1130    for b in trace.iter() {
1131        if b.desc.lower() != expected_lower {
1132            return Err(format!(
1133                "lower {:?} did not match expected {:?}: {:?}",
1134                b.desc.lower(),
1135                expected_lower,
1136                trace
1137            ));
1138        }
1139        expected_lower = b.desc.upper();
1140    }
1141    Ok(trace)
1142}
1143
1144/// A type that facilitates the proto encoding of a [`ProtoStateFieldDiffs`]
1145///
1146/// [`ProtoStateFieldDiffs`] is a columnar encoding of [`StateFieldDiff`]s, see
1147/// its doc comment for more info. The underlying buffer for a [`ProtoStateFieldDiffs`]
1148/// is a [`Bytes`] struct, which is an immutable, shared, reference counted,
1149/// buffer of data. Using a [`Bytes`] struct is a very efficient way to manage data
1150/// becuase multiple [`Bytes`] can reference different parts of the same underlying
1151/// portion of memory. See its doc comment for more info.
1152///
1153/// A [`ProtoStateFieldDiffsWriter`] maintains a mutable, unique, data buffer, i.e.
1154/// a [`BytesMut`], which we use when encoding a [`StateFieldDiff`]. And when
1155/// finished encoding, we convert it into a [`ProtoStateFieldDiffs`] by "freezing" the
1156/// underlying buffer, converting it into a [`Bytes`] struct, so it can be shared.
1157///
1158/// [`Bytes`]: bytes::Bytes
1159#[derive(Debug)]
1160pub struct ProtoStateFieldDiffsWriter {
1161    data_buf: BytesMut,
1162    proto: ProtoStateFieldDiffs,
1163}
1164
1165impl ProtoStateFieldDiffsWriter {
1166    /// Record a [`ProtoStateField`] for our columnar encoding.
1167    pub fn push_field(&mut self, field: ProtoStateField) {
1168        self.proto.fields.push(i32::from(field));
1169    }
1170
1171    /// Record a [`ProtoStateFieldDiffType`] for our columnar encoding.
1172    pub fn push_diff_type(&mut self, diff_type: ProtoStateFieldDiffType) {
1173        self.proto.diff_types.push(i32::from(diff_type));
1174    }
1175
1176    /// Encode a message for our columnar encoding.
1177    pub fn encode_proto<M: prost::Message>(&mut self, msg: &M) {
1178        let len_before = self.data_buf.len();
1179        self.data_buf.reserve(msg.encoded_len());
1180
1181        // Note: we use `encode_raw` as opposed to `encode` because all `encode` does is
1182        // check to make sure there's enough bytes in the buffer to fit our message
1183        // which we know there are because we just reserved the space. When benchmarking
1184        // `encode_raw` does offer a slight performance improvement over `encode`.
1185        msg.encode_raw(&mut self.data_buf);
1186
1187        // Record exactly how many bytes were written.
1188        let written_len = self.data_buf.len() - len_before;
1189        self.proto.data_lens.push(u64::cast_from(written_len));
1190    }
1191
1192    pub fn into_proto(self) -> ProtoStateFieldDiffs {
1193        let ProtoStateFieldDiffsWriter {
1194            data_buf,
1195            mut proto,
1196        } = self;
1197
1198        // Assert we didn't write into the proto's data_bytes field
1199        assert!(proto.data_bytes.is_empty());
1200
1201        // Move our buffer into the proto
1202        let data_bytes = data_buf.freeze();
1203        proto.data_bytes = data_bytes;
1204
1205        proto
1206    }
1207}
1208
1209impl ProtoStateFieldDiffs {
1210    pub fn into_writer(mut self) -> ProtoStateFieldDiffsWriter {
1211        // Create a new buffer which we'll encode data into.
1212        let mut data_buf = BytesMut::with_capacity(self.data_bytes.len());
1213
1214        // Take our existing data, and copy it into our buffer.
1215        let existing_data = std::mem::take(&mut self.data_bytes);
1216        data_buf.extend(existing_data);
1217
1218        ProtoStateFieldDiffsWriter {
1219            data_buf,
1220            proto: self,
1221        }
1222    }
1223
1224    pub fn iter<'a>(&'a self) -> ProtoStateFieldDiffsIter<'a> {
1225        let len = self.fields.len();
1226        assert_eq!(self.diff_types.len(), len);
1227
1228        ProtoStateFieldDiffsIter {
1229            len,
1230            diff_idx: 0,
1231            data_idx: 0,
1232            data_offset: 0,
1233            diffs: self,
1234        }
1235    }
1236
1237    pub fn validate(&self) -> Result<(), String> {
1238        if self.fields.len() != self.diff_types.len() {
1239            return Err(format!(
1240                "fields {} and diff_types {} lengths disagree",
1241                self.fields.len(),
1242                self.diff_types.len()
1243            ));
1244        }
1245
1246        let mut expected_data_slices = 0;
1247        for diff_type in self.diff_types.iter() {
1248            // We expect one for the key.
1249            expected_data_slices += 1;
1250            // And 1 or 2 for val depending on the diff type.
1251            match ProtoStateFieldDiffType::try_from(*diff_type) {
1252                Ok(ProtoStateFieldDiffType::Insert) => expected_data_slices += 1,
1253                Ok(ProtoStateFieldDiffType::Update) => expected_data_slices += 2,
1254                Ok(ProtoStateFieldDiffType::Delete) => expected_data_slices += 1,
1255                Err(_) => return Err(format!("unknown diff_type {}", diff_type)),
1256            }
1257        }
1258        if expected_data_slices != self.data_lens.len() {
1259            return Err(format!(
1260                "expected {} data slices got {}",
1261                expected_data_slices,
1262                self.data_lens.len()
1263            ));
1264        }
1265
1266        let expected_data_bytes = usize::cast_from(self.data_lens.iter().copied().sum::<u64>());
1267        if expected_data_bytes != self.data_bytes.len() {
1268            return Err(format!(
1269                "expected {} data bytes got {}",
1270                expected_data_bytes,
1271                self.data_bytes.len()
1272            ));
1273        }
1274
1275        Ok(())
1276    }
1277}
1278
1279#[derive(Debug)]
1280pub struct ProtoStateFieldDiff<'a> {
1281    pub key: &'a [u8],
1282    pub diff_type: ProtoStateFieldDiffType,
1283    pub from: &'a [u8],
1284    pub to: &'a [u8],
1285}
1286
1287pub struct ProtoStateFieldDiffsIter<'a> {
1288    len: usize,
1289    diff_idx: usize,
1290    data_idx: usize,
1291    data_offset: usize,
1292    diffs: &'a ProtoStateFieldDiffs,
1293}
1294
1295impl<'a> Iterator for ProtoStateFieldDiffsIter<'a> {
1296    type Item = Result<(ProtoStateField, ProtoStateFieldDiff<'a>), TryFromProtoError>;
1297
1298    fn next(&mut self) -> Option<Self::Item> {
1299        if self.diff_idx >= self.len {
1300            return None;
1301        }
1302        let mut next_data = || {
1303            let start = self.data_offset;
1304            let end = start + usize::cast_from(self.diffs.data_lens[self.data_idx]);
1305            let data = &self.diffs.data_bytes[start..end];
1306            self.data_idx += 1;
1307            self.data_offset = end;
1308            data
1309        };
1310        let field = match ProtoStateField::try_from(self.diffs.fields[self.diff_idx]) {
1311            Ok(x) => x,
1312            Err(_) => {
1313                return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1314                    "ProtoStateField({})",
1315                    self.diffs.fields[self.diff_idx]
1316                ))));
1317            }
1318        };
1319        let diff_type =
1320            match ProtoStateFieldDiffType::try_from(self.diffs.diff_types[self.diff_idx]) {
1321                Ok(x) => x,
1322                Err(_) => {
1323                    return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1324                        "ProtoStateFieldDiffType({})",
1325                        self.diffs.diff_types[self.diff_idx]
1326                    ))));
1327                }
1328            };
1329        let key = next_data();
1330        let (from, to): (&[u8], &[u8]) = match diff_type {
1331            ProtoStateFieldDiffType::Insert => (&[], next_data()),
1332            ProtoStateFieldDiffType::Update => (next_data(), next_data()),
1333            ProtoStateFieldDiffType::Delete => (next_data(), &[]),
1334        };
1335        let diff = ProtoStateFieldDiff {
1336            key,
1337            diff_type,
1338            from,
1339            to,
1340        };
1341        self.diff_idx += 1;
1342        Some(Ok((field, diff)))
1343    }
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348    use semver::Version;
1349    use std::ops::ControlFlow::Continue;
1350
1351    use crate::internal::paths::{PartId, PartialBatchKey, RollupId, WriterKey};
1352    use mz_ore::metrics::MetricsRegistry;
1353
1354    use crate::ShardId;
1355    use crate::internal::state::TypedState;
1356
1357    use super::*;
1358
1359    /// Model a situation where a "leader" is constantly making changes to its state, and a "follower"
1360    /// is applying those changes as diffs.
1361    #[mz_ore::test]
1362    #[cfg_attr(miri, ignore)] // too slow
1363    fn test_state_sync() {
1364        use proptest::prelude::*;
1365
1366        #[derive(Debug, Clone)]
1367        enum Action {
1368            /// Append a (non)empty batch to the shard that covers the given length of time.
1369            Append { empty: bool, time_delta: u64 },
1370            /// Apply the Nth compaction request we've received to the shard state.
1371            Compact { req: usize },
1372        }
1373
1374        let action_gen: BoxedStrategy<Action> = {
1375            prop::strategy::Union::new([
1376                (any::<bool>(), 1u64..10u64)
1377                    .prop_map(|(empty, time_delta)| Action::Append { empty, time_delta })
1378                    .boxed(),
1379                (0usize..10usize)
1380                    .prop_map(|req| Action::Compact { req })
1381                    .boxed(),
1382            ])
1383            .boxed()
1384        };
1385
1386        fn run(actions: Vec<(Action, bool)>, metrics: &Metrics) {
1387            let version = Version::new(0, 100, 0);
1388            let writer_key = WriterKey::Version(version.to_string());
1389            let id = ShardId::new();
1390            let hostname = "computer";
1391            let typed: TypedState<String, (), u64, i64> =
1392                TypedState::new(version, id, hostname.to_string(), 0);
1393            let mut leader = typed.state;
1394
1395            let seqno = SeqNo::minimum();
1396            let mut lower = 0u64;
1397            let mut merge_reqs = vec![];
1398
1399            leader.collections.rollups.insert(
1400                seqno,
1401                HollowRollup {
1402                    key: PartialRollupKey::new(seqno, &RollupId::new()),
1403                    encoded_size_bytes: None,
1404                },
1405            );
1406            leader.collections.trace.roundtrip_structure = false;
1407            let mut follower = leader.clone();
1408
1409            for (action, roundtrip_structure) in actions {
1410                // Apply the given action and the new roundtrip_structure setting and take a diff.
1411                let mut old_leader = leader.clone();
1412                match action {
1413                    Action::Append { empty, time_delta } => {
1414                        let upper = lower + time_delta;
1415                        let key = if empty {
1416                            None
1417                        } else {
1418                            let id = PartId::new();
1419                            Some(PartialBatchKey::new(&writer_key, &id))
1420                        };
1421
1422                        let keys = key.as_ref().map(|k| k.0.as_str());
1423                        let reqs = leader.collections.trace.push_batch(
1424                            crate::internal::state::tests::hollow(
1425                                lower,
1426                                upper,
1427                                keys.as_slice(),
1428                                if empty { 0 } else { 1 },
1429                            ),
1430                        );
1431                        merge_reqs.extend(reqs);
1432                        lower = upper;
1433                    }
1434                    Action::Compact { req } => {
1435                        if !merge_reqs.is_empty() {
1436                            let req = merge_reqs.remove(req.min(merge_reqs.len() - 1));
1437                            let len = req.inputs.iter().map(|p| p.batch.len).sum();
1438                            let parts = req
1439                                .inputs
1440                                .into_iter()
1441                                .flat_map(|p| p.batch.parts.clone())
1442                                .collect();
1443                            let output = HollowBatch::new_run(req.desc, parts, len);
1444                            leader
1445                                .collections
1446                                .trace
1447                                .apply_merge_res_unchecked(&FueledMergeRes { output });
1448                        }
1449                    }
1450                }
1451                leader.collections.trace.roundtrip_structure = roundtrip_structure;
1452                leader.seqno.0 += 1;
1453                let diff = StateDiff::from_diff(&old_leader, &leader);
1454
1455                // Validate that the diff applies to both the previous state (also checked in
1456                // debug asserts) and our follower that's only synchronized via diffs.
1457                old_leader
1458                    .apply_diff(metrics, diff.clone())
1459                    .expect("diff applies to the old version of the leader state");
1460                follower
1461                    .apply_diff(metrics, diff.clone())
1462                    .expect("diff applies to the synced version of the follower state");
1463
1464                // TODO: once spine structure is roundtripped through diffs, assert that the follower
1465                // has the same batches etc. as the leader does.
1466            }
1467        }
1468
1469        let config = PersistConfig::new_for_tests();
1470        let metrics_registry = MetricsRegistry::new();
1471        let metrics: Metrics = Metrics::new(&config, &metrics_registry);
1472
1473        proptest!(|(actions in prop::collection::vec((action_gen, any::<bool>()), 1..20))| {
1474            run(actions, &metrics)
1475        })
1476    }
1477
1478    // Regression test for the apply_diffs_spine special case that sniffs out an
1479    // insert, applies it, and then lets the remaining diffs (if any) fall
1480    // through to the rest of the code. See database-issues#4431.
1481    #[mz_ore::test]
1482    fn regression_15493_sniff_insert() {
1483        fn hb(lower: u64, upper: u64, len: usize) -> HollowBatch<u64> {
1484            HollowBatch::new_run(
1485                Description::new(
1486                    Antichain::from_elem(lower),
1487                    Antichain::from_elem(upper),
1488                    Antichain::from_elem(0),
1489                ),
1490                Vec::new(),
1491                len,
1492            )
1493        }
1494
1495        // The bug handled here is essentially a set of batches that look like
1496        // the pattern matched by `apply_lenient` _plus_ an insert. In
1497        // apply_diffs_spine, we use `sniff_insert` to steal the insert out of
1498        // the diffs and fall back to the rest of the logic to handle the
1499        // remaining diffs.
1500        //
1501        // Concretely, something like (the numbers are truncated versions of the
1502        // actual bug posted in the issue):
1503        // - spine: [0][7094664]0, [7094664][7185234]100
1504        // - diffs: [0][6805359]0 del, [6805359][7083793]0 del, [0][7083793]0 ins,
1505        //   [7185234][7185859]20 ins
1506        //
1507        // Where this allows us to handle the [7185234,7185859) and then
1508        // apply_lenient handles splitting up [0,7094664) so we can apply the
1509        // [0,6805359)+[6805359,7083793)->[0,7083793) swap.
1510
1511        let batches_before = vec![hb(0, 7094664, 0), hb(7094664, 7185234, 100)];
1512
1513        let diffs = vec![
1514            StateFieldDiff {
1515                key: hb(0, 6805359, 0),
1516                val: StateFieldValDiff::Delete(()),
1517            },
1518            StateFieldDiff {
1519                key: hb(6805359, 7083793, 0),
1520                val: StateFieldValDiff::Delete(()),
1521            },
1522            StateFieldDiff {
1523                key: hb(0, 7083793, 0),
1524                val: StateFieldValDiff::Insert(()),
1525            },
1526            StateFieldDiff {
1527                key: hb(7185234, 7185859, 20),
1528                val: StateFieldValDiff::Insert(()),
1529            },
1530        ];
1531
1532        // Ideally this first batch would be [0][7083793], [7083793,7094664]
1533        // here because `apply_lenient` splits it out, but when `apply_lenient`
1534        // reconstructs the trace, Spine happens to (deterministically) collapse
1535        // them back together. The main value of this test is that the
1536        // `apply_diffs_spine` call below doesn't return an Err, so don't worry
1537        // too much about this, it's just a sanity check.
1538        let batches_after = vec![
1539            hb(0, 7094664, 0),
1540            hb(7094664, 7185234, 100),
1541            hb(7185234, 7185859, 20),
1542        ];
1543
1544        let cfg = PersistConfig::new_for_tests();
1545        let state = TypedState::<(), (), u64, i64>::new(
1546            cfg.build_version.clone(),
1547            ShardId::new(),
1548            cfg.hostname.clone(),
1549            (cfg.now)(),
1550        );
1551        let state = state.clone_apply(&cfg, &mut |_seqno, _cfg, state| {
1552            for b in batches_before.iter() {
1553                let _merge_reqs = state.trace.push_batch(b.clone());
1554            }
1555            Continue::<(), ()>(())
1556        });
1557        let mut state = match state {
1558            Continue((_, x)) => x,
1559            _ => unreachable!(),
1560        };
1561
1562        let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1563        assert_eq!(
1564            apply_diffs_spine(&metrics, diffs, &mut state.collections.trace),
1565            Ok(())
1566        );
1567
1568        let mut actual = Vec::new();
1569        state
1570            .collections
1571            .trace
1572            .map_batches(|b| actual.push(b.clone()));
1573        assert_eq!(actual, batches_after);
1574    }
1575
1576    #[mz_ore::test]
1577    #[cfg_attr(miri, ignore)] // too slow
1578    fn apply_lenient() {
1579        #[track_caller]
1580        fn testcase(
1581            replacement: (u64, u64, u64, usize),
1582            spine: &[(u64, u64, u64, usize)],
1583            expected: Result<&[(u64, u64, u64, usize)], &str>,
1584        ) {
1585            fn batch(x: &(u64, u64, u64, usize)) -> HollowBatch<u64> {
1586                let (lower, upper, since, len) = x;
1587                let desc = Description::new(
1588                    Antichain::from_elem(*lower),
1589                    Antichain::from_elem(*upper),
1590                    Antichain::from_elem(*since),
1591                );
1592                HollowBatch::new_run(desc, Vec::new(), *len)
1593            }
1594            let replacement = batch(&replacement);
1595            let batches = spine.iter().map(batch).collect::<Vec<_>>();
1596
1597            let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1598            let actual = apply_compaction_lenient(&metrics, batches, &replacement);
1599            let expected = match expected {
1600                Ok(batches) => Ok(batches.iter().map(batch).collect::<Vec<_>>()),
1601                Err(err) => Err(err.to_owned()),
1602            };
1603            assert_eq!(actual, expected);
1604        }
1605
1606        // Exact swap of N batches
1607        testcase(
1608            (0, 3, 0, 100),
1609            &[(0, 1, 0, 0), (1, 2, 0, 0), (2, 3, 0, 0)],
1610            Ok(&[(0, 3, 0, 100)]),
1611        );
1612
1613        // Swap out the middle of a batch
1614        testcase(
1615            (1, 2, 0, 100),
1616            &[(0, 3, 0, 0)],
1617            Ok(&[(0, 1, 0, 0), (1, 2, 0, 100), (2, 3, 0, 0)]),
1618        );
1619
1620        // Split batch at replacement lower
1621        testcase(
1622            (2, 4, 0, 100),
1623            &[(0, 3, 0, 0), (3, 4, 0, 0)],
1624            Ok(&[(0, 2, 0, 0), (2, 4, 0, 100)]),
1625        );
1626
1627        // Err: split batch at replacement lower not empty
1628        testcase(
1629            (2, 4, 0, 100),
1630            &[(0, 3, 0, 1), (3, 4, 0, 0)],
1631            Err(
1632                "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([0], [3], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1633            ),
1634        );
1635
1636        // Split batch at replacement lower (untouched batch before the split one)
1637        testcase(
1638            (2, 4, 0, 100),
1639            &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1640            Ok(&[(0, 1, 0, 0), (1, 2, 0, 0), (2, 4, 0, 100)]),
1641        );
1642
1643        // Split batch at replacement lower (since is preserved)
1644        testcase(
1645            (2, 4, 0, 100),
1646            &[(0, 3, 200, 0), (3, 4, 0, 0)],
1647            Ok(&[(0, 2, 200, 0), (2, 4, 0, 100)]),
1648        );
1649
1650        // Split batch at replacement upper
1651        testcase(
1652            (0, 2, 0, 100),
1653            &[(0, 1, 0, 0), (1, 4, 0, 0)],
1654            Ok(&[(0, 2, 0, 100), (2, 4, 0, 0)]),
1655        );
1656
1657        // Err: split batch at replacement upper not empty
1658        testcase(
1659            (0, 2, 0, 100),
1660            &[(0, 1, 0, 0), (1, 4, 0, 1)],
1661            Err(
1662                "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([1], [4], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1663            ),
1664        );
1665
1666        // Split batch at replacement upper (untouched batch after the split one)
1667        testcase(
1668            (0, 2, 0, 100),
1669            &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1670            Ok(&[(0, 2, 0, 100), (2, 3, 0, 0), (3, 4, 0, 0)]),
1671        );
1672
1673        // Split batch at replacement upper (since is preserved)
1674        testcase(
1675            (0, 2, 0, 100),
1676            &[(0, 1, 0, 0), (1, 4, 200, 0)],
1677            Ok(&[(0, 2, 0, 100), (2, 4, 200, 0)]),
1678        );
1679
1680        // Split batch at replacement lower and upper
1681        testcase(
1682            (2, 6, 0, 100),
1683            &[(0, 3, 0, 0), (3, 5, 0, 0), (5, 8, 0, 0)],
1684            Ok(&[(0, 2, 0, 0), (2, 6, 0, 100), (6, 8, 0, 0)]),
1685        );
1686
1687        // Replacement doesn't overlap (after)
1688        testcase(
1689            (2, 3, 0, 100),
1690            &[(0, 1, 0, 0)],
1691            Err("replacement didn't overlap any batches"),
1692        );
1693
1694        // Replacement doesn't overlap (before, though this would never happen in practice)
1695        testcase(
1696            (2, 3, 0, 100),
1697            &[(4, 5, 0, 0)],
1698            Err("replacement didn't overlap any batches"),
1699        );
1700    }
1701}