mz_persist_client/internal/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44    ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45    HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46    LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47    ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48    ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49    ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50    ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51    ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52    ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53    RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54    proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57    ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60    ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65/// A key and value `Schema` of data written to a batch or shard.
66#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68    /// Id under which this schema is registered in the shard's schema registry,
69    /// if any.
70    pub id: Option<SchemaId>,
71    /// Key `Schema`.
72    pub key: Arc<K::Schema>,
73    /// Value `Schema`.
74    pub val: Arc<V::Schema>,
75}
76
77impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
78    fn clone(&self) -> Self {
79        Self {
80            id: self.id,
81            key: Arc::clone(&self.key),
82            val: Arc::clone(&self.val),
83        }
84    }
85}
86
87/// A proto that is decoded on use.
88///
89/// Because of the way prost works, decoding a large protobuf may result in a
90/// number of very short lived allocations in our RustType/ProtoType decode path
91/// (e.g. this happens for a repeated embedded message). Not every use of
92/// persist State needs every transitive bit of it to be decoded, so we opt
93/// certain parts of it (initially stats) to be decoded on use.
94///
95/// This has the dual benefit of only paying for the short-lived allocs when
96/// necessary and also allowing decoding to be gated by a feature flag. The
97/// tradeoffs are that we might decode more than once and that we have to handle
98/// invalid proto errors in more places.
99///
100/// Mechanically, this is accomplished by making the field a proto `bytes` types
101/// instead of `ProtoFoo`. These bytes then contain the serialization of
102/// ProtoFoo. NB: Swapping between the two is actually a forward and backward
103/// compatible change.
104///
105/// > Embedded messages are compatible with bytes if the bytes contain an
106/// > encoded version of the message.
107///
108/// (See <https://protobuf.dev/programming-guides/proto3/#updating>)
109#[derive(Clone, Serialize, Deserialize)]
110pub struct LazyProto<T> {
111    buf: Bytes,
112    _phantom: PhantomData<fn() -> T>,
113}
114
115impl<T: Message + Default> Debug for LazyProto<T> {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self.decode() {
118            Ok(proto) => Debug::fmt(&proto, f),
119            Err(err) => f
120                .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
121                .field("err", &err)
122                .finish(),
123        }
124    }
125}
126
127impl<T> PartialEq for LazyProto<T> {
128    fn eq(&self, other: &Self) -> bool {
129        self.cmp(other) == Ordering::Equal
130    }
131}
132
133impl<T> Eq for LazyProto<T> {}
134
135impl<T> PartialOrd for LazyProto<T> {
136    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
137        Some(self.cmp(other))
138    }
139}
140
141impl<T> Ord for LazyProto<T> {
142    fn cmp(&self, other: &Self) -> Ordering {
143        let LazyProto {
144            buf: self_buf,
145            _phantom: _,
146        } = self;
147        let LazyProto {
148            buf: other_buf,
149            _phantom: _,
150        } = other;
151        self_buf.cmp(other_buf)
152    }
153}
154
155impl<T> Hash for LazyProto<T> {
156    fn hash<H: Hasher>(&self, state: &mut H) {
157        let LazyProto { buf, _phantom } = self;
158        buf.hash(state);
159    }
160}
161
162impl<T: Message + Default> From<&T> for LazyProto<T> {
163    fn from(value: &T) -> Self {
164        let buf = Bytes::from(value.encode_to_vec());
165        LazyProto {
166            buf,
167            _phantom: PhantomData,
168        }
169    }
170}
171
172impl<T: Message + Default> LazyProto<T> {
173    pub fn decode(&self) -> Result<T, prost::DecodeError> {
174        T::decode(&*self.buf)
175    }
176
177    pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
178        Ok(T::decode(&*self.buf)?.into_rust()?)
179    }
180}
181
182impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
183    fn into_proto(&self) -> Bytes {
184        self.buf.clone()
185    }
186
187    fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
188        Ok(Self {
189            buf,
190            _phantom: PhantomData,
191        })
192    }
193}
194
195pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
196    let uuid_encoded = match encoded.strip_prefix(id_prefix) {
197        Some(x) => x,
198        None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
199    };
200    let uuid = Uuid::parse_str(uuid_encoded)
201        .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
202    Ok(*uuid.as_bytes())
203}
204
205pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
206    if let Err(msg) = cfg::check_data_version(code_version, data_version) {
207        // We can't catch halts, so panic in test, so we can get unit test
208        // coverage.
209        if cfg!(test) {
210            panic!("{msg}");
211        } else {
212            halt!("{msg}");
213        }
214    }
215}
216
217impl RustType<String> for LeasedReaderId {
218    fn into_proto(&self) -> String {
219        self.to_string()
220    }
221
222    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
223        match proto.parse() {
224            Ok(x) => Ok(x),
225            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
226        }
227    }
228}
229
230impl RustType<String> for CriticalReaderId {
231    fn into_proto(&self) -> String {
232        self.to_string()
233    }
234
235    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
236        match proto.parse() {
237            Ok(x) => Ok(x),
238            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
239        }
240    }
241}
242
243impl RustType<String> for WriterId {
244    fn into_proto(&self) -> String {
245        self.to_string()
246    }
247
248    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
249        match proto.parse() {
250            Ok(x) => Ok(x),
251            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
252        }
253    }
254}
255
256impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
257    fn into_proto(&self) -> ProtoEncodedSchemas {
258        ProtoEncodedSchemas {
259            key: self.key.clone(),
260            key_data_type: self.key_data_type.clone(),
261            val: self.val.clone(),
262            val_data_type: self.val_data_type.clone(),
263        }
264    }
265
266    fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
267        Ok(EncodedSchemas {
268            key: proto.key,
269            key_data_type: proto.key_data_type,
270            val: proto.val,
271            val_data_type: proto.val_data_type,
272        })
273    }
274}
275
276impl RustType<String> for IdempotencyToken {
277    fn into_proto(&self) -> String {
278        self.to_string()
279    }
280
281    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
282        match proto.parse() {
283            Ok(x) => Ok(x),
284            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
285        }
286    }
287}
288
289impl RustType<String> for PartialBatchKey {
290    fn into_proto(&self) -> String {
291        self.0.clone()
292    }
293
294    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
295        Ok(PartialBatchKey(proto))
296    }
297}
298
299impl RustType<String> for PartialRollupKey {
300    fn into_proto(&self) -> String {
301        self.0.clone()
302    }
303
304    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
305        Ok(PartialRollupKey(proto))
306    }
307}
308
309impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
310    pub fn encode<B>(&self, buf: &mut B)
311    where
312        B: bytes::BufMut,
313    {
314        self.into_proto()
315            .encode(buf)
316            .expect("no required fields means no initialization errors");
317    }
318
319    pub fn decode(build_version: &Version, buf: Bytes) -> Self {
320        let proto = ProtoStateDiff::decode(buf)
321            // We received a State that we couldn't decode. This could happen if
322            // persist messes up backward/forward compatibility, if the durable
323            // data was corrupted, or if operations messes up deployment. In any
324            // case, fail loudly.
325            .expect("internal error: invalid encoded state");
326        let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
327        check_data_version(build_version, &diff.applier_version);
328        diff
329    }
330}
331
332impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
333    fn into_proto(&self) -> ProtoStateDiff {
334        // Deconstruct self so we get a compile failure if new fields are added.
335        let StateDiff {
336            applier_version,
337            seqno_from,
338            seqno_to,
339            walltime_ms,
340            latest_rollup_key,
341            rollups,
342            active_rollup,
343            active_gc,
344            hostname,
345            last_gc_req,
346            leased_readers,
347            critical_readers,
348            writers,
349            schemas,
350            since,
351            legacy_batches,
352            hollow_batches,
353            spine_batches,
354            merges,
355        } = self;
356
357        let proto = ProtoStateFieldDiffs::default();
358
359        // Create a writer so we can efficiently encode our data.
360        let mut writer = proto.into_writer();
361
362        field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
363        field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
364        field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
365        field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
366        field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
367        field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
368        field_diffs_into_proto(
369            ProtoStateField::CriticalReaders,
370            critical_readers,
371            &mut writer,
372        );
373        field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
374        field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
375        field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
376        field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
377        field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
378        field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
379        field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
380
381        // After encoding all of our data, convert back into the proto.
382        let field_diffs = writer.into_proto();
383
384        debug_assert_eq!(field_diffs.validate(), Ok(()));
385        ProtoStateDiff {
386            applier_version: applier_version.to_string(),
387            seqno_from: seqno_from.into_proto(),
388            seqno_to: seqno_to.into_proto(),
389            walltime_ms: walltime_ms.into_proto(),
390            latest_rollup_key: latest_rollup_key.into_proto(),
391            field_diffs: Some(field_diffs),
392        }
393    }
394
395    fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
396        let applier_version = if proto.applier_version.is_empty() {
397            // Backward compatibility with versions of ProtoState before we set
398            // this field: if it's missing (empty), assume an infinitely old
399            // version.
400            semver::Version::new(0, 0, 0)
401        } else {
402            semver::Version::parse(&proto.applier_version).map_err(|err| {
403                TryFromProtoError::InvalidSemverVersion(format!(
404                    "invalid applier_version {}: {}",
405                    proto.applier_version, err
406                ))
407            })?
408        };
409        let mut state_diff = StateDiff::new(
410            applier_version,
411            proto.seqno_from.into_rust()?,
412            proto.seqno_to.into_rust()?,
413            proto.walltime_ms,
414            proto.latest_rollup_key.into_rust()?,
415        );
416        if let Some(field_diffs) = proto.field_diffs {
417            debug_assert_eq!(field_diffs.validate(), Ok(()));
418            for field_diff in field_diffs.iter() {
419                let (field, diff) = field_diff?;
420                match field {
421                    ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
422                        diff,
423                        &mut state_diff.hostname,
424                        |()| Ok(()),
425                        |v| v.into_rust(),
426                    )?,
427                    ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
428                        diff,
429                        &mut state_diff.last_gc_req,
430                        |()| Ok(()),
431                        |v| v.into_rust(),
432                    )?,
433                    ProtoStateField::ActiveGc => {
434                        field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
435                            diff,
436                            &mut state_diff.active_gc,
437                            |()| Ok(()),
438                            |v| v.into_rust(),
439                        )?
440                    }
441                    ProtoStateField::ActiveRollup => {
442                        field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
443                            diff,
444                            &mut state_diff.active_rollup,
445                            |()| Ok(()),
446                            |v| v.into_rust(),
447                        )?
448                    }
449                    ProtoStateField::Rollups => {
450                        field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
451                            diff,
452                            &mut state_diff.rollups,
453                            |k| k.into_rust(),
454                            |v| v.into_rust(),
455                        )?
456                    }
457                    // MIGRATION: We previously stored rollups as a `SeqNo ->
458                    // string Key` map, but now the value is a `struct
459                    // HollowRollup`.
460                    ProtoStateField::DeprecatedRollups => {
461                        field_diff_into_rust::<u64, String, _, _, _, _>(
462                            diff,
463                            &mut state_diff.rollups,
464                            |k| k.into_rust(),
465                            |v| {
466                                Ok(HollowRollup {
467                                    key: v.into_rust()?,
468                                    encoded_size_bytes: None,
469                                })
470                            },
471                        )?
472                    }
473                    ProtoStateField::LeasedReaders => {
474                        field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
475                            diff,
476                            &mut state_diff.leased_readers,
477                            |k| k.into_rust(),
478                            |v| v.into_rust(),
479                        )?
480                    }
481                    ProtoStateField::CriticalReaders => {
482                        field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
483                            diff,
484                            &mut state_diff.critical_readers,
485                            |k| k.into_rust(),
486                            |v| v.into_rust(),
487                        )?
488                    }
489                    ProtoStateField::Writers => {
490                        field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
491                            diff,
492                            &mut state_diff.writers,
493                            |k| k.into_rust(),
494                            |v| v.into_rust(),
495                        )?
496                    }
497                    ProtoStateField::Schemas => {
498                        field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
499                            diff,
500                            &mut state_diff.schemas,
501                            |k| k.into_rust(),
502                            |v| v.into_rust(),
503                        )?
504                    }
505                    ProtoStateField::Since => {
506                        field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
507                            diff,
508                            &mut state_diff.since,
509                            |()| Ok(()),
510                            |v| v.into_rust(),
511                        )?
512                    }
513                    ProtoStateField::LegacyBatches => {
514                        field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
515                            diff,
516                            &mut state_diff.legacy_batches,
517                            |k| k.into_rust(),
518                            |()| Ok(()),
519                        )?
520                    }
521                    ProtoStateField::HollowBatches => {
522                        field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
523                            diff,
524                            &mut state_diff.hollow_batches,
525                            |k| k.into_rust(),
526                            |v| v.into_rust(),
527                        )?
528                    }
529                    ProtoStateField::SpineBatches => {
530                        field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
531                            diff,
532                            &mut state_diff.spine_batches,
533                            |k| k.into_rust(),
534                            |v| v.into_rust(),
535                        )?
536                    }
537                    ProtoStateField::SpineMerges => {
538                        field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
539                            diff,
540                            &mut state_diff.merges,
541                            |k| k.into_rust(),
542                            |v| v.into_rust(),
543                        )?
544                    }
545                }
546            }
547        }
548        Ok(state_diff)
549    }
550}
551
552fn field_diffs_into_proto<K, KP, V, VP>(
553    field: ProtoStateField,
554    diffs: &[StateFieldDiff<K, V>],
555    writer: &mut ProtoStateFieldDiffsWriter,
556) where
557    KP: prost::Message,
558    K: RustType<KP>,
559    VP: prost::Message,
560    V: RustType<VP>,
561{
562    for diff in diffs.iter() {
563        field_diff_into_proto(field, diff, writer);
564    }
565}
566
567fn field_diff_into_proto<K, KP, V, VP>(
568    field: ProtoStateField,
569    diff: &StateFieldDiff<K, V>,
570    writer: &mut ProtoStateFieldDiffsWriter,
571) where
572    KP: prost::Message,
573    K: RustType<KP>,
574    VP: prost::Message,
575    V: RustType<VP>,
576{
577    writer.push_field(field);
578    writer.encode_proto(&diff.key.into_proto());
579    match &diff.val {
580        StateFieldValDiff::Insert(to) => {
581            writer.push_diff_type(ProtoStateFieldDiffType::Insert);
582            writer.encode_proto(&to.into_proto());
583        }
584        StateFieldValDiff::Update(from, to) => {
585            writer.push_diff_type(ProtoStateFieldDiffType::Update);
586            writer.encode_proto(&from.into_proto());
587            writer.encode_proto(&to.into_proto());
588        }
589        StateFieldValDiff::Delete(from) => {
590            writer.push_diff_type(ProtoStateFieldDiffType::Delete);
591            writer.encode_proto(&from.into_proto());
592        }
593    };
594}
595
596fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
597    proto: ProtoStateFieldDiff<'_>,
598    diffs: &mut Vec<StateFieldDiff<K, V>>,
599    k_fn: KFn,
600    v_fn: VFn,
601) -> Result<(), TryFromProtoError>
602where
603    KP: prost::Message + Default,
604    VP: prost::Message + Default,
605    KFn: Fn(KP) -> Result<K, TryFromProtoError>,
606    VFn: Fn(VP) -> Result<V, TryFromProtoError>,
607{
608    let val = match proto.diff_type {
609        ProtoStateFieldDiffType::Insert => {
610            let to = VP::decode(proto.to)
611                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
612            StateFieldValDiff::Insert(v_fn(to)?)
613        }
614        ProtoStateFieldDiffType::Update => {
615            let from = VP::decode(proto.from)
616                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
617            let to = VP::decode(proto.to)
618                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
619
620            StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
621        }
622        ProtoStateFieldDiffType::Delete => {
623            let from = VP::decode(proto.from)
624                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
625            StateFieldValDiff::Delete(v_fn(from)?)
626        }
627    };
628    let key = KP::decode(proto.key)
629        .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
630    diffs.push(StateFieldDiff {
631        key: k_fn(key)?,
632        val,
633    });
634    Ok(())
635}
636
637/// The decoded state of [ProtoRollup] for which we have not yet checked
638/// that codecs match the ones in durable state.
639#[derive(Debug)]
640#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
641pub struct UntypedState<T> {
642    pub(crate) key_codec: String,
643    pub(crate) val_codec: String,
644    pub(crate) ts_codec: String,
645    pub(crate) diff_codec: String,
646
647    // Important! This T has not been validated, so we can't expose anything in
648    // State that references T until one of the check methods have been called.
649    // Any field on State that doesn't reference T is fair game.
650    state: State<T>,
651}
652
653impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
654    pub fn seqno(&self) -> SeqNo {
655        self.state.seqno
656    }
657
658    pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
659        &self.state.collections.rollups
660    }
661
662    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
663        self.state.latest_rollup()
664    }
665
666    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
667        &mut self,
668        cfg: &PersistConfig,
669        metrics: &Metrics,
670        diffs: I,
671    ) {
672        // The apply_encoded_diffs might panic if T is not correct. Making this
673        // a silent no-op is far too subtle for my taste, but it's not clear
674        // what else we could do instead.
675        if T::codec_name() != self.ts_codec {
676            return;
677        }
678        self.state.apply_encoded_diffs(cfg, metrics, diffs);
679    }
680
681    pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
682        self,
683        shard_id: &ShardId,
684    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
685        // Also defensively check that the shard_id on the state we fetched
686        // matches the shard_id we were trying to fetch.
687        assert_eq!(shard_id, &self.state.shard_id);
688        if K::codec_name() != self.key_codec
689            || V::codec_name() != self.val_codec
690            || T::codec_name() != self.ts_codec
691            || D::codec_name() != self.diff_codec
692        {
693            return Err(Box::new(CodecMismatch {
694                requested: (
695                    K::codec_name(),
696                    V::codec_name(),
697                    T::codec_name(),
698                    D::codec_name(),
699                    None,
700                ),
701                actual: (
702                    self.key_codec,
703                    self.val_codec,
704                    self.ts_codec,
705                    self.diff_codec,
706                    None,
707                ),
708            }));
709        }
710        Ok(TypedState {
711            state: self.state,
712            _phantom: PhantomData,
713        })
714    }
715
716    pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
717        // Also defensively check that the shard_id on the state we fetched
718        // matches the shard_id we were trying to fetch.
719        assert_eq!(shard_id, &self.state.shard_id);
720        if T::codec_name() != self.ts_codec {
721            return Err(CodecMismatchT {
722                requested: T::codec_name(),
723                actual: self.ts_codec,
724            });
725        }
726        Ok(self.state)
727    }
728
729    pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
730        let proto = ProtoRollup::decode(buf)
731            // We received a State that we couldn't decode. This could happen if
732            // persist messes up backward/forward compatibility, if the durable
733            // data was corrupted, or if operations messes up deployment. In any
734            // case, fail loudly.
735            .expect("internal error: invalid encoded state");
736        let state = Rollup::from_proto(proto)
737            .expect("internal error: invalid encoded state")
738            .state;
739        check_data_version(build_version, &state.state.applier_version);
740        state
741    }
742}
743
744impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
745where
746    K: Codec,
747    V: Codec,
748    T: Codec64,
749    D: Codec64,
750{
751    fn from(typed_state: TypedState<K, V, T, D>) -> Self {
752        UntypedState {
753            key_codec: K::codec_name(),
754            val_codec: V::codec_name(),
755            ts_codec: T::codec_name(),
756            diff_codec: D::codec_name(),
757            state: typed_state.state,
758        }
759    }
760}
761
762/// A struct that maps 1:1 with ProtoRollup.
763///
764/// Contains State, and optionally the diffs between (state.latest_rollup.seqno, state.seqno]
765///
766/// `diffs` is always expected to be `Some` when writing new rollups, but may be `None`
767/// when deserializing rollups that were persisted before we started inlining diffs.
768#[derive(Debug)]
769pub struct Rollup<T> {
770    pub(crate) state: UntypedState<T>,
771    pub(crate) diffs: Option<InlinedDiffs>,
772}
773
774impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
775    /// Creates a `StateRollup` from a state and diffs from its last rollup.
776    ///
777    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
778    pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
779        let latest_rollup_seqno = *state.latest_rollup().0;
780        let mut verify_seqno = latest_rollup_seqno;
781        for diff in &diffs {
782            assert_eq!(verify_seqno.next(), diff.seqno);
783            verify_seqno = diff.seqno;
784        }
785        assert_eq!(verify_seqno, state.seqno());
786
787        let diffs = Some(InlinedDiffs::from(
788            latest_rollup_seqno.next(),
789            state.seqno().next(),
790            diffs,
791        ));
792
793        Self { state, diffs }
794    }
795
796    pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
797        Self { state, diffs: None }
798    }
799
800    pub(crate) fn from_state_without_diffs(
801        state: State<T>,
802        key_codec: String,
803        val_codec: String,
804        ts_codec: String,
805        diff_codec: String,
806    ) -> Self {
807        Self::from_untyped_state_without_diffs(UntypedState {
808            key_codec,
809            val_codec,
810            ts_codec,
811            diff_codec,
812            state,
813        })
814    }
815}
816
817#[derive(Debug)]
818pub(crate) struct InlinedDiffs {
819    pub(crate) lower: SeqNo,
820    pub(crate) upper: SeqNo,
821    pub(crate) diffs: Vec<VersionedData>,
822}
823
824impl InlinedDiffs {
825    pub(crate) fn description(&self) -> Description<SeqNo> {
826        Description::new(
827            Antichain::from_elem(self.lower),
828            Antichain::from_elem(self.upper),
829            Antichain::from_elem(SeqNo::minimum()),
830        )
831    }
832
833    fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
834        for diff in &diffs {
835            assert!(diff.seqno >= lower);
836            assert!(diff.seqno < upper);
837        }
838        Self {
839            lower,
840            upper,
841            diffs,
842        }
843    }
844}
845
846impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
847    fn into_proto(&self) -> ProtoInlinedDiffs {
848        ProtoInlinedDiffs {
849            lower: self.lower.into_proto(),
850            upper: self.upper.into_proto(),
851            diffs: self.diffs.into_proto(),
852        }
853    }
854
855    fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
856        Ok(Self {
857            lower: proto.lower.into_rust()?,
858            upper: proto.upper.into_rust()?,
859            diffs: proto.diffs.into_rust()?,
860        })
861    }
862}
863
864impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
865    fn into_proto(&self) -> ProtoRollup {
866        ProtoRollup {
867            applier_version: self.state.state.applier_version.to_string(),
868            shard_id: self.state.state.shard_id.into_proto(),
869            seqno: self.state.state.seqno.into_proto(),
870            walltime_ms: self.state.state.walltime_ms.into_proto(),
871            hostname: self.state.state.hostname.into_proto(),
872            key_codec: self.state.key_codec.into_proto(),
873            val_codec: self.state.val_codec.into_proto(),
874            ts_codec: T::codec_name(),
875            diff_codec: self.state.diff_codec.into_proto(),
876            last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
877            active_rollup: self.state.state.collections.active_rollup.into_proto(),
878            active_gc: self.state.state.collections.active_gc.into_proto(),
879            rollups: self
880                .state
881                .state
882                .collections
883                .rollups
884                .iter()
885                .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
886                .collect(),
887            deprecated_rollups: Default::default(),
888            leased_readers: self
889                .state
890                .state
891                .collections
892                .leased_readers
893                .iter()
894                .map(|(id, state)| (id.into_proto(), state.into_proto()))
895                .collect(),
896            critical_readers: self
897                .state
898                .state
899                .collections
900                .critical_readers
901                .iter()
902                .map(|(id, state)| (id.into_proto(), state.into_proto()))
903                .collect(),
904            writers: self
905                .state
906                .state
907                .collections
908                .writers
909                .iter()
910                .map(|(id, state)| (id.into_proto(), state.into_proto()))
911                .collect(),
912            schemas: self
913                .state
914                .state
915                .collections
916                .schemas
917                .iter()
918                .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
919                .collect(),
920            trace: Some(self.state.state.collections.trace.into_proto()),
921            diffs: self.diffs.as_ref().map(|x| x.into_proto()),
922        }
923    }
924
925    fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
926        let applier_version = if x.applier_version.is_empty() {
927            // Backward compatibility with versions of ProtoState before we set
928            // this field: if it's missing (empty), assume an infinitely old
929            // version.
930            semver::Version::new(0, 0, 0)
931        } else {
932            semver::Version::parse(&x.applier_version).map_err(|err| {
933                TryFromProtoError::InvalidSemverVersion(format!(
934                    "invalid applier_version {}: {}",
935                    x.applier_version, err
936                ))
937            })?
938        };
939
940        let mut rollups = BTreeMap::new();
941        for (seqno, rollup) in x.rollups {
942            rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
943        }
944        for (seqno, key) in x.deprecated_rollups {
945            rollups.insert(
946                seqno.into_rust()?,
947                HollowRollup {
948                    key: key.into_rust()?,
949                    encoded_size_bytes: None,
950                },
951            );
952        }
953        let mut leased_readers = BTreeMap::new();
954        for (id, state) in x.leased_readers {
955            leased_readers.insert(id.into_rust()?, state.into_rust()?);
956        }
957        let mut critical_readers = BTreeMap::new();
958        for (id, state) in x.critical_readers {
959            critical_readers.insert(id.into_rust()?, state.into_rust()?);
960        }
961        let mut writers = BTreeMap::new();
962        for (id, state) in x.writers {
963            writers.insert(id.into_rust()?, state.into_rust()?);
964        }
965        let mut schemas = BTreeMap::new();
966        for (id, x) in x.schemas {
967            schemas.insert(id.into_rust()?, x.into_rust()?);
968        }
969        let active_rollup = x
970            .active_rollup
971            .map(|rollup| rollup.into_rust())
972            .transpose()?;
973        let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
974        let collections = StateCollections {
975            rollups,
976            active_rollup,
977            active_gc,
978            last_gc_req: x.last_gc_req.into_rust()?,
979            leased_readers,
980            critical_readers,
981            writers,
982            schemas,
983            trace: x.trace.into_rust_if_some("trace")?,
984        };
985        let state = State {
986            applier_version,
987            shard_id: x.shard_id.into_rust()?,
988            seqno: x.seqno.into_rust()?,
989            walltime_ms: x.walltime_ms,
990            hostname: x.hostname,
991            collections,
992        };
993
994        let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
995        if let Some(diffs) = &diffs {
996            if diffs.lower != state.latest_rollup().0.next() {
997                return Err(TryFromProtoError::InvalidPersistState(format!(
998                    "diffs lower ({}) should match latest rollup's successor: ({})",
999                    diffs.lower,
1000                    state.latest_rollup().0.next()
1001                )));
1002            }
1003            if diffs.upper != state.seqno.next() {
1004                return Err(TryFromProtoError::InvalidPersistState(format!(
1005                    "diffs upper ({}) should match state's successor: ({})",
1006                    diffs.lower,
1007                    state.seqno.next()
1008                )));
1009            }
1010        }
1011
1012        Ok(Rollup {
1013            state: UntypedState {
1014                state,
1015                key_codec: x.key_codec.into_rust()?,
1016                val_codec: x.val_codec.into_rust()?,
1017                ts_codec: x.ts_codec.into_rust()?,
1018                diff_codec: x.diff_codec.into_rust()?,
1019            },
1020            diffs,
1021        })
1022    }
1023}
1024
1025impl RustType<ProtoVersionedData> for VersionedData {
1026    fn into_proto(&self) -> ProtoVersionedData {
1027        ProtoVersionedData {
1028            seqno: self.seqno.into_proto(),
1029            data: Bytes::clone(&self.data),
1030        }
1031    }
1032
1033    fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1034        Ok(Self {
1035            seqno: proto.seqno.into_rust()?,
1036            data: proto.data,
1037        })
1038    }
1039}
1040
1041impl RustType<ProtoSpineId> for SpineId {
1042    fn into_proto(&self) -> ProtoSpineId {
1043        ProtoSpineId {
1044            lo: self.0.into_proto(),
1045            hi: self.1.into_proto(),
1046        }
1047    }
1048
1049    fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1050        Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1051    }
1052}
1053
1054impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1055    fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1056        let (id, batch) = entry;
1057        ProtoIdHollowBatch {
1058            id: Some(id.into_proto()),
1059            batch: Some(batch.into_proto()),
1060        }
1061    }
1062
1063    fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1064        let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1065        let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1066        Ok((id, batch))
1067    }
1068}
1069
1070impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1071    fn into_proto(&self) -> ProtoSpineBatch {
1072        ProtoSpineBatch {
1073            desc: Some(self.desc.into_proto()),
1074            parts: self.parts.into_proto(),
1075            level: self.level.into_proto(),
1076            descs: self.descs.into_proto(),
1077        }
1078    }
1079
1080    fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1081        let level = proto.level.into_rust()?;
1082        let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1083        let parts = proto.parts.into_rust()?;
1084        let descs = proto.descs.into_rust()?;
1085        Ok(ThinSpineBatch {
1086            level,
1087            desc,
1088            parts,
1089            descs,
1090        })
1091    }
1092}
1093
1094impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1095    fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1096        let (id, batch) = entry;
1097        ProtoIdSpineBatch {
1098            id: Some(id.into_proto()),
1099            batch: Some(batch.into_proto()),
1100        }
1101    }
1102
1103    fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1104        let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1105        let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1106        Ok((id, batch))
1107    }
1108}
1109
1110impl RustType<ProtoCompaction> for ActiveCompaction {
1111    fn into_proto(&self) -> ProtoCompaction {
1112        ProtoCompaction {
1113            start_ms: self.start_ms,
1114        }
1115    }
1116
1117    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1118        Ok(Self {
1119            start_ms: proto.start_ms,
1120        })
1121    }
1122}
1123
1124impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1125    fn into_proto(&self) -> ProtoMerge {
1126        ProtoMerge {
1127            since: Some(self.since.into_proto()),
1128            remaining_work: self.remaining_work.into_proto(),
1129            active_compaction: self.active_compaction.into_proto(),
1130        }
1131    }
1132
1133    fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1134        let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1135        let remaining_work = proto.remaining_work.into_rust()?;
1136        let active_compaction = proto.active_compaction.into_rust()?;
1137        Ok(Self {
1138            since,
1139            remaining_work,
1140            active_compaction,
1141        })
1142    }
1143}
1144
1145impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1146    fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1147        ProtoIdMerge {
1148            id: Some(id.into_proto()),
1149            merge: Some(merge.into_proto()),
1150        }
1151    }
1152
1153    fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1154        let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1155        let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1156        Ok((id, merge))
1157    }
1158}
1159
1160impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1161    fn into_proto(&self) -> ProtoTrace {
1162        let since = self.since.into_proto();
1163        let legacy_batches = self
1164            .legacy_batches
1165            .iter()
1166            .map(|(b, _)| b.into_proto())
1167            .collect();
1168        let hollow_batches = self.hollow_batches.into_proto();
1169        let spine_batches = self.spine_batches.into_proto();
1170        let merges = self.merges.into_proto();
1171        ProtoTrace {
1172            since: Some(since),
1173            legacy_batches,
1174            hollow_batches,
1175            spine_batches,
1176            merges,
1177        }
1178    }
1179
1180    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1181        let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1182        let legacy_batches = proto
1183            .legacy_batches
1184            .into_iter()
1185            .map(|b| b.into_rust().map(|b| (b, ())))
1186            .collect::<Result<_, _>>()?;
1187        let hollow_batches = proto.hollow_batches.into_rust()?;
1188        let spine_batches = proto.spine_batches.into_rust()?;
1189        let merges = proto.merges.into_rust()?;
1190        Ok(FlatTrace {
1191            since,
1192            legacy_batches,
1193            hollow_batches,
1194            spine_batches,
1195            merges,
1196        })
1197    }
1198}
1199
1200impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1201    fn into_proto(&self) -> ProtoTrace {
1202        self.flatten().into_proto()
1203    }
1204
1205    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1206        Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1207    }
1208}
1209
1210impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1211    fn into_proto(&self) -> ProtoLeasedReaderState {
1212        ProtoLeasedReaderState {
1213            seqno: self.seqno.into_proto(),
1214            since: Some(self.since.into_proto()),
1215            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1216            lease_duration_ms: self.lease_duration_ms.into_proto(),
1217            debug: Some(self.debug.into_proto()),
1218        }
1219    }
1220
1221    fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1222        let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1223        // MIGRATION: If the lease_duration_ms is empty, then the proto field
1224        // was missing and we need to fill in a default. This would ideally be
1225        // based on the actual value in PersistConfig, but it's only here for a
1226        // short time and this is way easier.
1227        if lease_duration_ms == 0 {
1228            lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1229                .expect("lease duration as millis should fit within u64");
1230        }
1231        // MIGRATION: If debug is empty, then the proto field was missing and we
1232        // need to fill in a default.
1233        let debug = proto.debug.unwrap_or_default().into_rust()?;
1234        Ok(LeasedReaderState {
1235            seqno: proto.seqno.into_rust()?,
1236            since: proto
1237                .since
1238                .into_rust_if_some("ProtoLeasedReaderState::since")?,
1239            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1240            lease_duration_ms,
1241            debug,
1242        })
1243    }
1244}
1245
1246impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1247    fn into_proto(&self) -> ProtoCriticalReaderState {
1248        ProtoCriticalReaderState {
1249            since: Some(self.since.into_proto()),
1250            opaque: i64::from_le_bytes(self.opaque.0),
1251            opaque_codec: self.opaque_codec.clone(),
1252            debug: Some(self.debug.into_proto()),
1253        }
1254    }
1255
1256    fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1257        // MIGRATION: If debug is empty, then the proto field was missing and we
1258        // need to fill in a default.
1259        let debug = proto.debug.unwrap_or_default().into_rust()?;
1260        Ok(CriticalReaderState {
1261            since: proto
1262                .since
1263                .into_rust_if_some("ProtoCriticalReaderState::since")?,
1264            opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1265            opaque_codec: proto.opaque_codec,
1266            debug,
1267        })
1268    }
1269}
1270
1271impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1272    fn into_proto(&self) -> ProtoWriterState {
1273        ProtoWriterState {
1274            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1275            lease_duration_ms: self.lease_duration_ms.into_proto(),
1276            most_recent_write_token: self.most_recent_write_token.into_proto(),
1277            most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1278            debug: Some(self.debug.into_proto()),
1279        }
1280    }
1281
1282    fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1283        // MIGRATION: We didn't originally have most_recent_write_token and
1284        // most_recent_write_upper. Pick values that aren't going to
1285        // accidentally match ones in incoming writes and confuse things. We
1286        // could instead use Option on WriterState but this keeps the backward
1287        // compatibility logic confined to one place.
1288        let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1289            IdempotencyToken::SENTINEL
1290        } else {
1291            proto.most_recent_write_token.into_rust()?
1292        };
1293        let most_recent_write_upper = match proto.most_recent_write_upper {
1294            Some(x) => x.into_rust()?,
1295            None => Antichain::from_elem(T::minimum()),
1296        };
1297        // MIGRATION: If debug is empty, then the proto field was missing and we
1298        // need to fill in a default.
1299        let debug = proto.debug.unwrap_or_default().into_rust()?;
1300        Ok(WriterState {
1301            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1302            lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1303            most_recent_write_token,
1304            most_recent_write_upper,
1305            debug,
1306        })
1307    }
1308}
1309
1310impl RustType<ProtoHandleDebugState> for HandleDebugState {
1311    fn into_proto(&self) -> ProtoHandleDebugState {
1312        ProtoHandleDebugState {
1313            hostname: self.hostname.into_proto(),
1314            purpose: self.purpose.into_proto(),
1315        }
1316    }
1317
1318    fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1319        Ok(HandleDebugState {
1320            hostname: proto.hostname,
1321            purpose: proto.purpose,
1322        })
1323    }
1324}
1325
1326impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1327    fn into_proto(&self) -> ProtoHollowRun {
1328        ProtoHollowRun {
1329            parts: self.parts.into_proto(),
1330        }
1331    }
1332
1333    fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1334        Ok(HollowRun {
1335            parts: proto.parts.into_rust()?,
1336        })
1337    }
1338}
1339
1340impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1341    fn into_proto(&self) -> ProtoHollowBatch {
1342        let mut run_meta = self.run_meta.into_proto();
1343        // For backwards compatibility reasons, don't keep default metadata in the proto.
1344        let run_meta_default = RunMeta::default().into_proto();
1345        while run_meta.last() == Some(&run_meta_default) {
1346            run_meta.pop();
1347        }
1348        ProtoHollowBatch {
1349            desc: Some(self.desc.into_proto()),
1350            parts: self.parts.into_proto(),
1351            len: self.len.into_proto(),
1352            runs: self.run_splits.into_proto(),
1353            run_meta,
1354            deprecated_keys: vec![],
1355        }
1356    }
1357
1358    fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1359        let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1360        // MIGRATION: We used to just have the keys instead of a more structured
1361        // part.
1362        parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1363            RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1364                key: PartialBatchKey(key),
1365                encoded_size_bytes: 0,
1366                key_lower: vec![],
1367                structured_key_lower: None,
1368                stats: None,
1369                ts_rewrite: None,
1370                diffs_sum: None,
1371                format: None,
1372                schema_id: None,
1373                deprecated_schema_id: None,
1374            }))
1375        }));
1376        // We discard default metadatas from the proto above; re-add them here.
1377        let run_splits: Vec<usize> = proto.runs.into_rust()?;
1378        let num_runs = if parts.is_empty() {
1379            0
1380        } else {
1381            run_splits.len() + 1
1382        };
1383        let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1384        run_meta.resize(num_runs, RunMeta::default());
1385        Ok(HollowBatch {
1386            desc: proto.desc.into_rust_if_some("desc")?,
1387            parts,
1388            len: proto.len.into_rust()?,
1389            run_splits,
1390            run_meta,
1391        })
1392    }
1393}
1394
1395impl RustType<String> for RunId {
1396    fn into_proto(&self) -> String {
1397        self.to_string()
1398    }
1399
1400    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1401        RunId::from_str(&proto).map_err(|_| {
1402            TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1403        })
1404    }
1405}
1406
1407impl RustType<ProtoRunMeta> for RunMeta {
1408    fn into_proto(&self) -> ProtoRunMeta {
1409        let order = match self.order {
1410            None => ProtoRunOrder::Unknown,
1411            Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1412            Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1413            Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1414        };
1415        ProtoRunMeta {
1416            order: order.into(),
1417            schema_id: self.schema.into_proto(),
1418            deprecated_schema_id: self.deprecated_schema.into_proto(),
1419            id: self.id.into_proto(),
1420            len: self.len.into_proto(),
1421        }
1422    }
1423
1424    fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1425        let order = match ProtoRunOrder::try_from(proto.order)? {
1426            ProtoRunOrder::Unknown => None,
1427            ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1428            ProtoRunOrder::Codec => Some(RunOrder::Codec),
1429            ProtoRunOrder::Structured => Some(RunOrder::Structured),
1430        };
1431        Ok(Self {
1432            order,
1433            schema: proto.schema_id.into_rust()?,
1434            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1435            id: proto.id.into_rust()?,
1436            len: proto.len.into_rust()?,
1437        })
1438    }
1439}
1440
1441impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1442    fn into_proto(&self) -> ProtoHollowBatchPart {
1443        match self {
1444            RunPart::Single(part) => part.into_proto(),
1445            RunPart::Many(runs) => runs.into_proto(),
1446        }
1447    }
1448
1449    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1450        let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1451            RunPart::Many(proto.into_rust()?)
1452        } else {
1453            RunPart::Single(proto.into_rust()?)
1454        };
1455        Ok(run_part)
1456    }
1457}
1458
1459impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1460    fn into_proto(&self) -> ProtoHollowBatchPart {
1461        let part = ProtoHollowBatchPart {
1462            kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1463                key: self.key.into_proto(),
1464                max_part_bytes: self.max_part_bytes.into_proto(),
1465            })),
1466            encoded_size_bytes: self.hollow_bytes.into_proto(),
1467            key_lower: Bytes::copy_from_slice(&self.key_lower),
1468            diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1469            key_stats: None,
1470            ts_rewrite: None,
1471            format: None,
1472            schema_id: None,
1473            structured_key_lower: self.structured_key_lower.into_proto(),
1474            deprecated_schema_id: None,
1475        };
1476        part
1477    }
1478
1479    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1480        let run_proto = match proto.kind {
1481            Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1482            _ => Err(TryFromProtoError::UnknownEnumVariant(
1483                "ProtoHollowBatchPart::kind".to_string(),
1484            ))?,
1485        };
1486        Ok(Self {
1487            key: run_proto.key.into_rust()?,
1488            hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1489            max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1490            key_lower: proto.key_lower.to_vec(),
1491            structured_key_lower: proto.structured_key_lower.into_rust()?,
1492            diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1493            _phantom_data: Default::default(),
1494        })
1495    }
1496}
1497
1498impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1499    fn into_proto(&self) -> ProtoHollowBatchPart {
1500        match self {
1501            BatchPart::Hollow(x) => ProtoHollowBatchPart {
1502                kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1503                encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1504                key_lower: Bytes::copy_from_slice(&x.key_lower),
1505                structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1506                key_stats: x.stats.into_proto(),
1507                ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1508                diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1509                format: x.format.map(|f| f.into_proto()),
1510                schema_id: x.schema_id.into_proto(),
1511                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1512            },
1513            BatchPart::Inline {
1514                updates,
1515                ts_rewrite,
1516                schema_id,
1517                deprecated_schema_id,
1518            } => ProtoHollowBatchPart {
1519                kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1520                encoded_size_bytes: 0,
1521                key_lower: Bytes::new(),
1522                structured_key_lower: None,
1523                key_stats: None,
1524                ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1525                diffs_sum: None,
1526                format: None,
1527                schema_id: schema_id.into_proto(),
1528                deprecated_schema_id: deprecated_schema_id.into_proto(),
1529            },
1530        }
1531    }
1532
1533    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1534        let ts_rewrite = match proto.ts_rewrite {
1535            Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1536            None => None,
1537        };
1538        let schema_id = proto.schema_id.into_rust()?;
1539        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1540        match proto.kind {
1541            Some(proto_hollow_batch_part::Kind::Key(key)) => {
1542                Ok(BatchPart::Hollow(HollowBatchPart {
1543                    key: key.into_rust()?,
1544                    encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1545                    key_lower: proto.key_lower.into(),
1546                    structured_key_lower: proto.structured_key_lower.into_rust()?,
1547                    stats: proto.key_stats.into_rust()?,
1548                    ts_rewrite,
1549                    diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1550                    format: proto.format.map(|f| f.into_rust()).transpose()?,
1551                    schema_id,
1552                    deprecated_schema_id,
1553                }))
1554            }
1555            Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1556                assert_eq!(proto.encoded_size_bytes, 0);
1557                assert_eq!(proto.key_lower.len(), 0);
1558                assert_none!(proto.key_stats);
1559                assert_none!(proto.diffs_sum);
1560                let updates = LazyInlineBatchPart(x.into_rust()?);
1561                Ok(BatchPart::Inline {
1562                    updates,
1563                    ts_rewrite,
1564                    schema_id,
1565                    deprecated_schema_id,
1566                })
1567            }
1568            _ => Err(TryFromProtoError::unknown_enum_variant(
1569                "ProtoHollowBatchPart::kind",
1570            )),
1571        }
1572    }
1573}
1574
1575impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1576    fn into_proto(&self) -> proto_hollow_batch_part::Format {
1577        match self {
1578            BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1579            BatchColumnarFormat::Both(version) => {
1580                proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1581            }
1582            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1583        }
1584    }
1585
1586    fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1587        let format = match proto {
1588            proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1589            proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1590                BatchColumnarFormat::Both(version.cast_into())
1591            }
1592            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1593        };
1594        Ok(format)
1595    }
1596}
1597
1598/// Aggregate statistics about data contained in a part.
1599///
1600/// These are "lazy" in the sense that we don't decode them (or even validate
1601/// the encoded version) until they're used.
1602#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1603pub struct LazyPartStats {
1604    key: LazyProto<ProtoStructStats>,
1605}
1606
1607impl Debug for LazyPartStats {
1608    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1609        f.debug_tuple("LazyPartStats")
1610            .field(&self.decode())
1611            .finish()
1612    }
1613}
1614
1615impl LazyPartStats {
1616    pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1617        let PartStats { key } = x;
1618        let mut proto_stats = ProtoStructStats::from_rust(key);
1619        map_proto(&mut proto_stats);
1620        LazyPartStats {
1621            key: LazyProto::from(&proto_stats),
1622        }
1623    }
1624    /// Decodes and returns PartStats from the encoded representation.
1625    ///
1626    /// This does not cache the returned value, it decodes each time it's
1627    /// called.
1628    pub fn decode(&self) -> PartStats {
1629        let key = self.key.decode().expect("valid proto");
1630        PartStats {
1631            key: key.into_rust().expect("valid stats"),
1632        }
1633    }
1634}
1635
1636impl RustType<Bytes> for LazyPartStats {
1637    fn into_proto(&self) -> Bytes {
1638        let LazyPartStats { key } = self;
1639        key.into_proto()
1640    }
1641
1642    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1643        Ok(LazyPartStats {
1644            key: proto.into_rust()?,
1645        })
1646    }
1647}
1648
1649#[cfg(test)]
1650pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1651    proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1652}
1653
1654#[allow(unused_parens)]
1655impl Arbitrary for LazyPartStats {
1656    type Parameters = ();
1657    type Strategy =
1658        proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1659
1660    fn arbitrary_with(_: ()) -> Self::Strategy {
1661        Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1662            LazyPartStats::encode(&x, |_| {})
1663        })
1664    }
1665}
1666
1667impl ProtoInlineBatchPart {
1668    pub(crate) fn into_rust<T: Timestamp + Codec64>(
1669        lgbytes: &ColumnarMetrics,
1670        proto: Self,
1671    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1672        let updates = proto
1673            .updates
1674            .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1675        let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1676
1677        Ok(BlobTraceBatchPart {
1678            desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1679            index: proto.index.into_rust()?,
1680            updates,
1681        })
1682    }
1683}
1684
1685/// A batch part stored inlined in State.
1686#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1687pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1688
1689impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1690    fn from(value: &ProtoInlineBatchPart) -> Self {
1691        LazyInlineBatchPart(value.into())
1692    }
1693}
1694
1695impl Serialize for LazyInlineBatchPart {
1696    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1697        // NB: This serialize impl is only used for QA and debugging, so emit a
1698        // truncated version.
1699        let proto = self.0.decode().expect("valid proto");
1700        let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1701        let () = s.serialize_field("desc", &proto.desc)?;
1702        let () = s.serialize_field("index", &proto.index)?;
1703        let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1704        s.end()
1705    }
1706}
1707
1708impl LazyInlineBatchPart {
1709    pub(crate) fn encoded_size_bytes(&self) -> usize {
1710        self.0.buf.len()
1711    }
1712
1713    /// Decodes and returns a BlobTraceBatchPart from the encoded
1714    /// representation.
1715    ///
1716    /// This does not cache the returned value, it decodes each time it's
1717    /// called.
1718    pub fn decode<T: Timestamp + Codec64>(
1719        &self,
1720        lgbytes: &ColumnarMetrics,
1721    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1722        let proto = self.0.decode().expect("valid proto");
1723        ProtoInlineBatchPart::into_rust(lgbytes, proto)
1724    }
1725}
1726
1727impl RustType<Bytes> for LazyInlineBatchPart {
1728    fn into_proto(&self) -> Bytes {
1729        self.0.into_proto()
1730    }
1731
1732    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1733        Ok(LazyInlineBatchPart(proto.into_rust()?))
1734    }
1735}
1736
1737impl RustType<ProtoHollowRollup> for HollowRollup {
1738    fn into_proto(&self) -> ProtoHollowRollup {
1739        ProtoHollowRollup {
1740            key: self.key.into_proto(),
1741            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1742        }
1743    }
1744
1745    fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1746        Ok(HollowRollup {
1747            key: proto.key.into_rust()?,
1748            encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1749        })
1750    }
1751}
1752
1753impl RustType<ProtoActiveRollup> for ActiveRollup {
1754    fn into_proto(&self) -> ProtoActiveRollup {
1755        ProtoActiveRollup {
1756            start_ms: self.start_ms,
1757            seqno: self.seqno.into_proto(),
1758        }
1759    }
1760
1761    fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1762        Ok(ActiveRollup {
1763            start_ms: proto.start_ms,
1764            seqno: proto.seqno.into_rust()?,
1765        })
1766    }
1767}
1768
1769impl RustType<ProtoActiveGc> for ActiveGc {
1770    fn into_proto(&self) -> ProtoActiveGc {
1771        ProtoActiveGc {
1772            start_ms: self.start_ms,
1773            seqno: self.seqno.into_proto(),
1774        }
1775    }
1776
1777    fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1778        Ok(ActiveGc {
1779            start_ms: proto.start_ms,
1780            seqno: proto.seqno.into_rust()?,
1781        })
1782    }
1783}
1784
1785impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1786    fn into_proto(&self) -> ProtoU64Description {
1787        ProtoU64Description {
1788            lower: Some(self.lower().into_proto()),
1789            upper: Some(self.upper().into_proto()),
1790            since: Some(self.since().into_proto()),
1791        }
1792    }
1793
1794    fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1795        Ok(Description::new(
1796            proto.lower.into_rust_if_some("lower")?,
1797            proto.upper.into_rust_if_some("upper")?,
1798            proto.since.into_rust_if_some("since")?,
1799        ))
1800    }
1801}
1802
1803impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1804    fn into_proto(&self) -> ProtoU64Antichain {
1805        ProtoU64Antichain {
1806            elements: self
1807                .elements()
1808                .iter()
1809                .map(|x| i64::from_le_bytes(T::encode(x)))
1810                .collect(),
1811        }
1812    }
1813
1814    fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1815        let elements = proto
1816            .elements
1817            .iter()
1818            .map(|x| T::decode(x.to_le_bytes()))
1819            .collect::<Vec<_>>();
1820        Ok(Antichain::from(elements))
1821    }
1822}
1823
1824#[cfg(test)]
1825mod tests {
1826    use bytes::Bytes;
1827    use mz_build_info::DUMMY_BUILD_INFO;
1828    use mz_dyncfg::ConfigUpdates;
1829    use mz_ore::assert_err;
1830    use mz_persist::location::SeqNo;
1831    use proptest::prelude::*;
1832
1833    use crate::ShardId;
1834    use crate::internal::paths::PartialRollupKey;
1835    use crate::internal::state::tests::any_state;
1836    use crate::internal::state::{BatchPart, HandleDebugState};
1837    use crate::internal::state_diff::StateDiff;
1838    use crate::tests::new_test_client_cache;
1839
1840    use super::*;
1841
1842    #[mz_ore::test]
1843    fn applier_version_state() {
1844        let v1 = semver::Version::new(1, 0, 0);
1845        let v2 = semver::Version::new(2, 0, 0);
1846        let v3 = semver::Version::new(3, 0, 0);
1847
1848        // Code version v2 evaluates and writes out some State.
1849        let shard_id = ShardId::new();
1850        let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1851        let rollup =
1852            Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1853        let mut buf = Vec::new();
1854        rollup.encode(&mut buf).expect("serializable");
1855        let bytes = Bytes::from(buf);
1856
1857        // We can read it back using persist code v2 and v3.
1858        assert_eq!(
1859            UntypedState::<u64>::decode(&v2, bytes.clone())
1860                .check_codecs(&shard_id)
1861                .as_ref(),
1862            Ok(&state)
1863        );
1864        assert_eq!(
1865            UntypedState::<u64>::decode(&v3, bytes.clone())
1866                .check_codecs(&shard_id)
1867                .as_ref(),
1868            Ok(&state)
1869        );
1870
1871        // But we can't read it back using v1 because v1 might corrupt it by
1872        // losing or misinterpreting something written out by a future version
1873        // of code.
1874        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1875        let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1876        assert_err!(v1_res);
1877    }
1878
1879    #[mz_ore::test]
1880    fn applier_version_state_diff() {
1881        let v1 = semver::Version::new(1, 0, 0);
1882        let v2 = semver::Version::new(2, 0, 0);
1883        let v3 = semver::Version::new(3, 0, 0);
1884
1885        // Code version v2 evaluates and writes out some State.
1886        let diff = StateDiff::<u64>::new(
1887            v2.clone(),
1888            SeqNo(0),
1889            SeqNo(1),
1890            2,
1891            PartialRollupKey("rollup".into()),
1892        );
1893        let mut buf = Vec::new();
1894        diff.encode(&mut buf);
1895        let bytes = Bytes::from(buf);
1896
1897        // We can read it back using persist code v2 and v3.
1898        assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1899        assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1900
1901        // But we can't read it back using v1 because v1 might corrupt it by
1902        // losing or misinterpreting something written out by a future version
1903        // of code.
1904        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1905        let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1906        assert_err!(v1_res);
1907    }
1908
1909    #[mz_ore::test]
1910    fn hollow_batch_migration_keys() {
1911        let x = HollowBatch::new_run(
1912            Description::new(
1913                Antichain::from_elem(1u64),
1914                Antichain::from_elem(2u64),
1915                Antichain::from_elem(3u64),
1916            ),
1917            vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1918                key: PartialBatchKey("a".into()),
1919                encoded_size_bytes: 5,
1920                key_lower: vec![],
1921                structured_key_lower: None,
1922                stats: None,
1923                ts_rewrite: None,
1924                diffs_sum: None,
1925                format: None,
1926                schema_id: None,
1927                deprecated_schema_id: None,
1928            }))],
1929            4,
1930        );
1931        let mut old = x.into_proto();
1932        // Old ProtoHollowBatch had keys instead of parts.
1933        old.deprecated_keys = vec!["b".into()];
1934        // We don't expect to see a ProtoHollowBatch with keys _and_ parts, only
1935        // one or the other, but we have a defined output, so may as well test
1936        // it.
1937        let mut expected = x;
1938        // We fill in 0 for encoded_size_bytes when we migrate from keys. This
1939        // will violate bounded memory usage compaction during the transition
1940        // (short-term issue), but that's better than creating unnecessary runs
1941        // (longer-term issue).
1942        expected
1943            .parts
1944            .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1945                key: PartialBatchKey("b".into()),
1946                encoded_size_bytes: 0,
1947                key_lower: vec![],
1948                structured_key_lower: None,
1949                stats: None,
1950                ts_rewrite: None,
1951                diffs_sum: None,
1952                format: None,
1953                schema_id: None,
1954                deprecated_schema_id: None,
1955            })));
1956        assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1957    }
1958
1959    #[mz_ore::test]
1960    fn reader_state_migration_lease_duration() {
1961        let x = LeasedReaderState {
1962            seqno: SeqNo(1),
1963            since: Antichain::from_elem(2u64),
1964            last_heartbeat_timestamp_ms: 3,
1965            debug: HandleDebugState {
1966                hostname: "host".to_owned(),
1967                purpose: "purpose".to_owned(),
1968            },
1969            // Old ProtoReaderState had no lease_duration_ms field
1970            lease_duration_ms: 0,
1971        };
1972        let old = x.into_proto();
1973        let mut expected = x;
1974        // We fill in DEFAULT_READ_LEASE_DURATION for lease_duration_ms when we
1975        // migrate from unset.
1976        expected.lease_duration_ms =
1977            u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1978        assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1979    }
1980
1981    #[mz_ore::test]
1982    fn writer_state_migration_most_recent_write() {
1983        let proto = ProtoWriterState {
1984            last_heartbeat_timestamp_ms: 1,
1985            lease_duration_ms: 2,
1986            // Old ProtoWriterState had no most_recent_write_token or
1987            // most_recent_write_upper.
1988            most_recent_write_token: "".into(),
1989            most_recent_write_upper: None,
1990            debug: Some(ProtoHandleDebugState {
1991                hostname: "host".to_owned(),
1992                purpose: "purpose".to_owned(),
1993            }),
1994        };
1995        let expected = WriterState {
1996            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1997            lease_duration_ms: proto.lease_duration_ms,
1998            most_recent_write_token: IdempotencyToken::SENTINEL,
1999            most_recent_write_upper: Antichain::from_elem(0),
2000            debug: HandleDebugState {
2001                hostname: "host".to_owned(),
2002                purpose: "purpose".to_owned(),
2003            },
2004        };
2005        assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2006    }
2007
2008    #[mz_ore::test]
2009    fn state_migration_rollups() {
2010        let r1 = HollowRollup {
2011            key: PartialRollupKey("foo".to_owned()),
2012            encoded_size_bytes: None,
2013        };
2014        let r2 = HollowRollup {
2015            key: PartialRollupKey("bar".to_owned()),
2016            encoded_size_bytes: Some(2),
2017        };
2018        let shard_id = ShardId::new();
2019        let mut state = TypedState::<(), (), u64, i64>::new(
2020            DUMMY_BUILD_INFO.semver_version(),
2021            shard_id,
2022            "host".to_owned(),
2023            0,
2024        );
2025        state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2026        let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2027
2028        // Manually add the old rollup encoding.
2029        proto.deprecated_rollups.insert(1, r1.key.0.clone());
2030
2031        let state: Rollup<u64> = proto.into_rust().unwrap();
2032        let state = state.state;
2033        let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2034        let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2035        assert_eq!(
2036            state
2037                .state
2038                .collections
2039                .rollups
2040                .into_iter()
2041                .collect::<Vec<_>>(),
2042            expected
2043        );
2044    }
2045
2046    #[mz_persist_proc::test(tokio::test)]
2047    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2048    async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2049        let r1_rollup = HollowRollup {
2050            key: PartialRollupKey("foo".to_owned()),
2051            encoded_size_bytes: None,
2052        };
2053        let r1 = StateFieldDiff {
2054            key: SeqNo(1),
2055            val: StateFieldValDiff::Insert(r1_rollup.clone()),
2056        };
2057        let r2_rollup = HollowRollup {
2058            key: PartialRollupKey("bar".to_owned()),
2059            encoded_size_bytes: Some(2),
2060        };
2061        let r2 = StateFieldDiff {
2062            key: SeqNo(2),
2063            val: StateFieldValDiff::Insert(r2_rollup.clone()),
2064        };
2065        let r3_rollup = HollowRollup {
2066            key: PartialRollupKey("baz".to_owned()),
2067            encoded_size_bytes: None,
2068        };
2069        let r3 = StateFieldDiff {
2070            key: SeqNo(3),
2071            val: StateFieldValDiff::Delete(r3_rollup.clone()),
2072        };
2073        let mut diff = StateDiff::<u64>::new(
2074            DUMMY_BUILD_INFO.semver_version(),
2075            SeqNo(4),
2076            SeqNo(5),
2077            0,
2078            PartialRollupKey("ignored".to_owned()),
2079        );
2080        diff.rollups.push(r2.clone());
2081        diff.rollups.push(r3.clone());
2082        let mut diff_proto = diff.into_proto();
2083
2084        let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2085        let mut field_diffs_writer = field_diffs.into_writer();
2086
2087        // Manually add the old rollup encoding.
2088        field_diffs_into_proto(
2089            ProtoStateField::DeprecatedRollups,
2090            &[StateFieldDiff {
2091                key: r1.key,
2092                val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2093            }],
2094            &mut field_diffs_writer,
2095        );
2096
2097        assert_none!(diff_proto.field_diffs);
2098        diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2099
2100        let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2101        assert_eq!(
2102            diff.rollups.into_iter().collect::<Vec<_>>(),
2103            vec![r2, r3, r1]
2104        );
2105
2106        // Also make sure that a rollup delete in a diff applies cleanly to a
2107        // state that had it in the deprecated field.
2108        let shard_id = ShardId::new();
2109        let mut state = TypedState::<(), (), u64, i64>::new(
2110            DUMMY_BUILD_INFO.semver_version(),
2111            shard_id,
2112            "host".to_owned(),
2113            0,
2114        );
2115        state.state.seqno = SeqNo(4);
2116        let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2117        rollup
2118            .deprecated_rollups
2119            .insert(3, r3_rollup.key.into_proto());
2120        let state: Rollup<u64> = rollup.into_rust().unwrap();
2121        let state = state.state;
2122        let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2123        let cache = new_test_client_cache(&dyncfgs);
2124        let encoded_diff = VersionedData {
2125            seqno: SeqNo(5),
2126            data: diff_proto.encode_to_vec().into(),
2127        };
2128        state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2129        assert_eq!(
2130            state
2131                .state
2132                .collections
2133                .rollups
2134                .into_iter()
2135                .collect::<Vec<_>>(),
2136            vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2137        );
2138    }
2139
2140    #[mz_ore::test]
2141    #[cfg_attr(miri, ignore)] // too slow
2142    fn state_proto_roundtrip() {
2143        fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2144            let before = UntypedState {
2145                key_codec: <() as Codec>::codec_name(),
2146                val_codec: <() as Codec>::codec_name(),
2147                ts_codec: <T as Codec64>::codec_name(),
2148                diff_codec: <i64 as Codec64>::codec_name(),
2149                state,
2150            };
2151            let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2152            let after: Rollup<T> = proto.into_rust().unwrap();
2153            let after = after.state;
2154            assert_eq!(before, after);
2155        }
2156
2157        proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2158    }
2159
2160    #[mz_ore::test]
2161    fn check_data_versions_with_self_managed_versions() {
2162        #[track_caller]
2163        fn testcase(
2164            code: &str,
2165            data: &str,
2166            self_managed_versions: &[Version],
2167            expected: Result<(), ()>,
2168        ) {
2169            let code = Version::parse(code).unwrap();
2170            let data = Version::parse(data).unwrap();
2171            let actual = cfg::check_data_version_with_self_managed_versions(
2172                &code,
2173                &data,
2174                self_managed_versions,
2175            )
2176            .map_err(|_| ());
2177            assert_eq!(actual, expected);
2178        }
2179
2180        let none = [];
2181        let one = [Version::new(0, 130, 0)];
2182        let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2183        let three = [
2184            Version::new(0, 130, 0),
2185            Version::new(0, 140, 0),
2186            Version::new(0, 150, 0),
2187        ];
2188
2189        testcase("0.130.0", "0.128.0", &none, Ok(()));
2190        testcase("0.130.0", "0.129.0", &none, Ok(()));
2191        testcase("0.130.0", "0.130.0", &none, Ok(()));
2192        testcase("0.130.0", "0.130.1", &none, Ok(()));
2193        testcase("0.130.1", "0.130.0", &none, Ok(()));
2194        testcase("0.130.0", "0.131.0", &none, Ok(()));
2195        testcase("0.130.0", "0.132.0", &none, Err(()));
2196
2197        testcase("0.129.0", "0.127.0", &none, Ok(()));
2198        testcase("0.129.0", "0.128.0", &none, Ok(()));
2199        testcase("0.129.0", "0.129.0", &none, Ok(()));
2200        testcase("0.129.0", "0.129.1", &none, Ok(()));
2201        testcase("0.129.1", "0.129.0", &none, Ok(()));
2202        testcase("0.129.0", "0.130.0", &none, Ok(()));
2203        testcase("0.129.0", "0.131.0", &none, Err(()));
2204
2205        testcase("0.130.0", "0.128.0", &one, Ok(()));
2206        testcase("0.130.0", "0.129.0", &one, Ok(()));
2207        testcase("0.130.0", "0.130.0", &one, Ok(()));
2208        testcase("0.130.0", "0.130.1", &one, Ok(()));
2209        testcase("0.130.1", "0.130.0", &one, Ok(()));
2210        testcase("0.130.0", "0.131.0", &one, Ok(()));
2211        testcase("0.130.0", "0.132.0", &one, Ok(()));
2212
2213        testcase("0.129.0", "0.127.0", &one, Ok(()));
2214        testcase("0.129.0", "0.128.0", &one, Ok(()));
2215        testcase("0.129.0", "0.129.0", &one, Ok(()));
2216        testcase("0.129.0", "0.129.1", &one, Ok(()));
2217        testcase("0.129.1", "0.129.0", &one, Ok(()));
2218        testcase("0.129.0", "0.130.0", &one, Ok(()));
2219        testcase("0.129.0", "0.131.0", &one, Err(()));
2220
2221        testcase("0.131.0", "0.129.0", &one, Ok(()));
2222        testcase("0.131.0", "0.130.0", &one, Ok(()));
2223        testcase("0.131.0", "0.131.0", &one, Ok(()));
2224        testcase("0.131.0", "0.131.1", &one, Ok(()));
2225        testcase("0.131.1", "0.131.0", &one, Ok(()));
2226        testcase("0.131.0", "0.132.0", &one, Ok(()));
2227        testcase("0.131.0", "0.133.0", &one, Err(()));
2228
2229        testcase("0.130.0", "0.128.0", &two, Ok(()));
2230        testcase("0.130.0", "0.129.0", &two, Ok(()));
2231        testcase("0.130.0", "0.130.0", &two, Ok(()));
2232        testcase("0.130.0", "0.130.1", &two, Ok(()));
2233        testcase("0.130.1", "0.130.0", &two, Ok(()));
2234        testcase("0.130.0", "0.131.0", &two, Ok(()));
2235        testcase("0.130.0", "0.132.0", &two, Ok(()));
2236        testcase("0.130.0", "0.135.0", &two, Ok(()));
2237        testcase("0.130.0", "0.138.0", &two, Ok(()));
2238        testcase("0.130.0", "0.139.0", &two, Ok(()));
2239        testcase("0.130.0", "0.140.0", &two, Ok(()));
2240        testcase("0.130.9", "0.140.0", &two, Ok(()));
2241        testcase("0.130.0", "0.140.1", &two, Ok(()));
2242        testcase("0.130.3", "0.140.1", &two, Ok(()));
2243        testcase("0.130.3", "0.140.9", &two, Ok(()));
2244        testcase("0.130.0", "0.141.0", &two, Err(()));
2245        testcase("0.129.0", "0.133.0", &two, Err(()));
2246        testcase("0.129.0", "0.140.0", &two, Err(()));
2247        testcase("0.131.0", "0.133.0", &two, Err(()));
2248        testcase("0.131.0", "0.140.0", &two, Err(()));
2249
2250        testcase("0.130.0", "0.128.0", &three, Ok(()));
2251        testcase("0.130.0", "0.129.0", &three, Ok(()));
2252        testcase("0.130.0", "0.130.0", &three, Ok(()));
2253        testcase("0.130.0", "0.130.1", &three, Ok(()));
2254        testcase("0.130.1", "0.130.0", &three, Ok(()));
2255        testcase("0.130.0", "0.131.0", &three, Ok(()));
2256        testcase("0.130.0", "0.132.0", &three, Ok(()));
2257        testcase("0.130.0", "0.135.0", &three, Ok(()));
2258        testcase("0.130.0", "0.138.0", &three, Ok(()));
2259        testcase("0.130.0", "0.139.0", &three, Ok(()));
2260        testcase("0.130.0", "0.140.0", &three, Ok(()));
2261        testcase("0.130.9", "0.140.0", &three, Ok(()));
2262        testcase("0.130.0", "0.140.1", &three, Ok(()));
2263        testcase("0.130.3", "0.140.1", &three, Ok(()));
2264        testcase("0.130.3", "0.140.9", &three, Ok(()));
2265        testcase("0.130.0", "0.141.0", &three, Err(()));
2266        testcase("0.129.0", "0.133.0", &three, Err(()));
2267        testcase("0.129.0", "0.140.0", &three, Err(()));
2268        testcase("0.131.0", "0.133.0", &three, Err(()));
2269        testcase("0.131.0", "0.140.0", &three, Err(()));
2270        testcase("0.130.0", "0.150.0", &three, Err(()));
2271
2272        testcase("0.140.0", "0.138.0", &three, Ok(()));
2273        testcase("0.140.0", "0.139.0", &three, Ok(()));
2274        testcase("0.140.0", "0.140.0", &three, Ok(()));
2275        testcase("0.140.0", "0.140.1", &three, Ok(()));
2276        testcase("0.140.1", "0.140.0", &three, Ok(()));
2277        testcase("0.140.0", "0.141.0", &three, Ok(()));
2278        testcase("0.140.0", "0.142.0", &three, Ok(()));
2279        testcase("0.140.0", "0.145.0", &three, Ok(()));
2280        testcase("0.140.0", "0.148.0", &three, Ok(()));
2281        testcase("0.140.0", "0.149.0", &three, Ok(()));
2282        testcase("0.140.0", "0.150.0", &three, Ok(()));
2283        testcase("0.140.9", "0.150.0", &three, Ok(()));
2284        testcase("0.140.0", "0.150.1", &three, Ok(()));
2285        testcase("0.140.3", "0.150.1", &three, Ok(()));
2286        testcase("0.140.3", "0.150.9", &three, Ok(()));
2287        testcase("0.140.0", "0.151.0", &three, Err(()));
2288        testcase("0.139.0", "0.143.0", &three, Err(()));
2289        testcase("0.139.0", "0.150.0", &three, Err(()));
2290        testcase("0.141.0", "0.143.0", &three, Err(()));
2291        testcase("0.141.0", "0.150.0", &three, Err(()));
2292
2293        testcase("0.150.0", "0.148.0", &three, Ok(()));
2294        testcase("0.150.0", "0.149.0", &three, Ok(()));
2295        testcase("0.150.0", "0.150.0", &three, Ok(()));
2296        testcase("0.150.0", "0.150.1", &three, Ok(()));
2297        testcase("0.150.1", "0.150.0", &three, Ok(()));
2298        testcase("0.150.0", "0.151.0", &three, Ok(()));
2299        testcase("0.150.0", "0.152.0", &three, Ok(()));
2300        testcase("0.150.0", "0.155.0", &three, Ok(()));
2301        testcase("0.150.0", "0.158.0", &three, Ok(()));
2302        testcase("0.150.0", "0.159.0", &three, Ok(()));
2303        testcase("0.150.0", "0.160.0", &three, Ok(()));
2304        testcase("0.150.9", "0.160.0", &three, Ok(()));
2305        testcase("0.150.0", "0.160.1", &three, Ok(()));
2306        testcase("0.150.3", "0.160.1", &three, Ok(()));
2307        testcase("0.150.3", "0.160.9", &three, Ok(()));
2308        testcase("0.150.0", "0.161.0", &three, Ok(()));
2309        testcase("0.149.0", "0.153.0", &three, Err(()));
2310        testcase("0.149.0", "0.160.0", &three, Err(()));
2311        testcase("0.151.0", "0.153.0", &three, Err(()));
2312        testcase("0.151.0", "0.160.0", &three, Err(()));
2313    }
2314
2315    #[mz_ore::test]
2316    fn check_data_versions() {
2317        #[track_caller]
2318        fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2319            let code = Version::parse(code).unwrap();
2320            let data = Version::parse(data).unwrap();
2321            #[allow(clippy::disallowed_methods)]
2322            let actual =
2323                std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2324            assert_eq!(actual, expected);
2325        }
2326
2327        testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2328        testcase("0.10.0-dev", "0.10.0", Ok(()));
2329        // Note: Probably useful to let tests use two arbitrary shas on main, at
2330        // the very least for things like git bisect.
2331        testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2332        testcase("0.10.0-dev", "0.11.0", Ok(()));
2333        testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2334        testcase("0.10.0-dev", "0.12.0", Err(()));
2335        testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2336
2337        testcase("0.10.0", "0.8.0-dev", Ok(()));
2338        testcase("0.10.0", "0.8.0", Ok(()));
2339        testcase("0.10.0", "0.9.0-dev", Ok(()));
2340        testcase("0.10.0", "0.9.0", Ok(()));
2341        testcase("0.10.0", "0.10.0-dev", Ok(()));
2342        testcase("0.10.0", "0.10.0", Ok(()));
2343        // Note: This is what it would look like to run a version of the catalog
2344        // upgrade checker built from main.
2345        testcase("0.10.0", "0.11.0-dev", Ok(()));
2346        testcase("0.10.0", "0.11.0", Ok(()));
2347        testcase("0.10.0", "0.11.1", Ok(()));
2348        testcase("0.10.0", "0.11.1000000", Ok(()));
2349        testcase("0.10.0", "0.12.0-dev", Err(()));
2350        testcase("0.10.0", "0.12.0", Err(()));
2351        testcase("0.10.0", "0.13.0-dev", Err(()));
2352
2353        testcase("0.10.1", "0.9.0", Ok(()));
2354        testcase("0.10.1", "0.10.0", Ok(()));
2355        testcase("0.10.1", "0.11.0", Ok(()));
2356        testcase("0.10.1", "0.11.1", Ok(()));
2357        testcase("0.10.1", "0.11.100", Ok(()));
2358
2359        // This is probably a bad idea (seems as if we've downgraded from
2360        // running v0.10.1 to v0.10.0, an earlier patch version of the same
2361        // minor version), but not much we can do, given the `state_version =
2362        // max(code_version, prev_state_version)` logic we need to prevent
2363        // rolling back an arbitrary number of versions.
2364        testcase("0.10.0", "0.10.1", Ok(()));
2365    }
2366}