1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44    ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45    HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46    LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47    ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48    ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49    ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50    ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51    ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52    ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53    RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54    proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57    ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60    ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68    pub id: Option<SchemaId>,
73    pub key: Arc<K::Schema>,
75    pub val: Arc<V::Schema>,
77}
78
79impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
80    fn clone(&self) -> Self {
81        Self {
82            id: self.id,
83            key: Arc::clone(&self.key),
84            val: Arc::clone(&self.val),
85        }
86    }
87}
88
89#[derive(Clone, Serialize, Deserialize)]
112pub struct LazyProto<T> {
113    buf: Bytes,
114    _phantom: PhantomData<fn() -> T>,
115}
116
117impl<T: Message + Default> Debug for LazyProto<T> {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        match self.decode() {
120            Ok(proto) => Debug::fmt(&proto, f),
121            Err(err) => f
122                .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
123                .field("err", &err)
124                .finish(),
125        }
126    }
127}
128
129impl<T> PartialEq for LazyProto<T> {
130    fn eq(&self, other: &Self) -> bool {
131        self.cmp(other) == Ordering::Equal
132    }
133}
134
135impl<T> Eq for LazyProto<T> {}
136
137impl<T> PartialOrd for LazyProto<T> {
138    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139        Some(self.cmp(other))
140    }
141}
142
143impl<T> Ord for LazyProto<T> {
144    fn cmp(&self, other: &Self) -> Ordering {
145        let LazyProto {
146            buf: self_buf,
147            _phantom: _,
148        } = self;
149        let LazyProto {
150            buf: other_buf,
151            _phantom: _,
152        } = other;
153        self_buf.cmp(other_buf)
154    }
155}
156
157impl<T> Hash for LazyProto<T> {
158    fn hash<H: Hasher>(&self, state: &mut H) {
159        let LazyProto { buf, _phantom } = self;
160        buf.hash(state);
161    }
162}
163
164impl<T: Message + Default> From<&T> for LazyProto<T> {
165    fn from(value: &T) -> Self {
166        let buf = Bytes::from(value.encode_to_vec());
167        LazyProto {
168            buf,
169            _phantom: PhantomData,
170        }
171    }
172}
173
174impl<T: Message + Default> LazyProto<T> {
175    pub fn decode(&self) -> Result<T, prost::DecodeError> {
176        T::decode(&*self.buf)
177    }
178
179    pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
180        Ok(T::decode(&*self.buf)?.into_rust()?)
181    }
182}
183
184impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
185    fn into_proto(&self) -> Bytes {
186        self.buf.clone()
187    }
188
189    fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
190        Ok(Self {
191            buf,
192            _phantom: PhantomData,
193        })
194    }
195}
196
197pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
198    let uuid_encoded = match encoded.strip_prefix(id_prefix) {
199        Some(x) => x,
200        None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
201    };
202    let uuid = Uuid::parse_str(uuid_encoded)
203        .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
204    Ok(*uuid.as_bytes())
205}
206
207pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
208    if let Err(msg) = cfg::check_data_version(code_version, data_version) {
209        if cfg!(test) {
212            panic!("{msg}");
213        } else {
214            halt!("{msg}");
215        }
216    }
217}
218
219impl RustType<String> for LeasedReaderId {
220    fn into_proto(&self) -> String {
221        self.to_string()
222    }
223
224    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
225        match proto.parse() {
226            Ok(x) => Ok(x),
227            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
228        }
229    }
230}
231
232impl RustType<String> for CriticalReaderId {
233    fn into_proto(&self) -> String {
234        self.to_string()
235    }
236
237    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
238        match proto.parse() {
239            Ok(x) => Ok(x),
240            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
241        }
242    }
243}
244
245impl RustType<String> for WriterId {
246    fn into_proto(&self) -> String {
247        self.to_string()
248    }
249
250    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
251        match proto.parse() {
252            Ok(x) => Ok(x),
253            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
254        }
255    }
256}
257
258impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
259    fn into_proto(&self) -> ProtoEncodedSchemas {
260        ProtoEncodedSchemas {
261            key: self.key.clone(),
262            key_data_type: self.key_data_type.clone(),
263            val: self.val.clone(),
264            val_data_type: self.val_data_type.clone(),
265        }
266    }
267
268    fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
269        Ok(EncodedSchemas {
270            key: proto.key,
271            key_data_type: proto.key_data_type,
272            val: proto.val,
273            val_data_type: proto.val_data_type,
274        })
275    }
276}
277
278impl RustType<String> for IdempotencyToken {
279    fn into_proto(&self) -> String {
280        self.to_string()
281    }
282
283    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
284        match proto.parse() {
285            Ok(x) => Ok(x),
286            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
287        }
288    }
289}
290
291impl RustType<String> for PartialBatchKey {
292    fn into_proto(&self) -> String {
293        self.0.clone()
294    }
295
296    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
297        Ok(PartialBatchKey(proto))
298    }
299}
300
301impl RustType<String> for PartialRollupKey {
302    fn into_proto(&self) -> String {
303        self.0.clone()
304    }
305
306    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
307        Ok(PartialRollupKey(proto))
308    }
309}
310
311impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
312    pub fn encode<B>(&self, buf: &mut B)
313    where
314        B: bytes::BufMut,
315    {
316        self.into_proto()
317            .encode(buf)
318            .expect("no required fields means no initialization errors");
319    }
320
321    pub fn decode(build_version: &Version, buf: Bytes) -> Self {
322        let proto = ProtoStateDiff::decode(buf)
323            .expect("internal error: invalid encoded state");
328        let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
329        check_data_version(build_version, &diff.applier_version);
330        diff
331    }
332}
333
334impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
335    fn into_proto(&self) -> ProtoStateDiff {
336        let StateDiff {
338            applier_version,
339            seqno_from,
340            seqno_to,
341            walltime_ms,
342            latest_rollup_key,
343            rollups,
344            active_rollup,
345            active_gc,
346            hostname,
347            last_gc_req,
348            leased_readers,
349            critical_readers,
350            writers,
351            schemas,
352            since,
353            legacy_batches,
354            hollow_batches,
355            spine_batches,
356            merges,
357        } = self;
358
359        let proto = ProtoStateFieldDiffs::default();
360
361        let mut writer = proto.into_writer();
363
364        field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
365        field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
366        field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
367        field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
368        field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
369        field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
370        field_diffs_into_proto(
371            ProtoStateField::CriticalReaders,
372            critical_readers,
373            &mut writer,
374        );
375        field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
376        field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
377        field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
378        field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
379        field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
380        field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
381        field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
382
383        let field_diffs = writer.into_proto();
385
386        debug_assert_eq!(field_diffs.validate(), Ok(()));
387        ProtoStateDiff {
388            applier_version: applier_version.to_string(),
389            seqno_from: seqno_from.into_proto(),
390            seqno_to: seqno_to.into_proto(),
391            walltime_ms: walltime_ms.into_proto(),
392            latest_rollup_key: latest_rollup_key.into_proto(),
393            field_diffs: Some(field_diffs),
394        }
395    }
396
397    fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
398        let applier_version = if proto.applier_version.is_empty() {
399            semver::Version::new(0, 0, 0)
403        } else {
404            semver::Version::parse(&proto.applier_version).map_err(|err| {
405                TryFromProtoError::InvalidSemverVersion(format!(
406                    "invalid applier_version {}: {}",
407                    proto.applier_version, err
408                ))
409            })?
410        };
411        let mut state_diff = StateDiff::new(
412            applier_version,
413            proto.seqno_from.into_rust()?,
414            proto.seqno_to.into_rust()?,
415            proto.walltime_ms,
416            proto.latest_rollup_key.into_rust()?,
417        );
418        if let Some(field_diffs) = proto.field_diffs {
419            debug_assert_eq!(field_diffs.validate(), Ok(()));
420            for field_diff in field_diffs.iter() {
421                let (field, diff) = field_diff?;
422                match field {
423                    ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
424                        diff,
425                        &mut state_diff.hostname,
426                        |()| Ok(()),
427                        |v| v.into_rust(),
428                    )?,
429                    ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
430                        diff,
431                        &mut state_diff.last_gc_req,
432                        |()| Ok(()),
433                        |v| v.into_rust(),
434                    )?,
435                    ProtoStateField::ActiveGc => {
436                        field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
437                            diff,
438                            &mut state_diff.active_gc,
439                            |()| Ok(()),
440                            |v| v.into_rust(),
441                        )?
442                    }
443                    ProtoStateField::ActiveRollup => {
444                        field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
445                            diff,
446                            &mut state_diff.active_rollup,
447                            |()| Ok(()),
448                            |v| v.into_rust(),
449                        )?
450                    }
451                    ProtoStateField::Rollups => {
452                        field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
453                            diff,
454                            &mut state_diff.rollups,
455                            |k| k.into_rust(),
456                            |v| v.into_rust(),
457                        )?
458                    }
459                    ProtoStateField::DeprecatedRollups => {
463                        field_diff_into_rust::<u64, String, _, _, _, _>(
464                            diff,
465                            &mut state_diff.rollups,
466                            |k| k.into_rust(),
467                            |v| {
468                                Ok(HollowRollup {
469                                    key: v.into_rust()?,
470                                    encoded_size_bytes: None,
471                                })
472                            },
473                        )?
474                    }
475                    ProtoStateField::LeasedReaders => {
476                        field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
477                            diff,
478                            &mut state_diff.leased_readers,
479                            |k| k.into_rust(),
480                            |v| v.into_rust(),
481                        )?
482                    }
483                    ProtoStateField::CriticalReaders => {
484                        field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
485                            diff,
486                            &mut state_diff.critical_readers,
487                            |k| k.into_rust(),
488                            |v| v.into_rust(),
489                        )?
490                    }
491                    ProtoStateField::Writers => {
492                        field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
493                            diff,
494                            &mut state_diff.writers,
495                            |k| k.into_rust(),
496                            |v| v.into_rust(),
497                        )?
498                    }
499                    ProtoStateField::Schemas => {
500                        field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
501                            diff,
502                            &mut state_diff.schemas,
503                            |k| k.into_rust(),
504                            |v| v.into_rust(),
505                        )?
506                    }
507                    ProtoStateField::Since => {
508                        field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
509                            diff,
510                            &mut state_diff.since,
511                            |()| Ok(()),
512                            |v| v.into_rust(),
513                        )?
514                    }
515                    ProtoStateField::LegacyBatches => {
516                        field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
517                            diff,
518                            &mut state_diff.legacy_batches,
519                            |k| k.into_rust(),
520                            |()| Ok(()),
521                        )?
522                    }
523                    ProtoStateField::HollowBatches => {
524                        field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
525                            diff,
526                            &mut state_diff.hollow_batches,
527                            |k| k.into_rust(),
528                            |v| v.into_rust(),
529                        )?
530                    }
531                    ProtoStateField::SpineBatches => {
532                        field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
533                            diff,
534                            &mut state_diff.spine_batches,
535                            |k| k.into_rust(),
536                            |v| v.into_rust(),
537                        )?
538                    }
539                    ProtoStateField::SpineMerges => {
540                        field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
541                            diff,
542                            &mut state_diff.merges,
543                            |k| k.into_rust(),
544                            |v| v.into_rust(),
545                        )?
546                    }
547                }
548            }
549        }
550        Ok(state_diff)
551    }
552}
553
554fn field_diffs_into_proto<K, KP, V, VP>(
555    field: ProtoStateField,
556    diffs: &[StateFieldDiff<K, V>],
557    writer: &mut ProtoStateFieldDiffsWriter,
558) where
559    KP: prost::Message,
560    K: RustType<KP>,
561    VP: prost::Message,
562    V: RustType<VP>,
563{
564    for diff in diffs.iter() {
565        field_diff_into_proto(field, diff, writer);
566    }
567}
568
569fn field_diff_into_proto<K, KP, V, VP>(
570    field: ProtoStateField,
571    diff: &StateFieldDiff<K, V>,
572    writer: &mut ProtoStateFieldDiffsWriter,
573) where
574    KP: prost::Message,
575    K: RustType<KP>,
576    VP: prost::Message,
577    V: RustType<VP>,
578{
579    writer.push_field(field);
580    writer.encode_proto(&diff.key.into_proto());
581    match &diff.val {
582        StateFieldValDiff::Insert(to) => {
583            writer.push_diff_type(ProtoStateFieldDiffType::Insert);
584            writer.encode_proto(&to.into_proto());
585        }
586        StateFieldValDiff::Update(from, to) => {
587            writer.push_diff_type(ProtoStateFieldDiffType::Update);
588            writer.encode_proto(&from.into_proto());
589            writer.encode_proto(&to.into_proto());
590        }
591        StateFieldValDiff::Delete(from) => {
592            writer.push_diff_type(ProtoStateFieldDiffType::Delete);
593            writer.encode_proto(&from.into_proto());
594        }
595    };
596}
597
598fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
599    proto: ProtoStateFieldDiff<'_>,
600    diffs: &mut Vec<StateFieldDiff<K, V>>,
601    k_fn: KFn,
602    v_fn: VFn,
603) -> Result<(), TryFromProtoError>
604where
605    KP: prost::Message + Default,
606    VP: prost::Message + Default,
607    KFn: Fn(KP) -> Result<K, TryFromProtoError>,
608    VFn: Fn(VP) -> Result<V, TryFromProtoError>,
609{
610    let val = match proto.diff_type {
611        ProtoStateFieldDiffType::Insert => {
612            let to = VP::decode(proto.to)
613                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
614            StateFieldValDiff::Insert(v_fn(to)?)
615        }
616        ProtoStateFieldDiffType::Update => {
617            let from = VP::decode(proto.from)
618                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
619            let to = VP::decode(proto.to)
620                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
621
622            StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
623        }
624        ProtoStateFieldDiffType::Delete => {
625            let from = VP::decode(proto.from)
626                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
627            StateFieldValDiff::Delete(v_fn(from)?)
628        }
629    };
630    let key = KP::decode(proto.key)
631        .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
632    diffs.push(StateFieldDiff {
633        key: k_fn(key)?,
634        val,
635    });
636    Ok(())
637}
638
639#[derive(Debug)]
642#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
643pub struct UntypedState<T> {
644    pub(crate) key_codec: String,
645    pub(crate) val_codec: String,
646    pub(crate) ts_codec: String,
647    pub(crate) diff_codec: String,
648
649    state: State<T>,
653}
654
655impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
656    pub fn seqno(&self) -> SeqNo {
657        self.state.seqno
658    }
659
660    pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
661        &self.state.collections.rollups
662    }
663
664    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
665        self.state.latest_rollup()
666    }
667
668    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
669        &mut self,
670        cfg: &PersistConfig,
671        metrics: &Metrics,
672        diffs: I,
673    ) {
674        if T::codec_name() != self.ts_codec {
678            return;
679        }
680        self.state.apply_encoded_diffs(cfg, metrics, diffs);
681    }
682
683    pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
684        self,
685        shard_id: &ShardId,
686    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
687        assert_eq!(shard_id, &self.state.shard_id);
690        if K::codec_name() != self.key_codec
691            || V::codec_name() != self.val_codec
692            || T::codec_name() != self.ts_codec
693            || D::codec_name() != self.diff_codec
694        {
695            return Err(Box::new(CodecMismatch {
696                requested: (
697                    K::codec_name(),
698                    V::codec_name(),
699                    T::codec_name(),
700                    D::codec_name(),
701                    None,
702                ),
703                actual: (
704                    self.key_codec,
705                    self.val_codec,
706                    self.ts_codec,
707                    self.diff_codec,
708                    None,
709                ),
710            }));
711        }
712        Ok(TypedState {
713            state: self.state,
714            _phantom: PhantomData,
715        })
716    }
717
718    pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
719        assert_eq!(shard_id, &self.state.shard_id);
722        if T::codec_name() != self.ts_codec {
723            return Err(CodecMismatchT {
724                requested: T::codec_name(),
725                actual: self.ts_codec,
726            });
727        }
728        Ok(self.state)
729    }
730
731    pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
732        let proto = ProtoRollup::decode(buf)
733            .expect("internal error: invalid encoded state");
738        let state = Rollup::from_proto(proto)
739            .expect("internal error: invalid encoded state")
740            .state;
741        check_data_version(build_version, &state.state.applier_version);
742        state
743    }
744}
745
746impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
747where
748    K: Codec,
749    V: Codec,
750    T: Codec64,
751    D: Codec64,
752{
753    fn from(typed_state: TypedState<K, V, T, D>) -> Self {
754        UntypedState {
755            key_codec: K::codec_name(),
756            val_codec: V::codec_name(),
757            ts_codec: T::codec_name(),
758            diff_codec: D::codec_name(),
759            state: typed_state.state,
760        }
761    }
762}
763
764#[derive(Debug)]
771pub struct Rollup<T> {
772    pub(crate) state: UntypedState<T>,
773    pub(crate) diffs: Option<InlinedDiffs>,
774}
775
776impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
777    pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
781        let latest_rollup_seqno = *state.latest_rollup().0;
782        let mut verify_seqno = latest_rollup_seqno;
783        for diff in &diffs {
784            assert_eq!(verify_seqno.next(), diff.seqno);
785            verify_seqno = diff.seqno;
786        }
787        assert_eq!(verify_seqno, state.seqno());
788
789        let diffs = Some(InlinedDiffs::from(
790            latest_rollup_seqno.next(),
791            state.seqno().next(),
792            diffs,
793        ));
794
795        Self { state, diffs }
796    }
797
798    pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
799        Self { state, diffs: None }
800    }
801
802    pub(crate) fn from_state_without_diffs(
803        state: State<T>,
804        key_codec: String,
805        val_codec: String,
806        ts_codec: String,
807        diff_codec: String,
808    ) -> Self {
809        Self::from_untyped_state_without_diffs(UntypedState {
810            key_codec,
811            val_codec,
812            ts_codec,
813            diff_codec,
814            state,
815        })
816    }
817}
818
819#[derive(Debug)]
820pub(crate) struct InlinedDiffs {
821    pub(crate) lower: SeqNo,
822    pub(crate) upper: SeqNo,
823    pub(crate) diffs: Vec<VersionedData>,
824}
825
826impl InlinedDiffs {
827    pub(crate) fn description(&self) -> Description<SeqNo> {
828        Description::new(
829            Antichain::from_elem(self.lower),
830            Antichain::from_elem(self.upper),
831            Antichain::from_elem(SeqNo::minimum()),
832        )
833    }
834
835    fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
836        for diff in &diffs {
837            assert!(diff.seqno >= lower);
838            assert!(diff.seqno < upper);
839        }
840        Self {
841            lower,
842            upper,
843            diffs,
844        }
845    }
846}
847
848impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
849    fn into_proto(&self) -> ProtoInlinedDiffs {
850        ProtoInlinedDiffs {
851            lower: self.lower.into_proto(),
852            upper: self.upper.into_proto(),
853            diffs: self.diffs.into_proto(),
854        }
855    }
856
857    fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
858        Ok(Self {
859            lower: proto.lower.into_rust()?,
860            upper: proto.upper.into_rust()?,
861            diffs: proto.diffs.into_rust()?,
862        })
863    }
864}
865
866impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
867    fn into_proto(&self) -> ProtoRollup {
868        ProtoRollup {
869            applier_version: self.state.state.applier_version.to_string(),
870            shard_id: self.state.state.shard_id.into_proto(),
871            seqno: self.state.state.seqno.into_proto(),
872            walltime_ms: self.state.state.walltime_ms.into_proto(),
873            hostname: self.state.state.hostname.into_proto(),
874            key_codec: self.state.key_codec.into_proto(),
875            val_codec: self.state.val_codec.into_proto(),
876            ts_codec: T::codec_name(),
877            diff_codec: self.state.diff_codec.into_proto(),
878            last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
879            active_rollup: self.state.state.collections.active_rollup.into_proto(),
880            active_gc: self.state.state.collections.active_gc.into_proto(),
881            rollups: self
882                .state
883                .state
884                .collections
885                .rollups
886                .iter()
887                .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
888                .collect(),
889            deprecated_rollups: Default::default(),
890            leased_readers: self
891                .state
892                .state
893                .collections
894                .leased_readers
895                .iter()
896                .map(|(id, state)| (id.into_proto(), state.into_proto()))
897                .collect(),
898            critical_readers: self
899                .state
900                .state
901                .collections
902                .critical_readers
903                .iter()
904                .map(|(id, state)| (id.into_proto(), state.into_proto()))
905                .collect(),
906            writers: self
907                .state
908                .state
909                .collections
910                .writers
911                .iter()
912                .map(|(id, state)| (id.into_proto(), state.into_proto()))
913                .collect(),
914            schemas: self
915                .state
916                .state
917                .collections
918                .schemas
919                .iter()
920                .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
921                .collect(),
922            trace: Some(self.state.state.collections.trace.into_proto()),
923            diffs: self.diffs.as_ref().map(|x| x.into_proto()),
924        }
925    }
926
927    fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
928        let applier_version = if x.applier_version.is_empty() {
929            semver::Version::new(0, 0, 0)
933        } else {
934            semver::Version::parse(&x.applier_version).map_err(|err| {
935                TryFromProtoError::InvalidSemverVersion(format!(
936                    "invalid applier_version {}: {}",
937                    x.applier_version, err
938                ))
939            })?
940        };
941
942        let mut rollups = BTreeMap::new();
943        for (seqno, rollup) in x.rollups {
944            rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
945        }
946        for (seqno, key) in x.deprecated_rollups {
947            rollups.insert(
948                seqno.into_rust()?,
949                HollowRollup {
950                    key: key.into_rust()?,
951                    encoded_size_bytes: None,
952                },
953            );
954        }
955        let mut leased_readers = BTreeMap::new();
956        for (id, state) in x.leased_readers {
957            leased_readers.insert(id.into_rust()?, state.into_rust()?);
958        }
959        let mut critical_readers = BTreeMap::new();
960        for (id, state) in x.critical_readers {
961            critical_readers.insert(id.into_rust()?, state.into_rust()?);
962        }
963        let mut writers = BTreeMap::new();
964        for (id, state) in x.writers {
965            writers.insert(id.into_rust()?, state.into_rust()?);
966        }
967        let mut schemas = BTreeMap::new();
968        for (id, x) in x.schemas {
969            schemas.insert(id.into_rust()?, x.into_rust()?);
970        }
971        let active_rollup = x
972            .active_rollup
973            .map(|rollup| rollup.into_rust())
974            .transpose()?;
975        let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
976        let collections = StateCollections {
977            rollups,
978            active_rollup,
979            active_gc,
980            last_gc_req: x.last_gc_req.into_rust()?,
981            leased_readers,
982            critical_readers,
983            writers,
984            schemas,
985            trace: x.trace.into_rust_if_some("trace")?,
986        };
987        let state = State {
988            applier_version,
989            shard_id: x.shard_id.into_rust()?,
990            seqno: x.seqno.into_rust()?,
991            walltime_ms: x.walltime_ms,
992            hostname: x.hostname,
993            collections,
994        };
995
996        let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
997        if let Some(diffs) = &diffs {
998            if diffs.lower != state.latest_rollup().0.next() {
999                return Err(TryFromProtoError::InvalidPersistState(format!(
1000                    "diffs lower ({}) should match latest rollup's successor: ({})",
1001                    diffs.lower,
1002                    state.latest_rollup().0.next()
1003                )));
1004            }
1005            if diffs.upper != state.seqno.next() {
1006                return Err(TryFromProtoError::InvalidPersistState(format!(
1007                    "diffs upper ({}) should match state's successor: ({})",
1008                    diffs.lower,
1009                    state.seqno.next()
1010                )));
1011            }
1012        }
1013
1014        Ok(Rollup {
1015            state: UntypedState {
1016                state,
1017                key_codec: x.key_codec.into_rust()?,
1018                val_codec: x.val_codec.into_rust()?,
1019                ts_codec: x.ts_codec.into_rust()?,
1020                diff_codec: x.diff_codec.into_rust()?,
1021            },
1022            diffs,
1023        })
1024    }
1025}
1026
1027impl RustType<ProtoVersionedData> for VersionedData {
1028    fn into_proto(&self) -> ProtoVersionedData {
1029        ProtoVersionedData {
1030            seqno: self.seqno.into_proto(),
1031            data: Bytes::clone(&self.data),
1032        }
1033    }
1034
1035    fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1036        Ok(Self {
1037            seqno: proto.seqno.into_rust()?,
1038            data: proto.data,
1039        })
1040    }
1041}
1042
1043impl RustType<ProtoSpineId> for SpineId {
1044    fn into_proto(&self) -> ProtoSpineId {
1045        ProtoSpineId {
1046            lo: self.0.into_proto(),
1047            hi: self.1.into_proto(),
1048        }
1049    }
1050
1051    fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1052        Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1053    }
1054}
1055
1056impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1057    fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1058        let (id, batch) = entry;
1059        ProtoIdHollowBatch {
1060            id: Some(id.into_proto()),
1061            batch: Some(batch.into_proto()),
1062        }
1063    }
1064
1065    fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1066        let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1067        let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1068        Ok((id, batch))
1069    }
1070}
1071
1072impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1073    fn into_proto(&self) -> ProtoSpineBatch {
1074        ProtoSpineBatch {
1075            desc: Some(self.desc.into_proto()),
1076            parts: self.parts.into_proto(),
1077            level: self.level.into_proto(),
1078            descs: self.descs.into_proto(),
1079        }
1080    }
1081
1082    fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1083        let level = proto.level.into_rust()?;
1084        let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1085        let parts = proto.parts.into_rust()?;
1086        let descs = proto.descs.into_rust()?;
1087        Ok(ThinSpineBatch {
1088            level,
1089            desc,
1090            parts,
1091            descs,
1092        })
1093    }
1094}
1095
1096impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1097    fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1098        let (id, batch) = entry;
1099        ProtoIdSpineBatch {
1100            id: Some(id.into_proto()),
1101            batch: Some(batch.into_proto()),
1102        }
1103    }
1104
1105    fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1106        let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1107        let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1108        Ok((id, batch))
1109    }
1110}
1111
1112impl RustType<ProtoCompaction> for ActiveCompaction {
1113    fn into_proto(&self) -> ProtoCompaction {
1114        ProtoCompaction {
1115            start_ms: self.start_ms,
1116        }
1117    }
1118
1119    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1120        Ok(Self {
1121            start_ms: proto.start_ms,
1122        })
1123    }
1124}
1125
1126impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1127    fn into_proto(&self) -> ProtoMerge {
1128        ProtoMerge {
1129            since: Some(self.since.into_proto()),
1130            remaining_work: self.remaining_work.into_proto(),
1131            active_compaction: self.active_compaction.into_proto(),
1132        }
1133    }
1134
1135    fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1136        let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1137        let remaining_work = proto.remaining_work.into_rust()?;
1138        let active_compaction = proto.active_compaction.into_rust()?;
1139        Ok(Self {
1140            since,
1141            remaining_work,
1142            active_compaction,
1143        })
1144    }
1145}
1146
1147impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1148    fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1149        ProtoIdMerge {
1150            id: Some(id.into_proto()),
1151            merge: Some(merge.into_proto()),
1152        }
1153    }
1154
1155    fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1156        let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1157        let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1158        Ok((id, merge))
1159    }
1160}
1161
1162impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1163    fn into_proto(&self) -> ProtoTrace {
1164        let since = self.since.into_proto();
1165        let legacy_batches = self
1166            .legacy_batches
1167            .iter()
1168            .map(|(b, _)| b.into_proto())
1169            .collect();
1170        let hollow_batches = self.hollow_batches.into_proto();
1171        let spine_batches = self.spine_batches.into_proto();
1172        let merges = self.merges.into_proto();
1173        ProtoTrace {
1174            since: Some(since),
1175            legacy_batches,
1176            hollow_batches,
1177            spine_batches,
1178            merges,
1179        }
1180    }
1181
1182    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1183        let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1184        let legacy_batches = proto
1185            .legacy_batches
1186            .into_iter()
1187            .map(|b| b.into_rust().map(|b| (b, ())))
1188            .collect::<Result<_, _>>()?;
1189        let hollow_batches = proto.hollow_batches.into_rust()?;
1190        let spine_batches = proto.spine_batches.into_rust()?;
1191        let merges = proto.merges.into_rust()?;
1192        Ok(FlatTrace {
1193            since,
1194            legacy_batches,
1195            hollow_batches,
1196            spine_batches,
1197            merges,
1198        })
1199    }
1200}
1201
1202impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1203    fn into_proto(&self) -> ProtoTrace {
1204        self.flatten().into_proto()
1205    }
1206
1207    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1208        Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1209    }
1210}
1211
1212impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1213    fn into_proto(&self) -> ProtoLeasedReaderState {
1214        ProtoLeasedReaderState {
1215            seqno: self.seqno.into_proto(),
1216            since: Some(self.since.into_proto()),
1217            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1218            lease_duration_ms: self.lease_duration_ms.into_proto(),
1219            debug: Some(self.debug.into_proto()),
1220        }
1221    }
1222
1223    fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1224        let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1225        if lease_duration_ms == 0 {
1230            lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1231                .expect("lease duration as millis should fit within u64");
1232        }
1233        let debug = proto.debug.unwrap_or_default().into_rust()?;
1236        Ok(LeasedReaderState {
1237            seqno: proto.seqno.into_rust()?,
1238            since: proto
1239                .since
1240                .into_rust_if_some("ProtoLeasedReaderState::since")?,
1241            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1242            lease_duration_ms,
1243            debug,
1244        })
1245    }
1246}
1247
1248impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1249    fn into_proto(&self) -> ProtoCriticalReaderState {
1250        ProtoCriticalReaderState {
1251            since: Some(self.since.into_proto()),
1252            opaque: i64::from_le_bytes(self.opaque.0),
1253            opaque_codec: self.opaque_codec.clone(),
1254            debug: Some(self.debug.into_proto()),
1255        }
1256    }
1257
1258    fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1259        let debug = proto.debug.unwrap_or_default().into_rust()?;
1262        Ok(CriticalReaderState {
1263            since: proto
1264                .since
1265                .into_rust_if_some("ProtoCriticalReaderState::since")?,
1266            opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1267            opaque_codec: proto.opaque_codec,
1268            debug,
1269        })
1270    }
1271}
1272
1273impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1274    fn into_proto(&self) -> ProtoWriterState {
1275        ProtoWriterState {
1276            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1277            lease_duration_ms: self.lease_duration_ms.into_proto(),
1278            most_recent_write_token: self.most_recent_write_token.into_proto(),
1279            most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1280            debug: Some(self.debug.into_proto()),
1281        }
1282    }
1283
1284    fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1285        let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1291            IdempotencyToken::SENTINEL
1292        } else {
1293            proto.most_recent_write_token.into_rust()?
1294        };
1295        let most_recent_write_upper = match proto.most_recent_write_upper {
1296            Some(x) => x.into_rust()?,
1297            None => Antichain::from_elem(T::minimum()),
1298        };
1299        let debug = proto.debug.unwrap_or_default().into_rust()?;
1302        Ok(WriterState {
1303            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1304            lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1305            most_recent_write_token,
1306            most_recent_write_upper,
1307            debug,
1308        })
1309    }
1310}
1311
1312impl RustType<ProtoHandleDebugState> for HandleDebugState {
1313    fn into_proto(&self) -> ProtoHandleDebugState {
1314        ProtoHandleDebugState {
1315            hostname: self.hostname.into_proto(),
1316            purpose: self.purpose.into_proto(),
1317        }
1318    }
1319
1320    fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1321        Ok(HandleDebugState {
1322            hostname: proto.hostname,
1323            purpose: proto.purpose,
1324        })
1325    }
1326}
1327
1328impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1329    fn into_proto(&self) -> ProtoHollowRun {
1330        ProtoHollowRun {
1331            parts: self.parts.into_proto(),
1332        }
1333    }
1334
1335    fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1336        Ok(HollowRun {
1337            parts: proto.parts.into_rust()?,
1338        })
1339    }
1340}
1341
1342impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1343    fn into_proto(&self) -> ProtoHollowBatch {
1344        let mut run_meta = self.run_meta.into_proto();
1345        let run_meta_default = RunMeta::default().into_proto();
1347        while run_meta.last() == Some(&run_meta_default) {
1348            run_meta.pop();
1349        }
1350        ProtoHollowBatch {
1351            desc: Some(self.desc.into_proto()),
1352            parts: self.parts.into_proto(),
1353            len: self.len.into_proto(),
1354            runs: self.run_splits.into_proto(),
1355            run_meta,
1356            deprecated_keys: vec![],
1357        }
1358    }
1359
1360    fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1361        let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1362        parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1365            RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1366                key: PartialBatchKey(key),
1367                encoded_size_bytes: 0,
1368                key_lower: vec![],
1369                structured_key_lower: None,
1370                stats: None,
1371                ts_rewrite: None,
1372                diffs_sum: None,
1373                format: None,
1374                schema_id: None,
1375                deprecated_schema_id: None,
1376            }))
1377        }));
1378        let run_splits: Vec<usize> = proto.runs.into_rust()?;
1380        let num_runs = if parts.is_empty() {
1381            0
1382        } else {
1383            run_splits.len() + 1
1384        };
1385        let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1386        run_meta.resize(num_runs, RunMeta::default());
1387        Ok(HollowBatch {
1388            desc: proto.desc.into_rust_if_some("desc")?,
1389            parts,
1390            len: proto.len.into_rust()?,
1391            run_splits,
1392            run_meta,
1393        })
1394    }
1395}
1396
1397impl RustType<String> for RunId {
1398    fn into_proto(&self) -> String {
1399        self.to_string()
1400    }
1401
1402    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1403        RunId::from_str(&proto).map_err(|_| {
1404            TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1405        })
1406    }
1407}
1408
1409impl RustType<ProtoRunMeta> for RunMeta {
1410    fn into_proto(&self) -> ProtoRunMeta {
1411        let order = match self.order {
1412            None => ProtoRunOrder::Unknown,
1413            Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1414            Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1415            Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1416        };
1417        ProtoRunMeta {
1418            order: order.into(),
1419            schema_id: self.schema.into_proto(),
1420            deprecated_schema_id: self.deprecated_schema.into_proto(),
1421            id: self.id.into_proto(),
1422            len: self.len.into_proto(),
1423        }
1424    }
1425
1426    fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1427        let order = match ProtoRunOrder::try_from(proto.order)? {
1428            ProtoRunOrder::Unknown => None,
1429            ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1430            ProtoRunOrder::Codec => Some(RunOrder::Codec),
1431            ProtoRunOrder::Structured => Some(RunOrder::Structured),
1432        };
1433        Ok(Self {
1434            order,
1435            schema: proto.schema_id.into_rust()?,
1436            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1437            id: proto.id.into_rust()?,
1438            len: proto.len.into_rust()?,
1439        })
1440    }
1441}
1442
1443impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1444    fn into_proto(&self) -> ProtoHollowBatchPart {
1445        match self {
1446            RunPart::Single(part) => part.into_proto(),
1447            RunPart::Many(runs) => runs.into_proto(),
1448        }
1449    }
1450
1451    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1452        let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1453            RunPart::Many(proto.into_rust()?)
1454        } else {
1455            RunPart::Single(proto.into_rust()?)
1456        };
1457        Ok(run_part)
1458    }
1459}
1460
1461impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1462    fn into_proto(&self) -> ProtoHollowBatchPart {
1463        let part = ProtoHollowBatchPart {
1464            kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1465                key: self.key.into_proto(),
1466                max_part_bytes: self.max_part_bytes.into_proto(),
1467            })),
1468            encoded_size_bytes: self.hollow_bytes.into_proto(),
1469            key_lower: Bytes::copy_from_slice(&self.key_lower),
1470            diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1471            key_stats: None,
1472            ts_rewrite: None,
1473            format: None,
1474            schema_id: None,
1475            structured_key_lower: self.structured_key_lower.into_proto(),
1476            deprecated_schema_id: None,
1477        };
1478        part
1479    }
1480
1481    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1482        let run_proto = match proto.kind {
1483            Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1484            _ => Err(TryFromProtoError::UnknownEnumVariant(
1485                "ProtoHollowBatchPart::kind".to_string(),
1486            ))?,
1487        };
1488        Ok(Self {
1489            key: run_proto.key.into_rust()?,
1490            hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1491            max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1492            key_lower: proto.key_lower.to_vec(),
1493            structured_key_lower: proto.structured_key_lower.into_rust()?,
1494            diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1495            _phantom_data: Default::default(),
1496        })
1497    }
1498}
1499
1500impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1501    fn into_proto(&self) -> ProtoHollowBatchPart {
1502        match self {
1503            BatchPart::Hollow(x) => ProtoHollowBatchPart {
1504                kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1505                encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1506                key_lower: Bytes::copy_from_slice(&x.key_lower),
1507                structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1508                key_stats: x.stats.into_proto(),
1509                ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1510                diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1511                format: x.format.map(|f| f.into_proto()),
1512                schema_id: x.schema_id.into_proto(),
1513                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1514            },
1515            BatchPart::Inline {
1516                updates,
1517                ts_rewrite,
1518                schema_id,
1519                deprecated_schema_id,
1520            } => ProtoHollowBatchPart {
1521                kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1522                encoded_size_bytes: 0,
1523                key_lower: Bytes::new(),
1524                structured_key_lower: None,
1525                key_stats: None,
1526                ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1527                diffs_sum: None,
1528                format: None,
1529                schema_id: schema_id.into_proto(),
1530                deprecated_schema_id: deprecated_schema_id.into_proto(),
1531            },
1532        }
1533    }
1534
1535    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1536        let ts_rewrite = match proto.ts_rewrite {
1537            Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1538            None => None,
1539        };
1540        let schema_id = proto.schema_id.into_rust()?;
1541        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1542        match proto.kind {
1543            Some(proto_hollow_batch_part::Kind::Key(key)) => {
1544                Ok(BatchPart::Hollow(HollowBatchPart {
1545                    key: key.into_rust()?,
1546                    encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1547                    key_lower: proto.key_lower.into(),
1548                    structured_key_lower: proto.structured_key_lower.into_rust()?,
1549                    stats: proto.key_stats.into_rust()?,
1550                    ts_rewrite,
1551                    diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1552                    format: proto.format.map(|f| f.into_rust()).transpose()?,
1553                    schema_id,
1554                    deprecated_schema_id,
1555                }))
1556            }
1557            Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1558                assert_eq!(proto.encoded_size_bytes, 0);
1559                assert_eq!(proto.key_lower.len(), 0);
1560                assert_none!(proto.key_stats);
1561                assert_none!(proto.diffs_sum);
1562                let updates = LazyInlineBatchPart(x.into_rust()?);
1563                Ok(BatchPart::Inline {
1564                    updates,
1565                    ts_rewrite,
1566                    schema_id,
1567                    deprecated_schema_id,
1568                })
1569            }
1570            _ => Err(TryFromProtoError::unknown_enum_variant(
1571                "ProtoHollowBatchPart::kind",
1572            )),
1573        }
1574    }
1575}
1576
1577impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1578    fn into_proto(&self) -> proto_hollow_batch_part::Format {
1579        match self {
1580            BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1581            BatchColumnarFormat::Both(version) => {
1582                proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1583            }
1584            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1585        }
1586    }
1587
1588    fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1589        let format = match proto {
1590            proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1591            proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1592                BatchColumnarFormat::Both(version.cast_into())
1593            }
1594            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1595        };
1596        Ok(format)
1597    }
1598}
1599
1600#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1605pub struct LazyPartStats {
1606    key: LazyProto<ProtoStructStats>,
1607}
1608
1609impl Debug for LazyPartStats {
1610    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1611        f.debug_tuple("LazyPartStats")
1612            .field(&self.decode())
1613            .finish()
1614    }
1615}
1616
1617impl LazyPartStats {
1618    pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1619        let PartStats { key } = x;
1620        let mut proto_stats = ProtoStructStats::from_rust(key);
1621        map_proto(&mut proto_stats);
1622        LazyPartStats {
1623            key: LazyProto::from(&proto_stats),
1624        }
1625    }
1626    pub fn decode(&self) -> PartStats {
1631        let key = self.key.decode().expect("valid proto");
1632        PartStats {
1633            key: key.into_rust().expect("valid stats"),
1634        }
1635    }
1636}
1637
1638impl RustType<Bytes> for LazyPartStats {
1639    fn into_proto(&self) -> Bytes {
1640        let LazyPartStats { key } = self;
1641        key.into_proto()
1642    }
1643
1644    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1645        Ok(LazyPartStats {
1646            key: proto.into_rust()?,
1647        })
1648    }
1649}
1650
1651#[cfg(test)]
1652pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1653    proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1654}
1655
1656#[allow(unused_parens)]
1657impl Arbitrary for LazyPartStats {
1658    type Parameters = ();
1659    type Strategy =
1660        proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1661
1662    fn arbitrary_with(_: ()) -> Self::Strategy {
1663        Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1664            LazyPartStats::encode(&x, |_| {})
1665        })
1666    }
1667}
1668
1669impl ProtoInlineBatchPart {
1670    pub(crate) fn into_rust<T: Timestamp + Codec64>(
1671        lgbytes: &ColumnarMetrics,
1672        proto: Self,
1673    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1674        let updates = proto
1675            .updates
1676            .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1677        let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1678
1679        Ok(BlobTraceBatchPart {
1680            desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1681            index: proto.index.into_rust()?,
1682            updates,
1683        })
1684    }
1685}
1686
1687#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1689pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1690
1691impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1692    fn from(value: &ProtoInlineBatchPart) -> Self {
1693        LazyInlineBatchPart(value.into())
1694    }
1695}
1696
1697impl Serialize for LazyInlineBatchPart {
1698    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1699        let proto = self.0.decode().expect("valid proto");
1702        let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1703        let () = s.serialize_field("desc", &proto.desc)?;
1704        let () = s.serialize_field("index", &proto.index)?;
1705        let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1706        s.end()
1707    }
1708}
1709
1710impl LazyInlineBatchPart {
1711    pub(crate) fn encoded_size_bytes(&self) -> usize {
1712        self.0.buf.len()
1713    }
1714
1715    pub fn decode<T: Timestamp + Codec64>(
1721        &self,
1722        lgbytes: &ColumnarMetrics,
1723    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1724        let proto = self.0.decode().expect("valid proto");
1725        ProtoInlineBatchPart::into_rust(lgbytes, proto)
1726    }
1727}
1728
1729impl RustType<Bytes> for LazyInlineBatchPart {
1730    fn into_proto(&self) -> Bytes {
1731        self.0.into_proto()
1732    }
1733
1734    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1735        Ok(LazyInlineBatchPart(proto.into_rust()?))
1736    }
1737}
1738
1739impl RustType<ProtoHollowRollup> for HollowRollup {
1740    fn into_proto(&self) -> ProtoHollowRollup {
1741        ProtoHollowRollup {
1742            key: self.key.into_proto(),
1743            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1744        }
1745    }
1746
1747    fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1748        Ok(HollowRollup {
1749            key: proto.key.into_rust()?,
1750            encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1751        })
1752    }
1753}
1754
1755impl RustType<ProtoActiveRollup> for ActiveRollup {
1756    fn into_proto(&self) -> ProtoActiveRollup {
1757        ProtoActiveRollup {
1758            start_ms: self.start_ms,
1759            seqno: self.seqno.into_proto(),
1760        }
1761    }
1762
1763    fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1764        Ok(ActiveRollup {
1765            start_ms: proto.start_ms,
1766            seqno: proto.seqno.into_rust()?,
1767        })
1768    }
1769}
1770
1771impl RustType<ProtoActiveGc> for ActiveGc {
1772    fn into_proto(&self) -> ProtoActiveGc {
1773        ProtoActiveGc {
1774            start_ms: self.start_ms,
1775            seqno: self.seqno.into_proto(),
1776        }
1777    }
1778
1779    fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1780        Ok(ActiveGc {
1781            start_ms: proto.start_ms,
1782            seqno: proto.seqno.into_rust()?,
1783        })
1784    }
1785}
1786
1787impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1788    fn into_proto(&self) -> ProtoU64Description {
1789        ProtoU64Description {
1790            lower: Some(self.lower().into_proto()),
1791            upper: Some(self.upper().into_proto()),
1792            since: Some(self.since().into_proto()),
1793        }
1794    }
1795
1796    fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1797        Ok(Description::new(
1798            proto.lower.into_rust_if_some("lower")?,
1799            proto.upper.into_rust_if_some("upper")?,
1800            proto.since.into_rust_if_some("since")?,
1801        ))
1802    }
1803}
1804
1805impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1806    fn into_proto(&self) -> ProtoU64Antichain {
1807        ProtoU64Antichain {
1808            elements: self
1809                .elements()
1810                .iter()
1811                .map(|x| i64::from_le_bytes(T::encode(x)))
1812                .collect(),
1813        }
1814    }
1815
1816    fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1817        let elements = proto
1818            .elements
1819            .iter()
1820            .map(|x| T::decode(x.to_le_bytes()))
1821            .collect::<Vec<_>>();
1822        Ok(Antichain::from(elements))
1823    }
1824}
1825
1826#[cfg(test)]
1827mod tests {
1828    use bytes::Bytes;
1829    use mz_build_info::DUMMY_BUILD_INFO;
1830    use mz_dyncfg::ConfigUpdates;
1831    use mz_ore::assert_err;
1832    use mz_persist::location::SeqNo;
1833    use proptest::prelude::*;
1834
1835    use crate::ShardId;
1836    use crate::internal::paths::PartialRollupKey;
1837    use crate::internal::state::tests::any_state;
1838    use crate::internal::state::{BatchPart, HandleDebugState};
1839    use crate::internal::state_diff::StateDiff;
1840    use crate::tests::new_test_client_cache;
1841
1842    use super::*;
1843
1844    #[mz_ore::test]
1845    fn applier_version_state() {
1846        let v1 = semver::Version::new(1, 0, 0);
1847        let v2 = semver::Version::new(2, 0, 0);
1848        let v3 = semver::Version::new(3, 0, 0);
1849
1850        let shard_id = ShardId::new();
1852        let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1853        let rollup =
1854            Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1855        let mut buf = Vec::new();
1856        rollup.encode(&mut buf).expect("serializable");
1857        let bytes = Bytes::from(buf);
1858
1859        assert_eq!(
1861            UntypedState::<u64>::decode(&v2, bytes.clone())
1862                .check_codecs(&shard_id)
1863                .as_ref(),
1864            Ok(&state)
1865        );
1866        assert_eq!(
1867            UntypedState::<u64>::decode(&v3, bytes.clone())
1868                .check_codecs(&shard_id)
1869                .as_ref(),
1870            Ok(&state)
1871        );
1872
1873        #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1878        assert_err!(v1_res);
1879    }
1880
1881    #[mz_ore::test]
1882    fn applier_version_state_diff() {
1883        let v1 = semver::Version::new(1, 0, 0);
1884        let v2 = semver::Version::new(2, 0, 0);
1885        let v3 = semver::Version::new(3, 0, 0);
1886
1887        let diff = StateDiff::<u64>::new(
1889            v2.clone(),
1890            SeqNo(0),
1891            SeqNo(1),
1892            2,
1893            PartialRollupKey("rollup".into()),
1894        );
1895        let mut buf = Vec::new();
1896        diff.encode(&mut buf);
1897        let bytes = Bytes::from(buf);
1898
1899        assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1901        assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1902
1903        #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1908        assert_err!(v1_res);
1909    }
1910
1911    #[mz_ore::test]
1912    fn hollow_batch_migration_keys() {
1913        let x = HollowBatch::new_run(
1914            Description::new(
1915                Antichain::from_elem(1u64),
1916                Antichain::from_elem(2u64),
1917                Antichain::from_elem(3u64),
1918            ),
1919            vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1920                key: PartialBatchKey("a".into()),
1921                encoded_size_bytes: 5,
1922                key_lower: vec![],
1923                structured_key_lower: None,
1924                stats: None,
1925                ts_rewrite: None,
1926                diffs_sum: None,
1927                format: None,
1928                schema_id: None,
1929                deprecated_schema_id: None,
1930            }))],
1931            4,
1932        );
1933        let mut old = x.into_proto();
1934        old.deprecated_keys = vec!["b".into()];
1936        let mut expected = x;
1940        expected
1945            .parts
1946            .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1947                key: PartialBatchKey("b".into()),
1948                encoded_size_bytes: 0,
1949                key_lower: vec![],
1950                structured_key_lower: None,
1951                stats: None,
1952                ts_rewrite: None,
1953                diffs_sum: None,
1954                format: None,
1955                schema_id: None,
1956                deprecated_schema_id: None,
1957            })));
1958        assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1959    }
1960
1961    #[mz_ore::test]
1962    fn reader_state_migration_lease_duration() {
1963        let x = LeasedReaderState {
1964            seqno: SeqNo(1),
1965            since: Antichain::from_elem(2u64),
1966            last_heartbeat_timestamp_ms: 3,
1967            debug: HandleDebugState {
1968                hostname: "host".to_owned(),
1969                purpose: "purpose".to_owned(),
1970            },
1971            lease_duration_ms: 0,
1973        };
1974        let old = x.into_proto();
1975        let mut expected = x;
1976        expected.lease_duration_ms =
1979            u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1980        assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1981    }
1982
1983    #[mz_ore::test]
1984    fn writer_state_migration_most_recent_write() {
1985        let proto = ProtoWriterState {
1986            last_heartbeat_timestamp_ms: 1,
1987            lease_duration_ms: 2,
1988            most_recent_write_token: "".into(),
1991            most_recent_write_upper: None,
1992            debug: Some(ProtoHandleDebugState {
1993                hostname: "host".to_owned(),
1994                purpose: "purpose".to_owned(),
1995            }),
1996        };
1997        let expected = WriterState {
1998            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1999            lease_duration_ms: proto.lease_duration_ms,
2000            most_recent_write_token: IdempotencyToken::SENTINEL,
2001            most_recent_write_upper: Antichain::from_elem(0),
2002            debug: HandleDebugState {
2003                hostname: "host".to_owned(),
2004                purpose: "purpose".to_owned(),
2005            },
2006        };
2007        assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2008    }
2009
2010    #[mz_ore::test]
2011    fn state_migration_rollups() {
2012        let r1 = HollowRollup {
2013            key: PartialRollupKey("foo".to_owned()),
2014            encoded_size_bytes: None,
2015        };
2016        let r2 = HollowRollup {
2017            key: PartialRollupKey("bar".to_owned()),
2018            encoded_size_bytes: Some(2),
2019        };
2020        let shard_id = ShardId::new();
2021        let mut state = TypedState::<(), (), u64, i64>::new(
2022            DUMMY_BUILD_INFO.semver_version(),
2023            shard_id,
2024            "host".to_owned(),
2025            0,
2026        );
2027        state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2028        let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2029
2030        proto.deprecated_rollups.insert(1, r1.key.0.clone());
2032
2033        let state: Rollup<u64> = proto.into_rust().unwrap();
2034        let state = state.state;
2035        let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2036        let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2037        assert_eq!(
2038            state
2039                .state
2040                .collections
2041                .rollups
2042                .into_iter()
2043                .collect::<Vec<_>>(),
2044            expected
2045        );
2046    }
2047
2048    #[mz_persist_proc::test(tokio::test)]
2049    #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2051        let r1_rollup = HollowRollup {
2052            key: PartialRollupKey("foo".to_owned()),
2053            encoded_size_bytes: None,
2054        };
2055        let r1 = StateFieldDiff {
2056            key: SeqNo(1),
2057            val: StateFieldValDiff::Insert(r1_rollup.clone()),
2058        };
2059        let r2_rollup = HollowRollup {
2060            key: PartialRollupKey("bar".to_owned()),
2061            encoded_size_bytes: Some(2),
2062        };
2063        let r2 = StateFieldDiff {
2064            key: SeqNo(2),
2065            val: StateFieldValDiff::Insert(r2_rollup.clone()),
2066        };
2067        let r3_rollup = HollowRollup {
2068            key: PartialRollupKey("baz".to_owned()),
2069            encoded_size_bytes: None,
2070        };
2071        let r3 = StateFieldDiff {
2072            key: SeqNo(3),
2073            val: StateFieldValDiff::Delete(r3_rollup.clone()),
2074        };
2075        let mut diff = StateDiff::<u64>::new(
2076            DUMMY_BUILD_INFO.semver_version(),
2077            SeqNo(4),
2078            SeqNo(5),
2079            0,
2080            PartialRollupKey("ignored".to_owned()),
2081        );
2082        diff.rollups.push(r2.clone());
2083        diff.rollups.push(r3.clone());
2084        let mut diff_proto = diff.into_proto();
2085
2086        let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2087        let mut field_diffs_writer = field_diffs.into_writer();
2088
2089        field_diffs_into_proto(
2091            ProtoStateField::DeprecatedRollups,
2092            &[StateFieldDiff {
2093                key: r1.key,
2094                val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2095            }],
2096            &mut field_diffs_writer,
2097        );
2098
2099        assert_none!(diff_proto.field_diffs);
2100        diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2101
2102        let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2103        assert_eq!(
2104            diff.rollups.into_iter().collect::<Vec<_>>(),
2105            vec![r2, r3, r1]
2106        );
2107
2108        let shard_id = ShardId::new();
2111        let mut state = TypedState::<(), (), u64, i64>::new(
2112            DUMMY_BUILD_INFO.semver_version(),
2113            shard_id,
2114            "host".to_owned(),
2115            0,
2116        );
2117        state.state.seqno = SeqNo(4);
2118        let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2119        rollup
2120            .deprecated_rollups
2121            .insert(3, r3_rollup.key.into_proto());
2122        let state: Rollup<u64> = rollup.into_rust().unwrap();
2123        let state = state.state;
2124        let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2125        let cache = new_test_client_cache(&dyncfgs);
2126        let encoded_diff = VersionedData {
2127            seqno: SeqNo(5),
2128            data: diff_proto.encode_to_vec().into(),
2129        };
2130        state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2131        assert_eq!(
2132            state
2133                .state
2134                .collections
2135                .rollups
2136                .into_iter()
2137                .collect::<Vec<_>>(),
2138            vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2139        );
2140    }
2141
2142    #[mz_ore::test]
2143    #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2145        fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2146            let before = UntypedState {
2147                key_codec: <() as Codec>::codec_name(),
2148                val_codec: <() as Codec>::codec_name(),
2149                ts_codec: <T as Codec64>::codec_name(),
2150                diff_codec: <i64 as Codec64>::codec_name(),
2151                state,
2152            };
2153            let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2154            let after: Rollup<T> = proto.into_rust().unwrap();
2155            let after = after.state;
2156            assert_eq!(before, after);
2157        }
2158
2159        proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2160    }
2161
2162    #[mz_ore::test]
2163    fn check_data_versions_with_self_managed_versions() {
2164        #[track_caller]
2165        fn testcase(
2166            code: &str,
2167            data: &str,
2168            self_managed_versions: &[Version],
2169            expected: Result<(), ()>,
2170        ) {
2171            let code = Version::parse(code).unwrap();
2172            let data = Version::parse(data).unwrap();
2173            let actual = cfg::check_data_version_with_self_managed_versions(
2174                &code,
2175                &data,
2176                self_managed_versions,
2177            )
2178            .map_err(|_| ());
2179            assert_eq!(actual, expected);
2180        }
2181
2182        let none = [];
2183        let one = [Version::new(0, 130, 0)];
2184        let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2185        let three = [
2186            Version::new(0, 130, 0),
2187            Version::new(0, 140, 0),
2188            Version::new(0, 150, 0),
2189        ];
2190
2191        testcase("0.130.0", "0.128.0", &none, Ok(()));
2192        testcase("0.130.0", "0.129.0", &none, Ok(()));
2193        testcase("0.130.0", "0.130.0", &none, Ok(()));
2194        testcase("0.130.0", "0.130.1", &none, Ok(()));
2195        testcase("0.130.1", "0.130.0", &none, Ok(()));
2196        testcase("0.130.0", "0.131.0", &none, Ok(()));
2197        testcase("0.130.0", "0.132.0", &none, Err(()));
2198
2199        testcase("0.129.0", "0.127.0", &none, Ok(()));
2200        testcase("0.129.0", "0.128.0", &none, Ok(()));
2201        testcase("0.129.0", "0.129.0", &none, Ok(()));
2202        testcase("0.129.0", "0.129.1", &none, Ok(()));
2203        testcase("0.129.1", "0.129.0", &none, Ok(()));
2204        testcase("0.129.0", "0.130.0", &none, Ok(()));
2205        testcase("0.129.0", "0.131.0", &none, Err(()));
2206
2207        testcase("0.130.0", "0.128.0", &one, Ok(()));
2208        testcase("0.130.0", "0.129.0", &one, Ok(()));
2209        testcase("0.130.0", "0.130.0", &one, Ok(()));
2210        testcase("0.130.0", "0.130.1", &one, Ok(()));
2211        testcase("0.130.1", "0.130.0", &one, Ok(()));
2212        testcase("0.130.0", "0.131.0", &one, Ok(()));
2213        testcase("0.130.0", "0.132.0", &one, Ok(()));
2214
2215        testcase("0.129.0", "0.127.0", &one, Ok(()));
2216        testcase("0.129.0", "0.128.0", &one, Ok(()));
2217        testcase("0.129.0", "0.129.0", &one, Ok(()));
2218        testcase("0.129.0", "0.129.1", &one, Ok(()));
2219        testcase("0.129.1", "0.129.0", &one, Ok(()));
2220        testcase("0.129.0", "0.130.0", &one, Ok(()));
2221        testcase("0.129.0", "0.131.0", &one, Err(()));
2222
2223        testcase("0.131.0", "0.129.0", &one, Ok(()));
2224        testcase("0.131.0", "0.130.0", &one, Ok(()));
2225        testcase("0.131.0", "0.131.0", &one, Ok(()));
2226        testcase("0.131.0", "0.131.1", &one, Ok(()));
2227        testcase("0.131.1", "0.131.0", &one, Ok(()));
2228        testcase("0.131.0", "0.132.0", &one, Ok(()));
2229        testcase("0.131.0", "0.133.0", &one, Err(()));
2230
2231        testcase("0.130.0", "0.128.0", &two, Ok(()));
2232        testcase("0.130.0", "0.129.0", &two, Ok(()));
2233        testcase("0.130.0", "0.130.0", &two, Ok(()));
2234        testcase("0.130.0", "0.130.1", &two, Ok(()));
2235        testcase("0.130.1", "0.130.0", &two, Ok(()));
2236        testcase("0.130.0", "0.131.0", &two, Ok(()));
2237        testcase("0.130.0", "0.132.0", &two, Ok(()));
2238        testcase("0.130.0", "0.135.0", &two, Ok(()));
2239        testcase("0.130.0", "0.138.0", &two, Ok(()));
2240        testcase("0.130.0", "0.139.0", &two, Ok(()));
2241        testcase("0.130.0", "0.140.0", &two, Ok(()));
2242        testcase("0.130.9", "0.140.0", &two, Ok(()));
2243        testcase("0.130.0", "0.140.1", &two, Ok(()));
2244        testcase("0.130.3", "0.140.1", &two, Ok(()));
2245        testcase("0.130.3", "0.140.9", &two, Ok(()));
2246        testcase("0.130.0", "0.141.0", &two, Err(()));
2247        testcase("0.129.0", "0.133.0", &two, Err(()));
2248        testcase("0.129.0", "0.140.0", &two, Err(()));
2249        testcase("0.131.0", "0.133.0", &two, Err(()));
2250        testcase("0.131.0", "0.140.0", &two, Err(()));
2251
2252        testcase("0.130.0", "0.128.0", &three, Ok(()));
2253        testcase("0.130.0", "0.129.0", &three, Ok(()));
2254        testcase("0.130.0", "0.130.0", &three, Ok(()));
2255        testcase("0.130.0", "0.130.1", &three, Ok(()));
2256        testcase("0.130.1", "0.130.0", &three, Ok(()));
2257        testcase("0.130.0", "0.131.0", &three, Ok(()));
2258        testcase("0.130.0", "0.132.0", &three, Ok(()));
2259        testcase("0.130.0", "0.135.0", &three, Ok(()));
2260        testcase("0.130.0", "0.138.0", &three, Ok(()));
2261        testcase("0.130.0", "0.139.0", &three, Ok(()));
2262        testcase("0.130.0", "0.140.0", &three, Ok(()));
2263        testcase("0.130.9", "0.140.0", &three, Ok(()));
2264        testcase("0.130.0", "0.140.1", &three, Ok(()));
2265        testcase("0.130.3", "0.140.1", &three, Ok(()));
2266        testcase("0.130.3", "0.140.9", &three, Ok(()));
2267        testcase("0.130.0", "0.141.0", &three, Err(()));
2268        testcase("0.129.0", "0.133.0", &three, Err(()));
2269        testcase("0.129.0", "0.140.0", &three, Err(()));
2270        testcase("0.131.0", "0.133.0", &three, Err(()));
2271        testcase("0.131.0", "0.140.0", &three, Err(()));
2272        testcase("0.130.0", "0.150.0", &three, Err(()));
2273
2274        testcase("0.140.0", "0.138.0", &three, Ok(()));
2275        testcase("0.140.0", "0.139.0", &three, Ok(()));
2276        testcase("0.140.0", "0.140.0", &three, Ok(()));
2277        testcase("0.140.0", "0.140.1", &three, Ok(()));
2278        testcase("0.140.1", "0.140.0", &three, Ok(()));
2279        testcase("0.140.0", "0.141.0", &three, Ok(()));
2280        testcase("0.140.0", "0.142.0", &three, Ok(()));
2281        testcase("0.140.0", "0.145.0", &three, Ok(()));
2282        testcase("0.140.0", "0.148.0", &three, Ok(()));
2283        testcase("0.140.0", "0.149.0", &three, Ok(()));
2284        testcase("0.140.0", "0.150.0", &three, Ok(()));
2285        testcase("0.140.9", "0.150.0", &three, Ok(()));
2286        testcase("0.140.0", "0.150.1", &three, Ok(()));
2287        testcase("0.140.3", "0.150.1", &three, Ok(()));
2288        testcase("0.140.3", "0.150.9", &three, Ok(()));
2289        testcase("0.140.0", "0.151.0", &three, Err(()));
2290        testcase("0.139.0", "0.143.0", &three, Err(()));
2291        testcase("0.139.0", "0.150.0", &three, Err(()));
2292        testcase("0.141.0", "0.143.0", &three, Err(()));
2293        testcase("0.141.0", "0.150.0", &three, Err(()));
2294
2295        testcase("0.150.0", "0.148.0", &three, Ok(()));
2296        testcase("0.150.0", "0.149.0", &three, Ok(()));
2297        testcase("0.150.0", "0.150.0", &three, Ok(()));
2298        testcase("0.150.0", "0.150.1", &three, Ok(()));
2299        testcase("0.150.1", "0.150.0", &three, Ok(()));
2300        testcase("0.150.0", "0.151.0", &three, Ok(()));
2301        testcase("0.150.0", "0.152.0", &three, Ok(()));
2302        testcase("0.150.0", "0.155.0", &three, Ok(()));
2303        testcase("0.150.0", "0.158.0", &three, Ok(()));
2304        testcase("0.150.0", "0.159.0", &three, Ok(()));
2305        testcase("0.150.0", "0.160.0", &three, Ok(()));
2306        testcase("0.150.9", "0.160.0", &three, Ok(()));
2307        testcase("0.150.0", "0.160.1", &three, Ok(()));
2308        testcase("0.150.3", "0.160.1", &three, Ok(()));
2309        testcase("0.150.3", "0.160.9", &three, Ok(()));
2310        testcase("0.150.0", "0.161.0", &three, Ok(()));
2311        testcase("0.149.0", "0.153.0", &three, Err(()));
2312        testcase("0.149.0", "0.160.0", &three, Err(()));
2313        testcase("0.151.0", "0.153.0", &three, Err(()));
2314        testcase("0.151.0", "0.160.0", &three, Err(()));
2315    }
2316
2317    #[mz_ore::test]
2318    fn check_data_versions() {
2319        #[track_caller]
2320        fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2321            let code = Version::parse(code).unwrap();
2322            let data = Version::parse(data).unwrap();
2323            #[allow(clippy::disallowed_methods)]
2324            let actual =
2325                std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2326            assert_eq!(actual, expected);
2327        }
2328
2329        testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2330        testcase("0.10.0-dev", "0.10.0", Ok(()));
2331        testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2334        testcase("0.10.0-dev", "0.11.0", Ok(()));
2335        testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2336        testcase("0.10.0-dev", "0.12.0", Err(()));
2337        testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2338
2339        testcase("0.10.0", "0.8.0-dev", Ok(()));
2340        testcase("0.10.0", "0.8.0", Ok(()));
2341        testcase("0.10.0", "0.9.0-dev", Ok(()));
2342        testcase("0.10.0", "0.9.0", Ok(()));
2343        testcase("0.10.0", "0.10.0-dev", Ok(()));
2344        testcase("0.10.0", "0.10.0", Ok(()));
2345        testcase("0.10.0", "0.11.0-dev", Ok(()));
2348        testcase("0.10.0", "0.11.0", Ok(()));
2349        testcase("0.10.0", "0.11.1", Ok(()));
2350        testcase("0.10.0", "0.11.1000000", Ok(()));
2351        testcase("0.10.0", "0.12.0-dev", Err(()));
2352        testcase("0.10.0", "0.12.0", Err(()));
2353        testcase("0.10.0", "0.13.0-dev", Err(()));
2354
2355        testcase("0.10.1", "0.9.0", Ok(()));
2356        testcase("0.10.1", "0.10.0", Ok(()));
2357        testcase("0.10.1", "0.11.0", Ok(()));
2358        testcase("0.10.1", "0.11.1", Ok(()));
2359        testcase("0.10.1", "0.11.100", Ok(()));
2360
2361        testcase("0.10.0", "0.10.1", Ok(()));
2367    }
2368}