mz_persist_client/internal/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::sync::Arc;
16
17use bytes::{Buf, Bytes};
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::trace::Description;
20use mz_ore::cast::CastInto;
21use mz_ore::{assert_none, halt};
22use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
23use mz_persist::location::{SeqNo, VersionedData};
24use mz_persist::metrics::ColumnarMetrics;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::stats::{PartStats, ProtoStructStats};
27use mz_persist_types::{Codec, Codec64};
28use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
29use proptest::prelude::Arbitrary;
30use proptest::strategy::Strategy;
31use prost::Message;
32use semver::Version;
33use serde::ser::SerializeStruct;
34use serde::{Deserialize, Serialize};
35use timely::progress::{Antichain, Timestamp};
36use uuid::Uuid;
37
38use crate::critical::CriticalReaderId;
39use crate::error::{CodecMismatch, CodecMismatchT};
40use crate::internal::metrics::Metrics;
41use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
42use crate::internal::state::{
43    ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
44    HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
45    LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
46    ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
47    ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
48    ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
49    ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
50    ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
51    ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
52    RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
53    proto_hollow_batch_part,
54};
55use crate::internal::state_diff::{
56    ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
57};
58use crate::internal::trace::{
59    ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
60};
61use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
62use crate::{PersistConfig, ShardId, WriterId, cfg};
63
64#[derive(Debug)]
65pub struct Schemas<K: Codec, V: Codec> {
66    // TODO: Remove the Option once this finishes rolling out and all shards
67    // have a registered schema.
68    pub id: Option<SchemaId>,
69    pub key: Arc<K::Schema>,
70    pub val: Arc<V::Schema>,
71}
72
73impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
74    fn clone(&self) -> Self {
75        Self {
76            id: self.id,
77            key: Arc::clone(&self.key),
78            val: Arc::clone(&self.val),
79        }
80    }
81}
82
83/// A proto that is decoded on use.
84///
85/// Because of the way prost works, decoding a large protobuf may result in a
86/// number of very short lived allocations in our RustType/ProtoType decode path
87/// (e.g. this happens for a repeated embedded message). Not every use of
88/// persist State needs every transitive bit of it to be decoded, so we opt
89/// certain parts of it (initially stats) to be decoded on use.
90///
91/// This has the dual benefit of only paying for the short-lived allocs when
92/// necessary and also allowing decoding to be gated by a feature flag. The
93/// tradeoffs are that we might decode more than once and that we have to handle
94/// invalid proto errors in more places.
95///
96/// Mechanically, this is accomplished by making the field a proto `bytes` types
97/// instead of `ProtoFoo`. These bytes then contain the serialization of
98/// ProtoFoo. NB: Swapping between the two is actually a forward and backward
99/// compatible change.
100///
101/// > Embedded messages are compatible with bytes if the bytes contain an
102/// > encoded version of the message.
103///
104/// (See <https://protobuf.dev/programming-guides/proto3/#updating>)
105#[derive(Clone, Serialize, Deserialize)]
106pub struct LazyProto<T> {
107    buf: Bytes,
108    _phantom: PhantomData<fn() -> T>,
109}
110
111impl<T: Message + Default> Debug for LazyProto<T> {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        match self.decode() {
114            Ok(proto) => Debug::fmt(&proto, f),
115            Err(err) => f
116                .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
117                .field("err", &err)
118                .finish(),
119        }
120    }
121}
122
123impl<T> PartialEq for LazyProto<T> {
124    fn eq(&self, other: &Self) -> bool {
125        self.cmp(other) == Ordering::Equal
126    }
127}
128
129impl<T> Eq for LazyProto<T> {}
130
131impl<T> PartialOrd for LazyProto<T> {
132    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
133        Some(self.cmp(other))
134    }
135}
136
137impl<T> Ord for LazyProto<T> {
138    fn cmp(&self, other: &Self) -> Ordering {
139        let LazyProto {
140            buf: self_buf,
141            _phantom: _,
142        } = self;
143        let LazyProto {
144            buf: other_buf,
145            _phantom: _,
146        } = other;
147        self_buf.cmp(other_buf)
148    }
149}
150
151impl<T> Hash for LazyProto<T> {
152    fn hash<H: Hasher>(&self, state: &mut H) {
153        let LazyProto { buf, _phantom } = self;
154        buf.hash(state);
155    }
156}
157
158impl<T: Message + Default> From<&T> for LazyProto<T> {
159    fn from(value: &T) -> Self {
160        let buf = Bytes::from(value.encode_to_vec());
161        LazyProto {
162            buf,
163            _phantom: PhantomData,
164        }
165    }
166}
167
168impl<T: Message + Default> LazyProto<T> {
169    pub fn decode(&self) -> Result<T, prost::DecodeError> {
170        T::decode(&*self.buf)
171    }
172}
173
174impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
175    fn into_proto(&self) -> Bytes {
176        self.buf.clone()
177    }
178
179    fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
180        Ok(Self {
181            buf,
182            _phantom: PhantomData,
183        })
184    }
185}
186
187pub(crate) fn parse_id(id_prefix: char, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
188    let uuid_encoded = match encoded.strip_prefix(id_prefix) {
189        Some(x) => x,
190        None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
191    };
192    let uuid = Uuid::parse_str(uuid_encoded)
193        .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
194    Ok(*uuid.as_bytes())
195}
196
197pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
198    if let Err(msg) = cfg::check_data_version(code_version, data_version) {
199        // We can't catch halts, so panic in test, so we can get unit test
200        // coverage.
201        if cfg!(test) {
202            panic!("{msg}");
203        } else {
204            halt!("{msg}");
205        }
206    }
207}
208
209impl RustType<String> for LeasedReaderId {
210    fn into_proto(&self) -> String {
211        self.to_string()
212    }
213
214    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
215        match proto.parse() {
216            Ok(x) => Ok(x),
217            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
218        }
219    }
220}
221
222impl RustType<String> for CriticalReaderId {
223    fn into_proto(&self) -> String {
224        self.to_string()
225    }
226
227    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
228        match proto.parse() {
229            Ok(x) => Ok(x),
230            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
231        }
232    }
233}
234
235impl RustType<String> for WriterId {
236    fn into_proto(&self) -> String {
237        self.to_string()
238    }
239
240    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
241        match proto.parse() {
242            Ok(x) => Ok(x),
243            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
244        }
245    }
246}
247
248impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
249    fn into_proto(&self) -> ProtoEncodedSchemas {
250        ProtoEncodedSchemas {
251            key: self.key.clone(),
252            key_data_type: self.key_data_type.clone(),
253            val: self.val.clone(),
254            val_data_type: self.val_data_type.clone(),
255        }
256    }
257
258    fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
259        Ok(EncodedSchemas {
260            key: proto.key,
261            key_data_type: proto.key_data_type,
262            val: proto.val,
263            val_data_type: proto.val_data_type,
264        })
265    }
266}
267
268impl RustType<String> for IdempotencyToken {
269    fn into_proto(&self) -> String {
270        self.to_string()
271    }
272
273    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
274        match proto.parse() {
275            Ok(x) => Ok(x),
276            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
277        }
278    }
279}
280
281impl RustType<String> for PartialBatchKey {
282    fn into_proto(&self) -> String {
283        self.0.clone()
284    }
285
286    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
287        Ok(PartialBatchKey(proto))
288    }
289}
290
291impl RustType<String> for PartialRollupKey {
292    fn into_proto(&self) -> String {
293        self.0.clone()
294    }
295
296    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
297        Ok(PartialRollupKey(proto))
298    }
299}
300
301impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
302    pub fn encode<B>(&self, buf: &mut B)
303    where
304        B: bytes::BufMut,
305    {
306        self.into_proto()
307            .encode(buf)
308            .expect("no required fields means no initialization errors");
309    }
310
311    pub fn decode(build_version: &Version, buf: Bytes) -> Self {
312        let proto = ProtoStateDiff::decode(buf)
313            // We received a State that we couldn't decode. This could happen if
314            // persist messes up backward/forward compatibility, if the durable
315            // data was corrupted, or if operations messes up deployment. In any
316            // case, fail loudly.
317            .expect("internal error: invalid encoded state");
318        let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
319        check_data_version(build_version, &diff.applier_version);
320        diff
321    }
322}
323
324impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
325    fn into_proto(&self) -> ProtoStateDiff {
326        // Deconstruct self so we get a compile failure if new fields are added.
327        let StateDiff {
328            applier_version,
329            seqno_from,
330            seqno_to,
331            walltime_ms,
332            latest_rollup_key,
333            rollups,
334            active_rollup,
335            active_gc,
336            hostname,
337            last_gc_req,
338            leased_readers,
339            critical_readers,
340            writers,
341            schemas,
342            since,
343            legacy_batches,
344            hollow_batches,
345            spine_batches,
346            merges,
347        } = self;
348
349        let proto = ProtoStateFieldDiffs::default();
350
351        // Create a writer so we can efficiently encode our data.
352        let mut writer = proto.into_writer();
353
354        field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
355        field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
356        field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
357        field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
358        field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
359        field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
360        field_diffs_into_proto(
361            ProtoStateField::CriticalReaders,
362            critical_readers,
363            &mut writer,
364        );
365        field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
366        field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
367        field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
368        field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
369        field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
370        field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
371        field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
372
373        // After encoding all of our data, convert back into the proto.
374        let field_diffs = writer.into_proto();
375
376        debug_assert_eq!(field_diffs.validate(), Ok(()));
377        ProtoStateDiff {
378            applier_version: applier_version.to_string(),
379            seqno_from: seqno_from.into_proto(),
380            seqno_to: seqno_to.into_proto(),
381            walltime_ms: walltime_ms.into_proto(),
382            latest_rollup_key: latest_rollup_key.into_proto(),
383            field_diffs: Some(field_diffs),
384        }
385    }
386
387    fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
388        let applier_version = if proto.applier_version.is_empty() {
389            // Backward compatibility with versions of ProtoState before we set
390            // this field: if it's missing (empty), assume an infinitely old
391            // version.
392            semver::Version::new(0, 0, 0)
393        } else {
394            semver::Version::parse(&proto.applier_version).map_err(|err| {
395                TryFromProtoError::InvalidSemverVersion(format!(
396                    "invalid applier_version {}: {}",
397                    proto.applier_version, err
398                ))
399            })?
400        };
401        let mut state_diff = StateDiff::new(
402            applier_version,
403            proto.seqno_from.into_rust()?,
404            proto.seqno_to.into_rust()?,
405            proto.walltime_ms,
406            proto.latest_rollup_key.into_rust()?,
407        );
408        if let Some(field_diffs) = proto.field_diffs {
409            debug_assert_eq!(field_diffs.validate(), Ok(()));
410            for field_diff in field_diffs.iter() {
411                let (field, diff) = field_diff?;
412                match field {
413                    ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
414                        diff,
415                        &mut state_diff.hostname,
416                        |()| Ok(()),
417                        |v| v.into_rust(),
418                    )?,
419                    ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
420                        diff,
421                        &mut state_diff.last_gc_req,
422                        |()| Ok(()),
423                        |v| v.into_rust(),
424                    )?,
425                    ProtoStateField::ActiveGc => {
426                        field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
427                            diff,
428                            &mut state_diff.active_gc,
429                            |()| Ok(()),
430                            |v| v.into_rust(),
431                        )?
432                    }
433                    ProtoStateField::ActiveRollup => {
434                        field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
435                            diff,
436                            &mut state_diff.active_rollup,
437                            |()| Ok(()),
438                            |v| v.into_rust(),
439                        )?
440                    }
441                    ProtoStateField::Rollups => {
442                        field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
443                            diff,
444                            &mut state_diff.rollups,
445                            |k| k.into_rust(),
446                            |v| v.into_rust(),
447                        )?
448                    }
449                    // MIGRATION: We previously stored rollups as a `SeqNo ->
450                    // string Key` map, but now the value is a `struct
451                    // HollowRollup`.
452                    ProtoStateField::DeprecatedRollups => {
453                        field_diff_into_rust::<u64, String, _, _, _, _>(
454                            diff,
455                            &mut state_diff.rollups,
456                            |k| k.into_rust(),
457                            |v| {
458                                Ok(HollowRollup {
459                                    key: v.into_rust()?,
460                                    encoded_size_bytes: None,
461                                })
462                            },
463                        )?
464                    }
465                    ProtoStateField::LeasedReaders => {
466                        field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
467                            diff,
468                            &mut state_diff.leased_readers,
469                            |k| k.into_rust(),
470                            |v| v.into_rust(),
471                        )?
472                    }
473                    ProtoStateField::CriticalReaders => {
474                        field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
475                            diff,
476                            &mut state_diff.critical_readers,
477                            |k| k.into_rust(),
478                            |v| v.into_rust(),
479                        )?
480                    }
481                    ProtoStateField::Writers => {
482                        field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
483                            diff,
484                            &mut state_diff.writers,
485                            |k| k.into_rust(),
486                            |v| v.into_rust(),
487                        )?
488                    }
489                    ProtoStateField::Schemas => {
490                        field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
491                            diff,
492                            &mut state_diff.schemas,
493                            |k| k.into_rust(),
494                            |v| v.into_rust(),
495                        )?
496                    }
497                    ProtoStateField::Since => {
498                        field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
499                            diff,
500                            &mut state_diff.since,
501                            |()| Ok(()),
502                            |v| v.into_rust(),
503                        )?
504                    }
505                    ProtoStateField::LegacyBatches => {
506                        field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
507                            diff,
508                            &mut state_diff.legacy_batches,
509                            |k| k.into_rust(),
510                            |()| Ok(()),
511                        )?
512                    }
513                    ProtoStateField::HollowBatches => {
514                        field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
515                            diff,
516                            &mut state_diff.hollow_batches,
517                            |k| k.into_rust(),
518                            |v| v.into_rust(),
519                        )?
520                    }
521                    ProtoStateField::SpineBatches => {
522                        field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
523                            diff,
524                            &mut state_diff.spine_batches,
525                            |k| k.into_rust(),
526                            |v| v.into_rust(),
527                        )?
528                    }
529                    ProtoStateField::SpineMerges => {
530                        field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
531                            diff,
532                            &mut state_diff.merges,
533                            |k| k.into_rust(),
534                            |v| v.into_rust(),
535                        )?
536                    }
537                }
538            }
539        }
540        Ok(state_diff)
541    }
542}
543
544fn field_diffs_into_proto<K, KP, V, VP>(
545    field: ProtoStateField,
546    diffs: &[StateFieldDiff<K, V>],
547    writer: &mut ProtoStateFieldDiffsWriter,
548) where
549    KP: prost::Message,
550    K: RustType<KP>,
551    VP: prost::Message,
552    V: RustType<VP>,
553{
554    for diff in diffs.iter() {
555        field_diff_into_proto(field, diff, writer);
556    }
557}
558
559fn field_diff_into_proto<K, KP, V, VP>(
560    field: ProtoStateField,
561    diff: &StateFieldDiff<K, V>,
562    writer: &mut ProtoStateFieldDiffsWriter,
563) where
564    KP: prost::Message,
565    K: RustType<KP>,
566    VP: prost::Message,
567    V: RustType<VP>,
568{
569    writer.push_field(field);
570    writer.encode_proto(&diff.key.into_proto());
571    match &diff.val {
572        StateFieldValDiff::Insert(to) => {
573            writer.push_diff_type(ProtoStateFieldDiffType::Insert);
574            writer.encode_proto(&to.into_proto());
575        }
576        StateFieldValDiff::Update(from, to) => {
577            writer.push_diff_type(ProtoStateFieldDiffType::Update);
578            writer.encode_proto(&from.into_proto());
579            writer.encode_proto(&to.into_proto());
580        }
581        StateFieldValDiff::Delete(from) => {
582            writer.push_diff_type(ProtoStateFieldDiffType::Delete);
583            writer.encode_proto(&from.into_proto());
584        }
585    };
586}
587
588fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
589    proto: ProtoStateFieldDiff<'_>,
590    diffs: &mut Vec<StateFieldDiff<K, V>>,
591    k_fn: KFn,
592    v_fn: VFn,
593) -> Result<(), TryFromProtoError>
594where
595    KP: prost::Message + Default,
596    VP: prost::Message + Default,
597    KFn: Fn(KP) -> Result<K, TryFromProtoError>,
598    VFn: Fn(VP) -> Result<V, TryFromProtoError>,
599{
600    let val = match proto.diff_type {
601        ProtoStateFieldDiffType::Insert => {
602            let to = VP::decode(proto.to)
603                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
604            StateFieldValDiff::Insert(v_fn(to)?)
605        }
606        ProtoStateFieldDiffType::Update => {
607            let from = VP::decode(proto.from)
608                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
609            let to = VP::decode(proto.to)
610                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
611
612            StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
613        }
614        ProtoStateFieldDiffType::Delete => {
615            let from = VP::decode(proto.from)
616                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
617            StateFieldValDiff::Delete(v_fn(from)?)
618        }
619    };
620    let key = KP::decode(proto.key)
621        .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
622    diffs.push(StateFieldDiff {
623        key: k_fn(key)?,
624        val,
625    });
626    Ok(())
627}
628
629/// The decoded state of [ProtoRollup] for which we have not yet checked
630/// that codecs match the ones in durable state.
631#[derive(Debug)]
632#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
633pub struct UntypedState<T> {
634    pub(crate) key_codec: String,
635    pub(crate) val_codec: String,
636    pub(crate) ts_codec: String,
637    pub(crate) diff_codec: String,
638
639    // Important! This T has not been validated, so we can't expose anything in
640    // State that references T until one of the check methods have been called.
641    // Any field on State that doesn't reference T is fair game.
642    state: State<T>,
643}
644
645impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
646    pub fn seqno(&self) -> SeqNo {
647        self.state.seqno
648    }
649
650    pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
651        &self.state.collections.rollups
652    }
653
654    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
655        self.state.latest_rollup()
656    }
657
658    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
659        &mut self,
660        cfg: &PersistConfig,
661        metrics: &Metrics,
662        diffs: I,
663    ) {
664        // The apply_encoded_diffs might panic if T is not correct. Making this
665        // a silent no-op is far too subtle for my taste, but it's not clear
666        // what else we could do instead.
667        if T::codec_name() != self.ts_codec {
668            return;
669        }
670        self.state.apply_encoded_diffs(cfg, metrics, diffs);
671    }
672
673    pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
674        self,
675        shard_id: &ShardId,
676    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
677        // Also defensively check that the shard_id on the state we fetched
678        // matches the shard_id we were trying to fetch.
679        assert_eq!(shard_id, &self.state.shard_id);
680        if K::codec_name() != self.key_codec
681            || V::codec_name() != self.val_codec
682            || T::codec_name() != self.ts_codec
683            || D::codec_name() != self.diff_codec
684        {
685            return Err(Box::new(CodecMismatch {
686                requested: (
687                    K::codec_name(),
688                    V::codec_name(),
689                    T::codec_name(),
690                    D::codec_name(),
691                    None,
692                ),
693                actual: (
694                    self.key_codec,
695                    self.val_codec,
696                    self.ts_codec,
697                    self.diff_codec,
698                    None,
699                ),
700            }));
701        }
702        Ok(TypedState {
703            state: self.state,
704            _phantom: PhantomData,
705        })
706    }
707
708    pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
709        // Also defensively check that the shard_id on the state we fetched
710        // matches the shard_id we were trying to fetch.
711        assert_eq!(shard_id, &self.state.shard_id);
712        if T::codec_name() != self.ts_codec {
713            return Err(CodecMismatchT {
714                requested: T::codec_name(),
715                actual: self.ts_codec,
716            });
717        }
718        Ok(self.state)
719    }
720
721    pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
722        let proto = ProtoRollup::decode(buf)
723            // We received a State that we couldn't decode. This could happen if
724            // persist messes up backward/forward compatibility, if the durable
725            // data was corrupted, or if operations messes up deployment. In any
726            // case, fail loudly.
727            .expect("internal error: invalid encoded state");
728        let state = Rollup::from_proto(proto)
729            .expect("internal error: invalid encoded state")
730            .state;
731        check_data_version(build_version, &state.state.applier_version);
732        state
733    }
734}
735
736impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
737where
738    K: Codec,
739    V: Codec,
740    T: Codec64,
741    D: Codec64,
742{
743    fn from(typed_state: TypedState<K, V, T, D>) -> Self {
744        UntypedState {
745            key_codec: K::codec_name(),
746            val_codec: V::codec_name(),
747            ts_codec: T::codec_name(),
748            diff_codec: D::codec_name(),
749            state: typed_state.state,
750        }
751    }
752}
753
754/// A struct that maps 1:1 with ProtoRollup.
755///
756/// Contains State, and optionally the diffs between (state.latest_rollup.seqno, state.seqno]
757///
758/// `diffs` is always expected to be `Some` when writing new rollups, but may be `None`
759/// when deserializing rollups that were persisted before we started inlining diffs.
760#[derive(Debug)]
761pub struct Rollup<T> {
762    pub(crate) state: UntypedState<T>,
763    pub(crate) diffs: Option<InlinedDiffs>,
764}
765
766impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
767    /// Creates a `StateRollup` from a state and diffs from its last rollup.
768    ///
769    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
770    pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
771        let latest_rollup_seqno = *state.latest_rollup().0;
772        let mut verify_seqno = latest_rollup_seqno;
773        for diff in &diffs {
774            assert_eq!(verify_seqno.next(), diff.seqno);
775            verify_seqno = diff.seqno;
776        }
777        assert_eq!(verify_seqno, state.seqno());
778
779        let diffs = Some(InlinedDiffs::from(
780            latest_rollup_seqno.next(),
781            state.seqno().next(),
782            diffs,
783        ));
784
785        Self { state, diffs }
786    }
787
788    pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
789        Self { state, diffs: None }
790    }
791
792    pub(crate) fn from_state_without_diffs(
793        state: State<T>,
794        key_codec: String,
795        val_codec: String,
796        ts_codec: String,
797        diff_codec: String,
798    ) -> Self {
799        Self::from_untyped_state_without_diffs(UntypedState {
800            key_codec,
801            val_codec,
802            ts_codec,
803            diff_codec,
804            state,
805        })
806    }
807}
808
809#[derive(Debug)]
810pub(crate) struct InlinedDiffs {
811    pub(crate) lower: SeqNo,
812    pub(crate) upper: SeqNo,
813    pub(crate) diffs: Vec<VersionedData>,
814}
815
816impl InlinedDiffs {
817    pub(crate) fn description(&self) -> Description<SeqNo> {
818        Description::new(
819            Antichain::from_elem(self.lower),
820            Antichain::from_elem(self.upper),
821            Antichain::from_elem(SeqNo::minimum()),
822        )
823    }
824
825    fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
826        for diff in &diffs {
827            assert!(diff.seqno >= lower);
828            assert!(diff.seqno < upper);
829        }
830        Self {
831            lower,
832            upper,
833            diffs,
834        }
835    }
836}
837
838impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
839    fn into_proto(&self) -> ProtoInlinedDiffs {
840        ProtoInlinedDiffs {
841            lower: self.lower.into_proto(),
842            upper: self.upper.into_proto(),
843            diffs: self.diffs.into_proto(),
844        }
845    }
846
847    fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
848        Ok(Self {
849            lower: proto.lower.into_rust()?,
850            upper: proto.upper.into_rust()?,
851            diffs: proto.diffs.into_rust()?,
852        })
853    }
854}
855
856impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
857    fn into_proto(&self) -> ProtoRollup {
858        ProtoRollup {
859            applier_version: self.state.state.applier_version.to_string(),
860            shard_id: self.state.state.shard_id.into_proto(),
861            seqno: self.state.state.seqno.into_proto(),
862            walltime_ms: self.state.state.walltime_ms.into_proto(),
863            hostname: self.state.state.hostname.into_proto(),
864            key_codec: self.state.key_codec.into_proto(),
865            val_codec: self.state.val_codec.into_proto(),
866            ts_codec: T::codec_name(),
867            diff_codec: self.state.diff_codec.into_proto(),
868            last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
869            active_rollup: self.state.state.collections.active_rollup.into_proto(),
870            active_gc: self.state.state.collections.active_gc.into_proto(),
871            rollups: self
872                .state
873                .state
874                .collections
875                .rollups
876                .iter()
877                .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
878                .collect(),
879            deprecated_rollups: Default::default(),
880            leased_readers: self
881                .state
882                .state
883                .collections
884                .leased_readers
885                .iter()
886                .map(|(id, state)| (id.into_proto(), state.into_proto()))
887                .collect(),
888            critical_readers: self
889                .state
890                .state
891                .collections
892                .critical_readers
893                .iter()
894                .map(|(id, state)| (id.into_proto(), state.into_proto()))
895                .collect(),
896            writers: self
897                .state
898                .state
899                .collections
900                .writers
901                .iter()
902                .map(|(id, state)| (id.into_proto(), state.into_proto()))
903                .collect(),
904            schemas: self
905                .state
906                .state
907                .collections
908                .schemas
909                .iter()
910                .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
911                .collect(),
912            trace: Some(self.state.state.collections.trace.into_proto()),
913            diffs: self.diffs.as_ref().map(|x| x.into_proto()),
914        }
915    }
916
917    fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
918        let applier_version = if x.applier_version.is_empty() {
919            // Backward compatibility with versions of ProtoState before we set
920            // this field: if it's missing (empty), assume an infinitely old
921            // version.
922            semver::Version::new(0, 0, 0)
923        } else {
924            semver::Version::parse(&x.applier_version).map_err(|err| {
925                TryFromProtoError::InvalidSemverVersion(format!(
926                    "invalid applier_version {}: {}",
927                    x.applier_version, err
928                ))
929            })?
930        };
931
932        let mut rollups = BTreeMap::new();
933        for (seqno, rollup) in x.rollups {
934            rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
935        }
936        for (seqno, key) in x.deprecated_rollups {
937            rollups.insert(
938                seqno.into_rust()?,
939                HollowRollup {
940                    key: key.into_rust()?,
941                    encoded_size_bytes: None,
942                },
943            );
944        }
945        let mut leased_readers = BTreeMap::new();
946        for (id, state) in x.leased_readers {
947            leased_readers.insert(id.into_rust()?, state.into_rust()?);
948        }
949        let mut critical_readers = BTreeMap::new();
950        for (id, state) in x.critical_readers {
951            critical_readers.insert(id.into_rust()?, state.into_rust()?);
952        }
953        let mut writers = BTreeMap::new();
954        for (id, state) in x.writers {
955            writers.insert(id.into_rust()?, state.into_rust()?);
956        }
957        let mut schemas = BTreeMap::new();
958        for (id, x) in x.schemas {
959            schemas.insert(id.into_rust()?, x.into_rust()?);
960        }
961        let active_rollup = x
962            .active_rollup
963            .map(|rollup| rollup.into_rust())
964            .transpose()?;
965        let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
966        let collections = StateCollections {
967            rollups,
968            active_rollup,
969            active_gc,
970            last_gc_req: x.last_gc_req.into_rust()?,
971            leased_readers,
972            critical_readers,
973            writers,
974            schemas,
975            trace: x.trace.into_rust_if_some("trace")?,
976        };
977        let state = State {
978            applier_version,
979            shard_id: x.shard_id.into_rust()?,
980            seqno: x.seqno.into_rust()?,
981            walltime_ms: x.walltime_ms,
982            hostname: x.hostname,
983            collections,
984        };
985
986        let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
987        if let Some(diffs) = &diffs {
988            if diffs.lower != state.latest_rollup().0.next() {
989                return Err(TryFromProtoError::InvalidPersistState(format!(
990                    "diffs lower ({}) should match latest rollup's successor: ({})",
991                    diffs.lower,
992                    state.latest_rollup().0.next()
993                )));
994            }
995            if diffs.upper != state.seqno.next() {
996                return Err(TryFromProtoError::InvalidPersistState(format!(
997                    "diffs upper ({}) should match state's successor: ({})",
998                    diffs.lower,
999                    state.seqno.next()
1000                )));
1001            }
1002        }
1003
1004        Ok(Rollup {
1005            state: UntypedState {
1006                state,
1007                key_codec: x.key_codec.into_rust()?,
1008                val_codec: x.val_codec.into_rust()?,
1009                ts_codec: x.ts_codec.into_rust()?,
1010                diff_codec: x.diff_codec.into_rust()?,
1011            },
1012            diffs,
1013        })
1014    }
1015}
1016
1017impl RustType<ProtoVersionedData> for VersionedData {
1018    fn into_proto(&self) -> ProtoVersionedData {
1019        ProtoVersionedData {
1020            seqno: self.seqno.into_proto(),
1021            data: Bytes::clone(&self.data),
1022        }
1023    }
1024
1025    fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1026        Ok(Self {
1027            seqno: proto.seqno.into_rust()?,
1028            data: proto.data,
1029        })
1030    }
1031}
1032
1033impl RustType<ProtoSpineId> for SpineId {
1034    fn into_proto(&self) -> ProtoSpineId {
1035        ProtoSpineId {
1036            lo: self.0.into_proto(),
1037            hi: self.1.into_proto(),
1038        }
1039    }
1040
1041    fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1042        Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1043    }
1044}
1045
1046impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1047    fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1048        let (id, batch) = entry;
1049        ProtoIdHollowBatch {
1050            id: Some(id.into_proto()),
1051            batch: Some(batch.into_proto()),
1052        }
1053    }
1054
1055    fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1056        let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1057        let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1058        Ok((id, batch))
1059    }
1060}
1061
1062impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1063    fn into_proto(&self) -> ProtoSpineBatch {
1064        ProtoSpineBatch {
1065            desc: Some(self.desc.into_proto()),
1066            parts: self.parts.into_proto(),
1067            level: self.level.into_proto(),
1068            descs: self.descs.into_proto(),
1069        }
1070    }
1071
1072    fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1073        let level = proto.level.into_rust()?;
1074        let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1075        let parts = proto.parts.into_rust()?;
1076        let descs = proto.descs.into_rust()?;
1077        Ok(ThinSpineBatch {
1078            level,
1079            desc,
1080            parts,
1081            descs,
1082        })
1083    }
1084}
1085
1086impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1087    fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1088        let (id, batch) = entry;
1089        ProtoIdSpineBatch {
1090            id: Some(id.into_proto()),
1091            batch: Some(batch.into_proto()),
1092        }
1093    }
1094
1095    fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1096        let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1097        let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1098        Ok((id, batch))
1099    }
1100}
1101
1102impl RustType<ProtoCompaction> for ActiveCompaction {
1103    fn into_proto(&self) -> ProtoCompaction {
1104        ProtoCompaction {
1105            start_ms: self.start_ms,
1106        }
1107    }
1108
1109    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1110        Ok(Self {
1111            start_ms: proto.start_ms,
1112        })
1113    }
1114}
1115
1116impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1117    fn into_proto(&self) -> ProtoMerge {
1118        ProtoMerge {
1119            since: Some(self.since.into_proto()),
1120            remaining_work: self.remaining_work.into_proto(),
1121            active_compaction: self.active_compaction.into_proto(),
1122        }
1123    }
1124
1125    fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1126        let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1127        let remaining_work = proto.remaining_work.into_rust()?;
1128        let active_compaction = proto.active_compaction.into_rust()?;
1129        Ok(Self {
1130            since,
1131            remaining_work,
1132            active_compaction,
1133        })
1134    }
1135}
1136
1137impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1138    fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1139        ProtoIdMerge {
1140            id: Some(id.into_proto()),
1141            merge: Some(merge.into_proto()),
1142        }
1143    }
1144
1145    fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1146        let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1147        let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1148        Ok((id, merge))
1149    }
1150}
1151
1152impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1153    fn into_proto(&self) -> ProtoTrace {
1154        let since = self.since.into_proto();
1155        let legacy_batches = self
1156            .legacy_batches
1157            .iter()
1158            .map(|(b, _)| b.into_proto())
1159            .collect();
1160        let hollow_batches = self.hollow_batches.into_proto();
1161        let spine_batches = self.spine_batches.into_proto();
1162        let merges = self.merges.into_proto();
1163        ProtoTrace {
1164            since: Some(since),
1165            legacy_batches,
1166            hollow_batches,
1167            spine_batches,
1168            merges,
1169        }
1170    }
1171
1172    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1173        let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1174        let legacy_batches = proto
1175            .legacy_batches
1176            .into_iter()
1177            .map(|b| b.into_rust().map(|b| (b, ())))
1178            .collect::<Result<_, _>>()?;
1179        let hollow_batches = proto.hollow_batches.into_rust()?;
1180        let spine_batches = proto.spine_batches.into_rust()?;
1181        let merges = proto.merges.into_rust()?;
1182        Ok(FlatTrace {
1183            since,
1184            legacy_batches,
1185            hollow_batches,
1186            spine_batches,
1187            merges,
1188        })
1189    }
1190}
1191
1192impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1193    fn into_proto(&self) -> ProtoTrace {
1194        self.flatten().into_proto()
1195    }
1196
1197    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1198        Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1199    }
1200}
1201
1202impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1203    fn into_proto(&self) -> ProtoLeasedReaderState {
1204        ProtoLeasedReaderState {
1205            seqno: self.seqno.into_proto(),
1206            since: Some(self.since.into_proto()),
1207            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1208            lease_duration_ms: self.lease_duration_ms.into_proto(),
1209            debug: Some(self.debug.into_proto()),
1210        }
1211    }
1212
1213    fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1214        let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1215        // MIGRATION: If the lease_duration_ms is empty, then the proto field
1216        // was missing and we need to fill in a default. This would ideally be
1217        // based on the actual value in PersistConfig, but it's only here for a
1218        // short time and this is way easier.
1219        if lease_duration_ms == 0 {
1220            lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1221                .expect("lease duration as millis should fit within u64");
1222        }
1223        // MIGRATION: If debug is empty, then the proto field was missing and we
1224        // need to fill in a default.
1225        let debug = proto.debug.unwrap_or_default().into_rust()?;
1226        Ok(LeasedReaderState {
1227            seqno: proto.seqno.into_rust()?,
1228            since: proto
1229                .since
1230                .into_rust_if_some("ProtoLeasedReaderState::since")?,
1231            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1232            lease_duration_ms,
1233            debug,
1234        })
1235    }
1236}
1237
1238impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1239    fn into_proto(&self) -> ProtoCriticalReaderState {
1240        ProtoCriticalReaderState {
1241            since: Some(self.since.into_proto()),
1242            opaque: i64::from_le_bytes(self.opaque.0),
1243            opaque_codec: self.opaque_codec.clone(),
1244            debug: Some(self.debug.into_proto()),
1245        }
1246    }
1247
1248    fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1249        // MIGRATION: If debug is empty, then the proto field was missing and we
1250        // need to fill in a default.
1251        let debug = proto.debug.unwrap_or_default().into_rust()?;
1252        Ok(CriticalReaderState {
1253            since: proto
1254                .since
1255                .into_rust_if_some("ProtoCriticalReaderState::since")?,
1256            opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1257            opaque_codec: proto.opaque_codec,
1258            debug,
1259        })
1260    }
1261}
1262
1263impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1264    fn into_proto(&self) -> ProtoWriterState {
1265        ProtoWriterState {
1266            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1267            lease_duration_ms: self.lease_duration_ms.into_proto(),
1268            most_recent_write_token: self.most_recent_write_token.into_proto(),
1269            most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1270            debug: Some(self.debug.into_proto()),
1271        }
1272    }
1273
1274    fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1275        // MIGRATION: We didn't originally have most_recent_write_token and
1276        // most_recent_write_upper. Pick values that aren't going to
1277        // accidentally match ones in incoming writes and confuse things. We
1278        // could instead use Option on WriterState but this keeps the backward
1279        // compatibility logic confined to one place.
1280        let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1281            IdempotencyToken::SENTINEL
1282        } else {
1283            proto.most_recent_write_token.into_rust()?
1284        };
1285        let most_recent_write_upper = match proto.most_recent_write_upper {
1286            Some(x) => x.into_rust()?,
1287            None => Antichain::from_elem(T::minimum()),
1288        };
1289        // MIGRATION: If debug is empty, then the proto field was missing and we
1290        // need to fill in a default.
1291        let debug = proto.debug.unwrap_or_default().into_rust()?;
1292        Ok(WriterState {
1293            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1294            lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1295            most_recent_write_token,
1296            most_recent_write_upper,
1297            debug,
1298        })
1299    }
1300}
1301
1302impl RustType<ProtoHandleDebugState> for HandleDebugState {
1303    fn into_proto(&self) -> ProtoHandleDebugState {
1304        ProtoHandleDebugState {
1305            hostname: self.hostname.into_proto(),
1306            purpose: self.purpose.into_proto(),
1307        }
1308    }
1309
1310    fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1311        Ok(HandleDebugState {
1312            hostname: proto.hostname,
1313            purpose: proto.purpose,
1314        })
1315    }
1316}
1317
1318impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1319    fn into_proto(&self) -> ProtoHollowRun {
1320        ProtoHollowRun {
1321            parts: self.parts.into_proto(),
1322        }
1323    }
1324
1325    fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1326        Ok(HollowRun {
1327            parts: proto.parts.into_rust()?,
1328        })
1329    }
1330}
1331
1332impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1333    fn into_proto(&self) -> ProtoHollowBatch {
1334        let mut run_meta = self.run_meta.into_proto();
1335        // For backwards compatibility reasons, don't keep default metadata in the proto.
1336        let run_meta_default = RunMeta::default().into_proto();
1337        while run_meta.last() == Some(&run_meta_default) {
1338            run_meta.pop();
1339        }
1340        ProtoHollowBatch {
1341            desc: Some(self.desc.into_proto()),
1342            parts: self.parts.into_proto(),
1343            len: self.len.into_proto(),
1344            runs: self.run_splits.into_proto(),
1345            run_meta,
1346            deprecated_keys: vec![],
1347        }
1348    }
1349
1350    fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1351        let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1352        // MIGRATION: We used to just have the keys instead of a more structured
1353        // part.
1354        parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1355            RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1356                key: PartialBatchKey(key),
1357                encoded_size_bytes: 0,
1358                key_lower: vec![],
1359                structured_key_lower: None,
1360                stats: None,
1361                ts_rewrite: None,
1362                diffs_sum: None,
1363                format: None,
1364                schema_id: None,
1365                deprecated_schema_id: None,
1366            }))
1367        }));
1368        // We discard default metadatas from the proto above; re-add them here.
1369        let run_splits: Vec<usize> = proto.runs.into_rust()?;
1370        let num_runs = if parts.is_empty() {
1371            0
1372        } else {
1373            run_splits.len() + 1
1374        };
1375        let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1376        run_meta.resize(num_runs, RunMeta::default());
1377        Ok(HollowBatch {
1378            desc: proto.desc.into_rust_if_some("desc")?,
1379            parts,
1380            len: proto.len.into_rust()?,
1381            run_splits,
1382            run_meta,
1383        })
1384    }
1385}
1386
1387impl RustType<ProtoRunMeta> for RunMeta {
1388    fn into_proto(&self) -> ProtoRunMeta {
1389        let order = match self.order {
1390            None => ProtoRunOrder::Unknown,
1391            Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1392            Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1393            Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1394        };
1395        ProtoRunMeta {
1396            order: order.into(),
1397            schema_id: self.schema.into_proto(),
1398            deprecated_schema_id: self.deprecated_schema.into_proto(),
1399        }
1400    }
1401
1402    fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1403        let order = match ProtoRunOrder::try_from(proto.order)? {
1404            ProtoRunOrder::Unknown => None,
1405            ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1406            ProtoRunOrder::Codec => Some(RunOrder::Codec),
1407            ProtoRunOrder::Structured => Some(RunOrder::Structured),
1408        };
1409        Ok(Self {
1410            order,
1411            schema: proto.schema_id.into_rust()?,
1412            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1413        })
1414    }
1415}
1416
1417impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1418    fn into_proto(&self) -> ProtoHollowBatchPart {
1419        match self {
1420            RunPart::Single(part) => part.into_proto(),
1421            RunPart::Many(runs) => runs.into_proto(),
1422        }
1423    }
1424
1425    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1426        let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1427            RunPart::Many(proto.into_rust()?)
1428        } else {
1429            RunPart::Single(proto.into_rust()?)
1430        };
1431        Ok(run_part)
1432    }
1433}
1434
1435impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1436    fn into_proto(&self) -> ProtoHollowBatchPart {
1437        let part = ProtoHollowBatchPart {
1438            kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1439                key: self.key.into_proto(),
1440                max_part_bytes: self.max_part_bytes.into_proto(),
1441            })),
1442            encoded_size_bytes: self.hollow_bytes.into_proto(),
1443            key_lower: Bytes::copy_from_slice(&self.key_lower),
1444            diffs_sum: None,
1445            key_stats: None,
1446            ts_rewrite: None,
1447            format: None,
1448            schema_id: None,
1449            structured_key_lower: self.structured_key_lower.into_proto(),
1450            deprecated_schema_id: None,
1451        };
1452        part
1453    }
1454
1455    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1456        let run_proto = match proto.kind {
1457            Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1458            _ => Err(TryFromProtoError::UnknownEnumVariant(
1459                "ProtoHollowBatchPart::kind".to_string(),
1460            ))?,
1461        };
1462        Ok(Self {
1463            key: run_proto.key.into_rust()?,
1464            hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1465            max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1466            key_lower: proto.key_lower.to_vec(),
1467            structured_key_lower: proto.structured_key_lower.into_rust()?,
1468            _phantom_data: Default::default(),
1469        })
1470    }
1471}
1472
1473impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1474    fn into_proto(&self) -> ProtoHollowBatchPart {
1475        match self {
1476            BatchPart::Hollow(x) => ProtoHollowBatchPart {
1477                kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1478                encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1479                key_lower: Bytes::copy_from_slice(&x.key_lower),
1480                structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1481                key_stats: x.stats.into_proto(),
1482                ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1483                diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1484                format: x.format.map(|f| f.into_proto()),
1485                schema_id: x.schema_id.into_proto(),
1486                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1487            },
1488            BatchPart::Inline {
1489                updates,
1490                ts_rewrite,
1491                schema_id,
1492                deprecated_schema_id,
1493            } => ProtoHollowBatchPart {
1494                kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1495                encoded_size_bytes: 0,
1496                key_lower: Bytes::new(),
1497                structured_key_lower: None,
1498                key_stats: None,
1499                ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1500                diffs_sum: None,
1501                format: None,
1502                schema_id: schema_id.into_proto(),
1503                deprecated_schema_id: deprecated_schema_id.into_proto(),
1504            },
1505        }
1506    }
1507
1508    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1509        let ts_rewrite = match proto.ts_rewrite {
1510            Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1511            None => None,
1512        };
1513        let schema_id = proto.schema_id.into_rust()?;
1514        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1515        match proto.kind {
1516            Some(proto_hollow_batch_part::Kind::Key(key)) => {
1517                Ok(BatchPart::Hollow(HollowBatchPart {
1518                    key: key.into_rust()?,
1519                    encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1520                    key_lower: proto.key_lower.into(),
1521                    structured_key_lower: proto.structured_key_lower.into_rust()?,
1522                    stats: proto.key_stats.into_rust()?,
1523                    ts_rewrite,
1524                    diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1525                    format: proto.format.map(|f| f.into_rust()).transpose()?,
1526                    schema_id,
1527                    deprecated_schema_id,
1528                }))
1529            }
1530            Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1531                assert_eq!(proto.encoded_size_bytes, 0);
1532                assert_eq!(proto.key_lower.len(), 0);
1533                assert_none!(proto.key_stats);
1534                assert_none!(proto.diffs_sum);
1535                let updates = LazyInlineBatchPart(x.into_rust()?);
1536                Ok(BatchPart::Inline {
1537                    updates,
1538                    ts_rewrite,
1539                    schema_id,
1540                    deprecated_schema_id,
1541                })
1542            }
1543            _ => Err(TryFromProtoError::unknown_enum_variant(
1544                "ProtoHollowBatchPart::kind",
1545            )),
1546        }
1547    }
1548}
1549
1550impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1551    fn into_proto(&self) -> proto_hollow_batch_part::Format {
1552        match self {
1553            BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1554            BatchColumnarFormat::Both(version) => {
1555                proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1556            }
1557            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1558        }
1559    }
1560
1561    fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1562        let format = match proto {
1563            proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1564            proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1565                BatchColumnarFormat::Both(version.cast_into())
1566            }
1567            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1568        };
1569        Ok(format)
1570    }
1571}
1572
1573/// Aggregate statistics about data contained in a part.
1574///
1575/// These are "lazy" in the sense that we don't decode them (or even validate
1576/// the encoded version) until they're used.
1577#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1578pub struct LazyPartStats {
1579    key: LazyProto<ProtoStructStats>,
1580}
1581
1582impl LazyPartStats {
1583    pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1584        let PartStats { key } = x;
1585        let mut proto_stats = ProtoStructStats::from_rust(key);
1586        map_proto(&mut proto_stats);
1587        LazyPartStats {
1588            key: LazyProto::from(&proto_stats),
1589        }
1590    }
1591    /// Decodes and returns PartStats from the encoded representation.
1592    ///
1593    /// This does not cache the returned value, it decodes each time it's
1594    /// called.
1595    pub fn decode(&self) -> PartStats {
1596        let key = self.key.decode().expect("valid proto");
1597        PartStats {
1598            key: key.into_rust().expect("valid stats"),
1599        }
1600    }
1601}
1602
1603impl RustType<Bytes> for LazyPartStats {
1604    fn into_proto(&self) -> Bytes {
1605        let LazyPartStats { key } = self;
1606        key.into_proto()
1607    }
1608
1609    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1610        Ok(LazyPartStats {
1611            key: proto.into_rust()?,
1612        })
1613    }
1614}
1615
1616#[cfg(test)]
1617pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1618    proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1619}
1620
1621#[allow(unused_parens)]
1622impl Arbitrary for LazyPartStats {
1623    type Parameters = ();
1624    type Strategy =
1625        proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1626
1627    fn arbitrary_with(_: ()) -> Self::Strategy {
1628        Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1629            LazyPartStats::encode(&x, |_| {})
1630        })
1631    }
1632}
1633
1634impl ProtoInlineBatchPart {
1635    pub(crate) fn into_rust<T: Timestamp + Codec64>(
1636        lgbytes: &ColumnarMetrics,
1637        proto: Self,
1638    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1639        let updates = proto
1640            .updates
1641            .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1642        let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1643
1644        Ok(BlobTraceBatchPart {
1645            desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1646            index: proto.index.into_rust()?,
1647            updates,
1648        })
1649    }
1650}
1651
1652/// A batch part stored inlined in State.
1653#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1654pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1655
1656impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1657    fn from(value: &ProtoInlineBatchPart) -> Self {
1658        LazyInlineBatchPart(value.into())
1659    }
1660}
1661
1662impl Serialize for LazyInlineBatchPart {
1663    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1664        // NB: This serialize impl is only used for QA and debugging, so emit a
1665        // truncated version.
1666        let proto = self.0.decode().expect("valid proto");
1667        let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1668        let () = s.serialize_field("desc", &proto.desc)?;
1669        let () = s.serialize_field("index", &proto.index)?;
1670        let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1671        s.end()
1672    }
1673}
1674
1675impl LazyInlineBatchPart {
1676    pub(crate) fn encoded_size_bytes(&self) -> usize {
1677        self.0.buf.len()
1678    }
1679
1680    /// Decodes and returns a BlobTraceBatchPart from the encoded
1681    /// representation.
1682    ///
1683    /// This does not cache the returned value, it decodes each time it's
1684    /// called.
1685    pub fn decode<T: Timestamp + Codec64>(
1686        &self,
1687        lgbytes: &ColumnarMetrics,
1688    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1689        let proto = self.0.decode().expect("valid proto");
1690        ProtoInlineBatchPart::into_rust(lgbytes, proto)
1691    }
1692}
1693
1694impl RustType<Bytes> for LazyInlineBatchPart {
1695    fn into_proto(&self) -> Bytes {
1696        self.0.into_proto()
1697    }
1698
1699    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1700        Ok(LazyInlineBatchPart(proto.into_rust()?))
1701    }
1702}
1703
1704impl RustType<ProtoHollowRollup> for HollowRollup {
1705    fn into_proto(&self) -> ProtoHollowRollup {
1706        ProtoHollowRollup {
1707            key: self.key.into_proto(),
1708            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1709        }
1710    }
1711
1712    fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1713        Ok(HollowRollup {
1714            key: proto.key.into_rust()?,
1715            encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1716        })
1717    }
1718}
1719
1720impl RustType<ProtoActiveRollup> for ActiveRollup {
1721    fn into_proto(&self) -> ProtoActiveRollup {
1722        ProtoActiveRollup {
1723            start_ms: self.start_ms,
1724            seqno: self.seqno.into_proto(),
1725        }
1726    }
1727
1728    fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1729        Ok(ActiveRollup {
1730            start_ms: proto.start_ms,
1731            seqno: proto.seqno.into_rust()?,
1732        })
1733    }
1734}
1735
1736impl RustType<ProtoActiveGc> for ActiveGc {
1737    fn into_proto(&self) -> ProtoActiveGc {
1738        ProtoActiveGc {
1739            start_ms: self.start_ms,
1740            seqno: self.seqno.into_proto(),
1741        }
1742    }
1743
1744    fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1745        Ok(ActiveGc {
1746            start_ms: proto.start_ms,
1747            seqno: proto.seqno.into_rust()?,
1748        })
1749    }
1750}
1751
1752impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1753    fn into_proto(&self) -> ProtoU64Description {
1754        ProtoU64Description {
1755            lower: Some(self.lower().into_proto()),
1756            upper: Some(self.upper().into_proto()),
1757            since: Some(self.since().into_proto()),
1758        }
1759    }
1760
1761    fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1762        Ok(Description::new(
1763            proto.lower.into_rust_if_some("lower")?,
1764            proto.upper.into_rust_if_some("upper")?,
1765            proto.since.into_rust_if_some("since")?,
1766        ))
1767    }
1768}
1769
1770impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1771    fn into_proto(&self) -> ProtoU64Antichain {
1772        ProtoU64Antichain {
1773            elements: self
1774                .elements()
1775                .iter()
1776                .map(|x| i64::from_le_bytes(T::encode(x)))
1777                .collect(),
1778        }
1779    }
1780
1781    fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1782        let elements = proto
1783            .elements
1784            .iter()
1785            .map(|x| T::decode(x.to_le_bytes()))
1786            .collect::<Vec<_>>();
1787        Ok(Antichain::from(elements))
1788    }
1789}
1790
1791#[cfg(test)]
1792mod tests {
1793    use bytes::Bytes;
1794    use mz_build_info::DUMMY_BUILD_INFO;
1795    use mz_dyncfg::ConfigUpdates;
1796    use mz_ore::assert_err;
1797    use mz_persist::location::SeqNo;
1798    use proptest::prelude::*;
1799
1800    use crate::ShardId;
1801    use crate::internal::paths::PartialRollupKey;
1802    use crate::internal::state::tests::any_state;
1803    use crate::internal::state::{BatchPart, HandleDebugState};
1804    use crate::internal::state_diff::StateDiff;
1805    use crate::tests::new_test_client_cache;
1806
1807    use super::*;
1808
1809    #[mz_ore::test]
1810    fn applier_version_state() {
1811        let v1 = semver::Version::new(1, 0, 0);
1812        let v2 = semver::Version::new(2, 0, 0);
1813        let v3 = semver::Version::new(3, 0, 0);
1814
1815        // Code version v2 evaluates and writes out some State.
1816        let shard_id = ShardId::new();
1817        let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1818        let rollup =
1819            Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1820        let mut buf = Vec::new();
1821        rollup.encode(&mut buf).expect("serializable");
1822        let bytes = Bytes::from(buf);
1823
1824        // We can read it back using persist code v2 and v3.
1825        assert_eq!(
1826            UntypedState::<u64>::decode(&v2, bytes.clone())
1827                .check_codecs(&shard_id)
1828                .as_ref(),
1829            Ok(&state)
1830        );
1831        assert_eq!(
1832            UntypedState::<u64>::decode(&v3, bytes.clone())
1833                .check_codecs(&shard_id)
1834                .as_ref(),
1835            Ok(&state)
1836        );
1837
1838        // But we can't read it back using v1 because v1 might corrupt it by
1839        // losing or misinterpreting something written out by a future version
1840        // of code.
1841        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1842        let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1843        assert_err!(v1_res);
1844    }
1845
1846    #[mz_ore::test]
1847    fn applier_version_state_diff() {
1848        let v1 = semver::Version::new(1, 0, 0);
1849        let v2 = semver::Version::new(2, 0, 0);
1850        let v3 = semver::Version::new(3, 0, 0);
1851
1852        // Code version v2 evaluates and writes out some State.
1853        let diff = StateDiff::<u64>::new(
1854            v2.clone(),
1855            SeqNo(0),
1856            SeqNo(1),
1857            2,
1858            PartialRollupKey("rollup".into()),
1859        );
1860        let mut buf = Vec::new();
1861        diff.encode(&mut buf);
1862        let bytes = Bytes::from(buf);
1863
1864        // We can read it back using persist code v2 and v3.
1865        assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1866        assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1867
1868        // But we can't read it back using v1 because v1 might corrupt it by
1869        // losing or misinterpreting something written out by a future version
1870        // of code.
1871        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1872        let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1873        assert_err!(v1_res);
1874    }
1875
1876    #[mz_ore::test]
1877    fn hollow_batch_migration_keys() {
1878        let x = HollowBatch::new_run(
1879            Description::new(
1880                Antichain::from_elem(1u64),
1881                Antichain::from_elem(2u64),
1882                Antichain::from_elem(3u64),
1883            ),
1884            vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1885                key: PartialBatchKey("a".into()),
1886                encoded_size_bytes: 5,
1887                key_lower: vec![],
1888                structured_key_lower: None,
1889                stats: None,
1890                ts_rewrite: None,
1891                diffs_sum: None,
1892                format: None,
1893                schema_id: None,
1894                deprecated_schema_id: None,
1895            }))],
1896            4,
1897        );
1898        let mut old = x.into_proto();
1899        // Old ProtoHollowBatch had keys instead of parts.
1900        old.deprecated_keys = vec!["b".into()];
1901        // We don't expect to see a ProtoHollowBatch with keys _and_ parts, only
1902        // one or the other, but we have a defined output, so may as well test
1903        // it.
1904        let mut expected = x;
1905        // We fill in 0 for encoded_size_bytes when we migrate from keys. This
1906        // will violate bounded memory usage compaction during the transition
1907        // (short-term issue), but that's better than creating unnecessary runs
1908        // (longer-term issue).
1909        expected
1910            .parts
1911            .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1912                key: PartialBatchKey("b".into()),
1913                encoded_size_bytes: 0,
1914                key_lower: vec![],
1915                structured_key_lower: None,
1916                stats: None,
1917                ts_rewrite: None,
1918                diffs_sum: None,
1919                format: None,
1920                schema_id: None,
1921                deprecated_schema_id: None,
1922            })));
1923        assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1924    }
1925
1926    #[mz_ore::test]
1927    fn reader_state_migration_lease_duration() {
1928        let x = LeasedReaderState {
1929            seqno: SeqNo(1),
1930            since: Antichain::from_elem(2u64),
1931            last_heartbeat_timestamp_ms: 3,
1932            debug: HandleDebugState {
1933                hostname: "host".to_owned(),
1934                purpose: "purpose".to_owned(),
1935            },
1936            // Old ProtoReaderState had no lease_duration_ms field
1937            lease_duration_ms: 0,
1938        };
1939        let old = x.into_proto();
1940        let mut expected = x;
1941        // We fill in DEFAULT_READ_LEASE_DURATION for lease_duration_ms when we
1942        // migrate from unset.
1943        expected.lease_duration_ms =
1944            u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1945        assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1946    }
1947
1948    #[mz_ore::test]
1949    fn writer_state_migration_most_recent_write() {
1950        let proto = ProtoWriterState {
1951            last_heartbeat_timestamp_ms: 1,
1952            lease_duration_ms: 2,
1953            // Old ProtoWriterState had no most_recent_write_token or
1954            // most_recent_write_upper.
1955            most_recent_write_token: "".into(),
1956            most_recent_write_upper: None,
1957            debug: Some(ProtoHandleDebugState {
1958                hostname: "host".to_owned(),
1959                purpose: "purpose".to_owned(),
1960            }),
1961        };
1962        let expected = WriterState {
1963            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1964            lease_duration_ms: proto.lease_duration_ms,
1965            most_recent_write_token: IdempotencyToken::SENTINEL,
1966            most_recent_write_upper: Antichain::from_elem(0),
1967            debug: HandleDebugState {
1968                hostname: "host".to_owned(),
1969                purpose: "purpose".to_owned(),
1970            },
1971        };
1972        assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
1973    }
1974
1975    #[mz_ore::test]
1976    fn state_migration_rollups() {
1977        let r1 = HollowRollup {
1978            key: PartialRollupKey("foo".to_owned()),
1979            encoded_size_bytes: None,
1980        };
1981        let r2 = HollowRollup {
1982            key: PartialRollupKey("bar".to_owned()),
1983            encoded_size_bytes: Some(2),
1984        };
1985        let shard_id = ShardId::new();
1986        let mut state = TypedState::<(), (), u64, i64>::new(
1987            DUMMY_BUILD_INFO.semver_version(),
1988            shard_id,
1989            "host".to_owned(),
1990            0,
1991        );
1992        state.state.collections.rollups.insert(SeqNo(2), r2.clone());
1993        let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
1994
1995        // Manually add the old rollup encoding.
1996        proto.deprecated_rollups.insert(1, r1.key.0.clone());
1997
1998        let state: Rollup<u64> = proto.into_rust().unwrap();
1999        let state = state.state;
2000        let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2001        let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2002        assert_eq!(
2003            state
2004                .state
2005                .collections
2006                .rollups
2007                .into_iter()
2008                .collect::<Vec<_>>(),
2009            expected
2010        );
2011    }
2012
2013    #[mz_persist_proc::test(tokio::test)]
2014    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2015    async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2016        let r1_rollup = HollowRollup {
2017            key: PartialRollupKey("foo".to_owned()),
2018            encoded_size_bytes: None,
2019        };
2020        let r1 = StateFieldDiff {
2021            key: SeqNo(1),
2022            val: StateFieldValDiff::Insert(r1_rollup.clone()),
2023        };
2024        let r2_rollup = HollowRollup {
2025            key: PartialRollupKey("bar".to_owned()),
2026            encoded_size_bytes: Some(2),
2027        };
2028        let r2 = StateFieldDiff {
2029            key: SeqNo(2),
2030            val: StateFieldValDiff::Insert(r2_rollup.clone()),
2031        };
2032        let r3_rollup = HollowRollup {
2033            key: PartialRollupKey("baz".to_owned()),
2034            encoded_size_bytes: None,
2035        };
2036        let r3 = StateFieldDiff {
2037            key: SeqNo(3),
2038            val: StateFieldValDiff::Delete(r3_rollup.clone()),
2039        };
2040        let mut diff = StateDiff::<u64>::new(
2041            DUMMY_BUILD_INFO.semver_version(),
2042            SeqNo(4),
2043            SeqNo(5),
2044            0,
2045            PartialRollupKey("ignored".to_owned()),
2046        );
2047        diff.rollups.push(r2.clone());
2048        diff.rollups.push(r3.clone());
2049        let mut diff_proto = diff.into_proto();
2050
2051        let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2052        let mut field_diffs_writer = field_diffs.into_writer();
2053
2054        // Manually add the old rollup encoding.
2055        field_diffs_into_proto(
2056            ProtoStateField::DeprecatedRollups,
2057            &[StateFieldDiff {
2058                key: r1.key,
2059                val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2060            }],
2061            &mut field_diffs_writer,
2062        );
2063
2064        assert_none!(diff_proto.field_diffs);
2065        diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2066
2067        let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2068        assert_eq!(
2069            diff.rollups.into_iter().collect::<Vec<_>>(),
2070            vec![r2, r3, r1]
2071        );
2072
2073        // Also make sure that a rollup delete in a diff applies cleanly to a
2074        // state that had it in the deprecated field.
2075        let shard_id = ShardId::new();
2076        let mut state = TypedState::<(), (), u64, i64>::new(
2077            DUMMY_BUILD_INFO.semver_version(),
2078            shard_id,
2079            "host".to_owned(),
2080            0,
2081        );
2082        state.state.seqno = SeqNo(4);
2083        let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2084        rollup
2085            .deprecated_rollups
2086            .insert(3, r3_rollup.key.into_proto());
2087        let state: Rollup<u64> = rollup.into_rust().unwrap();
2088        let state = state.state;
2089        let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2090        let cache = new_test_client_cache(&dyncfgs);
2091        let encoded_diff = VersionedData {
2092            seqno: SeqNo(5),
2093            data: diff_proto.encode_to_vec().into(),
2094        };
2095        state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2096        assert_eq!(
2097            state
2098                .state
2099                .collections
2100                .rollups
2101                .into_iter()
2102                .collect::<Vec<_>>(),
2103            vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2104        );
2105    }
2106
2107    #[mz_ore::test]
2108    #[cfg_attr(miri, ignore)] // too slow
2109    fn state_proto_roundtrip() {
2110        fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2111            let before = UntypedState {
2112                key_codec: <() as Codec>::codec_name(),
2113                val_codec: <() as Codec>::codec_name(),
2114                ts_codec: <T as Codec64>::codec_name(),
2115                diff_codec: <i64 as Codec64>::codec_name(),
2116                state,
2117            };
2118            let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2119            let after: Rollup<T> = proto.into_rust().unwrap();
2120            let after = after.state;
2121            assert_eq!(before, after);
2122        }
2123
2124        proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2125    }
2126
2127    #[mz_ore::test]
2128    fn check_data_versions_with_lts_versions() {
2129        #[track_caller]
2130        fn testcase(code: &str, data: &str, lts_versions: &[Version], expected: Result<(), ()>) {
2131            let code = Version::parse(code).unwrap();
2132            let data = Version::parse(data).unwrap();
2133            let actual = cfg::check_data_version_with_lts_versions(&code, &data, lts_versions)
2134                .map_err(|_| ());
2135            assert_eq!(actual, expected);
2136        }
2137
2138        let none = [];
2139        let one = [Version::new(0, 130, 0)];
2140        let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2141        let three = [
2142            Version::new(0, 130, 0),
2143            Version::new(0, 140, 0),
2144            Version::new(0, 150, 0),
2145        ];
2146
2147        testcase("0.130.0", "0.128.0", &none, Ok(()));
2148        testcase("0.130.0", "0.129.0", &none, Ok(()));
2149        testcase("0.130.0", "0.130.0", &none, Ok(()));
2150        testcase("0.130.0", "0.130.1", &none, Ok(()));
2151        testcase("0.130.1", "0.130.0", &none, Ok(()));
2152        testcase("0.130.0", "0.131.0", &none, Ok(()));
2153        testcase("0.130.0", "0.132.0", &none, Err(()));
2154
2155        testcase("0.129.0", "0.127.0", &none, Ok(()));
2156        testcase("0.129.0", "0.128.0", &none, Ok(()));
2157        testcase("0.129.0", "0.129.0", &none, Ok(()));
2158        testcase("0.129.0", "0.129.1", &none, Ok(()));
2159        testcase("0.129.1", "0.129.0", &none, Ok(()));
2160        testcase("0.129.0", "0.130.0", &none, Ok(()));
2161        testcase("0.129.0", "0.131.0", &none, Err(()));
2162
2163        testcase("0.130.0", "0.128.0", &one, Ok(()));
2164        testcase("0.130.0", "0.129.0", &one, Ok(()));
2165        testcase("0.130.0", "0.130.0", &one, Ok(()));
2166        testcase("0.130.0", "0.130.1", &one, Ok(()));
2167        testcase("0.130.1", "0.130.0", &one, Ok(()));
2168        testcase("0.130.0", "0.131.0", &one, Ok(()));
2169        testcase("0.130.0", "0.132.0", &one, Ok(()));
2170
2171        testcase("0.129.0", "0.127.0", &one, Ok(()));
2172        testcase("0.129.0", "0.128.0", &one, Ok(()));
2173        testcase("0.129.0", "0.129.0", &one, Ok(()));
2174        testcase("0.129.0", "0.129.1", &one, Ok(()));
2175        testcase("0.129.1", "0.129.0", &one, Ok(()));
2176        testcase("0.129.0", "0.130.0", &one, Ok(()));
2177        testcase("0.129.0", "0.131.0", &one, Err(()));
2178
2179        testcase("0.131.0", "0.129.0", &one, Ok(()));
2180        testcase("0.131.0", "0.130.0", &one, Ok(()));
2181        testcase("0.131.0", "0.131.0", &one, Ok(()));
2182        testcase("0.131.0", "0.131.1", &one, Ok(()));
2183        testcase("0.131.1", "0.131.0", &one, Ok(()));
2184        testcase("0.131.0", "0.132.0", &one, Ok(()));
2185        testcase("0.131.0", "0.133.0", &one, Err(()));
2186
2187        testcase("0.130.0", "0.128.0", &two, Ok(()));
2188        testcase("0.130.0", "0.129.0", &two, Ok(()));
2189        testcase("0.130.0", "0.130.0", &two, Ok(()));
2190        testcase("0.130.0", "0.130.1", &two, Ok(()));
2191        testcase("0.130.1", "0.130.0", &two, Ok(()));
2192        testcase("0.130.0", "0.131.0", &two, Ok(()));
2193        testcase("0.130.0", "0.132.0", &two, Ok(()));
2194        testcase("0.130.0", "0.135.0", &two, Ok(()));
2195        testcase("0.130.0", "0.138.0", &two, Ok(()));
2196        testcase("0.130.0", "0.139.0", &two, Ok(()));
2197        testcase("0.130.0", "0.140.0", &two, Ok(()));
2198        testcase("0.130.9", "0.140.0", &two, Ok(()));
2199        testcase("0.130.0", "0.140.1", &two, Ok(()));
2200        testcase("0.130.3", "0.140.1", &two, Ok(()));
2201        testcase("0.130.3", "0.140.9", &two, Ok(()));
2202        testcase("0.130.0", "0.141.0", &two, Err(()));
2203        testcase("0.129.0", "0.133.0", &two, Err(()));
2204        testcase("0.129.0", "0.140.0", &two, Err(()));
2205        testcase("0.131.0", "0.133.0", &two, Err(()));
2206        testcase("0.131.0", "0.140.0", &two, Err(()));
2207
2208        testcase("0.130.0", "0.128.0", &three, Ok(()));
2209        testcase("0.130.0", "0.129.0", &three, Ok(()));
2210        testcase("0.130.0", "0.130.0", &three, Ok(()));
2211        testcase("0.130.0", "0.130.1", &three, Ok(()));
2212        testcase("0.130.1", "0.130.0", &three, Ok(()));
2213        testcase("0.130.0", "0.131.0", &three, Ok(()));
2214        testcase("0.130.0", "0.132.0", &three, Ok(()));
2215        testcase("0.130.0", "0.135.0", &three, Ok(()));
2216        testcase("0.130.0", "0.138.0", &three, Ok(()));
2217        testcase("0.130.0", "0.139.0", &three, Ok(()));
2218        testcase("0.130.0", "0.140.0", &three, Ok(()));
2219        testcase("0.130.9", "0.140.0", &three, Ok(()));
2220        testcase("0.130.0", "0.140.1", &three, Ok(()));
2221        testcase("0.130.3", "0.140.1", &three, Ok(()));
2222        testcase("0.130.3", "0.140.9", &three, Ok(()));
2223        testcase("0.130.0", "0.141.0", &three, Err(()));
2224        testcase("0.129.0", "0.133.0", &three, Err(()));
2225        testcase("0.129.0", "0.140.0", &three, Err(()));
2226        testcase("0.131.0", "0.133.0", &three, Err(()));
2227        testcase("0.131.0", "0.140.0", &three, Err(()));
2228        testcase("0.130.0", "0.150.0", &three, Err(()));
2229
2230        testcase("0.140.0", "0.138.0", &three, Ok(()));
2231        testcase("0.140.0", "0.139.0", &three, Ok(()));
2232        testcase("0.140.0", "0.140.0", &three, Ok(()));
2233        testcase("0.140.0", "0.140.1", &three, Ok(()));
2234        testcase("0.140.1", "0.140.0", &three, Ok(()));
2235        testcase("0.140.0", "0.141.0", &three, Ok(()));
2236        testcase("0.140.0", "0.142.0", &three, Ok(()));
2237        testcase("0.140.0", "0.145.0", &three, Ok(()));
2238        testcase("0.140.0", "0.148.0", &three, Ok(()));
2239        testcase("0.140.0", "0.149.0", &three, Ok(()));
2240        testcase("0.140.0", "0.150.0", &three, Ok(()));
2241        testcase("0.140.9", "0.150.0", &three, Ok(()));
2242        testcase("0.140.0", "0.150.1", &three, Ok(()));
2243        testcase("0.140.3", "0.150.1", &three, Ok(()));
2244        testcase("0.140.3", "0.150.9", &three, Ok(()));
2245        testcase("0.140.0", "0.151.0", &three, Err(()));
2246        testcase("0.139.0", "0.143.0", &three, Err(()));
2247        testcase("0.139.0", "0.150.0", &three, Err(()));
2248        testcase("0.141.0", "0.143.0", &three, Err(()));
2249        testcase("0.141.0", "0.150.0", &three, Err(()));
2250
2251        testcase("0.150.0", "0.148.0", &three, Ok(()));
2252        testcase("0.150.0", "0.149.0", &three, Ok(()));
2253        testcase("0.150.0", "0.150.0", &three, Ok(()));
2254        testcase("0.150.0", "0.150.1", &three, Ok(()));
2255        testcase("0.150.1", "0.150.0", &three, Ok(()));
2256        testcase("0.150.0", "0.151.0", &three, Ok(()));
2257        testcase("0.150.0", "0.152.0", &three, Ok(()));
2258        testcase("0.150.0", "0.155.0", &three, Ok(()));
2259        testcase("0.150.0", "0.158.0", &three, Ok(()));
2260        testcase("0.150.0", "0.159.0", &three, Ok(()));
2261        testcase("0.150.0", "0.160.0", &three, Ok(()));
2262        testcase("0.150.9", "0.160.0", &three, Ok(()));
2263        testcase("0.150.0", "0.160.1", &three, Ok(()));
2264        testcase("0.150.3", "0.160.1", &three, Ok(()));
2265        testcase("0.150.3", "0.160.9", &three, Ok(()));
2266        testcase("0.150.0", "0.161.0", &three, Ok(()));
2267        testcase("0.149.0", "0.153.0", &three, Err(()));
2268        testcase("0.149.0", "0.160.0", &three, Err(()));
2269        testcase("0.151.0", "0.153.0", &three, Err(()));
2270        testcase("0.151.0", "0.160.0", &three, Err(()));
2271    }
2272
2273    #[mz_ore::test]
2274    fn check_data_versions() {
2275        #[track_caller]
2276        fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2277            let code = Version::parse(code).unwrap();
2278            let data = Version::parse(data).unwrap();
2279            #[allow(clippy::disallowed_methods)]
2280            let actual =
2281                std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2282            assert_eq!(actual, expected);
2283        }
2284
2285        testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2286        testcase("0.10.0-dev", "0.10.0", Ok(()));
2287        // Note: Probably useful to let tests use two arbitrary shas on main, at
2288        // the very least for things like git bisect.
2289        testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2290        testcase("0.10.0-dev", "0.11.0", Ok(()));
2291        testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2292        testcase("0.10.0-dev", "0.12.0", Err(()));
2293        testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2294
2295        testcase("0.10.0", "0.8.0-dev", Ok(()));
2296        testcase("0.10.0", "0.8.0", Ok(()));
2297        testcase("0.10.0", "0.9.0-dev", Ok(()));
2298        testcase("0.10.0", "0.9.0", Ok(()));
2299        testcase("0.10.0", "0.10.0-dev", Ok(()));
2300        testcase("0.10.0", "0.10.0", Ok(()));
2301        // Note: This is what it would look like to run a version of the catalog
2302        // upgrade checker built from main.
2303        testcase("0.10.0", "0.11.0-dev", Ok(()));
2304        testcase("0.10.0", "0.11.0", Ok(()));
2305        testcase("0.10.0", "0.11.1", Ok(()));
2306        testcase("0.10.0", "0.11.1000000", Ok(()));
2307        testcase("0.10.0", "0.12.0-dev", Err(()));
2308        testcase("0.10.0", "0.12.0", Err(()));
2309        testcase("0.10.0", "0.13.0-dev", Err(()));
2310
2311        testcase("0.10.1", "0.9.0", Ok(()));
2312        testcase("0.10.1", "0.10.0", Ok(()));
2313        testcase("0.10.1", "0.11.0", Ok(()));
2314        testcase("0.10.1", "0.11.1", Ok(()));
2315        testcase("0.10.1", "0.11.100", Ok(()));
2316
2317        // This is probably a bad idea (seems as if we've downgraded from
2318        // running v0.10.1 to v0.10.0, an earlier patch version of the same
2319        // minor version), but not much we can do, given the `state_version =
2320        // max(code_version, prev_state_version)` logic we need to prevent
2321        // rolling back an arbitrary number of versions.
2322        testcase("0.10.0", "0.10.1", Ok(()));
2323    }
2324}