mz_persist_client/internal/
state_diff.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::sync::Arc;
14
15use bytes::{Bytes, BytesMut};
16use differential_dataflow::lattice::Lattice;
17use differential_dataflow::trace::Description;
18use mz_ore::assert_none;
19use mz_ore::cast::CastFrom;
20use mz_persist::location::{SeqNo, VersionedData};
21use mz_persist_types::Codec64;
22use mz_persist_types::schema::SchemaId;
23use mz_proto::TryFromProtoError;
24use timely::PartialOrder;
25use timely::progress::{Antichain, Timestamp};
26use tracing::debug;
27
28use crate::critical::CriticalReaderId;
29use crate::internal::paths::PartialRollupKey;
30use crate::internal::state::{
31    CriticalReaderState, EncodedSchemas, HollowBatch, HollowBlobRef, HollowRollup,
32    LeasedReaderState, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, RunPart,
33    State, StateCollections, WriterState,
34};
35use crate::internal::trace::CompactionInput;
36use crate::internal::trace::{FueledMergeRes, SpineId, ThinMerge, ThinSpineBatch, Trace};
37use crate::read::LeasedReaderId;
38use crate::write::WriterId;
39use crate::{Metrics, PersistConfig, ShardId};
40
41use StateFieldValDiff::*;
42
43use super::state::{ActiveGc, ActiveRollup};
44
45#[derive(Clone, Debug)]
46#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
47pub enum StateFieldValDiff<V> {
48    Insert(V),
49    Update(V, V),
50    Delete(V),
51}
52
53#[derive(Clone)]
54#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
55pub struct StateFieldDiff<K, V> {
56    pub key: K,
57    pub val: StateFieldValDiff<V>,
58}
59
60impl<K: Debug, V: Debug> std::fmt::Debug for StateFieldDiff<K, V> {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("StateFieldDiff")
63            // In the cases we've seen in the wild, it's been more useful to
64            // have the val printed first.
65            .field("val", &self.val)
66            .field("key", &self.key)
67            .finish()
68    }
69}
70
71#[derive(Debug)]
72#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
73pub struct StateDiff<T> {
74    pub(crate) applier_version: semver::Version,
75    pub(crate) seqno_from: SeqNo,
76    pub(crate) seqno_to: SeqNo,
77    pub(crate) walltime_ms: u64,
78    pub(crate) latest_rollup_key: PartialRollupKey,
79    pub(crate) rollups: Vec<StateFieldDiff<SeqNo, HollowRollup>>,
80    pub(crate) active_rollup: Vec<StateFieldDiff<(), ActiveRollup>>,
81    pub(crate) active_gc: Vec<StateFieldDiff<(), ActiveGc>>,
82    pub(crate) hostname: Vec<StateFieldDiff<(), String>>,
83    pub(crate) last_gc_req: Vec<StateFieldDiff<(), SeqNo>>,
84    pub(crate) leased_readers: Vec<StateFieldDiff<LeasedReaderId, LeasedReaderState<T>>>,
85    pub(crate) critical_readers: Vec<StateFieldDiff<CriticalReaderId, CriticalReaderState<T>>>,
86    pub(crate) writers: Vec<StateFieldDiff<WriterId, WriterState<T>>>,
87    pub(crate) schemas: Vec<StateFieldDiff<SchemaId, EncodedSchemas>>,
88    pub(crate) since: Vec<StateFieldDiff<(), Antichain<T>>>,
89    pub(crate) legacy_batches: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
90    pub(crate) hollow_batches: Vec<StateFieldDiff<SpineId, Arc<HollowBatch<T>>>>,
91    pub(crate) spine_batches: Vec<StateFieldDiff<SpineId, ThinSpineBatch<T>>>,
92    pub(crate) merges: Vec<StateFieldDiff<SpineId, ThinMerge<T>>>,
93}
94
95impl<T: Timestamp + Codec64> StateDiff<T> {
96    pub fn new(
97        applier_version: semver::Version,
98        seqno_from: SeqNo,
99        seqno_to: SeqNo,
100        walltime_ms: u64,
101        latest_rollup_key: PartialRollupKey,
102    ) -> Self {
103        StateDiff {
104            applier_version,
105            seqno_from,
106            seqno_to,
107            walltime_ms,
108            latest_rollup_key,
109            rollups: Vec::default(),
110            active_rollup: Vec::default(),
111            active_gc: Vec::default(),
112            hostname: Vec::default(),
113            last_gc_req: Vec::default(),
114            leased_readers: Vec::default(),
115            critical_readers: Vec::default(),
116            writers: Vec::default(),
117            schemas: Vec::default(),
118            since: Vec::default(),
119            legacy_batches: Vec::default(),
120            hollow_batches: Vec::default(),
121            spine_batches: Vec::default(),
122            merges: Vec::default(),
123        }
124    }
125
126    pub fn referenced_batches(&self) -> impl Iterator<Item = StateFieldValDiff<&HollowBatch<T>>> {
127        let legacy_batches = self
128            .legacy_batches
129            .iter()
130            .filter_map(|diff| match diff.val {
131                Insert(()) => Some(Insert(&diff.key)),
132                Update((), ()) => None, // Ignoring a noop diff.
133                Delete(()) => Some(Delete(&diff.key)),
134            });
135        let hollow_batches = self.hollow_batches.iter().map(|diff| match &diff.val {
136            Insert(batch) => Insert(&**batch),
137            Update(before, after) => Update(&**before, &**after),
138            Delete(batch) => Delete(&**batch),
139        });
140        legacy_batches.chain(hollow_batches)
141    }
142}
143
144impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
145    pub fn from_diff(from: &State<T>, to: &State<T>) -> Self {
146        // Deconstruct from and to so we get a compile failure if new
147        // fields are added.
148        let State {
149            shard_id: from_shard_id,
150            seqno: from_seqno,
151            hostname: from_hostname,
152            walltime_ms: _, // Intentionally unused
153            collections:
154                StateCollections {
155                    version: _,
156                    last_gc_req: from_last_gc_req,
157                    rollups: from_rollups,
158                    active_rollup: from_active_rollup,
159                    active_gc: from_active_gc,
160                    leased_readers: from_leased_readers,
161                    critical_readers: from_critical_readers,
162                    writers: from_writers,
163                    schemas: from_schemas,
164                    trace: from_trace,
165                },
166        } = from;
167        let State {
168            shard_id: to_shard_id,
169            seqno: to_seqno,
170            walltime_ms: to_walltime_ms,
171            hostname: to_hostname,
172            collections:
173                StateCollections {
174                    version: to_applier_version,
175                    last_gc_req: to_last_gc_req,
176                    rollups: to_rollups,
177                    active_rollup: to_active_rollup,
178                    active_gc: to_active_gc,
179                    leased_readers: to_leased_readers,
180                    critical_readers: to_critical_readers,
181                    writers: to_writers,
182                    schemas: to_schemas,
183                    trace: to_trace,
184                },
185        } = to;
186        assert_eq!(from_shard_id, to_shard_id);
187
188        let (_, latest_rollup) = to.latest_rollup();
189        let mut diffs = Self::new(
190            to_applier_version.clone(),
191            *from_seqno,
192            *to_seqno,
193            *to_walltime_ms,
194            latest_rollup.key.clone(),
195        );
196        diff_field_single(from_hostname, to_hostname, &mut diffs.hostname);
197        diff_field_single(from_last_gc_req, to_last_gc_req, &mut diffs.last_gc_req);
198        diff_field_sorted_iter(
199            from_active_rollup.iter().map(|r| (&(), r)),
200            to_active_rollup.iter().map(|r| (&(), r)),
201            &mut diffs.active_rollup,
202        );
203        diff_field_sorted_iter(
204            from_active_gc.iter().map(|g| (&(), g)),
205            to_active_gc.iter().map(|g| (&(), g)),
206            &mut diffs.active_gc,
207        );
208        diff_field_sorted_iter(from_rollups.iter(), to_rollups, &mut diffs.rollups);
209        diff_field_sorted_iter(
210            from_leased_readers.iter(),
211            to_leased_readers,
212            &mut diffs.leased_readers,
213        );
214        diff_field_sorted_iter(
215            from_critical_readers.iter(),
216            to_critical_readers,
217            &mut diffs.critical_readers,
218        );
219        diff_field_sorted_iter(from_writers.iter(), to_writers, &mut diffs.writers);
220        diff_field_sorted_iter(from_schemas.iter(), to_schemas, &mut diffs.schemas);
221        diff_field_single(from_trace.since(), to_trace.since(), &mut diffs.since);
222
223        let from_flat = from_trace.flatten();
224        let to_flat = to_trace.flatten();
225        diff_field_sorted_iter(
226            from_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
227            to_flat.legacy_batches.iter().map(|(k, v)| (&**k, v)),
228            &mut diffs.legacy_batches,
229        );
230        diff_field_sorted_iter(
231            from_flat.hollow_batches.iter(),
232            to_flat.hollow_batches.iter(),
233            &mut diffs.hollow_batches,
234        );
235        diff_field_sorted_iter(
236            from_flat.spine_batches.iter(),
237            to_flat.spine_batches.iter(),
238            &mut diffs.spine_batches,
239        );
240        diff_field_sorted_iter(
241            from_flat.merges.iter(),
242            to_flat.merges.iter(),
243            &mut diffs.merges,
244        );
245        diffs
246    }
247
248    pub(crate) fn blob_inserts(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
249        let batches = self
250            .referenced_batches()
251            .filter_map(|spine_diff| match spine_diff {
252                Insert(b) | Update(_, b) => Some(HollowBlobRef::Batch(b)),
253                Delete(_) => None, // No-op
254            });
255        let rollups = self
256            .rollups
257            .iter()
258            .filter_map(|rollups_diff| match &rollups_diff.val {
259                StateFieldValDiff::Insert(x) | StateFieldValDiff::Update(_, x) => {
260                    Some(HollowBlobRef::Rollup(x))
261                }
262                StateFieldValDiff::Delete(_) => None, // No-op
263            });
264        batches.chain(rollups)
265    }
266
267    pub(crate) fn part_deletes(&self) -> impl Iterator<Item = &RunPart<T>> {
268        // With the introduction of incremental compaction, we
269        // need to be more careful about what we consider "deleted".
270        // If there is a HollowBatch that we replace 2 out of the 4 runs of,
271        // we need to ensure that we only delete the runs that are actually
272        // no longer referenced.
273        let removed = self
274            .referenced_batches()
275            .filter_map(|spine_diff| match spine_diff {
276                Insert(_) => None,
277                Update(a, _) | Delete(a) => Some(a.parts.iter().collect::<Vec<_>>()),
278            });
279
280        let added: std::collections::BTreeSet<_> = self
281            .referenced_batches()
282            .filter_map(|spine_diff| match spine_diff {
283                Insert(a) | Update(_, a) => Some(a.parts.iter().collect::<Vec<_>>()),
284                Delete(_) => None,
285            })
286            .flatten()
287            .collect();
288
289        removed
290            .into_iter()
291            .flat_map(|x| x)
292            .filter(move |part| !added.contains(part))
293    }
294
295    pub(crate) fn rollup_deletes(&self) -> impl Iterator<Item = &HollowRollup> {
296        self.rollups
297            .iter()
298            .filter_map(|rollups_diff| match &rollups_diff.val {
299                Insert(_) => None,
300                Update(a, _) | Delete(a) => Some(a),
301            })
302    }
303
304    #[cfg(any(test, debug_assertions))]
305    #[allow(dead_code)]
306    pub fn validate_roundtrip<K, V, D>(
307        metrics: &Metrics,
308        from_state: &crate::internal::state::TypedState<K, V, T, D>,
309        diff: &Self,
310        to_state: &crate::internal::state::TypedState<K, V, T, D>,
311    ) -> Result<(), String>
312    where
313        K: mz_persist_types::Codec + std::fmt::Debug,
314        V: mz_persist_types::Codec + std::fmt::Debug,
315        D: differential_dataflow::difference::Monoid + Codec64,
316    {
317        use mz_proto::RustType;
318        use prost::Message;
319
320        use crate::internal::state::ProtoStateDiff;
321
322        let mut roundtrip_state = from_state.clone(from_state.hostname.clone());
323        roundtrip_state.apply_diff(metrics, diff.clone())?;
324
325        if &roundtrip_state != to_state {
326            // The weird spacing in this format string is so they all line up
327            // when printed out.
328            return Err(format!(
329                "state didn't roundtrip\n  from_state {:?}\n  to_state   {:?}\n  rt_state   {:?}\n  diff       {:?}\n",
330                from_state, to_state, roundtrip_state, diff
331            ));
332        }
333
334        let encoded_diff = diff.into_proto().encode_to_vec();
335        let roundtrip_diff = Self::from_proto(
336            ProtoStateDiff::decode(encoded_diff.as_slice()).map_err(|err| err.to_string())?,
337        )
338        .map_err(|err| err.to_string())?;
339
340        if &roundtrip_diff != diff {
341            // The weird spacing in this format string is so they all line up
342            // when printed out.
343            return Err(format!(
344                "diff didn't roundtrip\n  diff    {:?}\n  rt_diff {:?}",
345                diff, roundtrip_diff
346            ));
347        }
348
349        Ok(())
350    }
351}
352
353impl<T: Timestamp + Lattice + Codec64> State<T> {
354    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
355        &mut self,
356        cfg: &PersistConfig,
357        metrics: &Metrics,
358        diffs: I,
359    ) {
360        let mut state_seqno = self.seqno;
361        let diffs = diffs.into_iter().filter_map(move |x| {
362            if x.seqno != state_seqno.next() {
363                // No-op.
364                return None;
365            }
366            let data = x.data.clone();
367            let diff = metrics
368                .codecs
369                .state_diff
370                // Note: `x.data` is a `Bytes`, so cloning just increments a ref count
371                .decode(|| StateDiff::decode(&cfg.build_version, x.data.clone()));
372            assert_eq!(diff.seqno_from, state_seqno);
373            state_seqno = diff.seqno_to;
374            Some((diff, data))
375        });
376        self.apply_diffs(metrics, diffs);
377    }
378}
379
380impl<T: Timestamp + Lattice + Codec64> State<T> {
381    pub fn apply_diffs<I: IntoIterator<Item = (StateDiff<T>, Bytes)>>(
382        &mut self,
383        metrics: &Metrics,
384        diffs: I,
385    ) {
386        for (diff, data) in diffs {
387            // TODO: This could special-case batch apply for diffs where it's
388            // more efficient (in particular, spine batches that hit the slow
389            // path).
390            match self.apply_diff(metrics, diff) {
391                Ok(()) => {}
392                Err(err) => {
393                    // Having the full diff in the error message is critical for debugging any
394                    // issues that may arise from diff application. We pass along the original
395                    // Bytes it decoded from just so we can decode in this error path, while
396                    // avoiding any extraneous clones in the expected Ok path.
397                    // FIXME: this passes the state version but the method requires the build version.
398                    let diff = StateDiff::<T>::decode(&self.collections.version, data);
399                    panic!(
400                        "state diff should apply cleanly: {} diff {:?} state {:?}",
401                        err, diff, self
402                    )
403                }
404            }
405        }
406    }
407
408    // Intentionally not even pub(crate) because all callers should use
409    // [Self::apply_diffs].
410    pub(super) fn apply_diff(
411        &mut self,
412        metrics: &Metrics,
413        diff: StateDiff<T>,
414    ) -> Result<(), String> {
415        // Deconstruct diff so we get a compile failure if new fields are added.
416        let StateDiff {
417            applier_version: diff_applier_version,
418            seqno_from: diff_seqno_from,
419            seqno_to: diff_seqno_to,
420            walltime_ms: diff_walltime_ms,
421            latest_rollup_key: _,
422            rollups: diff_rollups,
423            active_rollup: diff_active_rollup,
424            active_gc: diff_active_gc,
425            hostname: diff_hostname,
426            last_gc_req: diff_last_gc_req,
427            leased_readers: diff_leased_readers,
428            critical_readers: diff_critical_readers,
429            writers: diff_writers,
430            schemas: diff_schemas,
431            since: diff_since,
432            legacy_batches: diff_legacy_batches,
433            hollow_batches: diff_hollow_batches,
434            spine_batches: diff_spine_batches,
435            merges: diff_merges,
436        } = diff;
437        if self.seqno == diff_seqno_to {
438            return Ok(());
439        }
440        if self.seqno != diff_seqno_from {
441            return Err(format!(
442                "could not apply diff {} -> {} to state {}",
443                diff_seqno_from, diff_seqno_to, self.seqno
444            ));
445        }
446        self.seqno = diff_seqno_to;
447        self.walltime_ms = diff_walltime_ms;
448        force_apply_diffs_single(
449            &self.shard_id,
450            diff_seqno_to,
451            "hostname",
452            diff_hostname,
453            &mut self.hostname,
454            metrics,
455        )?;
456
457        // Deconstruct collections so we get a compile failure if new fields are
458        // added.
459        let StateCollections {
460            version,
461            last_gc_req,
462            rollups,
463            active_rollup,
464            active_gc,
465            leased_readers,
466            critical_readers,
467            writers,
468            schemas,
469            trace,
470        } = &mut self.collections;
471
472        *version = diff_applier_version;
473        apply_diffs_map("rollups", diff_rollups, rollups)?;
474        apply_diffs_single("last_gc_req", diff_last_gc_req, last_gc_req)?;
475        apply_diffs_single_option("active_rollup", diff_active_rollup, active_rollup)?;
476        apply_diffs_single_option("active_gc", diff_active_gc, active_gc)?;
477        apply_diffs_map("leased_readers", diff_leased_readers, leased_readers)?;
478        apply_diffs_map("critical_readers", diff_critical_readers, critical_readers)?;
479        apply_diffs_map("writers", diff_writers, writers)?;
480        apply_diffs_map("schemas", diff_schemas, schemas)?;
481
482        let structure_unchanged = diff_hollow_batches.is_empty()
483            && diff_spine_batches.is_empty()
484            && diff_merges.is_empty();
485        let spine_unchanged =
486            diff_since.is_empty() && diff_legacy_batches.is_empty() && structure_unchanged;
487
488        if spine_unchanged {
489            return Ok(());
490        }
491
492        let mut flat = if trace.roundtrip_structure {
493            metrics.state.apply_spine_flattened.inc();
494            let mut flat = trace.flatten();
495            apply_diffs_single("since", diff_since, &mut flat.since)?;
496            apply_diffs_map(
497                "legacy_batches",
498                diff_legacy_batches
499                    .into_iter()
500                    .map(|StateFieldDiff { key, val }| StateFieldDiff {
501                        key: Arc::new(key),
502                        val,
503                    }),
504                &mut flat.legacy_batches,
505            )?;
506            Some(flat)
507        } else {
508            for x in diff_since {
509                match x.val {
510                    Update(from, to) => {
511                        if trace.since() != &from {
512                            return Err(format!(
513                                "since update didn't match: {:?} vs {:?}",
514                                self.collections.trace.since(),
515                                &from
516                            ));
517                        }
518                        trace.downgrade_since(&to);
519                    }
520                    Insert(_) => return Err("cannot insert since field".to_string()),
521                    Delete(_) => return Err("cannot delete since field".to_string()),
522                }
523            }
524            if !diff_legacy_batches.is_empty() {
525                apply_diffs_spine(metrics, diff_legacy_batches, trace)?;
526                debug_assert_eq!(trace.validate(), Ok(()), "{:?}", trace);
527            }
528            None
529        };
530
531        if !structure_unchanged {
532            let flat = flat.get_or_insert_with(|| trace.flatten());
533            apply_diffs_map(
534                "hollow_batches",
535                diff_hollow_batches,
536                &mut flat.hollow_batches,
537            )?;
538            apply_diffs_map("spine_batches", diff_spine_batches, &mut flat.spine_batches)?;
539            apply_diffs_map("merges", diff_merges, &mut flat.merges)?;
540        }
541
542        if let Some(flat) = flat {
543            *trace = Trace::unflatten(flat)?;
544        }
545
546        // There's various sanity checks that this method could run (e.g. since,
547        // upper, seqno_since, etc don't regress or that diff.latest_rollup ==
548        // state.rollups.last()), are they a good idea? On one hand, I like
549        // sanity checks, other the other, one of the goals here is to keep
550        // apply logic as straightforward and unchanging as possible.
551        Ok(())
552    }
553}
554
555fn diff_field_single<T: PartialEq + Clone>(
556    from: &T,
557    to: &T,
558    diffs: &mut Vec<StateFieldDiff<(), T>>,
559) {
560    // This could use the `diff_field_sorted_iter(once(from), once(to), diffs)`
561    // general impl, but we just do the obvious thing.
562    if from != to {
563        diffs.push(StateFieldDiff {
564            key: (),
565            val: Update(from.clone(), to.clone()),
566        })
567    }
568}
569
570fn apply_diffs_single_option<X: PartialEq + Debug>(
571    name: &str,
572    diffs: Vec<StateFieldDiff<(), X>>,
573    single: &mut Option<X>,
574) -> Result<(), String> {
575    for diff in diffs {
576        apply_diff_single_option(name, diff, single)?;
577    }
578    Ok(())
579}
580
581fn apply_diff_single_option<X: PartialEq + Debug>(
582    name: &str,
583    diff: StateFieldDiff<(), X>,
584    single: &mut Option<X>,
585) -> Result<(), String> {
586    match diff.val {
587        Update(from, to) => {
588            if single.as_ref() != Some(&from) {
589                return Err(format!(
590                    "{} update didn't match: {:?} vs {:?}",
591                    name, single, &from
592                ));
593            }
594            *single = Some(to)
595        }
596        Insert(to) => {
597            if single.is_some() {
598                return Err(format!("{} insert found existing value", name));
599            }
600            *single = Some(to)
601        }
602        Delete(from) => {
603            if single.as_ref() != Some(&from) {
604                return Err(format!(
605                    "{} delete didn't match: {:?} vs {:?}",
606                    name, single, &from
607                ));
608            }
609            *single = None
610        }
611    }
612    Ok(())
613}
614
615fn apply_diffs_single<X: PartialEq + Debug>(
616    name: &str,
617    diffs: Vec<StateFieldDiff<(), X>>,
618    single: &mut X,
619) -> Result<(), String> {
620    for diff in diffs {
621        apply_diff_single(name, diff, single)?;
622    }
623    Ok(())
624}
625
626fn apply_diff_single<X: PartialEq + Debug>(
627    name: &str,
628    diff: StateFieldDiff<(), X>,
629    single: &mut X,
630) -> Result<(), String> {
631    match diff.val {
632        Update(from, to) => {
633            if single != &from {
634                return Err(format!(
635                    "{} update didn't match: {:?} vs {:?}",
636                    name, single, &from
637                ));
638            }
639            *single = to
640        }
641        Insert(_) => return Err(format!("cannot insert {} field", name)),
642        Delete(_) => return Err(format!("cannot delete {} field", name)),
643    }
644    Ok(())
645}
646
647// A hack to force apply a diff, making `single` equal to
648// the Update `to` value, ignoring a mismatch on `from`.
649// Used to migrate forward after writing down incorrect
650// diffs.
651//
652// TODO: delete this once `hostname` has zero mismatches
653fn force_apply_diffs_single<X: PartialEq + Debug>(
654    shard_id: &ShardId,
655    seqno: SeqNo,
656    name: &str,
657    diffs: Vec<StateFieldDiff<(), X>>,
658    single: &mut X,
659    metrics: &Metrics,
660) -> Result<(), String> {
661    for diff in diffs {
662        force_apply_diff_single(shard_id, seqno, name, diff, single, metrics)?;
663    }
664    Ok(())
665}
666
667fn force_apply_diff_single<X: PartialEq + Debug>(
668    shard_id: &ShardId,
669    seqno: SeqNo,
670    name: &str,
671    diff: StateFieldDiff<(), X>,
672    single: &mut X,
673    metrics: &Metrics,
674) -> Result<(), String> {
675    match diff.val {
676        Update(from, to) => {
677            if single != &from {
678                debug!(
679                    "{}: update didn't match: {:?} vs {:?}, continuing to force apply diff to {:?} for shard {} and seqno {}",
680                    name, single, &from, &to, shard_id, seqno
681                );
682                metrics.state.force_apply_hostname.inc();
683            }
684            *single = to
685        }
686        Insert(_) => return Err(format!("cannot insert {} field", name)),
687        Delete(_) => return Err(format!("cannot delete {} field", name)),
688    }
689    Ok(())
690}
691
692fn diff_field_sorted_iter<'a, K, V, IF, IT>(from: IF, to: IT, diffs: &mut Vec<StateFieldDiff<K, V>>)
693where
694    K: Ord + Clone + 'a,
695    V: PartialEq + Clone + 'a,
696    IF: IntoIterator<Item = (&'a K, &'a V)>,
697    IT: IntoIterator<Item = (&'a K, &'a V)>,
698{
699    let (mut from, mut to) = (from.into_iter(), to.into_iter());
700    let (mut f, mut t) = (from.next(), to.next());
701    loop {
702        match (f, t) {
703            (None, None) => break,
704            (Some((fk, fv)), Some((tk, tv))) => match fk.cmp(tk) {
705                Ordering::Less => {
706                    diffs.push(StateFieldDiff {
707                        key: fk.clone(),
708                        val: Delete(fv.clone()),
709                    });
710                    let f_next = from.next();
711                    debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
712                    f = f_next;
713                }
714                Ordering::Greater => {
715                    diffs.push(StateFieldDiff {
716                        key: tk.clone(),
717                        val: Insert(tv.clone()),
718                    });
719                    let t_next = to.next();
720                    debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
721                    t = t_next;
722                }
723                Ordering::Equal => {
724                    // TODO: regression test for this if, I missed it in the
725                    // original impl :)
726                    if fv != tv {
727                        diffs.push(StateFieldDiff {
728                            key: fk.clone(),
729                            val: Update(fv.clone(), tv.clone()),
730                        });
731                    }
732                    let f_next = from.next();
733                    debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
734                    f = f_next;
735                    let t_next = to.next();
736                    debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
737                    t = t_next;
738                }
739            },
740            (None, Some((tk, tv))) => {
741                diffs.push(StateFieldDiff {
742                    key: tk.clone(),
743                    val: Insert(tv.clone()),
744                });
745                let t_next = to.next();
746                debug_assert!(t_next.as_ref().map_or(true, |(tk_next, _)| tk_next > &tk));
747                t = t_next;
748            }
749            (Some((fk, fv)), None) => {
750                diffs.push(StateFieldDiff {
751                    key: fk.clone(),
752                    val: Delete(fv.clone()),
753                });
754                let f_next = from.next();
755                debug_assert!(f_next.as_ref().map_or(true, |(fk_next, _)| fk_next > &fk));
756                f = f_next;
757            }
758        }
759    }
760}
761
762fn apply_diffs_map<K: Ord, V: PartialEq + Debug>(
763    name: &str,
764    diffs: impl IntoIterator<Item = StateFieldDiff<K, V>>,
765    map: &mut BTreeMap<K, V>,
766) -> Result<(), String> {
767    for diff in diffs {
768        apply_diff_map(name, diff, map)?;
769    }
770    Ok(())
771}
772
773// This might leave state in an invalid (umm) state when returning an error. The
774// caller ultimately ends up panic'ing on error, but if that changes, we might
775// want to revisit this.
776fn apply_diff_map<K: Ord, V: PartialEq + Debug>(
777    name: &str,
778    diff: StateFieldDiff<K, V>,
779    map: &mut BTreeMap<K, V>,
780) -> Result<(), String> {
781    match diff.val {
782        Insert(to) => {
783            let prev = map.insert(diff.key, to);
784            if prev != None {
785                return Err(format!("{} insert found existing value: {:?}", name, prev));
786            }
787        }
788        Update(from, to) => {
789            let prev = map.insert(diff.key, to);
790            if prev.as_ref() != Some(&from) {
791                return Err(format!(
792                    "{} update didn't match: {:?} vs {:?}",
793                    name,
794                    prev,
795                    Some(from),
796                ));
797            }
798        }
799        Delete(from) => {
800            let prev = map.remove(&diff.key);
801            if prev.as_ref() != Some(&from) {
802                return Err(format!(
803                    "{} delete didn't match: {:?} vs {:?}",
804                    name,
805                    prev,
806                    Some(from),
807                ));
808            }
809        }
810    };
811    Ok(())
812}
813
814// This might leave state in an invalid (umm) state when returning an error. The
815// caller ultimately ends up panic'ing on error, but if that changes, we might
816// want to revisit this.
817fn apply_diffs_spine<T: Timestamp + Lattice + Codec64>(
818    metrics: &Metrics,
819    mut diffs: Vec<StateFieldDiff<HollowBatch<T>, ()>>,
820    trace: &mut Trace<T>,
821) -> Result<(), String> {
822    // Another special case: sniff out a newly inserted batch (one whose lower
823    // lines up with the current upper) and handle that now. Then fall through
824    // to the rest of the handling on whatever is left.
825    if let Some(insert) = sniff_insert(&mut diffs, trace.upper()) {
826        // Ignore merge_reqs because whichever process generated this diff is
827        // assigned the work.
828        let () = trace.push_batch_no_merge_reqs(insert);
829        // If this insert was the only thing in diffs, then return now instead
830        // of falling through to the "no diffs" case in the match so we can inc
831        // the apply_spine_fast_path metric.
832        if diffs.is_empty() {
833            metrics.state.apply_spine_fast_path.inc();
834            return Ok(());
835        }
836    }
837
838    match &diffs[..] {
839        // Fast-path: no diffs.
840        [] => return Ok(()),
841
842        // Fast-path: batch insert with both new and most recent batch empty.
843        // Spine will happily merge these empty batches together without a call
844        // out to compaction.
845        [
846            StateFieldDiff {
847                key: del,
848                val: StateFieldValDiff::Delete(()),
849            },
850            StateFieldDiff {
851                key: ins,
852                val: StateFieldValDiff::Insert(()),
853            },
854        ] => {
855            if del.is_empty()
856                && ins.is_empty()
857                && del.desc.lower() == ins.desc.lower()
858                && PartialOrder::less_than(del.desc.upper(), ins.desc.upper())
859            {
860                // Ignore merge_reqs because whichever process generated this diff is
861                // assigned the work.
862                let () = trace.push_batch_no_merge_reqs(HollowBatch::empty(Description::new(
863                    del.desc.upper().clone(),
864                    ins.desc.upper().clone(),
865                    // `keys.len() == 0` for both `del` and `ins` means we
866                    // don't have to think about what the compaction
867                    // frontier is for these batches (nothing in them, so nothing could have been compacted.
868                    Antichain::from_elem(T::minimum()),
869                )));
870                metrics.state.apply_spine_fast_path.inc();
871                return Ok(());
872            }
873        }
874        // Fall-through
875        _ => {}
876    }
877
878    // Fast-path: compaction
879    if let Some((_inputs, output)) = sniff_compaction(&diffs) {
880        let res = FueledMergeRes {
881            output,
882            input: CompactionInput::Legacy,
883            new_active_compaction: None,
884        };
885        // We can't predict how spine will arrange the batches when it's
886        // hydrated. This means that something that is maintaining a Spine
887        // starting at some seqno may not exactly match something else
888        // maintaining the same spine starting at a different seqno. (Plus,
889        // maybe these aren't even on the same version of the code and we've
890        // changed the spine logic.) Because apply_merge_res is strict,
891        // we're not _guaranteed_ that we can apply a compaction response
892        // that was generated elsewhere. Most of the time we can, though, so
893        // count the good ones and fall back to the slow path below when we
894        // can't.
895        if trace.apply_merge_res_unchecked(&res).applied() {
896            // Maybe return the replaced batches from apply_merge_res and verify
897            // that they match _inputs?
898            metrics.state.apply_spine_fast_path.inc();
899            return Ok(());
900        }
901
902        // Otherwise, try our lenient application of a compaction result.
903        let mut batches = Vec::new();
904        trace.map_batches(|b| batches.push(b.clone()));
905
906        match apply_compaction_lenient(metrics, batches, &res.output) {
907            Ok(batches) => {
908                let mut new_trace = Trace::default();
909                new_trace.roundtrip_structure = trace.roundtrip_structure;
910                new_trace.downgrade_since(trace.since());
911                for batch in batches {
912                    // Ignore merge_reqs because whichever process generated
913                    // this diff is assigned the work.
914                    let () = new_trace.push_batch_no_merge_reqs(batch.clone());
915                }
916                *trace = new_trace;
917                metrics.state.apply_spine_slow_path_lenient.inc();
918                return Ok(());
919            }
920            Err(err) => {
921                return Err(format!(
922                    "lenient compaction result apply unexpectedly failed: {}",
923                    err
924                ));
925            }
926        }
927    }
928
929    // Something complicated is going on, so reconstruct the Trace from scratch.
930    metrics.state.apply_spine_slow_path.inc();
931    debug!(
932        "apply_diffs_spine didn't hit a fast-path diffs={:?} trace={:?}",
933        diffs, trace
934    );
935
936    let batches = {
937        let mut batches = BTreeMap::new();
938        trace.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
939        apply_diffs_map("spine", diffs.clone(), &mut batches).map(|_ok| batches)
940    };
941
942    let batches = match batches {
943        Ok(batches) => batches,
944        Err(err) => {
945            metrics
946                .state
947                .apply_spine_slow_path_with_reconstruction
948                .inc();
949            debug!(
950                "apply_diffs_spines could not apply diffs directly to existing trace batches: {}. diffs={:?} trace={:?}",
951                err, diffs, trace
952            );
953            // if we couldn't apply our diffs directly to our trace's batches, we can
954            // try one more trick: reconstruct a new spine with our existing batches,
955            // in an attempt to create different merges than we currently have. then,
956            // we can try to apply our diffs on top of these new (potentially) merged
957            // batches.
958            let mut reconstructed_spine = Trace::default();
959            reconstructed_spine.roundtrip_structure = trace.roundtrip_structure;
960            trace.map_batches(|b| {
961                // Ignore merge_reqs because whichever process generated this
962                // diff is assigned the work.
963                let () = reconstructed_spine.push_batch_no_merge_reqs(b.clone());
964            });
965
966            let mut batches = BTreeMap::new();
967            reconstructed_spine.map_batches(|b| assert_none!(batches.insert(b.clone(), ())));
968            apply_diffs_map("spine", diffs, &mut batches)?;
969            batches
970        }
971    };
972
973    let mut new_trace = Trace::default();
974    new_trace.roundtrip_structure = trace.roundtrip_structure;
975    new_trace.downgrade_since(trace.since());
976    for (batch, ()) in batches {
977        // Ignore merge_reqs because whichever process generated this diff is
978        // assigned the work.
979        let () = new_trace.push_batch_no_merge_reqs(batch);
980    }
981    *trace = new_trace;
982    Ok(())
983}
984
985fn sniff_insert<T: Timestamp + Lattice>(
986    diffs: &mut Vec<StateFieldDiff<HollowBatch<T>, ()>>,
987    upper: &Antichain<T>,
988) -> Option<HollowBatch<T>> {
989    for idx in 0..diffs.len() {
990        match &diffs[idx] {
991            StateFieldDiff {
992                key,
993                val: StateFieldValDiff::Insert(()),
994            } if key.desc.lower() == upper => return Some(diffs.remove(idx).key),
995            _ => continue,
996        }
997    }
998    None
999}
1000
1001// TODO: Instead of trying to sniff out a compaction from diffs, should we just
1002// be explicit?
1003fn sniff_compaction<'a, T: Timestamp + Lattice>(
1004    diffs: &'a [StateFieldDiff<HollowBatch<T>, ()>],
1005) -> Option<(Vec<&'a HollowBatch<T>>, HollowBatch<T>)> {
1006    // Compaction always produces exactly one output batch (with possibly many
1007    // parts, but we get one Insert for the whole batch.
1008    let mut inserts = diffs.iter().flat_map(|x| match x.val {
1009        StateFieldValDiff::Insert(()) => Some(&x.key),
1010        _ => None,
1011    });
1012    let compaction_output = match inserts.next() {
1013        Some(x) => x,
1014        None => return None,
1015    };
1016    if let Some(_) = inserts.next() {
1017        return None;
1018    }
1019
1020    // Grab all deletes and sanity check that there are no updates.
1021    let mut compaction_inputs = Vec::with_capacity(diffs.len() - 1);
1022    for diff in diffs.iter() {
1023        match diff.val {
1024            StateFieldValDiff::Delete(()) => {
1025                compaction_inputs.push(&diff.key);
1026            }
1027            StateFieldValDiff::Insert(()) => {}
1028            StateFieldValDiff::Update((), ()) => {
1029                // Fall through to let the general case create the error
1030                // message.
1031                return None;
1032            }
1033        }
1034    }
1035
1036    Some((compaction_inputs, compaction_output.clone()))
1037}
1038
1039/// Apply a compaction diff that doesn't exactly line up with the set of
1040/// HollowBatches.
1041///
1042/// Because of the way Spine internally optimizes only _some_ empty batches
1043/// (immediately merges them in), we can end up in a situation where a
1044/// compaction res applied on another copy of state, but when we replay all of
1045/// the state diffs against a new Spine locally, it merges empty batches
1046/// differently in-mem and we can't exactly apply the compaction diff. Example:
1047///
1048/// - compact: [1,2),[2,3) -> [1,3)
1049/// - this spine: [0,2),[2,3) (0,1 is empty)
1050///
1051/// Ideally, we'd figure out a way to avoid this, but nothing immediately comes
1052/// to mind. In the meantime, force the application (otherwise the shard is
1053/// stuck and we can't do anything with it) by manually splitting the empty
1054/// batch back out. For the example above:
1055///
1056/// - [0,1),[1,3) (0,1 is empty)
1057///
1058/// This can only happen when the batch needing to be split is empty, so error
1059/// out if it isn't because that means something unexpected is going on.
1060fn apply_compaction_lenient<'a, T: Timestamp + Lattice>(
1061    metrics: &Metrics,
1062    mut trace: Vec<HollowBatch<T>>,
1063    replacement: &'a HollowBatch<T>,
1064) -> Result<Vec<HollowBatch<T>>, String> {
1065    let mut overlapping_batches = Vec::new();
1066    trace.retain(|b| {
1067        let before_replacement = PartialOrder::less_equal(b.desc.upper(), replacement.desc.lower());
1068        let after_replacement = PartialOrder::less_equal(replacement.desc.upper(), b.desc.lower());
1069        let overlaps_replacement = !(before_replacement || after_replacement);
1070        if overlaps_replacement {
1071            overlapping_batches.push(b.clone());
1072            false
1073        } else {
1074            true
1075        }
1076    });
1077
1078    {
1079        let first_overlapping_batch = match overlapping_batches.first() {
1080            Some(x) => x,
1081            None => return Err("replacement didn't overlap any batches".into()),
1082        };
1083        if PartialOrder::less_than(
1084            first_overlapping_batch.desc.lower(),
1085            replacement.desc.lower(),
1086        ) {
1087            if first_overlapping_batch.len > 0 {
1088                return Err(format!(
1089                    "overlapping batch was unexpectedly non-empty: {:?}",
1090                    first_overlapping_batch
1091                ));
1092            }
1093            let desc = Description::new(
1094                first_overlapping_batch.desc.lower().clone(),
1095                replacement.desc.lower().clone(),
1096                first_overlapping_batch.desc.since().clone(),
1097            );
1098            trace.push(HollowBatch::empty(desc));
1099            metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1100        }
1101    }
1102
1103    {
1104        let last_overlapping_batch = match overlapping_batches.last() {
1105            Some(x) => x,
1106            None => return Err("replacement didn't overlap any batches".into()),
1107        };
1108        if PartialOrder::less_than(
1109            replacement.desc.upper(),
1110            last_overlapping_batch.desc.upper(),
1111        ) {
1112            if last_overlapping_batch.len > 0 {
1113                return Err(format!(
1114                    "overlapping batch was unexpectedly non-empty: {:?}",
1115                    last_overlapping_batch
1116                ));
1117            }
1118            let desc = Description::new(
1119                replacement.desc.upper().clone(),
1120                last_overlapping_batch.desc.upper().clone(),
1121                last_overlapping_batch.desc.since().clone(),
1122            );
1123            trace.push(HollowBatch::empty(desc));
1124            metrics.state.apply_spine_slow_path_lenient_adjustment.inc();
1125        }
1126    }
1127    trace.push(replacement.clone());
1128
1129    // We just inserted stuff at the end, so re-sort them into place.
1130    trace.sort_by(|a, b| a.desc.lower().elements().cmp(b.desc.lower().elements()));
1131
1132    // This impl is a touch complex, so sanity check our work.
1133    let mut expected_lower = &Antichain::from_elem(T::minimum());
1134    for b in trace.iter() {
1135        if b.desc.lower() != expected_lower {
1136            return Err(format!(
1137                "lower {:?} did not match expected {:?}: {:?}",
1138                b.desc.lower(),
1139                expected_lower,
1140                trace
1141            ));
1142        }
1143        expected_lower = b.desc.upper();
1144    }
1145    Ok(trace)
1146}
1147
1148/// A type that facilitates the proto encoding of a [`ProtoStateFieldDiffs`]
1149///
1150/// [`ProtoStateFieldDiffs`] is a columnar encoding of [`StateFieldDiff`]s, see
1151/// its doc comment for more info. The underlying buffer for a [`ProtoStateFieldDiffs`]
1152/// is a [`Bytes`] struct, which is an immutable, shared, reference counted,
1153/// buffer of data. Using a [`Bytes`] struct is a very efficient way to manage data
1154/// becuase multiple [`Bytes`] can reference different parts of the same underlying
1155/// portion of memory. See its doc comment for more info.
1156///
1157/// A [`ProtoStateFieldDiffsWriter`] maintains a mutable, unique, data buffer, i.e.
1158/// a [`BytesMut`], which we use when encoding a [`StateFieldDiff`]. And when
1159/// finished encoding, we convert it into a [`ProtoStateFieldDiffs`] by "freezing" the
1160/// underlying buffer, converting it into a [`Bytes`] struct, so it can be shared.
1161///
1162/// [`Bytes`]: bytes::Bytes
1163#[derive(Debug)]
1164pub struct ProtoStateFieldDiffsWriter {
1165    data_buf: BytesMut,
1166    proto: ProtoStateFieldDiffs,
1167}
1168
1169impl ProtoStateFieldDiffsWriter {
1170    /// Record a [`ProtoStateField`] for our columnar encoding.
1171    pub fn push_field(&mut self, field: ProtoStateField) {
1172        self.proto.fields.push(i32::from(field));
1173    }
1174
1175    /// Record a [`ProtoStateFieldDiffType`] for our columnar encoding.
1176    pub fn push_diff_type(&mut self, diff_type: ProtoStateFieldDiffType) {
1177        self.proto.diff_types.push(i32::from(diff_type));
1178    }
1179
1180    /// Encode a message for our columnar encoding.
1181    pub fn encode_proto<M: prost::Message>(&mut self, msg: &M) {
1182        let len_before = self.data_buf.len();
1183        self.data_buf.reserve(msg.encoded_len());
1184
1185        // Note: we use `encode_raw` as opposed to `encode` because all `encode` does is
1186        // check to make sure there's enough bytes in the buffer to fit our message
1187        // which we know there are because we just reserved the space. When benchmarking
1188        // `encode_raw` does offer a slight performance improvement over `encode`.
1189        msg.encode_raw(&mut self.data_buf);
1190
1191        // Record exactly how many bytes were written.
1192        let written_len = self.data_buf.len() - len_before;
1193        self.proto.data_lens.push(u64::cast_from(written_len));
1194    }
1195
1196    pub fn into_proto(self) -> ProtoStateFieldDiffs {
1197        let ProtoStateFieldDiffsWriter {
1198            data_buf,
1199            mut proto,
1200        } = self;
1201
1202        // Assert we didn't write into the proto's data_bytes field
1203        assert!(proto.data_bytes.is_empty());
1204
1205        // Move our buffer into the proto
1206        let data_bytes = data_buf.freeze();
1207        proto.data_bytes = data_bytes;
1208
1209        proto
1210    }
1211}
1212
1213impl ProtoStateFieldDiffs {
1214    pub fn into_writer(mut self) -> ProtoStateFieldDiffsWriter {
1215        // Create a new buffer which we'll encode data into.
1216        let mut data_buf = BytesMut::with_capacity(self.data_bytes.len());
1217
1218        // Take our existing data, and copy it into our buffer.
1219        let existing_data = std::mem::take(&mut self.data_bytes);
1220        data_buf.extend(existing_data);
1221
1222        ProtoStateFieldDiffsWriter {
1223            data_buf,
1224            proto: self,
1225        }
1226    }
1227
1228    pub fn iter<'a>(&'a self) -> ProtoStateFieldDiffsIter<'a> {
1229        let len = self.fields.len();
1230        assert_eq!(self.diff_types.len(), len);
1231
1232        ProtoStateFieldDiffsIter {
1233            len,
1234            diff_idx: 0,
1235            data_idx: 0,
1236            data_offset: 0,
1237            diffs: self,
1238        }
1239    }
1240
1241    pub fn validate(&self) -> Result<(), String> {
1242        if self.fields.len() != self.diff_types.len() {
1243            return Err(format!(
1244                "fields {} and diff_types {} lengths disagree",
1245                self.fields.len(),
1246                self.diff_types.len()
1247            ));
1248        }
1249
1250        let mut expected_data_slices = 0;
1251        for diff_type in self.diff_types.iter() {
1252            // We expect one for the key.
1253            expected_data_slices += 1;
1254            // And 1 or 2 for val depending on the diff type.
1255            match ProtoStateFieldDiffType::try_from(*diff_type) {
1256                Ok(ProtoStateFieldDiffType::Insert) => expected_data_slices += 1,
1257                Ok(ProtoStateFieldDiffType::Update) => expected_data_slices += 2,
1258                Ok(ProtoStateFieldDiffType::Delete) => expected_data_slices += 1,
1259                Err(_) => return Err(format!("unknown diff_type {}", diff_type)),
1260            }
1261        }
1262        if expected_data_slices != self.data_lens.len() {
1263            return Err(format!(
1264                "expected {} data slices got {}",
1265                expected_data_slices,
1266                self.data_lens.len()
1267            ));
1268        }
1269
1270        let expected_data_bytes = usize::cast_from(self.data_lens.iter().copied().sum::<u64>());
1271        if expected_data_bytes != self.data_bytes.len() {
1272            return Err(format!(
1273                "expected {} data bytes got {}",
1274                expected_data_bytes,
1275                self.data_bytes.len()
1276            ));
1277        }
1278
1279        Ok(())
1280    }
1281}
1282
1283#[derive(Debug)]
1284pub struct ProtoStateFieldDiff<'a> {
1285    pub key: &'a [u8],
1286    pub diff_type: ProtoStateFieldDiffType,
1287    pub from: &'a [u8],
1288    pub to: &'a [u8],
1289}
1290
1291pub struct ProtoStateFieldDiffsIter<'a> {
1292    len: usize,
1293    diff_idx: usize,
1294    data_idx: usize,
1295    data_offset: usize,
1296    diffs: &'a ProtoStateFieldDiffs,
1297}
1298
1299impl<'a> Iterator for ProtoStateFieldDiffsIter<'a> {
1300    type Item = Result<(ProtoStateField, ProtoStateFieldDiff<'a>), TryFromProtoError>;
1301
1302    fn next(&mut self) -> Option<Self::Item> {
1303        if self.diff_idx >= self.len {
1304            return None;
1305        }
1306        let mut next_data = || {
1307            let start = self.data_offset;
1308            let end = start + usize::cast_from(self.diffs.data_lens[self.data_idx]);
1309            let data = &self.diffs.data_bytes[start..end];
1310            self.data_idx += 1;
1311            self.data_offset = end;
1312            data
1313        };
1314        let field = match ProtoStateField::try_from(self.diffs.fields[self.diff_idx]) {
1315            Ok(x) => x,
1316            Err(_) => {
1317                return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1318                    "ProtoStateField({})",
1319                    self.diffs.fields[self.diff_idx]
1320                ))));
1321            }
1322        };
1323        let diff_type =
1324            match ProtoStateFieldDiffType::try_from(self.diffs.diff_types[self.diff_idx]) {
1325                Ok(x) => x,
1326                Err(_) => {
1327                    return Some(Err(TryFromProtoError::unknown_enum_variant(format!(
1328                        "ProtoStateFieldDiffType({})",
1329                        self.diffs.diff_types[self.diff_idx]
1330                    ))));
1331                }
1332            };
1333        let key = next_data();
1334        let (from, to): (&[u8], &[u8]) = match diff_type {
1335            ProtoStateFieldDiffType::Insert => (&[], next_data()),
1336            ProtoStateFieldDiffType::Update => (next_data(), next_data()),
1337            ProtoStateFieldDiffType::Delete => (next_data(), &[]),
1338        };
1339        let diff = ProtoStateFieldDiff {
1340            key,
1341            diff_type,
1342            from,
1343            to,
1344        };
1345        self.diff_idx += 1;
1346        Some(Ok((field, diff)))
1347    }
1348}
1349
1350#[cfg(test)]
1351mod tests {
1352    use semver::Version;
1353    use std::ops::ControlFlow::Continue;
1354
1355    use crate::internal::paths::{PartId, PartialBatchKey, RollupId, WriterKey};
1356    use mz_ore::metrics::MetricsRegistry;
1357
1358    use crate::ShardId;
1359    use crate::internal::state::TypedState;
1360
1361    use super::*;
1362
1363    /// Model a situation where a "leader" is constantly making changes to its state, and a "follower"
1364    /// is applying those changes as diffs.
1365    #[mz_ore::test]
1366    #[cfg_attr(miri, ignore)] // too slow
1367    fn test_state_sync() {
1368        use proptest::prelude::*;
1369
1370        #[derive(Debug, Clone)]
1371        enum Action {
1372            /// Append a (non)empty batch to the shard that covers the given length of time.
1373            Append { empty: bool, time_delta: u64 },
1374            /// Apply the Nth compaction request we've received to the shard state.
1375            Compact { req: usize },
1376        }
1377
1378        let action_gen: BoxedStrategy<Action> = {
1379            prop::strategy::Union::new([
1380                (any::<bool>(), 1u64..10u64)
1381                    .prop_map(|(empty, time_delta)| Action::Append { empty, time_delta })
1382                    .boxed(),
1383                (0usize..10usize)
1384                    .prop_map(|req| Action::Compact { req })
1385                    .boxed(),
1386            ])
1387            .boxed()
1388        };
1389
1390        fn run(actions: Vec<(Action, bool)>, metrics: &Metrics) {
1391            let version = Version::new(0, 100, 0);
1392            let writer_key = WriterKey::Version(version.to_string());
1393            let id = ShardId::new();
1394            let hostname = "computer";
1395            let typed: TypedState<String, (), u64, i64> =
1396                TypedState::new(version, id, hostname.to_string(), 0);
1397            let mut leader = typed.state;
1398
1399            let seqno = SeqNo::minimum();
1400            let mut lower = 0u64;
1401            let mut merge_reqs = vec![];
1402
1403            leader.collections.rollups.insert(
1404                seqno,
1405                HollowRollup {
1406                    key: PartialRollupKey::new(seqno, &RollupId::new()),
1407                    encoded_size_bytes: None,
1408                },
1409            );
1410            leader.collections.trace.roundtrip_structure = false;
1411            let mut follower = leader.clone();
1412
1413            for (action, roundtrip_structure) in actions {
1414                // Apply the given action and the new roundtrip_structure setting and take a diff.
1415                let mut old_leader = leader.clone();
1416                match action {
1417                    Action::Append { empty, time_delta } => {
1418                        let upper = lower + time_delta;
1419                        let key = if empty {
1420                            None
1421                        } else {
1422                            let id = PartId::new();
1423                            Some(PartialBatchKey::new(&writer_key, &id))
1424                        };
1425
1426                        let keys = key.as_ref().map(|k| k.0.as_str());
1427                        let reqs = leader.collections.trace.push_batch(
1428                            crate::internal::state::tests::hollow(
1429                                lower,
1430                                upper,
1431                                keys.as_slice(),
1432                                if empty { 0 } else { 1 },
1433                            ),
1434                        );
1435                        merge_reqs.extend(reqs);
1436                        lower = upper;
1437                    }
1438                    Action::Compact { req } => {
1439                        if !merge_reqs.is_empty() {
1440                            let req = merge_reqs.remove(req.min(merge_reqs.len() - 1));
1441                            let len = req.inputs.iter().map(|p| p.batch.len).sum();
1442                            let parts = req
1443                                .inputs
1444                                .into_iter()
1445                                .flat_map(|p| p.batch.parts.clone())
1446                                .collect();
1447                            let output = HollowBatch::new_run(req.desc, parts, len);
1448                            leader
1449                                .collections
1450                                .trace
1451                                .apply_merge_res_unchecked(&FueledMergeRes {
1452                                    output,
1453                                    input: CompactionInput::Legacy,
1454                                    new_active_compaction: None,
1455                                });
1456                        }
1457                    }
1458                }
1459                leader.collections.trace.roundtrip_structure = roundtrip_structure;
1460                leader.seqno.0 += 1;
1461                let diff = StateDiff::from_diff(&old_leader, &leader);
1462
1463                // Validate that the diff applies to both the previous state (also checked in
1464                // debug asserts) and our follower that's only synchronized via diffs.
1465                old_leader
1466                    .apply_diff(metrics, diff.clone())
1467                    .expect("diff applies to the old version of the leader state");
1468                follower
1469                    .apply_diff(metrics, diff.clone())
1470                    .expect("diff applies to the synced version of the follower state");
1471
1472                // TODO: once spine structure is roundtripped through diffs, assert that the follower
1473                // has the same batches etc. as the leader does.
1474            }
1475        }
1476
1477        let config = PersistConfig::new_for_tests();
1478        let metrics_registry = MetricsRegistry::new();
1479        let metrics: Metrics = Metrics::new(&config, &metrics_registry);
1480
1481        proptest!(|(actions in prop::collection::vec((action_gen, any::<bool>()), 1..20))| {
1482            run(actions, &metrics)
1483        })
1484    }
1485
1486    // Regression test for the apply_diffs_spine special case that sniffs out an
1487    // insert, applies it, and then lets the remaining diffs (if any) fall
1488    // through to the rest of the code. See database-issues#4431.
1489    #[mz_ore::test]
1490    fn regression_15493_sniff_insert() {
1491        fn hb(lower: u64, upper: u64, len: usize) -> HollowBatch<u64> {
1492            HollowBatch::new_run(
1493                Description::new(
1494                    Antichain::from_elem(lower),
1495                    Antichain::from_elem(upper),
1496                    Antichain::from_elem(0),
1497                ),
1498                Vec::new(),
1499                len,
1500            )
1501        }
1502
1503        // The bug handled here is essentially a set of batches that look like
1504        // the pattern matched by `apply_lenient` _plus_ an insert. In
1505        // apply_diffs_spine, we use `sniff_insert` to steal the insert out of
1506        // the diffs and fall back to the rest of the logic to handle the
1507        // remaining diffs.
1508        //
1509        // Concretely, something like (the numbers are truncated versions of the
1510        // actual bug posted in the issue):
1511        // - spine: [0][7094664]0, [7094664][7185234]100
1512        // - diffs: [0][6805359]0 del, [6805359][7083793]0 del, [0][7083793]0 ins,
1513        //   [7185234][7185859]20 ins
1514        //
1515        // Where this allows us to handle the [7185234,7185859) and then
1516        // apply_lenient handles splitting up [0,7094664) so we can apply the
1517        // [0,6805359)+[6805359,7083793)->[0,7083793) swap.
1518
1519        let batches_before = [hb(0, 7094664, 0), hb(7094664, 7185234, 100)];
1520
1521        let diffs = vec![
1522            StateFieldDiff {
1523                key: hb(0, 6805359, 0),
1524                val: StateFieldValDiff::Delete(()),
1525            },
1526            StateFieldDiff {
1527                key: hb(6805359, 7083793, 0),
1528                val: StateFieldValDiff::Delete(()),
1529            },
1530            StateFieldDiff {
1531                key: hb(0, 7083793, 0),
1532                val: StateFieldValDiff::Insert(()),
1533            },
1534            StateFieldDiff {
1535                key: hb(7185234, 7185859, 20),
1536                val: StateFieldValDiff::Insert(()),
1537            },
1538        ];
1539
1540        // Ideally this first batch would be [0][7083793], [7083793,7094664]
1541        // here because `apply_lenient` splits it out, but when `apply_lenient`
1542        // reconstructs the trace, Spine happens to (deterministically) collapse
1543        // them back together. The main value of this test is that the
1544        // `apply_diffs_spine` call below doesn't return an Err, so don't worry
1545        // too much about this, it's just a sanity check.
1546        let batches_after = vec![
1547            hb(0, 7094664, 0),
1548            hb(7094664, 7185234, 100),
1549            hb(7185234, 7185859, 20),
1550        ];
1551
1552        let cfg = PersistConfig::new_for_tests();
1553        let state = TypedState::<(), (), u64, i64>::new(
1554            cfg.build_version.clone(),
1555            ShardId::new(),
1556            cfg.hostname.clone(),
1557            (cfg.now)(),
1558        );
1559        let state = state.clone_apply(&cfg, &mut |_seqno, _cfg, state| {
1560            for b in batches_before.iter() {
1561                let _merge_reqs = state.trace.push_batch(b.clone());
1562            }
1563            Continue::<(), ()>(())
1564        });
1565        let mut state = match state {
1566            Continue((_, x)) => x,
1567            _ => unreachable!(),
1568        };
1569
1570        let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1571        assert_eq!(
1572            apply_diffs_spine(&metrics, diffs, &mut state.collections.trace),
1573            Ok(())
1574        );
1575
1576        let mut actual = Vec::new();
1577        state
1578            .collections
1579            .trace
1580            .map_batches(|b| actual.push(b.clone()));
1581        assert_eq!(actual, batches_after);
1582    }
1583
1584    #[mz_ore::test]
1585    #[cfg_attr(miri, ignore)] // too slow
1586    fn apply_lenient() {
1587        #[track_caller]
1588        fn testcase(
1589            replacement: (u64, u64, u64, usize),
1590            spine: &[(u64, u64, u64, usize)],
1591            expected: Result<&[(u64, u64, u64, usize)], &str>,
1592        ) {
1593            fn batch(x: &(u64, u64, u64, usize)) -> HollowBatch<u64> {
1594                let (lower, upper, since, len) = x;
1595                let desc = Description::new(
1596                    Antichain::from_elem(*lower),
1597                    Antichain::from_elem(*upper),
1598                    Antichain::from_elem(*since),
1599                );
1600                HollowBatch::new_run(desc, Vec::new(), *len)
1601            }
1602            let replacement = batch(&replacement);
1603            let batches = spine.iter().map(batch).collect::<Vec<_>>();
1604
1605            let metrics = Metrics::new(&PersistConfig::new_for_tests(), &MetricsRegistry::new());
1606            let actual = apply_compaction_lenient(&metrics, batches, &replacement);
1607            let expected = match expected {
1608                Ok(batches) => Ok(batches.iter().map(batch).collect::<Vec<_>>()),
1609                Err(err) => Err(err.to_owned()),
1610            };
1611            assert_eq!(actual, expected);
1612        }
1613
1614        // Exact swap of N batches
1615        testcase(
1616            (0, 3, 0, 100),
1617            &[(0, 1, 0, 0), (1, 2, 0, 0), (2, 3, 0, 0)],
1618            Ok(&[(0, 3, 0, 100)]),
1619        );
1620
1621        // Swap out the middle of a batch
1622        testcase(
1623            (1, 2, 0, 100),
1624            &[(0, 3, 0, 0)],
1625            Ok(&[(0, 1, 0, 0), (1, 2, 0, 100), (2, 3, 0, 0)]),
1626        );
1627
1628        // Split batch at replacement lower
1629        testcase(
1630            (2, 4, 0, 100),
1631            &[(0, 3, 0, 0), (3, 4, 0, 0)],
1632            Ok(&[(0, 2, 0, 0), (2, 4, 0, 100)]),
1633        );
1634
1635        // Err: split batch at replacement lower not empty
1636        testcase(
1637            (2, 4, 0, 100),
1638            &[(0, 3, 0, 1), (3, 4, 0, 0)],
1639            Err(
1640                "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([0], [3], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1641            ),
1642        );
1643
1644        // Split batch at replacement lower (untouched batch before the split one)
1645        testcase(
1646            (2, 4, 0, 100),
1647            &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1648            Ok(&[(0, 1, 0, 0), (1, 2, 0, 0), (2, 4, 0, 100)]),
1649        );
1650
1651        // Split batch at replacement lower (since is preserved)
1652        testcase(
1653            (2, 4, 0, 100),
1654            &[(0, 3, 200, 0), (3, 4, 0, 0)],
1655            Ok(&[(0, 2, 200, 0), (2, 4, 0, 100)]),
1656        );
1657
1658        // Split batch at replacement upper
1659        testcase(
1660            (0, 2, 0, 100),
1661            &[(0, 1, 0, 0), (1, 4, 0, 0)],
1662            Ok(&[(0, 2, 0, 100), (2, 4, 0, 0)]),
1663        );
1664
1665        // Err: split batch at replacement upper not empty
1666        testcase(
1667            (0, 2, 0, 100),
1668            &[(0, 1, 0, 0), (1, 4, 0, 1)],
1669            Err(
1670                "overlapping batch was unexpectedly non-empty: HollowBatch { desc: ([1], [4], [0]), parts: [], len: 1, runs: [], run_meta: [] }",
1671            ),
1672        );
1673
1674        // Split batch at replacement upper (untouched batch after the split one)
1675        testcase(
1676            (0, 2, 0, 100),
1677            &[(0, 1, 0, 0), (1, 3, 0, 0), (3, 4, 0, 0)],
1678            Ok(&[(0, 2, 0, 100), (2, 3, 0, 0), (3, 4, 0, 0)]),
1679        );
1680
1681        // Split batch at replacement upper (since is preserved)
1682        testcase(
1683            (0, 2, 0, 100),
1684            &[(0, 1, 0, 0), (1, 4, 200, 0)],
1685            Ok(&[(0, 2, 0, 100), (2, 4, 200, 0)]),
1686        );
1687
1688        // Split batch at replacement lower and upper
1689        testcase(
1690            (2, 6, 0, 100),
1691            &[(0, 3, 0, 0), (3, 5, 0, 0), (5, 8, 0, 0)],
1692            Ok(&[(0, 2, 0, 0), (2, 6, 0, 100), (6, 8, 0, 0)]),
1693        );
1694
1695        // Replacement doesn't overlap (after)
1696        testcase(
1697            (2, 3, 0, 100),
1698            &[(0, 1, 0, 0)],
1699            Err("replacement didn't overlap any batches"),
1700        );
1701
1702        // Replacement doesn't overlap (before, though this would never happen in practice)
1703        testcase(
1704            (2, 3, 0, 100),
1705            &[(4, 5, 0, 0)],
1706            Err("replacement didn't overlap any batches"),
1707        );
1708    }
1709}