mz_persist_client/internal/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44    ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45    HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46    LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47    ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48    ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49    ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50    ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51    ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52    ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53    RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54    proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57    ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60    ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65/// A key and value `Schema` of data written to a batch or shard.
66#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68    // TODO: Remove the Option once this finishes rolling out and all shards
69    // have a registered schema.
70    /// Id under which this schema is registered in the shard's schema registry,
71    /// if any.
72    pub id: Option<SchemaId>,
73    /// Key `Schema`.
74    pub key: Arc<K::Schema>,
75    /// Value `Schema`.
76    pub val: Arc<V::Schema>,
77}
78
79impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
80    fn clone(&self) -> Self {
81        Self {
82            id: self.id,
83            key: Arc::clone(&self.key),
84            val: Arc::clone(&self.val),
85        }
86    }
87}
88
89/// A proto that is decoded on use.
90///
91/// Because of the way prost works, decoding a large protobuf may result in a
92/// number of very short lived allocations in our RustType/ProtoType decode path
93/// (e.g. this happens for a repeated embedded message). Not every use of
94/// persist State needs every transitive bit of it to be decoded, so we opt
95/// certain parts of it (initially stats) to be decoded on use.
96///
97/// This has the dual benefit of only paying for the short-lived allocs when
98/// necessary and also allowing decoding to be gated by a feature flag. The
99/// tradeoffs are that we might decode more than once and that we have to handle
100/// invalid proto errors in more places.
101///
102/// Mechanically, this is accomplished by making the field a proto `bytes` types
103/// instead of `ProtoFoo`. These bytes then contain the serialization of
104/// ProtoFoo. NB: Swapping between the two is actually a forward and backward
105/// compatible change.
106///
107/// > Embedded messages are compatible with bytes if the bytes contain an
108/// > encoded version of the message.
109///
110/// (See <https://protobuf.dev/programming-guides/proto3/#updating>)
111#[derive(Clone, Serialize, Deserialize)]
112pub struct LazyProto<T> {
113    buf: Bytes,
114    _phantom: PhantomData<fn() -> T>,
115}
116
117impl<T: Message + Default> Debug for LazyProto<T> {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        match self.decode() {
120            Ok(proto) => Debug::fmt(&proto, f),
121            Err(err) => f
122                .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
123                .field("err", &err)
124                .finish(),
125        }
126    }
127}
128
129impl<T> PartialEq for LazyProto<T> {
130    fn eq(&self, other: &Self) -> bool {
131        self.cmp(other) == Ordering::Equal
132    }
133}
134
135impl<T> Eq for LazyProto<T> {}
136
137impl<T> PartialOrd for LazyProto<T> {
138    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139        Some(self.cmp(other))
140    }
141}
142
143impl<T> Ord for LazyProto<T> {
144    fn cmp(&self, other: &Self) -> Ordering {
145        let LazyProto {
146            buf: self_buf,
147            _phantom: _,
148        } = self;
149        let LazyProto {
150            buf: other_buf,
151            _phantom: _,
152        } = other;
153        self_buf.cmp(other_buf)
154    }
155}
156
157impl<T> Hash for LazyProto<T> {
158    fn hash<H: Hasher>(&self, state: &mut H) {
159        let LazyProto { buf, _phantom } = self;
160        buf.hash(state);
161    }
162}
163
164impl<T: Message + Default> From<&T> for LazyProto<T> {
165    fn from(value: &T) -> Self {
166        let buf = Bytes::from(value.encode_to_vec());
167        LazyProto {
168            buf,
169            _phantom: PhantomData,
170        }
171    }
172}
173
174impl<T: Message + Default> LazyProto<T> {
175    pub fn decode(&self) -> Result<T, prost::DecodeError> {
176        T::decode(&*self.buf)
177    }
178
179    pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
180        Ok(T::decode(&*self.buf)?.into_rust()?)
181    }
182}
183
184impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
185    fn into_proto(&self) -> Bytes {
186        self.buf.clone()
187    }
188
189    fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
190        Ok(Self {
191            buf,
192            _phantom: PhantomData,
193        })
194    }
195}
196
197pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
198    let uuid_encoded = match encoded.strip_prefix(id_prefix) {
199        Some(x) => x,
200        None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
201    };
202    let uuid = Uuid::parse_str(uuid_encoded)
203        .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
204    Ok(*uuid.as_bytes())
205}
206
207pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
208    if let Err(msg) = cfg::check_data_version(code_version, data_version) {
209        // We can't catch halts, so panic in test, so we can get unit test
210        // coverage.
211        if cfg!(test) {
212            panic!("{msg}");
213        } else {
214            halt!("{msg}");
215        }
216    }
217}
218
219impl RustType<String> for LeasedReaderId {
220    fn into_proto(&self) -> String {
221        self.to_string()
222    }
223
224    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
225        match proto.parse() {
226            Ok(x) => Ok(x),
227            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
228        }
229    }
230}
231
232impl RustType<String> for CriticalReaderId {
233    fn into_proto(&self) -> String {
234        self.to_string()
235    }
236
237    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
238        match proto.parse() {
239            Ok(x) => Ok(x),
240            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
241        }
242    }
243}
244
245impl RustType<String> for WriterId {
246    fn into_proto(&self) -> String {
247        self.to_string()
248    }
249
250    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
251        match proto.parse() {
252            Ok(x) => Ok(x),
253            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
254        }
255    }
256}
257
258impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
259    fn into_proto(&self) -> ProtoEncodedSchemas {
260        ProtoEncodedSchemas {
261            key: self.key.clone(),
262            key_data_type: self.key_data_type.clone(),
263            val: self.val.clone(),
264            val_data_type: self.val_data_type.clone(),
265        }
266    }
267
268    fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
269        Ok(EncodedSchemas {
270            key: proto.key,
271            key_data_type: proto.key_data_type,
272            val: proto.val,
273            val_data_type: proto.val_data_type,
274        })
275    }
276}
277
278impl RustType<String> for IdempotencyToken {
279    fn into_proto(&self) -> String {
280        self.to_string()
281    }
282
283    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
284        match proto.parse() {
285            Ok(x) => Ok(x),
286            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
287        }
288    }
289}
290
291impl RustType<String> for PartialBatchKey {
292    fn into_proto(&self) -> String {
293        self.0.clone()
294    }
295
296    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
297        Ok(PartialBatchKey(proto))
298    }
299}
300
301impl RustType<String> for PartialRollupKey {
302    fn into_proto(&self) -> String {
303        self.0.clone()
304    }
305
306    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
307        Ok(PartialRollupKey(proto))
308    }
309}
310
311impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
312    pub fn encode<B>(&self, buf: &mut B)
313    where
314        B: bytes::BufMut,
315    {
316        self.into_proto()
317            .encode(buf)
318            .expect("no required fields means no initialization errors");
319    }
320
321    pub fn decode(build_version: &Version, buf: Bytes) -> Self {
322        let proto = ProtoStateDiff::decode(buf)
323            // We received a State that we couldn't decode. This could happen if
324            // persist messes up backward/forward compatibility, if the durable
325            // data was corrupted, or if operations messes up deployment. In any
326            // case, fail loudly.
327            .expect("internal error: invalid encoded state");
328        let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
329        check_data_version(build_version, &diff.applier_version);
330        diff
331    }
332}
333
334impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
335    fn into_proto(&self) -> ProtoStateDiff {
336        // Deconstruct self so we get a compile failure if new fields are added.
337        let StateDiff {
338            applier_version,
339            seqno_from,
340            seqno_to,
341            walltime_ms,
342            latest_rollup_key,
343            rollups,
344            active_rollup,
345            active_gc,
346            hostname,
347            last_gc_req,
348            leased_readers,
349            critical_readers,
350            writers,
351            schemas,
352            since,
353            legacy_batches,
354            hollow_batches,
355            spine_batches,
356            merges,
357        } = self;
358
359        let proto = ProtoStateFieldDiffs::default();
360
361        // Create a writer so we can efficiently encode our data.
362        let mut writer = proto.into_writer();
363
364        field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
365        field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
366        field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
367        field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
368        field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
369        field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
370        field_diffs_into_proto(
371            ProtoStateField::CriticalReaders,
372            critical_readers,
373            &mut writer,
374        );
375        field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
376        field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
377        field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
378        field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
379        field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
380        field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
381        field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
382
383        // After encoding all of our data, convert back into the proto.
384        let field_diffs = writer.into_proto();
385
386        debug_assert_eq!(field_diffs.validate(), Ok(()));
387        ProtoStateDiff {
388            applier_version: applier_version.to_string(),
389            seqno_from: seqno_from.into_proto(),
390            seqno_to: seqno_to.into_proto(),
391            walltime_ms: walltime_ms.into_proto(),
392            latest_rollup_key: latest_rollup_key.into_proto(),
393            field_diffs: Some(field_diffs),
394        }
395    }
396
397    fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
398        let applier_version = if proto.applier_version.is_empty() {
399            // Backward compatibility with versions of ProtoState before we set
400            // this field: if it's missing (empty), assume an infinitely old
401            // version.
402            semver::Version::new(0, 0, 0)
403        } else {
404            semver::Version::parse(&proto.applier_version).map_err(|err| {
405                TryFromProtoError::InvalidSemverVersion(format!(
406                    "invalid applier_version {}: {}",
407                    proto.applier_version, err
408                ))
409            })?
410        };
411        let mut state_diff = StateDiff::new(
412            applier_version,
413            proto.seqno_from.into_rust()?,
414            proto.seqno_to.into_rust()?,
415            proto.walltime_ms,
416            proto.latest_rollup_key.into_rust()?,
417        );
418        if let Some(field_diffs) = proto.field_diffs {
419            debug_assert_eq!(field_diffs.validate(), Ok(()));
420            for field_diff in field_diffs.iter() {
421                let (field, diff) = field_diff?;
422                match field {
423                    ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
424                        diff,
425                        &mut state_diff.hostname,
426                        |()| Ok(()),
427                        |v| v.into_rust(),
428                    )?,
429                    ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
430                        diff,
431                        &mut state_diff.last_gc_req,
432                        |()| Ok(()),
433                        |v| v.into_rust(),
434                    )?,
435                    ProtoStateField::ActiveGc => {
436                        field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
437                            diff,
438                            &mut state_diff.active_gc,
439                            |()| Ok(()),
440                            |v| v.into_rust(),
441                        )?
442                    }
443                    ProtoStateField::ActiveRollup => {
444                        field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
445                            diff,
446                            &mut state_diff.active_rollup,
447                            |()| Ok(()),
448                            |v| v.into_rust(),
449                        )?
450                    }
451                    ProtoStateField::Rollups => {
452                        field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
453                            diff,
454                            &mut state_diff.rollups,
455                            |k| k.into_rust(),
456                            |v| v.into_rust(),
457                        )?
458                    }
459                    // MIGRATION: We previously stored rollups as a `SeqNo ->
460                    // string Key` map, but now the value is a `struct
461                    // HollowRollup`.
462                    ProtoStateField::DeprecatedRollups => {
463                        field_diff_into_rust::<u64, String, _, _, _, _>(
464                            diff,
465                            &mut state_diff.rollups,
466                            |k| k.into_rust(),
467                            |v| {
468                                Ok(HollowRollup {
469                                    key: v.into_rust()?,
470                                    encoded_size_bytes: None,
471                                })
472                            },
473                        )?
474                    }
475                    ProtoStateField::LeasedReaders => {
476                        field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
477                            diff,
478                            &mut state_diff.leased_readers,
479                            |k| k.into_rust(),
480                            |v| v.into_rust(),
481                        )?
482                    }
483                    ProtoStateField::CriticalReaders => {
484                        field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
485                            diff,
486                            &mut state_diff.critical_readers,
487                            |k| k.into_rust(),
488                            |v| v.into_rust(),
489                        )?
490                    }
491                    ProtoStateField::Writers => {
492                        field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
493                            diff,
494                            &mut state_diff.writers,
495                            |k| k.into_rust(),
496                            |v| v.into_rust(),
497                        )?
498                    }
499                    ProtoStateField::Schemas => {
500                        field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
501                            diff,
502                            &mut state_diff.schemas,
503                            |k| k.into_rust(),
504                            |v| v.into_rust(),
505                        )?
506                    }
507                    ProtoStateField::Since => {
508                        field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
509                            diff,
510                            &mut state_diff.since,
511                            |()| Ok(()),
512                            |v| v.into_rust(),
513                        )?
514                    }
515                    ProtoStateField::LegacyBatches => {
516                        field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
517                            diff,
518                            &mut state_diff.legacy_batches,
519                            |k| k.into_rust(),
520                            |()| Ok(()),
521                        )?
522                    }
523                    ProtoStateField::HollowBatches => {
524                        field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
525                            diff,
526                            &mut state_diff.hollow_batches,
527                            |k| k.into_rust(),
528                            |v| v.into_rust(),
529                        )?
530                    }
531                    ProtoStateField::SpineBatches => {
532                        field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
533                            diff,
534                            &mut state_diff.spine_batches,
535                            |k| k.into_rust(),
536                            |v| v.into_rust(),
537                        )?
538                    }
539                    ProtoStateField::SpineMerges => {
540                        field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
541                            diff,
542                            &mut state_diff.merges,
543                            |k| k.into_rust(),
544                            |v| v.into_rust(),
545                        )?
546                    }
547                }
548            }
549        }
550        Ok(state_diff)
551    }
552}
553
554fn field_diffs_into_proto<K, KP, V, VP>(
555    field: ProtoStateField,
556    diffs: &[StateFieldDiff<K, V>],
557    writer: &mut ProtoStateFieldDiffsWriter,
558) where
559    KP: prost::Message,
560    K: RustType<KP>,
561    VP: prost::Message,
562    V: RustType<VP>,
563{
564    for diff in diffs.iter() {
565        field_diff_into_proto(field, diff, writer);
566    }
567}
568
569fn field_diff_into_proto<K, KP, V, VP>(
570    field: ProtoStateField,
571    diff: &StateFieldDiff<K, V>,
572    writer: &mut ProtoStateFieldDiffsWriter,
573) where
574    KP: prost::Message,
575    K: RustType<KP>,
576    VP: prost::Message,
577    V: RustType<VP>,
578{
579    writer.push_field(field);
580    writer.encode_proto(&diff.key.into_proto());
581    match &diff.val {
582        StateFieldValDiff::Insert(to) => {
583            writer.push_diff_type(ProtoStateFieldDiffType::Insert);
584            writer.encode_proto(&to.into_proto());
585        }
586        StateFieldValDiff::Update(from, to) => {
587            writer.push_diff_type(ProtoStateFieldDiffType::Update);
588            writer.encode_proto(&from.into_proto());
589            writer.encode_proto(&to.into_proto());
590        }
591        StateFieldValDiff::Delete(from) => {
592            writer.push_diff_type(ProtoStateFieldDiffType::Delete);
593            writer.encode_proto(&from.into_proto());
594        }
595    };
596}
597
598fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
599    proto: ProtoStateFieldDiff<'_>,
600    diffs: &mut Vec<StateFieldDiff<K, V>>,
601    k_fn: KFn,
602    v_fn: VFn,
603) -> Result<(), TryFromProtoError>
604where
605    KP: prost::Message + Default,
606    VP: prost::Message + Default,
607    KFn: Fn(KP) -> Result<K, TryFromProtoError>,
608    VFn: Fn(VP) -> Result<V, TryFromProtoError>,
609{
610    let val = match proto.diff_type {
611        ProtoStateFieldDiffType::Insert => {
612            let to = VP::decode(proto.to)
613                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
614            StateFieldValDiff::Insert(v_fn(to)?)
615        }
616        ProtoStateFieldDiffType::Update => {
617            let from = VP::decode(proto.from)
618                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
619            let to = VP::decode(proto.to)
620                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
621
622            StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
623        }
624        ProtoStateFieldDiffType::Delete => {
625            let from = VP::decode(proto.from)
626                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
627            StateFieldValDiff::Delete(v_fn(from)?)
628        }
629    };
630    let key = KP::decode(proto.key)
631        .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
632    diffs.push(StateFieldDiff {
633        key: k_fn(key)?,
634        val,
635    });
636    Ok(())
637}
638
639/// The decoded state of [ProtoRollup] for which we have not yet checked
640/// that codecs match the ones in durable state.
641#[derive(Debug)]
642#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
643pub struct UntypedState<T> {
644    pub(crate) key_codec: String,
645    pub(crate) val_codec: String,
646    pub(crate) ts_codec: String,
647    pub(crate) diff_codec: String,
648
649    // Important! This T has not been validated, so we can't expose anything in
650    // State that references T until one of the check methods have been called.
651    // Any field on State that doesn't reference T is fair game.
652    state: State<T>,
653}
654
655impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
656    pub fn seqno(&self) -> SeqNo {
657        self.state.seqno
658    }
659
660    pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
661        &self.state.collections.rollups
662    }
663
664    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
665        self.state.latest_rollup()
666    }
667
668    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
669        &mut self,
670        cfg: &PersistConfig,
671        metrics: &Metrics,
672        diffs: I,
673    ) {
674        // The apply_encoded_diffs might panic if T is not correct. Making this
675        // a silent no-op is far too subtle for my taste, but it's not clear
676        // what else we could do instead.
677        if T::codec_name() != self.ts_codec {
678            return;
679        }
680        self.state.apply_encoded_diffs(cfg, metrics, diffs);
681    }
682
683    pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
684        self,
685        shard_id: &ShardId,
686    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
687        // Also defensively check that the shard_id on the state we fetched
688        // matches the shard_id we were trying to fetch.
689        assert_eq!(shard_id, &self.state.shard_id);
690        if K::codec_name() != self.key_codec
691            || V::codec_name() != self.val_codec
692            || T::codec_name() != self.ts_codec
693            || D::codec_name() != self.diff_codec
694        {
695            return Err(Box::new(CodecMismatch {
696                requested: (
697                    K::codec_name(),
698                    V::codec_name(),
699                    T::codec_name(),
700                    D::codec_name(),
701                    None,
702                ),
703                actual: (
704                    self.key_codec,
705                    self.val_codec,
706                    self.ts_codec,
707                    self.diff_codec,
708                    None,
709                ),
710            }));
711        }
712        Ok(TypedState {
713            state: self.state,
714            _phantom: PhantomData,
715        })
716    }
717
718    pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
719        // Also defensively check that the shard_id on the state we fetched
720        // matches the shard_id we were trying to fetch.
721        assert_eq!(shard_id, &self.state.shard_id);
722        if T::codec_name() != self.ts_codec {
723            return Err(CodecMismatchT {
724                requested: T::codec_name(),
725                actual: self.ts_codec,
726            });
727        }
728        Ok(self.state)
729    }
730
731    pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
732        let proto = ProtoRollup::decode(buf)
733            // We received a State that we couldn't decode. This could happen if
734            // persist messes up backward/forward compatibility, if the durable
735            // data was corrupted, or if operations messes up deployment. In any
736            // case, fail loudly.
737            .expect("internal error: invalid encoded state");
738        let state = Rollup::from_proto(proto)
739            .expect("internal error: invalid encoded state")
740            .state;
741        check_data_version(build_version, &state.state.applier_version);
742        state
743    }
744}
745
746impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
747where
748    K: Codec,
749    V: Codec,
750    T: Codec64,
751    D: Codec64,
752{
753    fn from(typed_state: TypedState<K, V, T, D>) -> Self {
754        UntypedState {
755            key_codec: K::codec_name(),
756            val_codec: V::codec_name(),
757            ts_codec: T::codec_name(),
758            diff_codec: D::codec_name(),
759            state: typed_state.state,
760        }
761    }
762}
763
764/// A struct that maps 1:1 with ProtoRollup.
765///
766/// Contains State, and optionally the diffs between (state.latest_rollup.seqno, state.seqno]
767///
768/// `diffs` is always expected to be `Some` when writing new rollups, but may be `None`
769/// when deserializing rollups that were persisted before we started inlining diffs.
770#[derive(Debug)]
771pub struct Rollup<T> {
772    pub(crate) state: UntypedState<T>,
773    pub(crate) diffs: Option<InlinedDiffs>,
774}
775
776impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
777    /// Creates a `StateRollup` from a state and diffs from its last rollup.
778    ///
779    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
780    pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
781        let latest_rollup_seqno = *state.latest_rollup().0;
782        let mut verify_seqno = latest_rollup_seqno;
783        for diff in &diffs {
784            assert_eq!(verify_seqno.next(), diff.seqno);
785            verify_seqno = diff.seqno;
786        }
787        assert_eq!(verify_seqno, state.seqno());
788
789        let diffs = Some(InlinedDiffs::from(
790            latest_rollup_seqno.next(),
791            state.seqno().next(),
792            diffs,
793        ));
794
795        Self { state, diffs }
796    }
797
798    pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
799        Self { state, diffs: None }
800    }
801
802    pub(crate) fn from_state_without_diffs(
803        state: State<T>,
804        key_codec: String,
805        val_codec: String,
806        ts_codec: String,
807        diff_codec: String,
808    ) -> Self {
809        Self::from_untyped_state_without_diffs(UntypedState {
810            key_codec,
811            val_codec,
812            ts_codec,
813            diff_codec,
814            state,
815        })
816    }
817}
818
819#[derive(Debug)]
820pub(crate) struct InlinedDiffs {
821    pub(crate) lower: SeqNo,
822    pub(crate) upper: SeqNo,
823    pub(crate) diffs: Vec<VersionedData>,
824}
825
826impl InlinedDiffs {
827    pub(crate) fn description(&self) -> Description<SeqNo> {
828        Description::new(
829            Antichain::from_elem(self.lower),
830            Antichain::from_elem(self.upper),
831            Antichain::from_elem(SeqNo::minimum()),
832        )
833    }
834
835    fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
836        for diff in &diffs {
837            assert!(diff.seqno >= lower);
838            assert!(diff.seqno < upper);
839        }
840        Self {
841            lower,
842            upper,
843            diffs,
844        }
845    }
846}
847
848impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
849    fn into_proto(&self) -> ProtoInlinedDiffs {
850        ProtoInlinedDiffs {
851            lower: self.lower.into_proto(),
852            upper: self.upper.into_proto(),
853            diffs: self.diffs.into_proto(),
854        }
855    }
856
857    fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
858        Ok(Self {
859            lower: proto.lower.into_rust()?,
860            upper: proto.upper.into_rust()?,
861            diffs: proto.diffs.into_rust()?,
862        })
863    }
864}
865
866impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
867    fn into_proto(&self) -> ProtoRollup {
868        ProtoRollup {
869            applier_version: self.state.state.applier_version.to_string(),
870            shard_id: self.state.state.shard_id.into_proto(),
871            seqno: self.state.state.seqno.into_proto(),
872            walltime_ms: self.state.state.walltime_ms.into_proto(),
873            hostname: self.state.state.hostname.into_proto(),
874            key_codec: self.state.key_codec.into_proto(),
875            val_codec: self.state.val_codec.into_proto(),
876            ts_codec: T::codec_name(),
877            diff_codec: self.state.diff_codec.into_proto(),
878            last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
879            active_rollup: self.state.state.collections.active_rollup.into_proto(),
880            active_gc: self.state.state.collections.active_gc.into_proto(),
881            rollups: self
882                .state
883                .state
884                .collections
885                .rollups
886                .iter()
887                .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
888                .collect(),
889            deprecated_rollups: Default::default(),
890            leased_readers: self
891                .state
892                .state
893                .collections
894                .leased_readers
895                .iter()
896                .map(|(id, state)| (id.into_proto(), state.into_proto()))
897                .collect(),
898            critical_readers: self
899                .state
900                .state
901                .collections
902                .critical_readers
903                .iter()
904                .map(|(id, state)| (id.into_proto(), state.into_proto()))
905                .collect(),
906            writers: self
907                .state
908                .state
909                .collections
910                .writers
911                .iter()
912                .map(|(id, state)| (id.into_proto(), state.into_proto()))
913                .collect(),
914            schemas: self
915                .state
916                .state
917                .collections
918                .schemas
919                .iter()
920                .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
921                .collect(),
922            trace: Some(self.state.state.collections.trace.into_proto()),
923            diffs: self.diffs.as_ref().map(|x| x.into_proto()),
924        }
925    }
926
927    fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
928        let applier_version = if x.applier_version.is_empty() {
929            // Backward compatibility with versions of ProtoState before we set
930            // this field: if it's missing (empty), assume an infinitely old
931            // version.
932            semver::Version::new(0, 0, 0)
933        } else {
934            semver::Version::parse(&x.applier_version).map_err(|err| {
935                TryFromProtoError::InvalidSemverVersion(format!(
936                    "invalid applier_version {}: {}",
937                    x.applier_version, err
938                ))
939            })?
940        };
941
942        let mut rollups = BTreeMap::new();
943        for (seqno, rollup) in x.rollups {
944            rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
945        }
946        for (seqno, key) in x.deprecated_rollups {
947            rollups.insert(
948                seqno.into_rust()?,
949                HollowRollup {
950                    key: key.into_rust()?,
951                    encoded_size_bytes: None,
952                },
953            );
954        }
955        let mut leased_readers = BTreeMap::new();
956        for (id, state) in x.leased_readers {
957            leased_readers.insert(id.into_rust()?, state.into_rust()?);
958        }
959        let mut critical_readers = BTreeMap::new();
960        for (id, state) in x.critical_readers {
961            critical_readers.insert(id.into_rust()?, state.into_rust()?);
962        }
963        let mut writers = BTreeMap::new();
964        for (id, state) in x.writers {
965            writers.insert(id.into_rust()?, state.into_rust()?);
966        }
967        let mut schemas = BTreeMap::new();
968        for (id, x) in x.schemas {
969            schemas.insert(id.into_rust()?, x.into_rust()?);
970        }
971        let active_rollup = x
972            .active_rollup
973            .map(|rollup| rollup.into_rust())
974            .transpose()?;
975        let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
976        let collections = StateCollections {
977            rollups,
978            active_rollup,
979            active_gc,
980            last_gc_req: x.last_gc_req.into_rust()?,
981            leased_readers,
982            critical_readers,
983            writers,
984            schemas,
985            trace: x.trace.into_rust_if_some("trace")?,
986        };
987        let state = State {
988            applier_version,
989            shard_id: x.shard_id.into_rust()?,
990            seqno: x.seqno.into_rust()?,
991            walltime_ms: x.walltime_ms,
992            hostname: x.hostname,
993            collections,
994        };
995
996        let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
997        if let Some(diffs) = &diffs {
998            if diffs.lower != state.latest_rollup().0.next() {
999                return Err(TryFromProtoError::InvalidPersistState(format!(
1000                    "diffs lower ({}) should match latest rollup's successor: ({})",
1001                    diffs.lower,
1002                    state.latest_rollup().0.next()
1003                )));
1004            }
1005            if diffs.upper != state.seqno.next() {
1006                return Err(TryFromProtoError::InvalidPersistState(format!(
1007                    "diffs upper ({}) should match state's successor: ({})",
1008                    diffs.lower,
1009                    state.seqno.next()
1010                )));
1011            }
1012        }
1013
1014        Ok(Rollup {
1015            state: UntypedState {
1016                state,
1017                key_codec: x.key_codec.into_rust()?,
1018                val_codec: x.val_codec.into_rust()?,
1019                ts_codec: x.ts_codec.into_rust()?,
1020                diff_codec: x.diff_codec.into_rust()?,
1021            },
1022            diffs,
1023        })
1024    }
1025}
1026
1027impl RustType<ProtoVersionedData> for VersionedData {
1028    fn into_proto(&self) -> ProtoVersionedData {
1029        ProtoVersionedData {
1030            seqno: self.seqno.into_proto(),
1031            data: Bytes::clone(&self.data),
1032        }
1033    }
1034
1035    fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1036        Ok(Self {
1037            seqno: proto.seqno.into_rust()?,
1038            data: proto.data,
1039        })
1040    }
1041}
1042
1043impl RustType<ProtoSpineId> for SpineId {
1044    fn into_proto(&self) -> ProtoSpineId {
1045        ProtoSpineId {
1046            lo: self.0.into_proto(),
1047            hi: self.1.into_proto(),
1048        }
1049    }
1050
1051    fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1052        Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1053    }
1054}
1055
1056impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1057    fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1058        let (id, batch) = entry;
1059        ProtoIdHollowBatch {
1060            id: Some(id.into_proto()),
1061            batch: Some(batch.into_proto()),
1062        }
1063    }
1064
1065    fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1066        let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1067        let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1068        Ok((id, batch))
1069    }
1070}
1071
1072impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1073    fn into_proto(&self) -> ProtoSpineBatch {
1074        ProtoSpineBatch {
1075            desc: Some(self.desc.into_proto()),
1076            parts: self.parts.into_proto(),
1077            level: self.level.into_proto(),
1078            descs: self.descs.into_proto(),
1079        }
1080    }
1081
1082    fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1083        let level = proto.level.into_rust()?;
1084        let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1085        let parts = proto.parts.into_rust()?;
1086        let descs = proto.descs.into_rust()?;
1087        Ok(ThinSpineBatch {
1088            level,
1089            desc,
1090            parts,
1091            descs,
1092        })
1093    }
1094}
1095
1096impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1097    fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1098        let (id, batch) = entry;
1099        ProtoIdSpineBatch {
1100            id: Some(id.into_proto()),
1101            batch: Some(batch.into_proto()),
1102        }
1103    }
1104
1105    fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1106        let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1107        let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1108        Ok((id, batch))
1109    }
1110}
1111
1112impl RustType<ProtoCompaction> for ActiveCompaction {
1113    fn into_proto(&self) -> ProtoCompaction {
1114        ProtoCompaction {
1115            start_ms: self.start_ms,
1116        }
1117    }
1118
1119    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1120        Ok(Self {
1121            start_ms: proto.start_ms,
1122        })
1123    }
1124}
1125
1126impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1127    fn into_proto(&self) -> ProtoMerge {
1128        ProtoMerge {
1129            since: Some(self.since.into_proto()),
1130            remaining_work: self.remaining_work.into_proto(),
1131            active_compaction: self.active_compaction.into_proto(),
1132        }
1133    }
1134
1135    fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1136        let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1137        let remaining_work = proto.remaining_work.into_rust()?;
1138        let active_compaction = proto.active_compaction.into_rust()?;
1139        Ok(Self {
1140            since,
1141            remaining_work,
1142            active_compaction,
1143        })
1144    }
1145}
1146
1147impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1148    fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1149        ProtoIdMerge {
1150            id: Some(id.into_proto()),
1151            merge: Some(merge.into_proto()),
1152        }
1153    }
1154
1155    fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1156        let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1157        let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1158        Ok((id, merge))
1159    }
1160}
1161
1162impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1163    fn into_proto(&self) -> ProtoTrace {
1164        let since = self.since.into_proto();
1165        let legacy_batches = self
1166            .legacy_batches
1167            .iter()
1168            .map(|(b, _)| b.into_proto())
1169            .collect();
1170        let hollow_batches = self.hollow_batches.into_proto();
1171        let spine_batches = self.spine_batches.into_proto();
1172        let merges = self.merges.into_proto();
1173        ProtoTrace {
1174            since: Some(since),
1175            legacy_batches,
1176            hollow_batches,
1177            spine_batches,
1178            merges,
1179        }
1180    }
1181
1182    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1183        let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1184        let legacy_batches = proto
1185            .legacy_batches
1186            .into_iter()
1187            .map(|b| b.into_rust().map(|b| (b, ())))
1188            .collect::<Result<_, _>>()?;
1189        let hollow_batches = proto.hollow_batches.into_rust()?;
1190        let spine_batches = proto.spine_batches.into_rust()?;
1191        let merges = proto.merges.into_rust()?;
1192        Ok(FlatTrace {
1193            since,
1194            legacy_batches,
1195            hollow_batches,
1196            spine_batches,
1197            merges,
1198        })
1199    }
1200}
1201
1202impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1203    fn into_proto(&self) -> ProtoTrace {
1204        self.flatten().into_proto()
1205    }
1206
1207    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1208        Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1209    }
1210}
1211
1212impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1213    fn into_proto(&self) -> ProtoLeasedReaderState {
1214        ProtoLeasedReaderState {
1215            seqno: self.seqno.into_proto(),
1216            since: Some(self.since.into_proto()),
1217            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1218            lease_duration_ms: self.lease_duration_ms.into_proto(),
1219            debug: Some(self.debug.into_proto()),
1220        }
1221    }
1222
1223    fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1224        let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1225        // MIGRATION: If the lease_duration_ms is empty, then the proto field
1226        // was missing and we need to fill in a default. This would ideally be
1227        // based on the actual value in PersistConfig, but it's only here for a
1228        // short time and this is way easier.
1229        if lease_duration_ms == 0 {
1230            lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1231                .expect("lease duration as millis should fit within u64");
1232        }
1233        // MIGRATION: If debug is empty, then the proto field was missing and we
1234        // need to fill in a default.
1235        let debug = proto.debug.unwrap_or_default().into_rust()?;
1236        Ok(LeasedReaderState {
1237            seqno: proto.seqno.into_rust()?,
1238            since: proto
1239                .since
1240                .into_rust_if_some("ProtoLeasedReaderState::since")?,
1241            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1242            lease_duration_ms,
1243            debug,
1244        })
1245    }
1246}
1247
1248impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1249    fn into_proto(&self) -> ProtoCriticalReaderState {
1250        ProtoCriticalReaderState {
1251            since: Some(self.since.into_proto()),
1252            opaque: i64::from_le_bytes(self.opaque.0),
1253            opaque_codec: self.opaque_codec.clone(),
1254            debug: Some(self.debug.into_proto()),
1255        }
1256    }
1257
1258    fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1259        // MIGRATION: If debug is empty, then the proto field was missing and we
1260        // need to fill in a default.
1261        let debug = proto.debug.unwrap_or_default().into_rust()?;
1262        Ok(CriticalReaderState {
1263            since: proto
1264                .since
1265                .into_rust_if_some("ProtoCriticalReaderState::since")?,
1266            opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1267            opaque_codec: proto.opaque_codec,
1268            debug,
1269        })
1270    }
1271}
1272
1273impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1274    fn into_proto(&self) -> ProtoWriterState {
1275        ProtoWriterState {
1276            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1277            lease_duration_ms: self.lease_duration_ms.into_proto(),
1278            most_recent_write_token: self.most_recent_write_token.into_proto(),
1279            most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1280            debug: Some(self.debug.into_proto()),
1281        }
1282    }
1283
1284    fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1285        // MIGRATION: We didn't originally have most_recent_write_token and
1286        // most_recent_write_upper. Pick values that aren't going to
1287        // accidentally match ones in incoming writes and confuse things. We
1288        // could instead use Option on WriterState but this keeps the backward
1289        // compatibility logic confined to one place.
1290        let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1291            IdempotencyToken::SENTINEL
1292        } else {
1293            proto.most_recent_write_token.into_rust()?
1294        };
1295        let most_recent_write_upper = match proto.most_recent_write_upper {
1296            Some(x) => x.into_rust()?,
1297            None => Antichain::from_elem(T::minimum()),
1298        };
1299        // MIGRATION: If debug is empty, then the proto field was missing and we
1300        // need to fill in a default.
1301        let debug = proto.debug.unwrap_or_default().into_rust()?;
1302        Ok(WriterState {
1303            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1304            lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1305            most_recent_write_token,
1306            most_recent_write_upper,
1307            debug,
1308        })
1309    }
1310}
1311
1312impl RustType<ProtoHandleDebugState> for HandleDebugState {
1313    fn into_proto(&self) -> ProtoHandleDebugState {
1314        ProtoHandleDebugState {
1315            hostname: self.hostname.into_proto(),
1316            purpose: self.purpose.into_proto(),
1317        }
1318    }
1319
1320    fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1321        Ok(HandleDebugState {
1322            hostname: proto.hostname,
1323            purpose: proto.purpose,
1324        })
1325    }
1326}
1327
1328impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1329    fn into_proto(&self) -> ProtoHollowRun {
1330        ProtoHollowRun {
1331            parts: self.parts.into_proto(),
1332        }
1333    }
1334
1335    fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1336        Ok(HollowRun {
1337            parts: proto.parts.into_rust()?,
1338        })
1339    }
1340}
1341
1342impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1343    fn into_proto(&self) -> ProtoHollowBatch {
1344        let mut run_meta = self.run_meta.into_proto();
1345        // For backwards compatibility reasons, don't keep default metadata in the proto.
1346        let run_meta_default = RunMeta::default().into_proto();
1347        while run_meta.last() == Some(&run_meta_default) {
1348            run_meta.pop();
1349        }
1350        ProtoHollowBatch {
1351            desc: Some(self.desc.into_proto()),
1352            parts: self.parts.into_proto(),
1353            len: self.len.into_proto(),
1354            runs: self.run_splits.into_proto(),
1355            run_meta,
1356            deprecated_keys: vec![],
1357        }
1358    }
1359
1360    fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1361        let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1362        // MIGRATION: We used to just have the keys instead of a more structured
1363        // part.
1364        parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1365            RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1366                key: PartialBatchKey(key),
1367                encoded_size_bytes: 0,
1368                key_lower: vec![],
1369                structured_key_lower: None,
1370                stats: None,
1371                ts_rewrite: None,
1372                diffs_sum: None,
1373                format: None,
1374                schema_id: None,
1375                deprecated_schema_id: None,
1376            }))
1377        }));
1378        // We discard default metadatas from the proto above; re-add them here.
1379        let run_splits: Vec<usize> = proto.runs.into_rust()?;
1380        let num_runs = if parts.is_empty() {
1381            0
1382        } else {
1383            run_splits.len() + 1
1384        };
1385        let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1386        run_meta.resize(num_runs, RunMeta::default());
1387        Ok(HollowBatch {
1388            desc: proto.desc.into_rust_if_some("desc")?,
1389            parts,
1390            len: proto.len.into_rust()?,
1391            run_splits,
1392            run_meta,
1393        })
1394    }
1395}
1396
1397impl RustType<String> for RunId {
1398    fn into_proto(&self) -> String {
1399        self.to_string()
1400    }
1401
1402    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1403        RunId::from_str(&proto).map_err(|_| {
1404            TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1405        })
1406    }
1407}
1408
1409impl RustType<ProtoRunMeta> for RunMeta {
1410    fn into_proto(&self) -> ProtoRunMeta {
1411        let order = match self.order {
1412            None => ProtoRunOrder::Unknown,
1413            Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1414            Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1415            Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1416        };
1417        ProtoRunMeta {
1418            order: order.into(),
1419            schema_id: self.schema.into_proto(),
1420            deprecated_schema_id: self.deprecated_schema.into_proto(),
1421            id: self.id.into_proto(),
1422            len: self.len.into_proto(),
1423        }
1424    }
1425
1426    fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1427        let order = match ProtoRunOrder::try_from(proto.order)? {
1428            ProtoRunOrder::Unknown => None,
1429            ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1430            ProtoRunOrder::Codec => Some(RunOrder::Codec),
1431            ProtoRunOrder::Structured => Some(RunOrder::Structured),
1432        };
1433        Ok(Self {
1434            order,
1435            schema: proto.schema_id.into_rust()?,
1436            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1437            id: proto.id.into_rust()?,
1438            len: proto.len.into_rust()?,
1439        })
1440    }
1441}
1442
1443impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1444    fn into_proto(&self) -> ProtoHollowBatchPart {
1445        match self {
1446            RunPart::Single(part) => part.into_proto(),
1447            RunPart::Many(runs) => runs.into_proto(),
1448        }
1449    }
1450
1451    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1452        let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1453            RunPart::Many(proto.into_rust()?)
1454        } else {
1455            RunPart::Single(proto.into_rust()?)
1456        };
1457        Ok(run_part)
1458    }
1459}
1460
1461impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1462    fn into_proto(&self) -> ProtoHollowBatchPart {
1463        let part = ProtoHollowBatchPart {
1464            kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1465                key: self.key.into_proto(),
1466                max_part_bytes: self.max_part_bytes.into_proto(),
1467            })),
1468            encoded_size_bytes: self.hollow_bytes.into_proto(),
1469            key_lower: Bytes::copy_from_slice(&self.key_lower),
1470            diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1471            key_stats: None,
1472            ts_rewrite: None,
1473            format: None,
1474            schema_id: None,
1475            structured_key_lower: self.structured_key_lower.into_proto(),
1476            deprecated_schema_id: None,
1477        };
1478        part
1479    }
1480
1481    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1482        let run_proto = match proto.kind {
1483            Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1484            _ => Err(TryFromProtoError::UnknownEnumVariant(
1485                "ProtoHollowBatchPart::kind".to_string(),
1486            ))?,
1487        };
1488        Ok(Self {
1489            key: run_proto.key.into_rust()?,
1490            hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1491            max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1492            key_lower: proto.key_lower.to_vec(),
1493            structured_key_lower: proto.structured_key_lower.into_rust()?,
1494            diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1495            _phantom_data: Default::default(),
1496        })
1497    }
1498}
1499
1500impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1501    fn into_proto(&self) -> ProtoHollowBatchPart {
1502        match self {
1503            BatchPart::Hollow(x) => ProtoHollowBatchPart {
1504                kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1505                encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1506                key_lower: Bytes::copy_from_slice(&x.key_lower),
1507                structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1508                key_stats: x.stats.into_proto(),
1509                ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1510                diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1511                format: x.format.map(|f| f.into_proto()),
1512                schema_id: x.schema_id.into_proto(),
1513                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1514            },
1515            BatchPart::Inline {
1516                updates,
1517                ts_rewrite,
1518                schema_id,
1519                deprecated_schema_id,
1520            } => ProtoHollowBatchPart {
1521                kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1522                encoded_size_bytes: 0,
1523                key_lower: Bytes::new(),
1524                structured_key_lower: None,
1525                key_stats: None,
1526                ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1527                diffs_sum: None,
1528                format: None,
1529                schema_id: schema_id.into_proto(),
1530                deprecated_schema_id: deprecated_schema_id.into_proto(),
1531            },
1532        }
1533    }
1534
1535    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1536        let ts_rewrite = match proto.ts_rewrite {
1537            Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1538            None => None,
1539        };
1540        let schema_id = proto.schema_id.into_rust()?;
1541        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1542        match proto.kind {
1543            Some(proto_hollow_batch_part::Kind::Key(key)) => {
1544                Ok(BatchPart::Hollow(HollowBatchPart {
1545                    key: key.into_rust()?,
1546                    encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1547                    key_lower: proto.key_lower.into(),
1548                    structured_key_lower: proto.structured_key_lower.into_rust()?,
1549                    stats: proto.key_stats.into_rust()?,
1550                    ts_rewrite,
1551                    diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1552                    format: proto.format.map(|f| f.into_rust()).transpose()?,
1553                    schema_id,
1554                    deprecated_schema_id,
1555                }))
1556            }
1557            Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1558                assert_eq!(proto.encoded_size_bytes, 0);
1559                assert_eq!(proto.key_lower.len(), 0);
1560                assert_none!(proto.key_stats);
1561                assert_none!(proto.diffs_sum);
1562                let updates = LazyInlineBatchPart(x.into_rust()?);
1563                Ok(BatchPart::Inline {
1564                    updates,
1565                    ts_rewrite,
1566                    schema_id,
1567                    deprecated_schema_id,
1568                })
1569            }
1570            _ => Err(TryFromProtoError::unknown_enum_variant(
1571                "ProtoHollowBatchPart::kind",
1572            )),
1573        }
1574    }
1575}
1576
1577impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1578    fn into_proto(&self) -> proto_hollow_batch_part::Format {
1579        match self {
1580            BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1581            BatchColumnarFormat::Both(version) => {
1582                proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1583            }
1584            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1585        }
1586    }
1587
1588    fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1589        let format = match proto {
1590            proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1591            proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1592                BatchColumnarFormat::Both(version.cast_into())
1593            }
1594            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1595        };
1596        Ok(format)
1597    }
1598}
1599
1600/// Aggregate statistics about data contained in a part.
1601///
1602/// These are "lazy" in the sense that we don't decode them (or even validate
1603/// the encoded version) until they're used.
1604#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1605pub struct LazyPartStats {
1606    key: LazyProto<ProtoStructStats>,
1607}
1608
1609impl LazyPartStats {
1610    pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1611        let PartStats { key } = x;
1612        let mut proto_stats = ProtoStructStats::from_rust(key);
1613        map_proto(&mut proto_stats);
1614        LazyPartStats {
1615            key: LazyProto::from(&proto_stats),
1616        }
1617    }
1618    /// Decodes and returns PartStats from the encoded representation.
1619    ///
1620    /// This does not cache the returned value, it decodes each time it's
1621    /// called.
1622    pub fn decode(&self) -> PartStats {
1623        let key = self.key.decode().expect("valid proto");
1624        PartStats {
1625            key: key.into_rust().expect("valid stats"),
1626        }
1627    }
1628}
1629
1630impl RustType<Bytes> for LazyPartStats {
1631    fn into_proto(&self) -> Bytes {
1632        let LazyPartStats { key } = self;
1633        key.into_proto()
1634    }
1635
1636    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1637        Ok(LazyPartStats {
1638            key: proto.into_rust()?,
1639        })
1640    }
1641}
1642
1643#[cfg(test)]
1644pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1645    proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1646}
1647
1648#[allow(unused_parens)]
1649impl Arbitrary for LazyPartStats {
1650    type Parameters = ();
1651    type Strategy =
1652        proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1653
1654    fn arbitrary_with(_: ()) -> Self::Strategy {
1655        Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1656            LazyPartStats::encode(&x, |_| {})
1657        })
1658    }
1659}
1660
1661impl ProtoInlineBatchPart {
1662    pub(crate) fn into_rust<T: Timestamp + Codec64>(
1663        lgbytes: &ColumnarMetrics,
1664        proto: Self,
1665    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1666        let updates = proto
1667            .updates
1668            .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1669        let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1670
1671        Ok(BlobTraceBatchPart {
1672            desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1673            index: proto.index.into_rust()?,
1674            updates,
1675        })
1676    }
1677}
1678
1679/// A batch part stored inlined in State.
1680#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1681pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1682
1683impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1684    fn from(value: &ProtoInlineBatchPart) -> Self {
1685        LazyInlineBatchPart(value.into())
1686    }
1687}
1688
1689impl Serialize for LazyInlineBatchPart {
1690    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1691        // NB: This serialize impl is only used for QA and debugging, so emit a
1692        // truncated version.
1693        let proto = self.0.decode().expect("valid proto");
1694        let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1695        let () = s.serialize_field("desc", &proto.desc)?;
1696        let () = s.serialize_field("index", &proto.index)?;
1697        let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1698        s.end()
1699    }
1700}
1701
1702impl LazyInlineBatchPart {
1703    pub(crate) fn encoded_size_bytes(&self) -> usize {
1704        self.0.buf.len()
1705    }
1706
1707    /// Decodes and returns a BlobTraceBatchPart from the encoded
1708    /// representation.
1709    ///
1710    /// This does not cache the returned value, it decodes each time it's
1711    /// called.
1712    pub fn decode<T: Timestamp + Codec64>(
1713        &self,
1714        lgbytes: &ColumnarMetrics,
1715    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1716        let proto = self.0.decode().expect("valid proto");
1717        ProtoInlineBatchPart::into_rust(lgbytes, proto)
1718    }
1719}
1720
1721impl RustType<Bytes> for LazyInlineBatchPart {
1722    fn into_proto(&self) -> Bytes {
1723        self.0.into_proto()
1724    }
1725
1726    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1727        Ok(LazyInlineBatchPart(proto.into_rust()?))
1728    }
1729}
1730
1731impl RustType<ProtoHollowRollup> for HollowRollup {
1732    fn into_proto(&self) -> ProtoHollowRollup {
1733        ProtoHollowRollup {
1734            key: self.key.into_proto(),
1735            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1736        }
1737    }
1738
1739    fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1740        Ok(HollowRollup {
1741            key: proto.key.into_rust()?,
1742            encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1743        })
1744    }
1745}
1746
1747impl RustType<ProtoActiveRollup> for ActiveRollup {
1748    fn into_proto(&self) -> ProtoActiveRollup {
1749        ProtoActiveRollup {
1750            start_ms: self.start_ms,
1751            seqno: self.seqno.into_proto(),
1752        }
1753    }
1754
1755    fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1756        Ok(ActiveRollup {
1757            start_ms: proto.start_ms,
1758            seqno: proto.seqno.into_rust()?,
1759        })
1760    }
1761}
1762
1763impl RustType<ProtoActiveGc> for ActiveGc {
1764    fn into_proto(&self) -> ProtoActiveGc {
1765        ProtoActiveGc {
1766            start_ms: self.start_ms,
1767            seqno: self.seqno.into_proto(),
1768        }
1769    }
1770
1771    fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1772        Ok(ActiveGc {
1773            start_ms: proto.start_ms,
1774            seqno: proto.seqno.into_rust()?,
1775        })
1776    }
1777}
1778
1779impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1780    fn into_proto(&self) -> ProtoU64Description {
1781        ProtoU64Description {
1782            lower: Some(self.lower().into_proto()),
1783            upper: Some(self.upper().into_proto()),
1784            since: Some(self.since().into_proto()),
1785        }
1786    }
1787
1788    fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1789        Ok(Description::new(
1790            proto.lower.into_rust_if_some("lower")?,
1791            proto.upper.into_rust_if_some("upper")?,
1792            proto.since.into_rust_if_some("since")?,
1793        ))
1794    }
1795}
1796
1797impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1798    fn into_proto(&self) -> ProtoU64Antichain {
1799        ProtoU64Antichain {
1800            elements: self
1801                .elements()
1802                .iter()
1803                .map(|x| i64::from_le_bytes(T::encode(x)))
1804                .collect(),
1805        }
1806    }
1807
1808    fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1809        let elements = proto
1810            .elements
1811            .iter()
1812            .map(|x| T::decode(x.to_le_bytes()))
1813            .collect::<Vec<_>>();
1814        Ok(Antichain::from(elements))
1815    }
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820    use bytes::Bytes;
1821    use mz_build_info::DUMMY_BUILD_INFO;
1822    use mz_dyncfg::ConfigUpdates;
1823    use mz_ore::assert_err;
1824    use mz_persist::location::SeqNo;
1825    use proptest::prelude::*;
1826
1827    use crate::ShardId;
1828    use crate::internal::paths::PartialRollupKey;
1829    use crate::internal::state::tests::any_state;
1830    use crate::internal::state::{BatchPart, HandleDebugState};
1831    use crate::internal::state_diff::StateDiff;
1832    use crate::tests::new_test_client_cache;
1833
1834    use super::*;
1835
1836    #[mz_ore::test]
1837    fn applier_version_state() {
1838        let v1 = semver::Version::new(1, 0, 0);
1839        let v2 = semver::Version::new(2, 0, 0);
1840        let v3 = semver::Version::new(3, 0, 0);
1841
1842        // Code version v2 evaluates and writes out some State.
1843        let shard_id = ShardId::new();
1844        let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1845        let rollup =
1846            Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1847        let mut buf = Vec::new();
1848        rollup.encode(&mut buf).expect("serializable");
1849        let bytes = Bytes::from(buf);
1850
1851        // We can read it back using persist code v2 and v3.
1852        assert_eq!(
1853            UntypedState::<u64>::decode(&v2, bytes.clone())
1854                .check_codecs(&shard_id)
1855                .as_ref(),
1856            Ok(&state)
1857        );
1858        assert_eq!(
1859            UntypedState::<u64>::decode(&v3, bytes.clone())
1860                .check_codecs(&shard_id)
1861                .as_ref(),
1862            Ok(&state)
1863        );
1864
1865        // But we can't read it back using v1 because v1 might corrupt it by
1866        // losing or misinterpreting something written out by a future version
1867        // of code.
1868        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1869        let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1870        assert_err!(v1_res);
1871    }
1872
1873    #[mz_ore::test]
1874    fn applier_version_state_diff() {
1875        let v1 = semver::Version::new(1, 0, 0);
1876        let v2 = semver::Version::new(2, 0, 0);
1877        let v3 = semver::Version::new(3, 0, 0);
1878
1879        // Code version v2 evaluates and writes out some State.
1880        let diff = StateDiff::<u64>::new(
1881            v2.clone(),
1882            SeqNo(0),
1883            SeqNo(1),
1884            2,
1885            PartialRollupKey("rollup".into()),
1886        );
1887        let mut buf = Vec::new();
1888        diff.encode(&mut buf);
1889        let bytes = Bytes::from(buf);
1890
1891        // We can read it back using persist code v2 and v3.
1892        assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1893        assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1894
1895        // But we can't read it back using v1 because v1 might corrupt it by
1896        // losing or misinterpreting something written out by a future version
1897        // of code.
1898        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1899        let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1900        assert_err!(v1_res);
1901    }
1902
1903    #[mz_ore::test]
1904    fn hollow_batch_migration_keys() {
1905        let x = HollowBatch::new_run(
1906            Description::new(
1907                Antichain::from_elem(1u64),
1908                Antichain::from_elem(2u64),
1909                Antichain::from_elem(3u64),
1910            ),
1911            vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1912                key: PartialBatchKey("a".into()),
1913                encoded_size_bytes: 5,
1914                key_lower: vec![],
1915                structured_key_lower: None,
1916                stats: None,
1917                ts_rewrite: None,
1918                diffs_sum: None,
1919                format: None,
1920                schema_id: None,
1921                deprecated_schema_id: None,
1922            }))],
1923            4,
1924        );
1925        let mut old = x.into_proto();
1926        // Old ProtoHollowBatch had keys instead of parts.
1927        old.deprecated_keys = vec!["b".into()];
1928        // We don't expect to see a ProtoHollowBatch with keys _and_ parts, only
1929        // one or the other, but we have a defined output, so may as well test
1930        // it.
1931        let mut expected = x;
1932        // We fill in 0 for encoded_size_bytes when we migrate from keys. This
1933        // will violate bounded memory usage compaction during the transition
1934        // (short-term issue), but that's better than creating unnecessary runs
1935        // (longer-term issue).
1936        expected
1937            .parts
1938            .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1939                key: PartialBatchKey("b".into()),
1940                encoded_size_bytes: 0,
1941                key_lower: vec![],
1942                structured_key_lower: None,
1943                stats: None,
1944                ts_rewrite: None,
1945                diffs_sum: None,
1946                format: None,
1947                schema_id: None,
1948                deprecated_schema_id: None,
1949            })));
1950        assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1951    }
1952
1953    #[mz_ore::test]
1954    fn reader_state_migration_lease_duration() {
1955        let x = LeasedReaderState {
1956            seqno: SeqNo(1),
1957            since: Antichain::from_elem(2u64),
1958            last_heartbeat_timestamp_ms: 3,
1959            debug: HandleDebugState {
1960                hostname: "host".to_owned(),
1961                purpose: "purpose".to_owned(),
1962            },
1963            // Old ProtoReaderState had no lease_duration_ms field
1964            lease_duration_ms: 0,
1965        };
1966        let old = x.into_proto();
1967        let mut expected = x;
1968        // We fill in DEFAULT_READ_LEASE_DURATION for lease_duration_ms when we
1969        // migrate from unset.
1970        expected.lease_duration_ms =
1971            u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1972        assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1973    }
1974
1975    #[mz_ore::test]
1976    fn writer_state_migration_most_recent_write() {
1977        let proto = ProtoWriterState {
1978            last_heartbeat_timestamp_ms: 1,
1979            lease_duration_ms: 2,
1980            // Old ProtoWriterState had no most_recent_write_token or
1981            // most_recent_write_upper.
1982            most_recent_write_token: "".into(),
1983            most_recent_write_upper: None,
1984            debug: Some(ProtoHandleDebugState {
1985                hostname: "host".to_owned(),
1986                purpose: "purpose".to_owned(),
1987            }),
1988        };
1989        let expected = WriterState {
1990            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1991            lease_duration_ms: proto.lease_duration_ms,
1992            most_recent_write_token: IdempotencyToken::SENTINEL,
1993            most_recent_write_upper: Antichain::from_elem(0),
1994            debug: HandleDebugState {
1995                hostname: "host".to_owned(),
1996                purpose: "purpose".to_owned(),
1997            },
1998        };
1999        assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2000    }
2001
2002    #[mz_ore::test]
2003    fn state_migration_rollups() {
2004        let r1 = HollowRollup {
2005            key: PartialRollupKey("foo".to_owned()),
2006            encoded_size_bytes: None,
2007        };
2008        let r2 = HollowRollup {
2009            key: PartialRollupKey("bar".to_owned()),
2010            encoded_size_bytes: Some(2),
2011        };
2012        let shard_id = ShardId::new();
2013        let mut state = TypedState::<(), (), u64, i64>::new(
2014            DUMMY_BUILD_INFO.semver_version(),
2015            shard_id,
2016            "host".to_owned(),
2017            0,
2018        );
2019        state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2020        let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2021
2022        // Manually add the old rollup encoding.
2023        proto.deprecated_rollups.insert(1, r1.key.0.clone());
2024
2025        let state: Rollup<u64> = proto.into_rust().unwrap();
2026        let state = state.state;
2027        let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2028        let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2029        assert_eq!(
2030            state
2031                .state
2032                .collections
2033                .rollups
2034                .into_iter()
2035                .collect::<Vec<_>>(),
2036            expected
2037        );
2038    }
2039
2040    #[mz_persist_proc::test(tokio::test)]
2041    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2042    async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2043        let r1_rollup = HollowRollup {
2044            key: PartialRollupKey("foo".to_owned()),
2045            encoded_size_bytes: None,
2046        };
2047        let r1 = StateFieldDiff {
2048            key: SeqNo(1),
2049            val: StateFieldValDiff::Insert(r1_rollup.clone()),
2050        };
2051        let r2_rollup = HollowRollup {
2052            key: PartialRollupKey("bar".to_owned()),
2053            encoded_size_bytes: Some(2),
2054        };
2055        let r2 = StateFieldDiff {
2056            key: SeqNo(2),
2057            val: StateFieldValDiff::Insert(r2_rollup.clone()),
2058        };
2059        let r3_rollup = HollowRollup {
2060            key: PartialRollupKey("baz".to_owned()),
2061            encoded_size_bytes: None,
2062        };
2063        let r3 = StateFieldDiff {
2064            key: SeqNo(3),
2065            val: StateFieldValDiff::Delete(r3_rollup.clone()),
2066        };
2067        let mut diff = StateDiff::<u64>::new(
2068            DUMMY_BUILD_INFO.semver_version(),
2069            SeqNo(4),
2070            SeqNo(5),
2071            0,
2072            PartialRollupKey("ignored".to_owned()),
2073        );
2074        diff.rollups.push(r2.clone());
2075        diff.rollups.push(r3.clone());
2076        let mut diff_proto = diff.into_proto();
2077
2078        let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2079        let mut field_diffs_writer = field_diffs.into_writer();
2080
2081        // Manually add the old rollup encoding.
2082        field_diffs_into_proto(
2083            ProtoStateField::DeprecatedRollups,
2084            &[StateFieldDiff {
2085                key: r1.key,
2086                val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2087            }],
2088            &mut field_diffs_writer,
2089        );
2090
2091        assert_none!(diff_proto.field_diffs);
2092        diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2093
2094        let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2095        assert_eq!(
2096            diff.rollups.into_iter().collect::<Vec<_>>(),
2097            vec![r2, r3, r1]
2098        );
2099
2100        // Also make sure that a rollup delete in a diff applies cleanly to a
2101        // state that had it in the deprecated field.
2102        let shard_id = ShardId::new();
2103        let mut state = TypedState::<(), (), u64, i64>::new(
2104            DUMMY_BUILD_INFO.semver_version(),
2105            shard_id,
2106            "host".to_owned(),
2107            0,
2108        );
2109        state.state.seqno = SeqNo(4);
2110        let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2111        rollup
2112            .deprecated_rollups
2113            .insert(3, r3_rollup.key.into_proto());
2114        let state: Rollup<u64> = rollup.into_rust().unwrap();
2115        let state = state.state;
2116        let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2117        let cache = new_test_client_cache(&dyncfgs);
2118        let encoded_diff = VersionedData {
2119            seqno: SeqNo(5),
2120            data: diff_proto.encode_to_vec().into(),
2121        };
2122        state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2123        assert_eq!(
2124            state
2125                .state
2126                .collections
2127                .rollups
2128                .into_iter()
2129                .collect::<Vec<_>>(),
2130            vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2131        );
2132    }
2133
2134    #[mz_ore::test]
2135    #[cfg_attr(miri, ignore)] // too slow
2136    fn state_proto_roundtrip() {
2137        fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2138            let before = UntypedState {
2139                key_codec: <() as Codec>::codec_name(),
2140                val_codec: <() as Codec>::codec_name(),
2141                ts_codec: <T as Codec64>::codec_name(),
2142                diff_codec: <i64 as Codec64>::codec_name(),
2143                state,
2144            };
2145            let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2146            let after: Rollup<T> = proto.into_rust().unwrap();
2147            let after = after.state;
2148            assert_eq!(before, after);
2149        }
2150
2151        proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2152    }
2153
2154    #[mz_ore::test]
2155    fn check_data_versions_with_self_managed_versions() {
2156        #[track_caller]
2157        fn testcase(
2158            code: &str,
2159            data: &str,
2160            self_managed_versions: &[Version],
2161            expected: Result<(), ()>,
2162        ) {
2163            let code = Version::parse(code).unwrap();
2164            let data = Version::parse(data).unwrap();
2165            let actual = cfg::check_data_version_with_self_managed_versions(
2166                &code,
2167                &data,
2168                self_managed_versions,
2169            )
2170            .map_err(|_| ());
2171            assert_eq!(actual, expected);
2172        }
2173
2174        let none = [];
2175        let one = [Version::new(0, 130, 0)];
2176        let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2177        let three = [
2178            Version::new(0, 130, 0),
2179            Version::new(0, 140, 0),
2180            Version::new(0, 150, 0),
2181        ];
2182
2183        testcase("0.130.0", "0.128.0", &none, Ok(()));
2184        testcase("0.130.0", "0.129.0", &none, Ok(()));
2185        testcase("0.130.0", "0.130.0", &none, Ok(()));
2186        testcase("0.130.0", "0.130.1", &none, Ok(()));
2187        testcase("0.130.1", "0.130.0", &none, Ok(()));
2188        testcase("0.130.0", "0.131.0", &none, Ok(()));
2189        testcase("0.130.0", "0.132.0", &none, Err(()));
2190
2191        testcase("0.129.0", "0.127.0", &none, Ok(()));
2192        testcase("0.129.0", "0.128.0", &none, Ok(()));
2193        testcase("0.129.0", "0.129.0", &none, Ok(()));
2194        testcase("0.129.0", "0.129.1", &none, Ok(()));
2195        testcase("0.129.1", "0.129.0", &none, Ok(()));
2196        testcase("0.129.0", "0.130.0", &none, Ok(()));
2197        testcase("0.129.0", "0.131.0", &none, Err(()));
2198
2199        testcase("0.130.0", "0.128.0", &one, Ok(()));
2200        testcase("0.130.0", "0.129.0", &one, Ok(()));
2201        testcase("0.130.0", "0.130.0", &one, Ok(()));
2202        testcase("0.130.0", "0.130.1", &one, Ok(()));
2203        testcase("0.130.1", "0.130.0", &one, Ok(()));
2204        testcase("0.130.0", "0.131.0", &one, Ok(()));
2205        testcase("0.130.0", "0.132.0", &one, Ok(()));
2206
2207        testcase("0.129.0", "0.127.0", &one, Ok(()));
2208        testcase("0.129.0", "0.128.0", &one, Ok(()));
2209        testcase("0.129.0", "0.129.0", &one, Ok(()));
2210        testcase("0.129.0", "0.129.1", &one, Ok(()));
2211        testcase("0.129.1", "0.129.0", &one, Ok(()));
2212        testcase("0.129.0", "0.130.0", &one, Ok(()));
2213        testcase("0.129.0", "0.131.0", &one, Err(()));
2214
2215        testcase("0.131.0", "0.129.0", &one, Ok(()));
2216        testcase("0.131.0", "0.130.0", &one, Ok(()));
2217        testcase("0.131.0", "0.131.0", &one, Ok(()));
2218        testcase("0.131.0", "0.131.1", &one, Ok(()));
2219        testcase("0.131.1", "0.131.0", &one, Ok(()));
2220        testcase("0.131.0", "0.132.0", &one, Ok(()));
2221        testcase("0.131.0", "0.133.0", &one, Err(()));
2222
2223        testcase("0.130.0", "0.128.0", &two, Ok(()));
2224        testcase("0.130.0", "0.129.0", &two, Ok(()));
2225        testcase("0.130.0", "0.130.0", &two, Ok(()));
2226        testcase("0.130.0", "0.130.1", &two, Ok(()));
2227        testcase("0.130.1", "0.130.0", &two, Ok(()));
2228        testcase("0.130.0", "0.131.0", &two, Ok(()));
2229        testcase("0.130.0", "0.132.0", &two, Ok(()));
2230        testcase("0.130.0", "0.135.0", &two, Ok(()));
2231        testcase("0.130.0", "0.138.0", &two, Ok(()));
2232        testcase("0.130.0", "0.139.0", &two, Ok(()));
2233        testcase("0.130.0", "0.140.0", &two, Ok(()));
2234        testcase("0.130.9", "0.140.0", &two, Ok(()));
2235        testcase("0.130.0", "0.140.1", &two, Ok(()));
2236        testcase("0.130.3", "0.140.1", &two, Ok(()));
2237        testcase("0.130.3", "0.140.9", &two, Ok(()));
2238        testcase("0.130.0", "0.141.0", &two, Err(()));
2239        testcase("0.129.0", "0.133.0", &two, Err(()));
2240        testcase("0.129.0", "0.140.0", &two, Err(()));
2241        testcase("0.131.0", "0.133.0", &two, Err(()));
2242        testcase("0.131.0", "0.140.0", &two, Err(()));
2243
2244        testcase("0.130.0", "0.128.0", &three, Ok(()));
2245        testcase("0.130.0", "0.129.0", &three, Ok(()));
2246        testcase("0.130.0", "0.130.0", &three, Ok(()));
2247        testcase("0.130.0", "0.130.1", &three, Ok(()));
2248        testcase("0.130.1", "0.130.0", &three, Ok(()));
2249        testcase("0.130.0", "0.131.0", &three, Ok(()));
2250        testcase("0.130.0", "0.132.0", &three, Ok(()));
2251        testcase("0.130.0", "0.135.0", &three, Ok(()));
2252        testcase("0.130.0", "0.138.0", &three, Ok(()));
2253        testcase("0.130.0", "0.139.0", &three, Ok(()));
2254        testcase("0.130.0", "0.140.0", &three, Ok(()));
2255        testcase("0.130.9", "0.140.0", &three, Ok(()));
2256        testcase("0.130.0", "0.140.1", &three, Ok(()));
2257        testcase("0.130.3", "0.140.1", &three, Ok(()));
2258        testcase("0.130.3", "0.140.9", &three, Ok(()));
2259        testcase("0.130.0", "0.141.0", &three, Err(()));
2260        testcase("0.129.0", "0.133.0", &three, Err(()));
2261        testcase("0.129.0", "0.140.0", &three, Err(()));
2262        testcase("0.131.0", "0.133.0", &three, Err(()));
2263        testcase("0.131.0", "0.140.0", &three, Err(()));
2264        testcase("0.130.0", "0.150.0", &three, Err(()));
2265
2266        testcase("0.140.0", "0.138.0", &three, Ok(()));
2267        testcase("0.140.0", "0.139.0", &three, Ok(()));
2268        testcase("0.140.0", "0.140.0", &three, Ok(()));
2269        testcase("0.140.0", "0.140.1", &three, Ok(()));
2270        testcase("0.140.1", "0.140.0", &three, Ok(()));
2271        testcase("0.140.0", "0.141.0", &three, Ok(()));
2272        testcase("0.140.0", "0.142.0", &three, Ok(()));
2273        testcase("0.140.0", "0.145.0", &three, Ok(()));
2274        testcase("0.140.0", "0.148.0", &three, Ok(()));
2275        testcase("0.140.0", "0.149.0", &three, Ok(()));
2276        testcase("0.140.0", "0.150.0", &three, Ok(()));
2277        testcase("0.140.9", "0.150.0", &three, Ok(()));
2278        testcase("0.140.0", "0.150.1", &three, Ok(()));
2279        testcase("0.140.3", "0.150.1", &three, Ok(()));
2280        testcase("0.140.3", "0.150.9", &three, Ok(()));
2281        testcase("0.140.0", "0.151.0", &three, Err(()));
2282        testcase("0.139.0", "0.143.0", &three, Err(()));
2283        testcase("0.139.0", "0.150.0", &three, Err(()));
2284        testcase("0.141.0", "0.143.0", &three, Err(()));
2285        testcase("0.141.0", "0.150.0", &three, Err(()));
2286
2287        testcase("0.150.0", "0.148.0", &three, Ok(()));
2288        testcase("0.150.0", "0.149.0", &three, Ok(()));
2289        testcase("0.150.0", "0.150.0", &three, Ok(()));
2290        testcase("0.150.0", "0.150.1", &three, Ok(()));
2291        testcase("0.150.1", "0.150.0", &three, Ok(()));
2292        testcase("0.150.0", "0.151.0", &three, Ok(()));
2293        testcase("0.150.0", "0.152.0", &three, Ok(()));
2294        testcase("0.150.0", "0.155.0", &three, Ok(()));
2295        testcase("0.150.0", "0.158.0", &three, Ok(()));
2296        testcase("0.150.0", "0.159.0", &three, Ok(()));
2297        testcase("0.150.0", "0.160.0", &three, Ok(()));
2298        testcase("0.150.9", "0.160.0", &three, Ok(()));
2299        testcase("0.150.0", "0.160.1", &three, Ok(()));
2300        testcase("0.150.3", "0.160.1", &three, Ok(()));
2301        testcase("0.150.3", "0.160.9", &three, Ok(()));
2302        testcase("0.150.0", "0.161.0", &three, Ok(()));
2303        testcase("0.149.0", "0.153.0", &three, Err(()));
2304        testcase("0.149.0", "0.160.0", &three, Err(()));
2305        testcase("0.151.0", "0.153.0", &three, Err(()));
2306        testcase("0.151.0", "0.160.0", &three, Err(()));
2307    }
2308
2309    #[mz_ore::test]
2310    fn check_data_versions() {
2311        #[track_caller]
2312        fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2313            let code = Version::parse(code).unwrap();
2314            let data = Version::parse(data).unwrap();
2315            #[allow(clippy::disallowed_methods)]
2316            let actual =
2317                std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2318            assert_eq!(actual, expected);
2319        }
2320
2321        testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2322        testcase("0.10.0-dev", "0.10.0", Ok(()));
2323        // Note: Probably useful to let tests use two arbitrary shas on main, at
2324        // the very least for things like git bisect.
2325        testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2326        testcase("0.10.0-dev", "0.11.0", Ok(()));
2327        testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2328        testcase("0.10.0-dev", "0.12.0", Err(()));
2329        testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2330
2331        testcase("0.10.0", "0.8.0-dev", Ok(()));
2332        testcase("0.10.0", "0.8.0", Ok(()));
2333        testcase("0.10.0", "0.9.0-dev", Ok(()));
2334        testcase("0.10.0", "0.9.0", Ok(()));
2335        testcase("0.10.0", "0.10.0-dev", Ok(()));
2336        testcase("0.10.0", "0.10.0", Ok(()));
2337        // Note: This is what it would look like to run a version of the catalog
2338        // upgrade checker built from main.
2339        testcase("0.10.0", "0.11.0-dev", Ok(()));
2340        testcase("0.10.0", "0.11.0", Ok(()));
2341        testcase("0.10.0", "0.11.1", Ok(()));
2342        testcase("0.10.0", "0.11.1000000", Ok(()));
2343        testcase("0.10.0", "0.12.0-dev", Err(()));
2344        testcase("0.10.0", "0.12.0", Err(()));
2345        testcase("0.10.0", "0.13.0-dev", Err(()));
2346
2347        testcase("0.10.1", "0.9.0", Ok(()));
2348        testcase("0.10.1", "0.10.0", Ok(()));
2349        testcase("0.10.1", "0.11.0", Ok(()));
2350        testcase("0.10.1", "0.11.1", Ok(()));
2351        testcase("0.10.1", "0.11.100", Ok(()));
2352
2353        // This is probably a bad idea (seems as if we've downgraded from
2354        // running v0.10.1 to v0.10.0, an earlier patch version of the same
2355        // minor version), but not much we can do, given the `state_version =
2356        // max(code_version, prev_state_version)` logic we need to prevent
2357        // rolling back an arbitrary number of versions.
2358        testcase("0.10.0", "0.10.1", Ok(()));
2359    }
2360}