mz_persist_client/internal/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::sync::Arc;
16
17use bytes::{Buf, Bytes};
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::trace::Description;
20use mz_ore::cast::CastInto;
21use mz_ore::{assert_none, halt};
22use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
23use mz_persist::location::{SeqNo, VersionedData};
24use mz_persist::metrics::ColumnarMetrics;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::stats::{PartStats, ProtoStructStats};
27use mz_persist_types::{Codec, Codec64};
28use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
29use proptest::prelude::Arbitrary;
30use proptest::strategy::Strategy;
31use prost::Message;
32use semver::Version;
33use serde::ser::SerializeStruct;
34use serde::{Deserialize, Serialize};
35use timely::progress::{Antichain, Timestamp};
36use uuid::Uuid;
37
38use crate::critical::CriticalReaderId;
39use crate::error::{CodecMismatch, CodecMismatchT};
40use crate::internal::metrics::Metrics;
41use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
42use crate::internal::state::{
43    ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
44    HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
45    LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
46    ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
47    ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
48    ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
49    ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
50    ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
51    ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
52    RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
53    proto_hollow_batch_part,
54};
55use crate::internal::state_diff::{
56    ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
57};
58use crate::internal::trace::{
59    ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
60};
61use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
62use crate::{PersistConfig, ShardId, WriterId, cfg};
63
64/// A key and value `Schema` of data written to a batch or shard.
65#[derive(Debug)]
66pub struct Schemas<K: Codec, V: Codec> {
67    // TODO: Remove the Option once this finishes rolling out and all shards
68    // have a registered schema.
69    /// Id under which this schema is registered in the shard's schema registry,
70    /// if any.
71    pub id: Option<SchemaId>,
72    /// Key `Schema`.
73    pub key: Arc<K::Schema>,
74    /// Value `Schema`.
75    pub val: Arc<V::Schema>,
76}
77
78impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
79    fn clone(&self) -> Self {
80        Self {
81            id: self.id,
82            key: Arc::clone(&self.key),
83            val: Arc::clone(&self.val),
84        }
85    }
86}
87
88/// A proto that is decoded on use.
89///
90/// Because of the way prost works, decoding a large protobuf may result in a
91/// number of very short lived allocations in our RustType/ProtoType decode path
92/// (e.g. this happens for a repeated embedded message). Not every use of
93/// persist State needs every transitive bit of it to be decoded, so we opt
94/// certain parts of it (initially stats) to be decoded on use.
95///
96/// This has the dual benefit of only paying for the short-lived allocs when
97/// necessary and also allowing decoding to be gated by a feature flag. The
98/// tradeoffs are that we might decode more than once and that we have to handle
99/// invalid proto errors in more places.
100///
101/// Mechanically, this is accomplished by making the field a proto `bytes` types
102/// instead of `ProtoFoo`. These bytes then contain the serialization of
103/// ProtoFoo. NB: Swapping between the two is actually a forward and backward
104/// compatible change.
105///
106/// > Embedded messages are compatible with bytes if the bytes contain an
107/// > encoded version of the message.
108///
109/// (See <https://protobuf.dev/programming-guides/proto3/#updating>)
110#[derive(Clone, Serialize, Deserialize)]
111pub struct LazyProto<T> {
112    buf: Bytes,
113    _phantom: PhantomData<fn() -> T>,
114}
115
116impl<T: Message + Default> Debug for LazyProto<T> {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self.decode() {
119            Ok(proto) => Debug::fmt(&proto, f),
120            Err(err) => f
121                .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
122                .field("err", &err)
123                .finish(),
124        }
125    }
126}
127
128impl<T> PartialEq for LazyProto<T> {
129    fn eq(&self, other: &Self) -> bool {
130        self.cmp(other) == Ordering::Equal
131    }
132}
133
134impl<T> Eq for LazyProto<T> {}
135
136impl<T> PartialOrd for LazyProto<T> {
137    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
138        Some(self.cmp(other))
139    }
140}
141
142impl<T> Ord for LazyProto<T> {
143    fn cmp(&self, other: &Self) -> Ordering {
144        let LazyProto {
145            buf: self_buf,
146            _phantom: _,
147        } = self;
148        let LazyProto {
149            buf: other_buf,
150            _phantom: _,
151        } = other;
152        self_buf.cmp(other_buf)
153    }
154}
155
156impl<T> Hash for LazyProto<T> {
157    fn hash<H: Hasher>(&self, state: &mut H) {
158        let LazyProto { buf, _phantom } = self;
159        buf.hash(state);
160    }
161}
162
163impl<T: Message + Default> From<&T> for LazyProto<T> {
164    fn from(value: &T) -> Self {
165        let buf = Bytes::from(value.encode_to_vec());
166        LazyProto {
167            buf,
168            _phantom: PhantomData,
169        }
170    }
171}
172
173impl<T: Message + Default> LazyProto<T> {
174    pub fn decode(&self) -> Result<T, prost::DecodeError> {
175        T::decode(&*self.buf)
176    }
177
178    pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
179        Ok(T::decode(&*self.buf)?.into_rust()?)
180    }
181}
182
183impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
184    fn into_proto(&self) -> Bytes {
185        self.buf.clone()
186    }
187
188    fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
189        Ok(Self {
190            buf,
191            _phantom: PhantomData,
192        })
193    }
194}
195
196pub(crate) fn parse_id(id_prefix: char, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
197    let uuid_encoded = match encoded.strip_prefix(id_prefix) {
198        Some(x) => x,
199        None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
200    };
201    let uuid = Uuid::parse_str(uuid_encoded)
202        .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
203    Ok(*uuid.as_bytes())
204}
205
206pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
207    if let Err(msg) = cfg::check_data_version(code_version, data_version) {
208        // We can't catch halts, so panic in test, so we can get unit test
209        // coverage.
210        if cfg!(test) {
211            panic!("{msg}");
212        } else {
213            halt!("{msg}");
214        }
215    }
216}
217
218impl RustType<String> for LeasedReaderId {
219    fn into_proto(&self) -> String {
220        self.to_string()
221    }
222
223    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
224        match proto.parse() {
225            Ok(x) => Ok(x),
226            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
227        }
228    }
229}
230
231impl RustType<String> for CriticalReaderId {
232    fn into_proto(&self) -> String {
233        self.to_string()
234    }
235
236    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
237        match proto.parse() {
238            Ok(x) => Ok(x),
239            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
240        }
241    }
242}
243
244impl RustType<String> for WriterId {
245    fn into_proto(&self) -> String {
246        self.to_string()
247    }
248
249    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
250        match proto.parse() {
251            Ok(x) => Ok(x),
252            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
253        }
254    }
255}
256
257impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
258    fn into_proto(&self) -> ProtoEncodedSchemas {
259        ProtoEncodedSchemas {
260            key: self.key.clone(),
261            key_data_type: self.key_data_type.clone(),
262            val: self.val.clone(),
263            val_data_type: self.val_data_type.clone(),
264        }
265    }
266
267    fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
268        Ok(EncodedSchemas {
269            key: proto.key,
270            key_data_type: proto.key_data_type,
271            val: proto.val,
272            val_data_type: proto.val_data_type,
273        })
274    }
275}
276
277impl RustType<String> for IdempotencyToken {
278    fn into_proto(&self) -> String {
279        self.to_string()
280    }
281
282    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
283        match proto.parse() {
284            Ok(x) => Ok(x),
285            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
286        }
287    }
288}
289
290impl RustType<String> for PartialBatchKey {
291    fn into_proto(&self) -> String {
292        self.0.clone()
293    }
294
295    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
296        Ok(PartialBatchKey(proto))
297    }
298}
299
300impl RustType<String> for PartialRollupKey {
301    fn into_proto(&self) -> String {
302        self.0.clone()
303    }
304
305    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
306        Ok(PartialRollupKey(proto))
307    }
308}
309
310impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
311    pub fn encode<B>(&self, buf: &mut B)
312    where
313        B: bytes::BufMut,
314    {
315        self.into_proto()
316            .encode(buf)
317            .expect("no required fields means no initialization errors");
318    }
319
320    pub fn decode(build_version: &Version, buf: Bytes) -> Self {
321        let proto = ProtoStateDiff::decode(buf)
322            // We received a State that we couldn't decode. This could happen if
323            // persist messes up backward/forward compatibility, if the durable
324            // data was corrupted, or if operations messes up deployment. In any
325            // case, fail loudly.
326            .expect("internal error: invalid encoded state");
327        let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
328        check_data_version(build_version, &diff.applier_version);
329        diff
330    }
331}
332
333impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
334    fn into_proto(&self) -> ProtoStateDiff {
335        // Deconstruct self so we get a compile failure if new fields are added.
336        let StateDiff {
337            applier_version,
338            seqno_from,
339            seqno_to,
340            walltime_ms,
341            latest_rollup_key,
342            rollups,
343            active_rollup,
344            active_gc,
345            hostname,
346            last_gc_req,
347            leased_readers,
348            critical_readers,
349            writers,
350            schemas,
351            since,
352            legacy_batches,
353            hollow_batches,
354            spine_batches,
355            merges,
356        } = self;
357
358        let proto = ProtoStateFieldDiffs::default();
359
360        // Create a writer so we can efficiently encode our data.
361        let mut writer = proto.into_writer();
362
363        field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
364        field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
365        field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
366        field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
367        field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
368        field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
369        field_diffs_into_proto(
370            ProtoStateField::CriticalReaders,
371            critical_readers,
372            &mut writer,
373        );
374        field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
375        field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
376        field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
377        field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
378        field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
379        field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
380        field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
381
382        // After encoding all of our data, convert back into the proto.
383        let field_diffs = writer.into_proto();
384
385        debug_assert_eq!(field_diffs.validate(), Ok(()));
386        ProtoStateDiff {
387            applier_version: applier_version.to_string(),
388            seqno_from: seqno_from.into_proto(),
389            seqno_to: seqno_to.into_proto(),
390            walltime_ms: walltime_ms.into_proto(),
391            latest_rollup_key: latest_rollup_key.into_proto(),
392            field_diffs: Some(field_diffs),
393        }
394    }
395
396    fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
397        let applier_version = if proto.applier_version.is_empty() {
398            // Backward compatibility with versions of ProtoState before we set
399            // this field: if it's missing (empty), assume an infinitely old
400            // version.
401            semver::Version::new(0, 0, 0)
402        } else {
403            semver::Version::parse(&proto.applier_version).map_err(|err| {
404                TryFromProtoError::InvalidSemverVersion(format!(
405                    "invalid applier_version {}: {}",
406                    proto.applier_version, err
407                ))
408            })?
409        };
410        let mut state_diff = StateDiff::new(
411            applier_version,
412            proto.seqno_from.into_rust()?,
413            proto.seqno_to.into_rust()?,
414            proto.walltime_ms,
415            proto.latest_rollup_key.into_rust()?,
416        );
417        if let Some(field_diffs) = proto.field_diffs {
418            debug_assert_eq!(field_diffs.validate(), Ok(()));
419            for field_diff in field_diffs.iter() {
420                let (field, diff) = field_diff?;
421                match field {
422                    ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
423                        diff,
424                        &mut state_diff.hostname,
425                        |()| Ok(()),
426                        |v| v.into_rust(),
427                    )?,
428                    ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
429                        diff,
430                        &mut state_diff.last_gc_req,
431                        |()| Ok(()),
432                        |v| v.into_rust(),
433                    )?,
434                    ProtoStateField::ActiveGc => {
435                        field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
436                            diff,
437                            &mut state_diff.active_gc,
438                            |()| Ok(()),
439                            |v| v.into_rust(),
440                        )?
441                    }
442                    ProtoStateField::ActiveRollup => {
443                        field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
444                            diff,
445                            &mut state_diff.active_rollup,
446                            |()| Ok(()),
447                            |v| v.into_rust(),
448                        )?
449                    }
450                    ProtoStateField::Rollups => {
451                        field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
452                            diff,
453                            &mut state_diff.rollups,
454                            |k| k.into_rust(),
455                            |v| v.into_rust(),
456                        )?
457                    }
458                    // MIGRATION: We previously stored rollups as a `SeqNo ->
459                    // string Key` map, but now the value is a `struct
460                    // HollowRollup`.
461                    ProtoStateField::DeprecatedRollups => {
462                        field_diff_into_rust::<u64, String, _, _, _, _>(
463                            diff,
464                            &mut state_diff.rollups,
465                            |k| k.into_rust(),
466                            |v| {
467                                Ok(HollowRollup {
468                                    key: v.into_rust()?,
469                                    encoded_size_bytes: None,
470                                })
471                            },
472                        )?
473                    }
474                    ProtoStateField::LeasedReaders => {
475                        field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
476                            diff,
477                            &mut state_diff.leased_readers,
478                            |k| k.into_rust(),
479                            |v| v.into_rust(),
480                        )?
481                    }
482                    ProtoStateField::CriticalReaders => {
483                        field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
484                            diff,
485                            &mut state_diff.critical_readers,
486                            |k| k.into_rust(),
487                            |v| v.into_rust(),
488                        )?
489                    }
490                    ProtoStateField::Writers => {
491                        field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
492                            diff,
493                            &mut state_diff.writers,
494                            |k| k.into_rust(),
495                            |v| v.into_rust(),
496                        )?
497                    }
498                    ProtoStateField::Schemas => {
499                        field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
500                            diff,
501                            &mut state_diff.schemas,
502                            |k| k.into_rust(),
503                            |v| v.into_rust(),
504                        )?
505                    }
506                    ProtoStateField::Since => {
507                        field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
508                            diff,
509                            &mut state_diff.since,
510                            |()| Ok(()),
511                            |v| v.into_rust(),
512                        )?
513                    }
514                    ProtoStateField::LegacyBatches => {
515                        field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
516                            diff,
517                            &mut state_diff.legacy_batches,
518                            |k| k.into_rust(),
519                            |()| Ok(()),
520                        )?
521                    }
522                    ProtoStateField::HollowBatches => {
523                        field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
524                            diff,
525                            &mut state_diff.hollow_batches,
526                            |k| k.into_rust(),
527                            |v| v.into_rust(),
528                        )?
529                    }
530                    ProtoStateField::SpineBatches => {
531                        field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
532                            diff,
533                            &mut state_diff.spine_batches,
534                            |k| k.into_rust(),
535                            |v| v.into_rust(),
536                        )?
537                    }
538                    ProtoStateField::SpineMerges => {
539                        field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
540                            diff,
541                            &mut state_diff.merges,
542                            |k| k.into_rust(),
543                            |v| v.into_rust(),
544                        )?
545                    }
546                }
547            }
548        }
549        Ok(state_diff)
550    }
551}
552
553fn field_diffs_into_proto<K, KP, V, VP>(
554    field: ProtoStateField,
555    diffs: &[StateFieldDiff<K, V>],
556    writer: &mut ProtoStateFieldDiffsWriter,
557) where
558    KP: prost::Message,
559    K: RustType<KP>,
560    VP: prost::Message,
561    V: RustType<VP>,
562{
563    for diff in diffs.iter() {
564        field_diff_into_proto(field, diff, writer);
565    }
566}
567
568fn field_diff_into_proto<K, KP, V, VP>(
569    field: ProtoStateField,
570    diff: &StateFieldDiff<K, V>,
571    writer: &mut ProtoStateFieldDiffsWriter,
572) where
573    KP: prost::Message,
574    K: RustType<KP>,
575    VP: prost::Message,
576    V: RustType<VP>,
577{
578    writer.push_field(field);
579    writer.encode_proto(&diff.key.into_proto());
580    match &diff.val {
581        StateFieldValDiff::Insert(to) => {
582            writer.push_diff_type(ProtoStateFieldDiffType::Insert);
583            writer.encode_proto(&to.into_proto());
584        }
585        StateFieldValDiff::Update(from, to) => {
586            writer.push_diff_type(ProtoStateFieldDiffType::Update);
587            writer.encode_proto(&from.into_proto());
588            writer.encode_proto(&to.into_proto());
589        }
590        StateFieldValDiff::Delete(from) => {
591            writer.push_diff_type(ProtoStateFieldDiffType::Delete);
592            writer.encode_proto(&from.into_proto());
593        }
594    };
595}
596
597fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
598    proto: ProtoStateFieldDiff<'_>,
599    diffs: &mut Vec<StateFieldDiff<K, V>>,
600    k_fn: KFn,
601    v_fn: VFn,
602) -> Result<(), TryFromProtoError>
603where
604    KP: prost::Message + Default,
605    VP: prost::Message + Default,
606    KFn: Fn(KP) -> Result<K, TryFromProtoError>,
607    VFn: Fn(VP) -> Result<V, TryFromProtoError>,
608{
609    let val = match proto.diff_type {
610        ProtoStateFieldDiffType::Insert => {
611            let to = VP::decode(proto.to)
612                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
613            StateFieldValDiff::Insert(v_fn(to)?)
614        }
615        ProtoStateFieldDiffType::Update => {
616            let from = VP::decode(proto.from)
617                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
618            let to = VP::decode(proto.to)
619                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
620
621            StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
622        }
623        ProtoStateFieldDiffType::Delete => {
624            let from = VP::decode(proto.from)
625                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
626            StateFieldValDiff::Delete(v_fn(from)?)
627        }
628    };
629    let key = KP::decode(proto.key)
630        .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
631    diffs.push(StateFieldDiff {
632        key: k_fn(key)?,
633        val,
634    });
635    Ok(())
636}
637
638/// The decoded state of [ProtoRollup] for which we have not yet checked
639/// that codecs match the ones in durable state.
640#[derive(Debug)]
641#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
642pub struct UntypedState<T> {
643    pub(crate) key_codec: String,
644    pub(crate) val_codec: String,
645    pub(crate) ts_codec: String,
646    pub(crate) diff_codec: String,
647
648    // Important! This T has not been validated, so we can't expose anything in
649    // State that references T until one of the check methods have been called.
650    // Any field on State that doesn't reference T is fair game.
651    state: State<T>,
652}
653
654impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
655    pub fn seqno(&self) -> SeqNo {
656        self.state.seqno
657    }
658
659    pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
660        &self.state.collections.rollups
661    }
662
663    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
664        self.state.latest_rollup()
665    }
666
667    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
668        &mut self,
669        cfg: &PersistConfig,
670        metrics: &Metrics,
671        diffs: I,
672    ) {
673        // The apply_encoded_diffs might panic if T is not correct. Making this
674        // a silent no-op is far too subtle for my taste, but it's not clear
675        // what else we could do instead.
676        if T::codec_name() != self.ts_codec {
677            return;
678        }
679        self.state.apply_encoded_diffs(cfg, metrics, diffs);
680    }
681
682    pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
683        self,
684        shard_id: &ShardId,
685    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
686        // Also defensively check that the shard_id on the state we fetched
687        // matches the shard_id we were trying to fetch.
688        assert_eq!(shard_id, &self.state.shard_id);
689        if K::codec_name() != self.key_codec
690            || V::codec_name() != self.val_codec
691            || T::codec_name() != self.ts_codec
692            || D::codec_name() != self.diff_codec
693        {
694            return Err(Box::new(CodecMismatch {
695                requested: (
696                    K::codec_name(),
697                    V::codec_name(),
698                    T::codec_name(),
699                    D::codec_name(),
700                    None,
701                ),
702                actual: (
703                    self.key_codec,
704                    self.val_codec,
705                    self.ts_codec,
706                    self.diff_codec,
707                    None,
708                ),
709            }));
710        }
711        Ok(TypedState {
712            state: self.state,
713            _phantom: PhantomData,
714        })
715    }
716
717    pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
718        // Also defensively check that the shard_id on the state we fetched
719        // matches the shard_id we were trying to fetch.
720        assert_eq!(shard_id, &self.state.shard_id);
721        if T::codec_name() != self.ts_codec {
722            return Err(CodecMismatchT {
723                requested: T::codec_name(),
724                actual: self.ts_codec,
725            });
726        }
727        Ok(self.state)
728    }
729
730    pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
731        let proto = ProtoRollup::decode(buf)
732            // We received a State that we couldn't decode. This could happen if
733            // persist messes up backward/forward compatibility, if the durable
734            // data was corrupted, or if operations messes up deployment. In any
735            // case, fail loudly.
736            .expect("internal error: invalid encoded state");
737        let state = Rollup::from_proto(proto)
738            .expect("internal error: invalid encoded state")
739            .state;
740        check_data_version(build_version, &state.state.applier_version);
741        state
742    }
743}
744
745impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
746where
747    K: Codec,
748    V: Codec,
749    T: Codec64,
750    D: Codec64,
751{
752    fn from(typed_state: TypedState<K, V, T, D>) -> Self {
753        UntypedState {
754            key_codec: K::codec_name(),
755            val_codec: V::codec_name(),
756            ts_codec: T::codec_name(),
757            diff_codec: D::codec_name(),
758            state: typed_state.state,
759        }
760    }
761}
762
763/// A struct that maps 1:1 with ProtoRollup.
764///
765/// Contains State, and optionally the diffs between (state.latest_rollup.seqno, state.seqno]
766///
767/// `diffs` is always expected to be `Some` when writing new rollups, but may be `None`
768/// when deserializing rollups that were persisted before we started inlining diffs.
769#[derive(Debug)]
770pub struct Rollup<T> {
771    pub(crate) state: UntypedState<T>,
772    pub(crate) diffs: Option<InlinedDiffs>,
773}
774
775impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
776    /// Creates a `StateRollup` from a state and diffs from its last rollup.
777    ///
778    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
779    pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
780        let latest_rollup_seqno = *state.latest_rollup().0;
781        let mut verify_seqno = latest_rollup_seqno;
782        for diff in &diffs {
783            assert_eq!(verify_seqno.next(), diff.seqno);
784            verify_seqno = diff.seqno;
785        }
786        assert_eq!(verify_seqno, state.seqno());
787
788        let diffs = Some(InlinedDiffs::from(
789            latest_rollup_seqno.next(),
790            state.seqno().next(),
791            diffs,
792        ));
793
794        Self { state, diffs }
795    }
796
797    pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
798        Self { state, diffs: None }
799    }
800
801    pub(crate) fn from_state_without_diffs(
802        state: State<T>,
803        key_codec: String,
804        val_codec: String,
805        ts_codec: String,
806        diff_codec: String,
807    ) -> Self {
808        Self::from_untyped_state_without_diffs(UntypedState {
809            key_codec,
810            val_codec,
811            ts_codec,
812            diff_codec,
813            state,
814        })
815    }
816}
817
818#[derive(Debug)]
819pub(crate) struct InlinedDiffs {
820    pub(crate) lower: SeqNo,
821    pub(crate) upper: SeqNo,
822    pub(crate) diffs: Vec<VersionedData>,
823}
824
825impl InlinedDiffs {
826    pub(crate) fn description(&self) -> Description<SeqNo> {
827        Description::new(
828            Antichain::from_elem(self.lower),
829            Antichain::from_elem(self.upper),
830            Antichain::from_elem(SeqNo::minimum()),
831        )
832    }
833
834    fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
835        for diff in &diffs {
836            assert!(diff.seqno >= lower);
837            assert!(diff.seqno < upper);
838        }
839        Self {
840            lower,
841            upper,
842            diffs,
843        }
844    }
845}
846
847impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
848    fn into_proto(&self) -> ProtoInlinedDiffs {
849        ProtoInlinedDiffs {
850            lower: self.lower.into_proto(),
851            upper: self.upper.into_proto(),
852            diffs: self.diffs.into_proto(),
853        }
854    }
855
856    fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
857        Ok(Self {
858            lower: proto.lower.into_rust()?,
859            upper: proto.upper.into_rust()?,
860            diffs: proto.diffs.into_rust()?,
861        })
862    }
863}
864
865impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
866    fn into_proto(&self) -> ProtoRollup {
867        ProtoRollup {
868            applier_version: self.state.state.applier_version.to_string(),
869            shard_id: self.state.state.shard_id.into_proto(),
870            seqno: self.state.state.seqno.into_proto(),
871            walltime_ms: self.state.state.walltime_ms.into_proto(),
872            hostname: self.state.state.hostname.into_proto(),
873            key_codec: self.state.key_codec.into_proto(),
874            val_codec: self.state.val_codec.into_proto(),
875            ts_codec: T::codec_name(),
876            diff_codec: self.state.diff_codec.into_proto(),
877            last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
878            active_rollup: self.state.state.collections.active_rollup.into_proto(),
879            active_gc: self.state.state.collections.active_gc.into_proto(),
880            rollups: self
881                .state
882                .state
883                .collections
884                .rollups
885                .iter()
886                .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
887                .collect(),
888            deprecated_rollups: Default::default(),
889            leased_readers: self
890                .state
891                .state
892                .collections
893                .leased_readers
894                .iter()
895                .map(|(id, state)| (id.into_proto(), state.into_proto()))
896                .collect(),
897            critical_readers: self
898                .state
899                .state
900                .collections
901                .critical_readers
902                .iter()
903                .map(|(id, state)| (id.into_proto(), state.into_proto()))
904                .collect(),
905            writers: self
906                .state
907                .state
908                .collections
909                .writers
910                .iter()
911                .map(|(id, state)| (id.into_proto(), state.into_proto()))
912                .collect(),
913            schemas: self
914                .state
915                .state
916                .collections
917                .schemas
918                .iter()
919                .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
920                .collect(),
921            trace: Some(self.state.state.collections.trace.into_proto()),
922            diffs: self.diffs.as_ref().map(|x| x.into_proto()),
923        }
924    }
925
926    fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
927        let applier_version = if x.applier_version.is_empty() {
928            // Backward compatibility with versions of ProtoState before we set
929            // this field: if it's missing (empty), assume an infinitely old
930            // version.
931            semver::Version::new(0, 0, 0)
932        } else {
933            semver::Version::parse(&x.applier_version).map_err(|err| {
934                TryFromProtoError::InvalidSemverVersion(format!(
935                    "invalid applier_version {}: {}",
936                    x.applier_version, err
937                ))
938            })?
939        };
940
941        let mut rollups = BTreeMap::new();
942        for (seqno, rollup) in x.rollups {
943            rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
944        }
945        for (seqno, key) in x.deprecated_rollups {
946            rollups.insert(
947                seqno.into_rust()?,
948                HollowRollup {
949                    key: key.into_rust()?,
950                    encoded_size_bytes: None,
951                },
952            );
953        }
954        let mut leased_readers = BTreeMap::new();
955        for (id, state) in x.leased_readers {
956            leased_readers.insert(id.into_rust()?, state.into_rust()?);
957        }
958        let mut critical_readers = BTreeMap::new();
959        for (id, state) in x.critical_readers {
960            critical_readers.insert(id.into_rust()?, state.into_rust()?);
961        }
962        let mut writers = BTreeMap::new();
963        for (id, state) in x.writers {
964            writers.insert(id.into_rust()?, state.into_rust()?);
965        }
966        let mut schemas = BTreeMap::new();
967        for (id, x) in x.schemas {
968            schemas.insert(id.into_rust()?, x.into_rust()?);
969        }
970        let active_rollup = x
971            .active_rollup
972            .map(|rollup| rollup.into_rust())
973            .transpose()?;
974        let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
975        let collections = StateCollections {
976            rollups,
977            active_rollup,
978            active_gc,
979            last_gc_req: x.last_gc_req.into_rust()?,
980            leased_readers,
981            critical_readers,
982            writers,
983            schemas,
984            trace: x.trace.into_rust_if_some("trace")?,
985        };
986        let state = State {
987            applier_version,
988            shard_id: x.shard_id.into_rust()?,
989            seqno: x.seqno.into_rust()?,
990            walltime_ms: x.walltime_ms,
991            hostname: x.hostname,
992            collections,
993        };
994
995        let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
996        if let Some(diffs) = &diffs {
997            if diffs.lower != state.latest_rollup().0.next() {
998                return Err(TryFromProtoError::InvalidPersistState(format!(
999                    "diffs lower ({}) should match latest rollup's successor: ({})",
1000                    diffs.lower,
1001                    state.latest_rollup().0.next()
1002                )));
1003            }
1004            if diffs.upper != state.seqno.next() {
1005                return Err(TryFromProtoError::InvalidPersistState(format!(
1006                    "diffs upper ({}) should match state's successor: ({})",
1007                    diffs.lower,
1008                    state.seqno.next()
1009                )));
1010            }
1011        }
1012
1013        Ok(Rollup {
1014            state: UntypedState {
1015                state,
1016                key_codec: x.key_codec.into_rust()?,
1017                val_codec: x.val_codec.into_rust()?,
1018                ts_codec: x.ts_codec.into_rust()?,
1019                diff_codec: x.diff_codec.into_rust()?,
1020            },
1021            diffs,
1022        })
1023    }
1024}
1025
1026impl RustType<ProtoVersionedData> for VersionedData {
1027    fn into_proto(&self) -> ProtoVersionedData {
1028        ProtoVersionedData {
1029            seqno: self.seqno.into_proto(),
1030            data: Bytes::clone(&self.data),
1031        }
1032    }
1033
1034    fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1035        Ok(Self {
1036            seqno: proto.seqno.into_rust()?,
1037            data: proto.data,
1038        })
1039    }
1040}
1041
1042impl RustType<ProtoSpineId> for SpineId {
1043    fn into_proto(&self) -> ProtoSpineId {
1044        ProtoSpineId {
1045            lo: self.0.into_proto(),
1046            hi: self.1.into_proto(),
1047        }
1048    }
1049
1050    fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1051        Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1052    }
1053}
1054
1055impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1056    fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1057        let (id, batch) = entry;
1058        ProtoIdHollowBatch {
1059            id: Some(id.into_proto()),
1060            batch: Some(batch.into_proto()),
1061        }
1062    }
1063
1064    fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1065        let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1066        let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1067        Ok((id, batch))
1068    }
1069}
1070
1071impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1072    fn into_proto(&self) -> ProtoSpineBatch {
1073        ProtoSpineBatch {
1074            desc: Some(self.desc.into_proto()),
1075            parts: self.parts.into_proto(),
1076            level: self.level.into_proto(),
1077            descs: self.descs.into_proto(),
1078        }
1079    }
1080
1081    fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1082        let level = proto.level.into_rust()?;
1083        let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1084        let parts = proto.parts.into_rust()?;
1085        let descs = proto.descs.into_rust()?;
1086        Ok(ThinSpineBatch {
1087            level,
1088            desc,
1089            parts,
1090            descs,
1091        })
1092    }
1093}
1094
1095impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1096    fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1097        let (id, batch) = entry;
1098        ProtoIdSpineBatch {
1099            id: Some(id.into_proto()),
1100            batch: Some(batch.into_proto()),
1101        }
1102    }
1103
1104    fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1105        let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1106        let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1107        Ok((id, batch))
1108    }
1109}
1110
1111impl RustType<ProtoCompaction> for ActiveCompaction {
1112    fn into_proto(&self) -> ProtoCompaction {
1113        ProtoCompaction {
1114            start_ms: self.start_ms,
1115        }
1116    }
1117
1118    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1119        Ok(Self {
1120            start_ms: proto.start_ms,
1121        })
1122    }
1123}
1124
1125impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1126    fn into_proto(&self) -> ProtoMerge {
1127        ProtoMerge {
1128            since: Some(self.since.into_proto()),
1129            remaining_work: self.remaining_work.into_proto(),
1130            active_compaction: self.active_compaction.into_proto(),
1131        }
1132    }
1133
1134    fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1135        let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1136        let remaining_work = proto.remaining_work.into_rust()?;
1137        let active_compaction = proto.active_compaction.into_rust()?;
1138        Ok(Self {
1139            since,
1140            remaining_work,
1141            active_compaction,
1142        })
1143    }
1144}
1145
1146impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1147    fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1148        ProtoIdMerge {
1149            id: Some(id.into_proto()),
1150            merge: Some(merge.into_proto()),
1151        }
1152    }
1153
1154    fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1155        let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1156        let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1157        Ok((id, merge))
1158    }
1159}
1160
1161impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1162    fn into_proto(&self) -> ProtoTrace {
1163        let since = self.since.into_proto();
1164        let legacy_batches = self
1165            .legacy_batches
1166            .iter()
1167            .map(|(b, _)| b.into_proto())
1168            .collect();
1169        let hollow_batches = self.hollow_batches.into_proto();
1170        let spine_batches = self.spine_batches.into_proto();
1171        let merges = self.merges.into_proto();
1172        ProtoTrace {
1173            since: Some(since),
1174            legacy_batches,
1175            hollow_batches,
1176            spine_batches,
1177            merges,
1178        }
1179    }
1180
1181    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1182        let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1183        let legacy_batches = proto
1184            .legacy_batches
1185            .into_iter()
1186            .map(|b| b.into_rust().map(|b| (b, ())))
1187            .collect::<Result<_, _>>()?;
1188        let hollow_batches = proto.hollow_batches.into_rust()?;
1189        let spine_batches = proto.spine_batches.into_rust()?;
1190        let merges = proto.merges.into_rust()?;
1191        Ok(FlatTrace {
1192            since,
1193            legacy_batches,
1194            hollow_batches,
1195            spine_batches,
1196            merges,
1197        })
1198    }
1199}
1200
1201impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1202    fn into_proto(&self) -> ProtoTrace {
1203        self.flatten().into_proto()
1204    }
1205
1206    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1207        Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1208    }
1209}
1210
1211impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1212    fn into_proto(&self) -> ProtoLeasedReaderState {
1213        ProtoLeasedReaderState {
1214            seqno: self.seqno.into_proto(),
1215            since: Some(self.since.into_proto()),
1216            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1217            lease_duration_ms: self.lease_duration_ms.into_proto(),
1218            debug: Some(self.debug.into_proto()),
1219        }
1220    }
1221
1222    fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1223        let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1224        // MIGRATION: If the lease_duration_ms is empty, then the proto field
1225        // was missing and we need to fill in a default. This would ideally be
1226        // based on the actual value in PersistConfig, but it's only here for a
1227        // short time and this is way easier.
1228        if lease_duration_ms == 0 {
1229            lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1230                .expect("lease duration as millis should fit within u64");
1231        }
1232        // MIGRATION: If debug is empty, then the proto field was missing and we
1233        // need to fill in a default.
1234        let debug = proto.debug.unwrap_or_default().into_rust()?;
1235        Ok(LeasedReaderState {
1236            seqno: proto.seqno.into_rust()?,
1237            since: proto
1238                .since
1239                .into_rust_if_some("ProtoLeasedReaderState::since")?,
1240            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1241            lease_duration_ms,
1242            debug,
1243        })
1244    }
1245}
1246
1247impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1248    fn into_proto(&self) -> ProtoCriticalReaderState {
1249        ProtoCriticalReaderState {
1250            since: Some(self.since.into_proto()),
1251            opaque: i64::from_le_bytes(self.opaque.0),
1252            opaque_codec: self.opaque_codec.clone(),
1253            debug: Some(self.debug.into_proto()),
1254        }
1255    }
1256
1257    fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1258        // MIGRATION: If debug is empty, then the proto field was missing and we
1259        // need to fill in a default.
1260        let debug = proto.debug.unwrap_or_default().into_rust()?;
1261        Ok(CriticalReaderState {
1262            since: proto
1263                .since
1264                .into_rust_if_some("ProtoCriticalReaderState::since")?,
1265            opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1266            opaque_codec: proto.opaque_codec,
1267            debug,
1268        })
1269    }
1270}
1271
1272impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1273    fn into_proto(&self) -> ProtoWriterState {
1274        ProtoWriterState {
1275            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1276            lease_duration_ms: self.lease_duration_ms.into_proto(),
1277            most_recent_write_token: self.most_recent_write_token.into_proto(),
1278            most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1279            debug: Some(self.debug.into_proto()),
1280        }
1281    }
1282
1283    fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1284        // MIGRATION: We didn't originally have most_recent_write_token and
1285        // most_recent_write_upper. Pick values that aren't going to
1286        // accidentally match ones in incoming writes and confuse things. We
1287        // could instead use Option on WriterState but this keeps the backward
1288        // compatibility logic confined to one place.
1289        let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1290            IdempotencyToken::SENTINEL
1291        } else {
1292            proto.most_recent_write_token.into_rust()?
1293        };
1294        let most_recent_write_upper = match proto.most_recent_write_upper {
1295            Some(x) => x.into_rust()?,
1296            None => Antichain::from_elem(T::minimum()),
1297        };
1298        // MIGRATION: If debug is empty, then the proto field was missing and we
1299        // need to fill in a default.
1300        let debug = proto.debug.unwrap_or_default().into_rust()?;
1301        Ok(WriterState {
1302            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1303            lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1304            most_recent_write_token,
1305            most_recent_write_upper,
1306            debug,
1307        })
1308    }
1309}
1310
1311impl RustType<ProtoHandleDebugState> for HandleDebugState {
1312    fn into_proto(&self) -> ProtoHandleDebugState {
1313        ProtoHandleDebugState {
1314            hostname: self.hostname.into_proto(),
1315            purpose: self.purpose.into_proto(),
1316        }
1317    }
1318
1319    fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1320        Ok(HandleDebugState {
1321            hostname: proto.hostname,
1322            purpose: proto.purpose,
1323        })
1324    }
1325}
1326
1327impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1328    fn into_proto(&self) -> ProtoHollowRun {
1329        ProtoHollowRun {
1330            parts: self.parts.into_proto(),
1331        }
1332    }
1333
1334    fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1335        Ok(HollowRun {
1336            parts: proto.parts.into_rust()?,
1337        })
1338    }
1339}
1340
1341impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1342    fn into_proto(&self) -> ProtoHollowBatch {
1343        let mut run_meta = self.run_meta.into_proto();
1344        // For backwards compatibility reasons, don't keep default metadata in the proto.
1345        let run_meta_default = RunMeta::default().into_proto();
1346        while run_meta.last() == Some(&run_meta_default) {
1347            run_meta.pop();
1348        }
1349        ProtoHollowBatch {
1350            desc: Some(self.desc.into_proto()),
1351            parts: self.parts.into_proto(),
1352            len: self.len.into_proto(),
1353            runs: self.run_splits.into_proto(),
1354            run_meta,
1355            deprecated_keys: vec![],
1356        }
1357    }
1358
1359    fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1360        let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1361        // MIGRATION: We used to just have the keys instead of a more structured
1362        // part.
1363        parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1364            RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1365                key: PartialBatchKey(key),
1366                encoded_size_bytes: 0,
1367                key_lower: vec![],
1368                structured_key_lower: None,
1369                stats: None,
1370                ts_rewrite: None,
1371                diffs_sum: None,
1372                format: None,
1373                schema_id: None,
1374                deprecated_schema_id: None,
1375            }))
1376        }));
1377        // We discard default metadatas from the proto above; re-add them here.
1378        let run_splits: Vec<usize> = proto.runs.into_rust()?;
1379        let num_runs = if parts.is_empty() {
1380            0
1381        } else {
1382            run_splits.len() + 1
1383        };
1384        let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1385        run_meta.resize(num_runs, RunMeta::default());
1386        Ok(HollowBatch {
1387            desc: proto.desc.into_rust_if_some("desc")?,
1388            parts,
1389            len: proto.len.into_rust()?,
1390            run_splits,
1391            run_meta,
1392        })
1393    }
1394}
1395
1396impl RustType<ProtoRunMeta> for RunMeta {
1397    fn into_proto(&self) -> ProtoRunMeta {
1398        let order = match self.order {
1399            None => ProtoRunOrder::Unknown,
1400            Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1401            Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1402            Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1403        };
1404        ProtoRunMeta {
1405            order: order.into(),
1406            schema_id: self.schema.into_proto(),
1407            deprecated_schema_id: self.deprecated_schema.into_proto(),
1408        }
1409    }
1410
1411    fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1412        let order = match ProtoRunOrder::try_from(proto.order)? {
1413            ProtoRunOrder::Unknown => None,
1414            ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1415            ProtoRunOrder::Codec => Some(RunOrder::Codec),
1416            ProtoRunOrder::Structured => Some(RunOrder::Structured),
1417        };
1418        Ok(Self {
1419            order,
1420            schema: proto.schema_id.into_rust()?,
1421            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1422        })
1423    }
1424}
1425
1426impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1427    fn into_proto(&self) -> ProtoHollowBatchPart {
1428        match self {
1429            RunPart::Single(part) => part.into_proto(),
1430            RunPart::Many(runs) => runs.into_proto(),
1431        }
1432    }
1433
1434    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1435        let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1436            RunPart::Many(proto.into_rust()?)
1437        } else {
1438            RunPart::Single(proto.into_rust()?)
1439        };
1440        Ok(run_part)
1441    }
1442}
1443
1444impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1445    fn into_proto(&self) -> ProtoHollowBatchPart {
1446        let part = ProtoHollowBatchPart {
1447            kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1448                key: self.key.into_proto(),
1449                max_part_bytes: self.max_part_bytes.into_proto(),
1450            })),
1451            encoded_size_bytes: self.hollow_bytes.into_proto(),
1452            key_lower: Bytes::copy_from_slice(&self.key_lower),
1453            diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1454            key_stats: None,
1455            ts_rewrite: None,
1456            format: None,
1457            schema_id: None,
1458            structured_key_lower: self.structured_key_lower.into_proto(),
1459            deprecated_schema_id: None,
1460        };
1461        part
1462    }
1463
1464    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1465        let run_proto = match proto.kind {
1466            Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1467            _ => Err(TryFromProtoError::UnknownEnumVariant(
1468                "ProtoHollowBatchPart::kind".to_string(),
1469            ))?,
1470        };
1471        Ok(Self {
1472            key: run_proto.key.into_rust()?,
1473            hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1474            max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1475            key_lower: proto.key_lower.to_vec(),
1476            structured_key_lower: proto.structured_key_lower.into_rust()?,
1477            diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1478            _phantom_data: Default::default(),
1479        })
1480    }
1481}
1482
1483impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1484    fn into_proto(&self) -> ProtoHollowBatchPart {
1485        match self {
1486            BatchPart::Hollow(x) => ProtoHollowBatchPart {
1487                kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1488                encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1489                key_lower: Bytes::copy_from_slice(&x.key_lower),
1490                structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1491                key_stats: x.stats.into_proto(),
1492                ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1493                diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1494                format: x.format.map(|f| f.into_proto()),
1495                schema_id: x.schema_id.into_proto(),
1496                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1497            },
1498            BatchPart::Inline {
1499                updates,
1500                ts_rewrite,
1501                schema_id,
1502                deprecated_schema_id,
1503            } => ProtoHollowBatchPart {
1504                kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1505                encoded_size_bytes: 0,
1506                key_lower: Bytes::new(),
1507                structured_key_lower: None,
1508                key_stats: None,
1509                ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1510                diffs_sum: None,
1511                format: None,
1512                schema_id: schema_id.into_proto(),
1513                deprecated_schema_id: deprecated_schema_id.into_proto(),
1514            },
1515        }
1516    }
1517
1518    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1519        let ts_rewrite = match proto.ts_rewrite {
1520            Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1521            None => None,
1522        };
1523        let schema_id = proto.schema_id.into_rust()?;
1524        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1525        match proto.kind {
1526            Some(proto_hollow_batch_part::Kind::Key(key)) => {
1527                Ok(BatchPart::Hollow(HollowBatchPart {
1528                    key: key.into_rust()?,
1529                    encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1530                    key_lower: proto.key_lower.into(),
1531                    structured_key_lower: proto.structured_key_lower.into_rust()?,
1532                    stats: proto.key_stats.into_rust()?,
1533                    ts_rewrite,
1534                    diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1535                    format: proto.format.map(|f| f.into_rust()).transpose()?,
1536                    schema_id,
1537                    deprecated_schema_id,
1538                }))
1539            }
1540            Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1541                assert_eq!(proto.encoded_size_bytes, 0);
1542                assert_eq!(proto.key_lower.len(), 0);
1543                assert_none!(proto.key_stats);
1544                assert_none!(proto.diffs_sum);
1545                let updates = LazyInlineBatchPart(x.into_rust()?);
1546                Ok(BatchPart::Inline {
1547                    updates,
1548                    ts_rewrite,
1549                    schema_id,
1550                    deprecated_schema_id,
1551                })
1552            }
1553            _ => Err(TryFromProtoError::unknown_enum_variant(
1554                "ProtoHollowBatchPart::kind",
1555            )),
1556        }
1557    }
1558}
1559
1560impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1561    fn into_proto(&self) -> proto_hollow_batch_part::Format {
1562        match self {
1563            BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1564            BatchColumnarFormat::Both(version) => {
1565                proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1566            }
1567            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1568        }
1569    }
1570
1571    fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1572        let format = match proto {
1573            proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1574            proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1575                BatchColumnarFormat::Both(version.cast_into())
1576            }
1577            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1578        };
1579        Ok(format)
1580    }
1581}
1582
1583/// Aggregate statistics about data contained in a part.
1584///
1585/// These are "lazy" in the sense that we don't decode them (or even validate
1586/// the encoded version) until they're used.
1587#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1588pub struct LazyPartStats {
1589    key: LazyProto<ProtoStructStats>,
1590}
1591
1592impl LazyPartStats {
1593    pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1594        let PartStats { key } = x;
1595        let mut proto_stats = ProtoStructStats::from_rust(key);
1596        map_proto(&mut proto_stats);
1597        LazyPartStats {
1598            key: LazyProto::from(&proto_stats),
1599        }
1600    }
1601    /// Decodes and returns PartStats from the encoded representation.
1602    ///
1603    /// This does not cache the returned value, it decodes each time it's
1604    /// called.
1605    pub fn decode(&self) -> PartStats {
1606        let key = self.key.decode().expect("valid proto");
1607        PartStats {
1608            key: key.into_rust().expect("valid stats"),
1609        }
1610    }
1611}
1612
1613impl RustType<Bytes> for LazyPartStats {
1614    fn into_proto(&self) -> Bytes {
1615        let LazyPartStats { key } = self;
1616        key.into_proto()
1617    }
1618
1619    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1620        Ok(LazyPartStats {
1621            key: proto.into_rust()?,
1622        })
1623    }
1624}
1625
1626#[cfg(test)]
1627pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1628    proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1629}
1630
1631#[allow(unused_parens)]
1632impl Arbitrary for LazyPartStats {
1633    type Parameters = ();
1634    type Strategy =
1635        proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1636
1637    fn arbitrary_with(_: ()) -> Self::Strategy {
1638        Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1639            LazyPartStats::encode(&x, |_| {})
1640        })
1641    }
1642}
1643
1644impl ProtoInlineBatchPart {
1645    pub(crate) fn into_rust<T: Timestamp + Codec64>(
1646        lgbytes: &ColumnarMetrics,
1647        proto: Self,
1648    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1649        let updates = proto
1650            .updates
1651            .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1652        let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1653
1654        Ok(BlobTraceBatchPart {
1655            desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1656            index: proto.index.into_rust()?,
1657            updates,
1658        })
1659    }
1660}
1661
1662/// A batch part stored inlined in State.
1663#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1664pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1665
1666impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1667    fn from(value: &ProtoInlineBatchPart) -> Self {
1668        LazyInlineBatchPart(value.into())
1669    }
1670}
1671
1672impl Serialize for LazyInlineBatchPart {
1673    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1674        // NB: This serialize impl is only used for QA and debugging, so emit a
1675        // truncated version.
1676        let proto = self.0.decode().expect("valid proto");
1677        let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1678        let () = s.serialize_field("desc", &proto.desc)?;
1679        let () = s.serialize_field("index", &proto.index)?;
1680        let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1681        s.end()
1682    }
1683}
1684
1685impl LazyInlineBatchPart {
1686    pub(crate) fn encoded_size_bytes(&self) -> usize {
1687        self.0.buf.len()
1688    }
1689
1690    /// Decodes and returns a BlobTraceBatchPart from the encoded
1691    /// representation.
1692    ///
1693    /// This does not cache the returned value, it decodes each time it's
1694    /// called.
1695    pub fn decode<T: Timestamp + Codec64>(
1696        &self,
1697        lgbytes: &ColumnarMetrics,
1698    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1699        let proto = self.0.decode().expect("valid proto");
1700        ProtoInlineBatchPart::into_rust(lgbytes, proto)
1701    }
1702}
1703
1704impl RustType<Bytes> for LazyInlineBatchPart {
1705    fn into_proto(&self) -> Bytes {
1706        self.0.into_proto()
1707    }
1708
1709    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1710        Ok(LazyInlineBatchPart(proto.into_rust()?))
1711    }
1712}
1713
1714impl RustType<ProtoHollowRollup> for HollowRollup {
1715    fn into_proto(&self) -> ProtoHollowRollup {
1716        ProtoHollowRollup {
1717            key: self.key.into_proto(),
1718            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1719        }
1720    }
1721
1722    fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1723        Ok(HollowRollup {
1724            key: proto.key.into_rust()?,
1725            encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1726        })
1727    }
1728}
1729
1730impl RustType<ProtoActiveRollup> for ActiveRollup {
1731    fn into_proto(&self) -> ProtoActiveRollup {
1732        ProtoActiveRollup {
1733            start_ms: self.start_ms,
1734            seqno: self.seqno.into_proto(),
1735        }
1736    }
1737
1738    fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1739        Ok(ActiveRollup {
1740            start_ms: proto.start_ms,
1741            seqno: proto.seqno.into_rust()?,
1742        })
1743    }
1744}
1745
1746impl RustType<ProtoActiveGc> for ActiveGc {
1747    fn into_proto(&self) -> ProtoActiveGc {
1748        ProtoActiveGc {
1749            start_ms: self.start_ms,
1750            seqno: self.seqno.into_proto(),
1751        }
1752    }
1753
1754    fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1755        Ok(ActiveGc {
1756            start_ms: proto.start_ms,
1757            seqno: proto.seqno.into_rust()?,
1758        })
1759    }
1760}
1761
1762impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1763    fn into_proto(&self) -> ProtoU64Description {
1764        ProtoU64Description {
1765            lower: Some(self.lower().into_proto()),
1766            upper: Some(self.upper().into_proto()),
1767            since: Some(self.since().into_proto()),
1768        }
1769    }
1770
1771    fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1772        Ok(Description::new(
1773            proto.lower.into_rust_if_some("lower")?,
1774            proto.upper.into_rust_if_some("upper")?,
1775            proto.since.into_rust_if_some("since")?,
1776        ))
1777    }
1778}
1779
1780impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1781    fn into_proto(&self) -> ProtoU64Antichain {
1782        ProtoU64Antichain {
1783            elements: self
1784                .elements()
1785                .iter()
1786                .map(|x| i64::from_le_bytes(T::encode(x)))
1787                .collect(),
1788        }
1789    }
1790
1791    fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1792        let elements = proto
1793            .elements
1794            .iter()
1795            .map(|x| T::decode(x.to_le_bytes()))
1796            .collect::<Vec<_>>();
1797        Ok(Antichain::from(elements))
1798    }
1799}
1800
1801#[cfg(test)]
1802mod tests {
1803    use bytes::Bytes;
1804    use mz_build_info::DUMMY_BUILD_INFO;
1805    use mz_dyncfg::ConfigUpdates;
1806    use mz_ore::assert_err;
1807    use mz_persist::location::SeqNo;
1808    use proptest::prelude::*;
1809
1810    use crate::ShardId;
1811    use crate::internal::paths::PartialRollupKey;
1812    use crate::internal::state::tests::any_state;
1813    use crate::internal::state::{BatchPart, HandleDebugState};
1814    use crate::internal::state_diff::StateDiff;
1815    use crate::tests::new_test_client_cache;
1816
1817    use super::*;
1818
1819    #[mz_ore::test]
1820    fn applier_version_state() {
1821        let v1 = semver::Version::new(1, 0, 0);
1822        let v2 = semver::Version::new(2, 0, 0);
1823        let v3 = semver::Version::new(3, 0, 0);
1824
1825        // Code version v2 evaluates and writes out some State.
1826        let shard_id = ShardId::new();
1827        let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1828        let rollup =
1829            Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1830        let mut buf = Vec::new();
1831        rollup.encode(&mut buf).expect("serializable");
1832        let bytes = Bytes::from(buf);
1833
1834        // We can read it back using persist code v2 and v3.
1835        assert_eq!(
1836            UntypedState::<u64>::decode(&v2, bytes.clone())
1837                .check_codecs(&shard_id)
1838                .as_ref(),
1839            Ok(&state)
1840        );
1841        assert_eq!(
1842            UntypedState::<u64>::decode(&v3, bytes.clone())
1843                .check_codecs(&shard_id)
1844                .as_ref(),
1845            Ok(&state)
1846        );
1847
1848        // But we can't read it back using v1 because v1 might corrupt it by
1849        // losing or misinterpreting something written out by a future version
1850        // of code.
1851        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1852        let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1853        assert_err!(v1_res);
1854    }
1855
1856    #[mz_ore::test]
1857    fn applier_version_state_diff() {
1858        let v1 = semver::Version::new(1, 0, 0);
1859        let v2 = semver::Version::new(2, 0, 0);
1860        let v3 = semver::Version::new(3, 0, 0);
1861
1862        // Code version v2 evaluates and writes out some State.
1863        let diff = StateDiff::<u64>::new(
1864            v2.clone(),
1865            SeqNo(0),
1866            SeqNo(1),
1867            2,
1868            PartialRollupKey("rollup".into()),
1869        );
1870        let mut buf = Vec::new();
1871        diff.encode(&mut buf);
1872        let bytes = Bytes::from(buf);
1873
1874        // We can read it back using persist code v2 and v3.
1875        assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1876        assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1877
1878        // But we can't read it back using v1 because v1 might corrupt it by
1879        // losing or misinterpreting something written out by a future version
1880        // of code.
1881        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1882        let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1883        assert_err!(v1_res);
1884    }
1885
1886    #[mz_ore::test]
1887    fn hollow_batch_migration_keys() {
1888        let x = HollowBatch::new_run(
1889            Description::new(
1890                Antichain::from_elem(1u64),
1891                Antichain::from_elem(2u64),
1892                Antichain::from_elem(3u64),
1893            ),
1894            vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1895                key: PartialBatchKey("a".into()),
1896                encoded_size_bytes: 5,
1897                key_lower: vec![],
1898                structured_key_lower: None,
1899                stats: None,
1900                ts_rewrite: None,
1901                diffs_sum: None,
1902                format: None,
1903                schema_id: None,
1904                deprecated_schema_id: None,
1905            }))],
1906            4,
1907        );
1908        let mut old = x.into_proto();
1909        // Old ProtoHollowBatch had keys instead of parts.
1910        old.deprecated_keys = vec!["b".into()];
1911        // We don't expect to see a ProtoHollowBatch with keys _and_ parts, only
1912        // one or the other, but we have a defined output, so may as well test
1913        // it.
1914        let mut expected = x;
1915        // We fill in 0 for encoded_size_bytes when we migrate from keys. This
1916        // will violate bounded memory usage compaction during the transition
1917        // (short-term issue), but that's better than creating unnecessary runs
1918        // (longer-term issue).
1919        expected
1920            .parts
1921            .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1922                key: PartialBatchKey("b".into()),
1923                encoded_size_bytes: 0,
1924                key_lower: vec![],
1925                structured_key_lower: None,
1926                stats: None,
1927                ts_rewrite: None,
1928                diffs_sum: None,
1929                format: None,
1930                schema_id: None,
1931                deprecated_schema_id: None,
1932            })));
1933        assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1934    }
1935
1936    #[mz_ore::test]
1937    fn reader_state_migration_lease_duration() {
1938        let x = LeasedReaderState {
1939            seqno: SeqNo(1),
1940            since: Antichain::from_elem(2u64),
1941            last_heartbeat_timestamp_ms: 3,
1942            debug: HandleDebugState {
1943                hostname: "host".to_owned(),
1944                purpose: "purpose".to_owned(),
1945            },
1946            // Old ProtoReaderState had no lease_duration_ms field
1947            lease_duration_ms: 0,
1948        };
1949        let old = x.into_proto();
1950        let mut expected = x;
1951        // We fill in DEFAULT_READ_LEASE_DURATION for lease_duration_ms when we
1952        // migrate from unset.
1953        expected.lease_duration_ms =
1954            u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1955        assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1956    }
1957
1958    #[mz_ore::test]
1959    fn writer_state_migration_most_recent_write() {
1960        let proto = ProtoWriterState {
1961            last_heartbeat_timestamp_ms: 1,
1962            lease_duration_ms: 2,
1963            // Old ProtoWriterState had no most_recent_write_token or
1964            // most_recent_write_upper.
1965            most_recent_write_token: "".into(),
1966            most_recent_write_upper: None,
1967            debug: Some(ProtoHandleDebugState {
1968                hostname: "host".to_owned(),
1969                purpose: "purpose".to_owned(),
1970            }),
1971        };
1972        let expected = WriterState {
1973            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1974            lease_duration_ms: proto.lease_duration_ms,
1975            most_recent_write_token: IdempotencyToken::SENTINEL,
1976            most_recent_write_upper: Antichain::from_elem(0),
1977            debug: HandleDebugState {
1978                hostname: "host".to_owned(),
1979                purpose: "purpose".to_owned(),
1980            },
1981        };
1982        assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
1983    }
1984
1985    #[mz_ore::test]
1986    fn state_migration_rollups() {
1987        let r1 = HollowRollup {
1988            key: PartialRollupKey("foo".to_owned()),
1989            encoded_size_bytes: None,
1990        };
1991        let r2 = HollowRollup {
1992            key: PartialRollupKey("bar".to_owned()),
1993            encoded_size_bytes: Some(2),
1994        };
1995        let shard_id = ShardId::new();
1996        let mut state = TypedState::<(), (), u64, i64>::new(
1997            DUMMY_BUILD_INFO.semver_version(),
1998            shard_id,
1999            "host".to_owned(),
2000            0,
2001        );
2002        state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2003        let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2004
2005        // Manually add the old rollup encoding.
2006        proto.deprecated_rollups.insert(1, r1.key.0.clone());
2007
2008        let state: Rollup<u64> = proto.into_rust().unwrap();
2009        let state = state.state;
2010        let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2011        let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2012        assert_eq!(
2013            state
2014                .state
2015                .collections
2016                .rollups
2017                .into_iter()
2018                .collect::<Vec<_>>(),
2019            expected
2020        );
2021    }
2022
2023    #[mz_persist_proc::test(tokio::test)]
2024    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2025    async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2026        let r1_rollup = HollowRollup {
2027            key: PartialRollupKey("foo".to_owned()),
2028            encoded_size_bytes: None,
2029        };
2030        let r1 = StateFieldDiff {
2031            key: SeqNo(1),
2032            val: StateFieldValDiff::Insert(r1_rollup.clone()),
2033        };
2034        let r2_rollup = HollowRollup {
2035            key: PartialRollupKey("bar".to_owned()),
2036            encoded_size_bytes: Some(2),
2037        };
2038        let r2 = StateFieldDiff {
2039            key: SeqNo(2),
2040            val: StateFieldValDiff::Insert(r2_rollup.clone()),
2041        };
2042        let r3_rollup = HollowRollup {
2043            key: PartialRollupKey("baz".to_owned()),
2044            encoded_size_bytes: None,
2045        };
2046        let r3 = StateFieldDiff {
2047            key: SeqNo(3),
2048            val: StateFieldValDiff::Delete(r3_rollup.clone()),
2049        };
2050        let mut diff = StateDiff::<u64>::new(
2051            DUMMY_BUILD_INFO.semver_version(),
2052            SeqNo(4),
2053            SeqNo(5),
2054            0,
2055            PartialRollupKey("ignored".to_owned()),
2056        );
2057        diff.rollups.push(r2.clone());
2058        diff.rollups.push(r3.clone());
2059        let mut diff_proto = diff.into_proto();
2060
2061        let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2062        let mut field_diffs_writer = field_diffs.into_writer();
2063
2064        // Manually add the old rollup encoding.
2065        field_diffs_into_proto(
2066            ProtoStateField::DeprecatedRollups,
2067            &[StateFieldDiff {
2068                key: r1.key,
2069                val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2070            }],
2071            &mut field_diffs_writer,
2072        );
2073
2074        assert_none!(diff_proto.field_diffs);
2075        diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2076
2077        let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2078        assert_eq!(
2079            diff.rollups.into_iter().collect::<Vec<_>>(),
2080            vec![r2, r3, r1]
2081        );
2082
2083        // Also make sure that a rollup delete in a diff applies cleanly to a
2084        // state that had it in the deprecated field.
2085        let shard_id = ShardId::new();
2086        let mut state = TypedState::<(), (), u64, i64>::new(
2087            DUMMY_BUILD_INFO.semver_version(),
2088            shard_id,
2089            "host".to_owned(),
2090            0,
2091        );
2092        state.state.seqno = SeqNo(4);
2093        let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2094        rollup
2095            .deprecated_rollups
2096            .insert(3, r3_rollup.key.into_proto());
2097        let state: Rollup<u64> = rollup.into_rust().unwrap();
2098        let state = state.state;
2099        let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2100        let cache = new_test_client_cache(&dyncfgs);
2101        let encoded_diff = VersionedData {
2102            seqno: SeqNo(5),
2103            data: diff_proto.encode_to_vec().into(),
2104        };
2105        state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2106        assert_eq!(
2107            state
2108                .state
2109                .collections
2110                .rollups
2111                .into_iter()
2112                .collect::<Vec<_>>(),
2113            vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2114        );
2115    }
2116
2117    #[mz_ore::test]
2118    #[cfg_attr(miri, ignore)] // too slow
2119    fn state_proto_roundtrip() {
2120        fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2121            let before = UntypedState {
2122                key_codec: <() as Codec>::codec_name(),
2123                val_codec: <() as Codec>::codec_name(),
2124                ts_codec: <T as Codec64>::codec_name(),
2125                diff_codec: <i64 as Codec64>::codec_name(),
2126                state,
2127            };
2128            let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2129            let after: Rollup<T> = proto.into_rust().unwrap();
2130            let after = after.state;
2131            assert_eq!(before, after);
2132        }
2133
2134        proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2135    }
2136
2137    #[mz_ore::test]
2138    fn check_data_versions_with_self_managed_versions() {
2139        #[track_caller]
2140        fn testcase(
2141            code: &str,
2142            data: &str,
2143            self_managed_versions: &[Version],
2144            expected: Result<(), ()>,
2145        ) {
2146            let code = Version::parse(code).unwrap();
2147            let data = Version::parse(data).unwrap();
2148            let actual = cfg::check_data_version_with_self_managed_versions(
2149                &code,
2150                &data,
2151                self_managed_versions,
2152            )
2153            .map_err(|_| ());
2154            assert_eq!(actual, expected);
2155        }
2156
2157        let none = [];
2158        let one = [Version::new(0, 130, 0)];
2159        let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2160        let three = [
2161            Version::new(0, 130, 0),
2162            Version::new(0, 140, 0),
2163            Version::new(0, 150, 0),
2164        ];
2165
2166        testcase("0.130.0", "0.128.0", &none, Ok(()));
2167        testcase("0.130.0", "0.129.0", &none, Ok(()));
2168        testcase("0.130.0", "0.130.0", &none, Ok(()));
2169        testcase("0.130.0", "0.130.1", &none, Ok(()));
2170        testcase("0.130.1", "0.130.0", &none, Ok(()));
2171        testcase("0.130.0", "0.131.0", &none, Ok(()));
2172        testcase("0.130.0", "0.132.0", &none, Err(()));
2173
2174        testcase("0.129.0", "0.127.0", &none, Ok(()));
2175        testcase("0.129.0", "0.128.0", &none, Ok(()));
2176        testcase("0.129.0", "0.129.0", &none, Ok(()));
2177        testcase("0.129.0", "0.129.1", &none, Ok(()));
2178        testcase("0.129.1", "0.129.0", &none, Ok(()));
2179        testcase("0.129.0", "0.130.0", &none, Ok(()));
2180        testcase("0.129.0", "0.131.0", &none, Err(()));
2181
2182        testcase("0.130.0", "0.128.0", &one, Ok(()));
2183        testcase("0.130.0", "0.129.0", &one, Ok(()));
2184        testcase("0.130.0", "0.130.0", &one, Ok(()));
2185        testcase("0.130.0", "0.130.1", &one, Ok(()));
2186        testcase("0.130.1", "0.130.0", &one, Ok(()));
2187        testcase("0.130.0", "0.131.0", &one, Ok(()));
2188        testcase("0.130.0", "0.132.0", &one, Ok(()));
2189
2190        testcase("0.129.0", "0.127.0", &one, Ok(()));
2191        testcase("0.129.0", "0.128.0", &one, Ok(()));
2192        testcase("0.129.0", "0.129.0", &one, Ok(()));
2193        testcase("0.129.0", "0.129.1", &one, Ok(()));
2194        testcase("0.129.1", "0.129.0", &one, Ok(()));
2195        testcase("0.129.0", "0.130.0", &one, Ok(()));
2196        testcase("0.129.0", "0.131.0", &one, Err(()));
2197
2198        testcase("0.131.0", "0.129.0", &one, Ok(()));
2199        testcase("0.131.0", "0.130.0", &one, Ok(()));
2200        testcase("0.131.0", "0.131.0", &one, Ok(()));
2201        testcase("0.131.0", "0.131.1", &one, Ok(()));
2202        testcase("0.131.1", "0.131.0", &one, Ok(()));
2203        testcase("0.131.0", "0.132.0", &one, Ok(()));
2204        testcase("0.131.0", "0.133.0", &one, Err(()));
2205
2206        testcase("0.130.0", "0.128.0", &two, Ok(()));
2207        testcase("0.130.0", "0.129.0", &two, Ok(()));
2208        testcase("0.130.0", "0.130.0", &two, Ok(()));
2209        testcase("0.130.0", "0.130.1", &two, Ok(()));
2210        testcase("0.130.1", "0.130.0", &two, Ok(()));
2211        testcase("0.130.0", "0.131.0", &two, Ok(()));
2212        testcase("0.130.0", "0.132.0", &two, Ok(()));
2213        testcase("0.130.0", "0.135.0", &two, Ok(()));
2214        testcase("0.130.0", "0.138.0", &two, Ok(()));
2215        testcase("0.130.0", "0.139.0", &two, Ok(()));
2216        testcase("0.130.0", "0.140.0", &two, Ok(()));
2217        testcase("0.130.9", "0.140.0", &two, Ok(()));
2218        testcase("0.130.0", "0.140.1", &two, Ok(()));
2219        testcase("0.130.3", "0.140.1", &two, Ok(()));
2220        testcase("0.130.3", "0.140.9", &two, Ok(()));
2221        testcase("0.130.0", "0.141.0", &two, Err(()));
2222        testcase("0.129.0", "0.133.0", &two, Err(()));
2223        testcase("0.129.0", "0.140.0", &two, Err(()));
2224        testcase("0.131.0", "0.133.0", &two, Err(()));
2225        testcase("0.131.0", "0.140.0", &two, Err(()));
2226
2227        testcase("0.130.0", "0.128.0", &three, Ok(()));
2228        testcase("0.130.0", "0.129.0", &three, Ok(()));
2229        testcase("0.130.0", "0.130.0", &three, Ok(()));
2230        testcase("0.130.0", "0.130.1", &three, Ok(()));
2231        testcase("0.130.1", "0.130.0", &three, Ok(()));
2232        testcase("0.130.0", "0.131.0", &three, Ok(()));
2233        testcase("0.130.0", "0.132.0", &three, Ok(()));
2234        testcase("0.130.0", "0.135.0", &three, Ok(()));
2235        testcase("0.130.0", "0.138.0", &three, Ok(()));
2236        testcase("0.130.0", "0.139.0", &three, Ok(()));
2237        testcase("0.130.0", "0.140.0", &three, Ok(()));
2238        testcase("0.130.9", "0.140.0", &three, Ok(()));
2239        testcase("0.130.0", "0.140.1", &three, Ok(()));
2240        testcase("0.130.3", "0.140.1", &three, Ok(()));
2241        testcase("0.130.3", "0.140.9", &three, Ok(()));
2242        testcase("0.130.0", "0.141.0", &three, Err(()));
2243        testcase("0.129.0", "0.133.0", &three, Err(()));
2244        testcase("0.129.0", "0.140.0", &three, Err(()));
2245        testcase("0.131.0", "0.133.0", &three, Err(()));
2246        testcase("0.131.0", "0.140.0", &three, Err(()));
2247        testcase("0.130.0", "0.150.0", &three, Err(()));
2248
2249        testcase("0.140.0", "0.138.0", &three, Ok(()));
2250        testcase("0.140.0", "0.139.0", &three, Ok(()));
2251        testcase("0.140.0", "0.140.0", &three, Ok(()));
2252        testcase("0.140.0", "0.140.1", &three, Ok(()));
2253        testcase("0.140.1", "0.140.0", &three, Ok(()));
2254        testcase("0.140.0", "0.141.0", &three, Ok(()));
2255        testcase("0.140.0", "0.142.0", &three, Ok(()));
2256        testcase("0.140.0", "0.145.0", &three, Ok(()));
2257        testcase("0.140.0", "0.148.0", &three, Ok(()));
2258        testcase("0.140.0", "0.149.0", &three, Ok(()));
2259        testcase("0.140.0", "0.150.0", &three, Ok(()));
2260        testcase("0.140.9", "0.150.0", &three, Ok(()));
2261        testcase("0.140.0", "0.150.1", &three, Ok(()));
2262        testcase("0.140.3", "0.150.1", &three, Ok(()));
2263        testcase("0.140.3", "0.150.9", &three, Ok(()));
2264        testcase("0.140.0", "0.151.0", &three, Err(()));
2265        testcase("0.139.0", "0.143.0", &three, Err(()));
2266        testcase("0.139.0", "0.150.0", &three, Err(()));
2267        testcase("0.141.0", "0.143.0", &three, Err(()));
2268        testcase("0.141.0", "0.150.0", &three, Err(()));
2269
2270        testcase("0.150.0", "0.148.0", &three, Ok(()));
2271        testcase("0.150.0", "0.149.0", &three, Ok(()));
2272        testcase("0.150.0", "0.150.0", &three, Ok(()));
2273        testcase("0.150.0", "0.150.1", &three, Ok(()));
2274        testcase("0.150.1", "0.150.0", &three, Ok(()));
2275        testcase("0.150.0", "0.151.0", &three, Ok(()));
2276        testcase("0.150.0", "0.152.0", &three, Ok(()));
2277        testcase("0.150.0", "0.155.0", &three, Ok(()));
2278        testcase("0.150.0", "0.158.0", &three, Ok(()));
2279        testcase("0.150.0", "0.159.0", &three, Ok(()));
2280        testcase("0.150.0", "0.160.0", &three, Ok(()));
2281        testcase("0.150.9", "0.160.0", &three, Ok(()));
2282        testcase("0.150.0", "0.160.1", &three, Ok(()));
2283        testcase("0.150.3", "0.160.1", &three, Ok(()));
2284        testcase("0.150.3", "0.160.9", &three, Ok(()));
2285        testcase("0.150.0", "0.161.0", &three, Ok(()));
2286        testcase("0.149.0", "0.153.0", &three, Err(()));
2287        testcase("0.149.0", "0.160.0", &three, Err(()));
2288        testcase("0.151.0", "0.153.0", &three, Err(()));
2289        testcase("0.151.0", "0.160.0", &three, Err(()));
2290    }
2291
2292    #[mz_ore::test]
2293    fn check_data_versions() {
2294        #[track_caller]
2295        fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2296            let code = Version::parse(code).unwrap();
2297            let data = Version::parse(data).unwrap();
2298            #[allow(clippy::disallowed_methods)]
2299            let actual =
2300                std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2301            assert_eq!(actual, expected);
2302        }
2303
2304        testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2305        testcase("0.10.0-dev", "0.10.0", Ok(()));
2306        // Note: Probably useful to let tests use two arbitrary shas on main, at
2307        // the very least for things like git bisect.
2308        testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2309        testcase("0.10.0-dev", "0.11.0", Ok(()));
2310        testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2311        testcase("0.10.0-dev", "0.12.0", Err(()));
2312        testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2313
2314        testcase("0.10.0", "0.8.0-dev", Ok(()));
2315        testcase("0.10.0", "0.8.0", Ok(()));
2316        testcase("0.10.0", "0.9.0-dev", Ok(()));
2317        testcase("0.10.0", "0.9.0", Ok(()));
2318        testcase("0.10.0", "0.10.0-dev", Ok(()));
2319        testcase("0.10.0", "0.10.0", Ok(()));
2320        // Note: This is what it would look like to run a version of the catalog
2321        // upgrade checker built from main.
2322        testcase("0.10.0", "0.11.0-dev", Ok(()));
2323        testcase("0.10.0", "0.11.0", Ok(()));
2324        testcase("0.10.0", "0.11.1", Ok(()));
2325        testcase("0.10.0", "0.11.1000000", Ok(()));
2326        testcase("0.10.0", "0.12.0-dev", Err(()));
2327        testcase("0.10.0", "0.12.0", Err(()));
2328        testcase("0.10.0", "0.13.0-dev", Err(()));
2329
2330        testcase("0.10.1", "0.9.0", Ok(()));
2331        testcase("0.10.1", "0.10.0", Ok(()));
2332        testcase("0.10.1", "0.11.0", Ok(()));
2333        testcase("0.10.1", "0.11.1", Ok(()));
2334        testcase("0.10.1", "0.11.100", Ok(()));
2335
2336        // This is probably a bad idea (seems as if we've downgraded from
2337        // running v0.10.1 to v0.10.0, an earlier patch version of the same
2338        // minor version), but not much we can do, given the `state_version =
2339        // max(code_version, prev_state_version)` logic we need to prevent
2340        // rolling back an arbitrary number of versions.
2341        testcase("0.10.0", "0.10.1", Ok(()));
2342    }
2343}