mz_persist_client/internal/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt, soft_panic_or_log};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize, Serializer};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44    ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45    HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46    LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47    ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48    ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49    ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50    ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51    ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52    ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53    RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54    proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57    ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60    ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65/// A key and value `Schema` of data written to a batch or shard.
66#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68    /// Id under which this schema is registered in the shard's schema registry,
69    /// if any.
70    pub id: Option<SchemaId>,
71    /// Key `Schema`.
72    pub key: Arc<K::Schema>,
73    /// Value `Schema`.
74    pub val: Arc<V::Schema>,
75}
76
77impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
78    fn clone(&self) -> Self {
79        Self {
80            id: self.id,
81            key: Arc::clone(&self.key),
82            val: Arc::clone(&self.val),
83        }
84    }
85}
86
87/// A proto that is decoded on use.
88///
89/// Because of the way prost works, decoding a large protobuf may result in a
90/// number of very short lived allocations in our RustType/ProtoType decode path
91/// (e.g. this happens for a repeated embedded message). Not every use of
92/// persist State needs every transitive bit of it to be decoded, so we opt
93/// certain parts of it (initially stats) to be decoded on use.
94///
95/// This has the dual benefit of only paying for the short-lived allocs when
96/// necessary and also allowing decoding to be gated by a feature flag. The
97/// tradeoffs are that we might decode more than once and that we have to handle
98/// invalid proto errors in more places.
99///
100/// Mechanically, this is accomplished by making the field a proto `bytes` types
101/// instead of `ProtoFoo`. These bytes then contain the serialization of
102/// ProtoFoo. NB: Swapping between the two is actually a forward and backward
103/// compatible change.
104///
105/// > Embedded messages are compatible with bytes if the bytes contain an
106/// > encoded version of the message.
107///
108/// (See <https://protobuf.dev/programming-guides/proto3/#updating>)
109#[derive(Clone, Serialize, Deserialize)]
110pub struct LazyProto<T> {
111    buf: Bytes,
112    _phantom: PhantomData<fn() -> T>,
113}
114
115impl<T: Message + Default> Debug for LazyProto<T> {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        match self.decode() {
118            Ok(proto) => Debug::fmt(&proto, f),
119            Err(err) => f
120                .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
121                .field("err", &err)
122                .finish(),
123        }
124    }
125}
126
127impl<T> PartialEq for LazyProto<T> {
128    fn eq(&self, other: &Self) -> bool {
129        self.cmp(other) == Ordering::Equal
130    }
131}
132
133impl<T> Eq for LazyProto<T> {}
134
135impl<T> PartialOrd for LazyProto<T> {
136    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
137        Some(self.cmp(other))
138    }
139}
140
141impl<T> Ord for LazyProto<T> {
142    fn cmp(&self, other: &Self) -> Ordering {
143        let LazyProto {
144            buf: self_buf,
145            _phantom: _,
146        } = self;
147        let LazyProto {
148            buf: other_buf,
149            _phantom: _,
150        } = other;
151        self_buf.cmp(other_buf)
152    }
153}
154
155impl<T> Hash for LazyProto<T> {
156    fn hash<H: Hasher>(&self, state: &mut H) {
157        let LazyProto { buf, _phantom } = self;
158        buf.hash(state);
159    }
160}
161
162impl<T: Message + Default> From<&T> for LazyProto<T> {
163    fn from(value: &T) -> Self {
164        let buf = Bytes::from(value.encode_to_vec());
165        LazyProto {
166            buf,
167            _phantom: PhantomData,
168        }
169    }
170}
171
172impl<T: Message + Default> LazyProto<T> {
173    pub fn decode(&self) -> Result<T, prost::DecodeError> {
174        T::decode(&*self.buf)
175    }
176
177    pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
178        Ok(T::decode(&*self.buf)?.into_rust()?)
179    }
180}
181
182impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
183    fn into_proto(&self) -> Bytes {
184        self.buf.clone()
185    }
186
187    fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
188        Ok(Self {
189            buf,
190            _phantom: PhantomData,
191        })
192    }
193}
194
195/// Our Proto implementation, Prost, cannot handle unrecognized fields. This means that unexpected
196/// data will be dropped at deserialization time, which means that we can't reliably roundtrip data
197/// from future versions of the code, which causes trouble during upgrades and at other times.
198///
199/// This type works around the issue by defining an unstructured metadata map. Keys are expected to
200/// be well-known strings defined in the code; values are bytes, expected to be encoded protobuf.
201/// (The association between the two is lightly enforced with the affiliated [MetadataKey] type.)
202/// It's safe to add new metadata keys in new versions, since even unrecognized keys can be losslessly
203/// roundtripped. However, if the metadata is not safe for the old version to ignore -- perhaps it
204/// needs to be kept in sync with some other part of the struct -- you will need to use a more
205/// heavyweight migration for it.
206#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
207pub(crate) struct MetadataMap(BTreeMap<String, Bytes>);
208
209/// Associating a field name and an associated Proto message type, for lookup in a metadata map.
210///
211/// It is an error to reuse key names, or to change the type associated with a particular name.
212/// It is polite to choose short names, since they get serialized alongside every struct.
213#[allow(unused)]
214#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
215pub(crate) struct MetadataKey<V, P = V> {
216    name: &'static str,
217    type_: PhantomData<(V, P)>,
218}
219
220impl<V, P> MetadataKey<V, P> {
221    #[allow(unused)]
222    pub(crate) const fn new(name: &'static str) -> Self {
223        MetadataKey {
224            name,
225            type_: PhantomData,
226        }
227    }
228}
229
230impl serde::Serialize for MetadataMap {
231    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
232    where
233        S: Serializer,
234    {
235        serializer.collect_map(self.0.iter())
236    }
237}
238
239impl MetadataMap {
240    /// Returns true iff no metadata keys have been set.
241    pub fn is_empty(&self) -> bool {
242        self.0.is_empty()
243    }
244
245    /// Serialize and insert a new key into the map, replacing any existing value for the key.
246    #[allow(unused)]
247    pub fn set<V: RustType<P>, P: prost::Message>(&mut self, key: MetadataKey<V, P>, value: V) {
248        self.0.insert(
249            String::from(key.name),
250            Bytes::from(value.into_proto_owned().encode_to_vec()),
251        );
252    }
253
254    /// Deserialize a key from the map, if it is present.
255    #[allow(unused)]
256    pub fn get<V: RustType<P>, P: prost::Message + Default>(
257        &self,
258        key: MetadataKey<V, P>,
259    ) -> Option<V> {
260        let proto = match P::decode(self.0.get(key.name)?.as_ref()) {
261            Ok(decoded) => decoded,
262            Err(err) => {
263                // This should be impossible unless one of the MetadataKey invariants are broken.
264                soft_panic_or_log!(
265                    "error when decoding {key}; was it redefined? {err}",
266                    key = key.name
267                );
268                return None;
269            }
270        };
271
272        match proto.into_rust() {
273            Ok(proto) => Some(proto),
274            Err(err) => {
275                // This should be impossible unless one of the MetadataKey invariants are broken.
276                soft_panic_or_log!(
277                    "error when decoding {key}; was it redefined? {err}",
278                    key = key.name
279                );
280                None
281            }
282        }
283    }
284}
285impl RustType<BTreeMap<String, Bytes>> for MetadataMap {
286    fn into_proto(&self) -> BTreeMap<String, Bytes> {
287        self.0.clone()
288    }
289    fn from_proto(proto: BTreeMap<String, Bytes>) -> Result<Self, TryFromProtoError> {
290        Ok(MetadataMap(proto))
291    }
292}
293
294pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
295    let uuid_encoded = match encoded.strip_prefix(id_prefix) {
296        Some(x) => x,
297        None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
298    };
299    let uuid = Uuid::parse_str(uuid_encoded)
300        .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
301    Ok(*uuid.as_bytes())
302}
303
304pub(crate) fn assert_code_can_read_data(code_version: &Version, data_version: &Version) {
305    if !cfg::code_can_read_data(code_version, data_version) {
306        // We can't catch halts, so panic in test, so we can get unit test
307        // coverage.
308        if cfg!(test) {
309            panic!("code at version {code_version} cannot read data with version {data_version}");
310        } else {
311            halt!("code at version {code_version} cannot read data with version {data_version}");
312        }
313    }
314}
315
316impl RustType<String> for LeasedReaderId {
317    fn into_proto(&self) -> String {
318        self.to_string()
319    }
320
321    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
322        match proto.parse() {
323            Ok(x) => Ok(x),
324            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
325        }
326    }
327}
328
329impl RustType<String> for CriticalReaderId {
330    fn into_proto(&self) -> String {
331        self.to_string()
332    }
333
334    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
335        match proto.parse() {
336            Ok(x) => Ok(x),
337            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
338        }
339    }
340}
341
342impl RustType<String> for WriterId {
343    fn into_proto(&self) -> String {
344        self.to_string()
345    }
346
347    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
348        match proto.parse() {
349            Ok(x) => Ok(x),
350            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
351        }
352    }
353}
354
355impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
356    fn into_proto(&self) -> ProtoEncodedSchemas {
357        ProtoEncodedSchemas {
358            key: self.key.clone(),
359            key_data_type: self.key_data_type.clone(),
360            val: self.val.clone(),
361            val_data_type: self.val_data_type.clone(),
362        }
363    }
364
365    fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
366        Ok(EncodedSchemas {
367            key: proto.key,
368            key_data_type: proto.key_data_type,
369            val: proto.val,
370            val_data_type: proto.val_data_type,
371        })
372    }
373}
374
375impl RustType<String> for IdempotencyToken {
376    fn into_proto(&self) -> String {
377        self.to_string()
378    }
379
380    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
381        match proto.parse() {
382            Ok(x) => Ok(x),
383            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
384        }
385    }
386}
387
388impl RustType<String> for PartialBatchKey {
389    fn into_proto(&self) -> String {
390        self.0.clone()
391    }
392
393    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
394        Ok(PartialBatchKey(proto))
395    }
396}
397
398impl RustType<String> for PartialRollupKey {
399    fn into_proto(&self) -> String {
400        self.0.clone()
401    }
402
403    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
404        Ok(PartialRollupKey(proto))
405    }
406}
407
408impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
409    pub fn encode<B>(&self, buf: &mut B)
410    where
411        B: bytes::BufMut,
412    {
413        self.into_proto()
414            .encode(buf)
415            .expect("no required fields means no initialization errors");
416    }
417
418    pub fn decode(build_version: &Version, buf: Bytes) -> Self {
419        let proto = ProtoStateDiff::decode(buf)
420            // We received a State that we couldn't decode. This could happen if
421            // persist messes up backward/forward compatibility, if the durable
422            // data was corrupted, or if operations messes up deployment. In any
423            // case, fail loudly.
424            .expect("internal error: invalid encoded state");
425        let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
426        assert_code_can_read_data(build_version, &diff.applier_version);
427        diff
428    }
429}
430
431impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
432    fn into_proto(&self) -> ProtoStateDiff {
433        // Deconstruct self so we get a compile failure if new fields are added.
434        let StateDiff {
435            applier_version,
436            seqno_from,
437            seqno_to,
438            walltime_ms,
439            latest_rollup_key,
440            rollups,
441            active_rollup,
442            active_gc,
443            hostname,
444            last_gc_req,
445            leased_readers,
446            critical_readers,
447            writers,
448            schemas,
449            since,
450            legacy_batches,
451            hollow_batches,
452            spine_batches,
453            merges,
454        } = self;
455
456        let proto = ProtoStateFieldDiffs::default();
457
458        // Create a writer so we can efficiently encode our data.
459        let mut writer = proto.into_writer();
460
461        field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
462        field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
463        field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
464        field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
465        field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
466        field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
467        field_diffs_into_proto(
468            ProtoStateField::CriticalReaders,
469            critical_readers,
470            &mut writer,
471        );
472        field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
473        field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
474        field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
475        field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
476        field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
477        field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
478        field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
479
480        // After encoding all of our data, convert back into the proto.
481        let field_diffs = writer.into_proto();
482
483        debug_assert_eq!(field_diffs.validate(), Ok(()));
484        ProtoStateDiff {
485            applier_version: applier_version.to_string(),
486            seqno_from: seqno_from.into_proto(),
487            seqno_to: seqno_to.into_proto(),
488            walltime_ms: walltime_ms.into_proto(),
489            latest_rollup_key: latest_rollup_key.into_proto(),
490            field_diffs: Some(field_diffs),
491        }
492    }
493
494    fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
495        let applier_version = if proto.applier_version.is_empty() {
496            // Backward compatibility with versions of ProtoState before we set
497            // this field: if it's missing (empty), assume an infinitely old
498            // version.
499            semver::Version::new(0, 0, 0)
500        } else {
501            semver::Version::parse(&proto.applier_version).map_err(|err| {
502                TryFromProtoError::InvalidSemverVersion(format!(
503                    "invalid applier_version {}: {}",
504                    proto.applier_version, err
505                ))
506            })?
507        };
508        let mut state_diff = StateDiff::new(
509            applier_version,
510            proto.seqno_from.into_rust()?,
511            proto.seqno_to.into_rust()?,
512            proto.walltime_ms,
513            proto.latest_rollup_key.into_rust()?,
514        );
515        if let Some(field_diffs) = proto.field_diffs {
516            debug_assert_eq!(field_diffs.validate(), Ok(()));
517            for field_diff in field_diffs.iter() {
518                let (field, diff) = field_diff?;
519                match field {
520                    ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
521                        diff,
522                        &mut state_diff.hostname,
523                        |()| Ok(()),
524                        |v| v.into_rust(),
525                    )?,
526                    ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
527                        diff,
528                        &mut state_diff.last_gc_req,
529                        |()| Ok(()),
530                        |v| v.into_rust(),
531                    )?,
532                    ProtoStateField::ActiveGc => {
533                        field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
534                            diff,
535                            &mut state_diff.active_gc,
536                            |()| Ok(()),
537                            |v| v.into_rust(),
538                        )?
539                    }
540                    ProtoStateField::ActiveRollup => {
541                        field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
542                            diff,
543                            &mut state_diff.active_rollup,
544                            |()| Ok(()),
545                            |v| v.into_rust(),
546                        )?
547                    }
548                    ProtoStateField::Rollups => {
549                        field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
550                            diff,
551                            &mut state_diff.rollups,
552                            |k| k.into_rust(),
553                            |v| v.into_rust(),
554                        )?
555                    }
556                    // MIGRATION: We previously stored rollups as a `SeqNo ->
557                    // string Key` map, but now the value is a `struct
558                    // HollowRollup`.
559                    ProtoStateField::DeprecatedRollups => {
560                        field_diff_into_rust::<u64, String, _, _, _, _>(
561                            diff,
562                            &mut state_diff.rollups,
563                            |k| k.into_rust(),
564                            |v| {
565                                Ok(HollowRollup {
566                                    key: v.into_rust()?,
567                                    encoded_size_bytes: None,
568                                })
569                            },
570                        )?
571                    }
572                    ProtoStateField::LeasedReaders => {
573                        field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
574                            diff,
575                            &mut state_diff.leased_readers,
576                            |k| k.into_rust(),
577                            |v| v.into_rust(),
578                        )?
579                    }
580                    ProtoStateField::CriticalReaders => {
581                        field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
582                            diff,
583                            &mut state_diff.critical_readers,
584                            |k| k.into_rust(),
585                            |v| v.into_rust(),
586                        )?
587                    }
588                    ProtoStateField::Writers => {
589                        field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
590                            diff,
591                            &mut state_diff.writers,
592                            |k| k.into_rust(),
593                            |v| v.into_rust(),
594                        )?
595                    }
596                    ProtoStateField::Schemas => {
597                        field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
598                            diff,
599                            &mut state_diff.schemas,
600                            |k| k.into_rust(),
601                            |v| v.into_rust(),
602                        )?
603                    }
604                    ProtoStateField::Since => {
605                        field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
606                            diff,
607                            &mut state_diff.since,
608                            |()| Ok(()),
609                            |v| v.into_rust(),
610                        )?
611                    }
612                    ProtoStateField::LegacyBatches => {
613                        field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
614                            diff,
615                            &mut state_diff.legacy_batches,
616                            |k| k.into_rust(),
617                            |()| Ok(()),
618                        )?
619                    }
620                    ProtoStateField::HollowBatches => {
621                        field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
622                            diff,
623                            &mut state_diff.hollow_batches,
624                            |k| k.into_rust(),
625                            |v| v.into_rust(),
626                        )?
627                    }
628                    ProtoStateField::SpineBatches => {
629                        field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
630                            diff,
631                            &mut state_diff.spine_batches,
632                            |k| k.into_rust(),
633                            |v| v.into_rust(),
634                        )?
635                    }
636                    ProtoStateField::SpineMerges => {
637                        field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
638                            diff,
639                            &mut state_diff.merges,
640                            |k| k.into_rust(),
641                            |v| v.into_rust(),
642                        )?
643                    }
644                }
645            }
646        }
647        Ok(state_diff)
648    }
649}
650
651fn field_diffs_into_proto<K, KP, V, VP>(
652    field: ProtoStateField,
653    diffs: &[StateFieldDiff<K, V>],
654    writer: &mut ProtoStateFieldDiffsWriter,
655) where
656    KP: prost::Message,
657    K: RustType<KP>,
658    VP: prost::Message,
659    V: RustType<VP>,
660{
661    for diff in diffs.iter() {
662        field_diff_into_proto(field, diff, writer);
663    }
664}
665
666fn field_diff_into_proto<K, KP, V, VP>(
667    field: ProtoStateField,
668    diff: &StateFieldDiff<K, V>,
669    writer: &mut ProtoStateFieldDiffsWriter,
670) where
671    KP: prost::Message,
672    K: RustType<KP>,
673    VP: prost::Message,
674    V: RustType<VP>,
675{
676    writer.push_field(field);
677    writer.encode_proto(&diff.key.into_proto());
678    match &diff.val {
679        StateFieldValDiff::Insert(to) => {
680            writer.push_diff_type(ProtoStateFieldDiffType::Insert);
681            writer.encode_proto(&to.into_proto());
682        }
683        StateFieldValDiff::Update(from, to) => {
684            writer.push_diff_type(ProtoStateFieldDiffType::Update);
685            writer.encode_proto(&from.into_proto());
686            writer.encode_proto(&to.into_proto());
687        }
688        StateFieldValDiff::Delete(from) => {
689            writer.push_diff_type(ProtoStateFieldDiffType::Delete);
690            writer.encode_proto(&from.into_proto());
691        }
692    };
693}
694
695fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
696    proto: ProtoStateFieldDiff<'_>,
697    diffs: &mut Vec<StateFieldDiff<K, V>>,
698    k_fn: KFn,
699    v_fn: VFn,
700) -> Result<(), TryFromProtoError>
701where
702    KP: prost::Message + Default,
703    VP: prost::Message + Default,
704    KFn: Fn(KP) -> Result<K, TryFromProtoError>,
705    VFn: Fn(VP) -> Result<V, TryFromProtoError>,
706{
707    let val = match proto.diff_type {
708        ProtoStateFieldDiffType::Insert => {
709            let to = VP::decode(proto.to)
710                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
711            StateFieldValDiff::Insert(v_fn(to)?)
712        }
713        ProtoStateFieldDiffType::Update => {
714            let from = VP::decode(proto.from)
715                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
716            let to = VP::decode(proto.to)
717                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
718
719            StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
720        }
721        ProtoStateFieldDiffType::Delete => {
722            let from = VP::decode(proto.from)
723                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
724            StateFieldValDiff::Delete(v_fn(from)?)
725        }
726    };
727    let key = KP::decode(proto.key)
728        .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
729    diffs.push(StateFieldDiff {
730        key: k_fn(key)?,
731        val,
732    });
733    Ok(())
734}
735
736/// The decoded state of [ProtoRollup] for which we have not yet checked
737/// that codecs match the ones in durable state.
738#[derive(Debug)]
739#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
740pub struct UntypedState<T> {
741    pub(crate) key_codec: String,
742    pub(crate) val_codec: String,
743    pub(crate) ts_codec: String,
744    pub(crate) diff_codec: String,
745
746    // Important! This T has not been validated, so we can't expose anything in
747    // State that references T until one of the check methods have been called.
748    // Any field on State that doesn't reference T is fair game.
749    state: State<T>,
750}
751
752impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
753    pub fn seqno(&self) -> SeqNo {
754        self.state.seqno
755    }
756
757    pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
758        &self.state.collections.rollups
759    }
760
761    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
762        self.state.latest_rollup()
763    }
764
765    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
766        &mut self,
767        cfg: &PersistConfig,
768        metrics: &Metrics,
769        diffs: I,
770    ) {
771        // The apply_encoded_diffs might panic if T is not correct. Making this
772        // a silent no-op is far too subtle for my taste, but it's not clear
773        // what else we could do instead.
774        if T::codec_name() != self.ts_codec {
775            return;
776        }
777        self.state.apply_encoded_diffs(cfg, metrics, diffs);
778    }
779
780    pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
781        self,
782        shard_id: &ShardId,
783    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
784        // Also defensively check that the shard_id on the state we fetched
785        // matches the shard_id we were trying to fetch.
786        assert_eq!(shard_id, &self.state.shard_id);
787        if K::codec_name() != self.key_codec
788            || V::codec_name() != self.val_codec
789            || T::codec_name() != self.ts_codec
790            || D::codec_name() != self.diff_codec
791        {
792            return Err(Box::new(CodecMismatch {
793                requested: (
794                    K::codec_name(),
795                    V::codec_name(),
796                    T::codec_name(),
797                    D::codec_name(),
798                    None,
799                ),
800                actual: (
801                    self.key_codec,
802                    self.val_codec,
803                    self.ts_codec,
804                    self.diff_codec,
805                    None,
806                ),
807            }));
808        }
809        Ok(TypedState {
810            state: self.state,
811            _phantom: PhantomData,
812        })
813    }
814
815    pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
816        // Also defensively check that the shard_id on the state we fetched
817        // matches the shard_id we were trying to fetch.
818        assert_eq!(shard_id, &self.state.shard_id);
819        if T::codec_name() != self.ts_codec {
820            return Err(CodecMismatchT {
821                requested: T::codec_name(),
822                actual: self.ts_codec,
823            });
824        }
825        Ok(self.state)
826    }
827
828    pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
829        let proto = ProtoRollup::decode(buf)
830            // We received a State that we couldn't decode. This could happen if
831            // persist messes up backward/forward compatibility, if the durable
832            // data was corrupted, or if operations messes up deployment. In any
833            // case, fail loudly.
834            .expect("internal error: invalid encoded state");
835        let state = Rollup::from_proto(proto)
836            .expect("internal error: invalid encoded state")
837            .state;
838        assert_code_can_read_data(build_version, &state.state.collections.version);
839        state
840    }
841}
842
843impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
844where
845    K: Codec,
846    V: Codec,
847    T: Codec64,
848    D: Codec64,
849{
850    fn from(typed_state: TypedState<K, V, T, D>) -> Self {
851        UntypedState {
852            key_codec: K::codec_name(),
853            val_codec: V::codec_name(),
854            ts_codec: T::codec_name(),
855            diff_codec: D::codec_name(),
856            state: typed_state.state,
857        }
858    }
859}
860
861/// A struct that maps 1:1 with ProtoRollup.
862///
863/// Contains State, and optionally the diffs between (state.latest_rollup.seqno, state.seqno]
864///
865/// `diffs` is always expected to be `Some` when writing new rollups, but may be `None`
866/// when deserializing rollups that were persisted before we started inlining diffs.
867#[derive(Debug)]
868pub struct Rollup<T> {
869    pub(crate) state: UntypedState<T>,
870    pub(crate) diffs: Option<InlinedDiffs>,
871}
872
873impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
874    /// Creates a `StateRollup` from a state and diffs from its last rollup.
875    ///
876    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
877    pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
878        let latest_rollup_seqno = *state.latest_rollup().0;
879        let mut verify_seqno = latest_rollup_seqno;
880        for diff in &diffs {
881            assert_eq!(verify_seqno.next(), diff.seqno);
882            verify_seqno = diff.seqno;
883        }
884        assert_eq!(verify_seqno, state.seqno());
885
886        let diffs = Some(InlinedDiffs::from(
887            latest_rollup_seqno.next(),
888            state.seqno().next(),
889            diffs,
890        ));
891
892        Self { state, diffs }
893    }
894
895    pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
896        Self { state, diffs: None }
897    }
898
899    pub(crate) fn from_state_without_diffs(
900        state: State<T>,
901        key_codec: String,
902        val_codec: String,
903        ts_codec: String,
904        diff_codec: String,
905    ) -> Self {
906        Self::from_untyped_state_without_diffs(UntypedState {
907            key_codec,
908            val_codec,
909            ts_codec,
910            diff_codec,
911            state,
912        })
913    }
914}
915
916#[derive(Debug)]
917pub(crate) struct InlinedDiffs {
918    pub(crate) lower: SeqNo,
919    pub(crate) upper: SeqNo,
920    pub(crate) diffs: Vec<VersionedData>,
921}
922
923impl InlinedDiffs {
924    pub(crate) fn description(&self) -> Description<SeqNo> {
925        Description::new(
926            Antichain::from_elem(self.lower),
927            Antichain::from_elem(self.upper),
928            Antichain::from_elem(SeqNo::minimum()),
929        )
930    }
931
932    fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
933        for diff in &diffs {
934            assert!(diff.seqno >= lower);
935            assert!(diff.seqno < upper);
936        }
937        Self {
938            lower,
939            upper,
940            diffs,
941        }
942    }
943}
944
945impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
946    fn into_proto(&self) -> ProtoInlinedDiffs {
947        ProtoInlinedDiffs {
948            lower: self.lower.into_proto(),
949            upper: self.upper.into_proto(),
950            diffs: self.diffs.into_proto(),
951        }
952    }
953
954    fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
955        Ok(Self {
956            lower: proto.lower.into_rust()?,
957            upper: proto.upper.into_rust()?,
958            diffs: proto.diffs.into_rust()?,
959        })
960    }
961}
962
963impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
964    fn into_proto(&self) -> ProtoRollup {
965        ProtoRollup {
966            applier_version: self.state.state.collections.version.to_string(),
967            shard_id: self.state.state.shard_id.into_proto(),
968            seqno: self.state.state.seqno.into_proto(),
969            walltime_ms: self.state.state.walltime_ms.into_proto(),
970            hostname: self.state.state.hostname.into_proto(),
971            key_codec: self.state.key_codec.into_proto(),
972            val_codec: self.state.val_codec.into_proto(),
973            ts_codec: T::codec_name(),
974            diff_codec: self.state.diff_codec.into_proto(),
975            last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
976            active_rollup: self.state.state.collections.active_rollup.into_proto(),
977            active_gc: self.state.state.collections.active_gc.into_proto(),
978            rollups: self
979                .state
980                .state
981                .collections
982                .rollups
983                .iter()
984                .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
985                .collect(),
986            deprecated_rollups: Default::default(),
987            leased_readers: self
988                .state
989                .state
990                .collections
991                .leased_readers
992                .iter()
993                .map(|(id, state)| (id.into_proto(), state.into_proto()))
994                .collect(),
995            critical_readers: self
996                .state
997                .state
998                .collections
999                .critical_readers
1000                .iter()
1001                .map(|(id, state)| (id.into_proto(), state.into_proto()))
1002                .collect(),
1003            writers: self
1004                .state
1005                .state
1006                .collections
1007                .writers
1008                .iter()
1009                .map(|(id, state)| (id.into_proto(), state.into_proto()))
1010                .collect(),
1011            schemas: self
1012                .state
1013                .state
1014                .collections
1015                .schemas
1016                .iter()
1017                .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
1018                .collect(),
1019            trace: Some(self.state.state.collections.trace.into_proto()),
1020            diffs: self.diffs.as_ref().map(|x| x.into_proto()),
1021        }
1022    }
1023
1024    fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
1025        let applier_version = if x.applier_version.is_empty() {
1026            // Backward compatibility with versions of ProtoState before we set
1027            // this field: if it's missing (empty), assume an infinitely old
1028            // version.
1029            semver::Version::new(0, 0, 0)
1030        } else {
1031            semver::Version::parse(&x.applier_version).map_err(|err| {
1032                TryFromProtoError::InvalidSemverVersion(format!(
1033                    "invalid applier_version {}: {}",
1034                    x.applier_version, err
1035                ))
1036            })?
1037        };
1038
1039        let mut rollups = BTreeMap::new();
1040        for (seqno, rollup) in x.rollups {
1041            rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
1042        }
1043        for (seqno, key) in x.deprecated_rollups {
1044            rollups.insert(
1045                seqno.into_rust()?,
1046                HollowRollup {
1047                    key: key.into_rust()?,
1048                    encoded_size_bytes: None,
1049                },
1050            );
1051        }
1052        let mut leased_readers = BTreeMap::new();
1053        for (id, state) in x.leased_readers {
1054            leased_readers.insert(id.into_rust()?, state.into_rust()?);
1055        }
1056        let mut critical_readers = BTreeMap::new();
1057        for (id, state) in x.critical_readers {
1058            critical_readers.insert(id.into_rust()?, state.into_rust()?);
1059        }
1060        let mut writers = BTreeMap::new();
1061        for (id, state) in x.writers {
1062            writers.insert(id.into_rust()?, state.into_rust()?);
1063        }
1064        let mut schemas = BTreeMap::new();
1065        for (id, x) in x.schemas {
1066            schemas.insert(id.into_rust()?, x.into_rust()?);
1067        }
1068        let active_rollup = x
1069            .active_rollup
1070            .map(|rollup| rollup.into_rust())
1071            .transpose()?;
1072        let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
1073        let collections = StateCollections {
1074            version: applier_version.clone(),
1075            rollups,
1076            active_rollup,
1077            active_gc,
1078            last_gc_req: x.last_gc_req.into_rust()?,
1079            leased_readers,
1080            critical_readers,
1081            writers,
1082            schemas,
1083            trace: x.trace.into_rust_if_some("trace")?,
1084        };
1085        let state = State {
1086            shard_id: x.shard_id.into_rust()?,
1087            seqno: x.seqno.into_rust()?,
1088            walltime_ms: x.walltime_ms,
1089            hostname: x.hostname,
1090            collections,
1091        };
1092
1093        let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
1094        if let Some(diffs) = &diffs {
1095            if diffs.lower != state.latest_rollup().0.next() {
1096                return Err(TryFromProtoError::InvalidPersistState(format!(
1097                    "diffs lower ({}) should match latest rollup's successor: ({})",
1098                    diffs.lower,
1099                    state.latest_rollup().0.next()
1100                )));
1101            }
1102            if diffs.upper != state.seqno.next() {
1103                return Err(TryFromProtoError::InvalidPersistState(format!(
1104                    "diffs upper ({}) should match state's successor: ({})",
1105                    diffs.lower,
1106                    state.seqno.next()
1107                )));
1108            }
1109        }
1110
1111        Ok(Rollup {
1112            state: UntypedState {
1113                state,
1114                key_codec: x.key_codec.into_rust()?,
1115                val_codec: x.val_codec.into_rust()?,
1116                ts_codec: x.ts_codec.into_rust()?,
1117                diff_codec: x.diff_codec.into_rust()?,
1118            },
1119            diffs,
1120        })
1121    }
1122}
1123
1124impl RustType<ProtoVersionedData> for VersionedData {
1125    fn into_proto(&self) -> ProtoVersionedData {
1126        ProtoVersionedData {
1127            seqno: self.seqno.into_proto(),
1128            data: Bytes::clone(&self.data),
1129        }
1130    }
1131
1132    fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1133        Ok(Self {
1134            seqno: proto.seqno.into_rust()?,
1135            data: proto.data,
1136        })
1137    }
1138}
1139
1140impl RustType<ProtoSpineId> for SpineId {
1141    fn into_proto(&self) -> ProtoSpineId {
1142        ProtoSpineId {
1143            lo: self.0.into_proto(),
1144            hi: self.1.into_proto(),
1145        }
1146    }
1147
1148    fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1149        Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1150    }
1151}
1152
1153impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1154    fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1155        let (id, batch) = entry;
1156        ProtoIdHollowBatch {
1157            id: Some(id.into_proto()),
1158            batch: Some(batch.into_proto()),
1159        }
1160    }
1161
1162    fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1163        let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1164        let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1165        Ok((id, batch))
1166    }
1167}
1168
1169impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1170    fn into_proto(&self) -> ProtoSpineBatch {
1171        ProtoSpineBatch {
1172            desc: Some(self.desc.into_proto()),
1173            parts: self.parts.into_proto(),
1174            level: self.level.into_proto(),
1175            descs: self.descs.into_proto(),
1176        }
1177    }
1178
1179    fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1180        let level = proto.level.into_rust()?;
1181        let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1182        let parts = proto.parts.into_rust()?;
1183        let descs = proto.descs.into_rust()?;
1184        Ok(ThinSpineBatch {
1185            level,
1186            desc,
1187            parts,
1188            descs,
1189        })
1190    }
1191}
1192
1193impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1194    fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1195        let (id, batch) = entry;
1196        ProtoIdSpineBatch {
1197            id: Some(id.into_proto()),
1198            batch: Some(batch.into_proto()),
1199        }
1200    }
1201
1202    fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1203        let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1204        let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1205        Ok((id, batch))
1206    }
1207}
1208
1209impl RustType<ProtoCompaction> for ActiveCompaction {
1210    fn into_proto(&self) -> ProtoCompaction {
1211        ProtoCompaction {
1212            start_ms: self.start_ms,
1213        }
1214    }
1215
1216    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1217        Ok(Self {
1218            start_ms: proto.start_ms,
1219        })
1220    }
1221}
1222
1223impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1224    fn into_proto(&self) -> ProtoMerge {
1225        ProtoMerge {
1226            since: Some(self.since.into_proto()),
1227            remaining_work: self.remaining_work.into_proto(),
1228            active_compaction: self.active_compaction.into_proto(),
1229        }
1230    }
1231
1232    fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1233        let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1234        let remaining_work = proto.remaining_work.into_rust()?;
1235        let active_compaction = proto.active_compaction.into_rust()?;
1236        Ok(Self {
1237            since,
1238            remaining_work,
1239            active_compaction,
1240        })
1241    }
1242}
1243
1244impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1245    fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1246        ProtoIdMerge {
1247            id: Some(id.into_proto()),
1248            merge: Some(merge.into_proto()),
1249        }
1250    }
1251
1252    fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1253        let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1254        let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1255        Ok((id, merge))
1256    }
1257}
1258
1259impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1260    fn into_proto(&self) -> ProtoTrace {
1261        let since = self.since.into_proto();
1262        let legacy_batches = self
1263            .legacy_batches
1264            .iter()
1265            .map(|(b, _)| b.into_proto())
1266            .collect();
1267        let hollow_batches = self.hollow_batches.into_proto();
1268        let spine_batches = self.spine_batches.into_proto();
1269        let merges = self.merges.into_proto();
1270        ProtoTrace {
1271            since: Some(since),
1272            legacy_batches,
1273            hollow_batches,
1274            spine_batches,
1275            merges,
1276        }
1277    }
1278
1279    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1280        let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1281        let legacy_batches = proto
1282            .legacy_batches
1283            .into_iter()
1284            .map(|b| b.into_rust().map(|b| (b, ())))
1285            .collect::<Result<_, _>>()?;
1286        let hollow_batches = proto.hollow_batches.into_rust()?;
1287        let spine_batches = proto.spine_batches.into_rust()?;
1288        let merges = proto.merges.into_rust()?;
1289        Ok(FlatTrace {
1290            since,
1291            legacy_batches,
1292            hollow_batches,
1293            spine_batches,
1294            merges,
1295        })
1296    }
1297}
1298
1299impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1300    fn into_proto(&self) -> ProtoTrace {
1301        self.flatten().into_proto()
1302    }
1303
1304    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1305        Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1306    }
1307}
1308
1309impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1310    fn into_proto(&self) -> ProtoLeasedReaderState {
1311        ProtoLeasedReaderState {
1312            seqno: self.seqno.into_proto(),
1313            since: Some(self.since.into_proto()),
1314            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1315            lease_duration_ms: self.lease_duration_ms.into_proto(),
1316            debug: Some(self.debug.into_proto()),
1317        }
1318    }
1319
1320    fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1321        let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1322        // MIGRATION: If the lease_duration_ms is empty, then the proto field
1323        // was missing and we need to fill in a default. This would ideally be
1324        // based on the actual value in PersistConfig, but it's only here for a
1325        // short time and this is way easier.
1326        if lease_duration_ms == 0 {
1327            lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1328                .expect("lease duration as millis should fit within u64");
1329        }
1330        // MIGRATION: If debug is empty, then the proto field was missing and we
1331        // need to fill in a default.
1332        let debug = proto.debug.unwrap_or_default().into_rust()?;
1333        Ok(LeasedReaderState {
1334            seqno: proto.seqno.into_rust()?,
1335            since: proto
1336                .since
1337                .into_rust_if_some("ProtoLeasedReaderState::since")?,
1338            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1339            lease_duration_ms,
1340            debug,
1341        })
1342    }
1343}
1344
1345impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1346    fn into_proto(&self) -> ProtoCriticalReaderState {
1347        ProtoCriticalReaderState {
1348            since: Some(self.since.into_proto()),
1349            opaque: i64::from_le_bytes(self.opaque.0),
1350            opaque_codec: self.opaque_codec.clone(),
1351            debug: Some(self.debug.into_proto()),
1352        }
1353    }
1354
1355    fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1356        // MIGRATION: If debug is empty, then the proto field was missing and we
1357        // need to fill in a default.
1358        let debug = proto.debug.unwrap_or_default().into_rust()?;
1359        Ok(CriticalReaderState {
1360            since: proto
1361                .since
1362                .into_rust_if_some("ProtoCriticalReaderState::since")?,
1363            opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1364            opaque_codec: proto.opaque_codec,
1365            debug,
1366        })
1367    }
1368}
1369
1370impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1371    fn into_proto(&self) -> ProtoWriterState {
1372        ProtoWriterState {
1373            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1374            lease_duration_ms: self.lease_duration_ms.into_proto(),
1375            most_recent_write_token: self.most_recent_write_token.into_proto(),
1376            most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1377            debug: Some(self.debug.into_proto()),
1378        }
1379    }
1380
1381    fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1382        // MIGRATION: We didn't originally have most_recent_write_token and
1383        // most_recent_write_upper. Pick values that aren't going to
1384        // accidentally match ones in incoming writes and confuse things. We
1385        // could instead use Option on WriterState but this keeps the backward
1386        // compatibility logic confined to one place.
1387        let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1388            IdempotencyToken::SENTINEL
1389        } else {
1390            proto.most_recent_write_token.into_rust()?
1391        };
1392        let most_recent_write_upper = match proto.most_recent_write_upper {
1393            Some(x) => x.into_rust()?,
1394            None => Antichain::from_elem(T::minimum()),
1395        };
1396        // MIGRATION: If debug is empty, then the proto field was missing and we
1397        // need to fill in a default.
1398        let debug = proto.debug.unwrap_or_default().into_rust()?;
1399        Ok(WriterState {
1400            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1401            lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1402            most_recent_write_token,
1403            most_recent_write_upper,
1404            debug,
1405        })
1406    }
1407}
1408
1409impl RustType<ProtoHandleDebugState> for HandleDebugState {
1410    fn into_proto(&self) -> ProtoHandleDebugState {
1411        ProtoHandleDebugState {
1412            hostname: self.hostname.into_proto(),
1413            purpose: self.purpose.into_proto(),
1414        }
1415    }
1416
1417    fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1418        Ok(HandleDebugState {
1419            hostname: proto.hostname,
1420            purpose: proto.purpose,
1421        })
1422    }
1423}
1424
1425impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1426    fn into_proto(&self) -> ProtoHollowRun {
1427        ProtoHollowRun {
1428            parts: self.parts.into_proto(),
1429        }
1430    }
1431
1432    fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1433        Ok(HollowRun {
1434            parts: proto.parts.into_rust()?,
1435        })
1436    }
1437}
1438
1439impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1440    fn into_proto(&self) -> ProtoHollowBatch {
1441        let mut run_meta = self.run_meta.into_proto();
1442        // For backwards compatibility reasons, don't keep default metadata in the proto.
1443        let run_meta_default = RunMeta::default().into_proto();
1444        while run_meta.last() == Some(&run_meta_default) {
1445            run_meta.pop();
1446        }
1447        ProtoHollowBatch {
1448            desc: Some(self.desc.into_proto()),
1449            parts: self.parts.into_proto(),
1450            len: self.len.into_proto(),
1451            runs: self.run_splits.into_proto(),
1452            run_meta,
1453            deprecated_keys: vec![],
1454        }
1455    }
1456
1457    fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1458        let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1459        // MIGRATION: We used to just have the keys instead of a more structured
1460        // part.
1461        parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1462            RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1463                key: PartialBatchKey(key),
1464                meta: Default::default(),
1465                encoded_size_bytes: 0,
1466                key_lower: vec![],
1467                structured_key_lower: None,
1468                stats: None,
1469                ts_rewrite: None,
1470                diffs_sum: None,
1471                format: None,
1472                schema_id: None,
1473                deprecated_schema_id: None,
1474            }))
1475        }));
1476        // We discard default metadatas from the proto above; re-add them here.
1477        let run_splits: Vec<usize> = proto.runs.into_rust()?;
1478        let num_runs = if parts.is_empty() {
1479            0
1480        } else {
1481            run_splits.len() + 1
1482        };
1483        let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1484        run_meta.resize(num_runs, RunMeta::default());
1485        Ok(HollowBatch {
1486            desc: proto.desc.into_rust_if_some("desc")?,
1487            parts,
1488            len: proto.len.into_rust()?,
1489            run_splits,
1490            run_meta,
1491        })
1492    }
1493}
1494
1495impl RustType<String> for RunId {
1496    fn into_proto(&self) -> String {
1497        self.to_string()
1498    }
1499
1500    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1501        RunId::from_str(&proto).map_err(|_| {
1502            TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1503        })
1504    }
1505}
1506
1507impl RustType<ProtoRunMeta> for RunMeta {
1508    fn into_proto(&self) -> ProtoRunMeta {
1509        let order = match self.order {
1510            None => ProtoRunOrder::Unknown,
1511            Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1512            Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1513            Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1514        };
1515        ProtoRunMeta {
1516            order: order.into(),
1517            schema_id: self.schema.into_proto(),
1518            deprecated_schema_id: self.deprecated_schema.into_proto(),
1519            id: self.id.into_proto(),
1520            len: self.len.into_proto(),
1521            meta: self.meta.into_proto(),
1522        }
1523    }
1524
1525    fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1526        let order = match ProtoRunOrder::try_from(proto.order)? {
1527            ProtoRunOrder::Unknown => None,
1528            ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1529            ProtoRunOrder::Codec => Some(RunOrder::Codec),
1530            ProtoRunOrder::Structured => Some(RunOrder::Structured),
1531        };
1532        Ok(Self {
1533            order,
1534            schema: proto.schema_id.into_rust()?,
1535            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1536            id: proto.id.into_rust()?,
1537            len: proto.len.into_rust()?,
1538            meta: proto.meta.into_rust()?,
1539        })
1540    }
1541}
1542
1543impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1544    fn into_proto(&self) -> ProtoHollowBatchPart {
1545        match self {
1546            RunPart::Single(part) => part.into_proto(),
1547            RunPart::Many(runs) => runs.into_proto(),
1548        }
1549    }
1550
1551    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1552        let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1553            RunPart::Many(proto.into_rust()?)
1554        } else {
1555            RunPart::Single(proto.into_rust()?)
1556        };
1557        Ok(run_part)
1558    }
1559}
1560
1561impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1562    fn into_proto(&self) -> ProtoHollowBatchPart {
1563        let part = ProtoHollowBatchPart {
1564            kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1565                key: self.key.into_proto(),
1566                max_part_bytes: self.max_part_bytes.into_proto(),
1567            })),
1568            encoded_size_bytes: self.hollow_bytes.into_proto(),
1569            key_lower: Bytes::copy_from_slice(&self.key_lower),
1570            diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1571            key_stats: None,
1572            ts_rewrite: None,
1573            format: None,
1574            schema_id: None,
1575            structured_key_lower: self.structured_key_lower.into_proto(),
1576            deprecated_schema_id: None,
1577            metadata: BTreeMap::default(),
1578        };
1579        part
1580    }
1581
1582    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1583        let run_proto = match proto.kind {
1584            Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1585            _ => Err(TryFromProtoError::UnknownEnumVariant(
1586                "ProtoHollowBatchPart::kind".to_string(),
1587            ))?,
1588        };
1589        Ok(Self {
1590            key: run_proto.key.into_rust()?,
1591            hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1592            max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1593            key_lower: proto.key_lower.to_vec(),
1594            structured_key_lower: proto.structured_key_lower.into_rust()?,
1595            diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1596            _phantom_data: Default::default(),
1597        })
1598    }
1599}
1600
1601impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1602    fn into_proto(&self) -> ProtoHollowBatchPart {
1603        match self {
1604            BatchPart::Hollow(x) => ProtoHollowBatchPart {
1605                kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1606                encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1607                key_lower: Bytes::copy_from_slice(&x.key_lower),
1608                structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1609                key_stats: x.stats.into_proto(),
1610                ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1611                diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1612                format: x.format.map(|f| f.into_proto()),
1613                schema_id: x.schema_id.into_proto(),
1614                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1615                metadata: BTreeMap::default(),
1616            },
1617            BatchPart::Inline {
1618                updates,
1619                ts_rewrite,
1620                schema_id,
1621                deprecated_schema_id,
1622            } => ProtoHollowBatchPart {
1623                kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1624                encoded_size_bytes: 0,
1625                key_lower: Bytes::new(),
1626                structured_key_lower: None,
1627                key_stats: None,
1628                ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1629                diffs_sum: None,
1630                format: None,
1631                schema_id: schema_id.into_proto(),
1632                deprecated_schema_id: deprecated_schema_id.into_proto(),
1633                metadata: BTreeMap::default(),
1634            },
1635        }
1636    }
1637
1638    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1639        let ts_rewrite = match proto.ts_rewrite {
1640            Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1641            None => None,
1642        };
1643        let schema_id = proto.schema_id.into_rust()?;
1644        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1645        match proto.kind {
1646            Some(proto_hollow_batch_part::Kind::Key(key)) => {
1647                Ok(BatchPart::Hollow(HollowBatchPart {
1648                    key: key.into_rust()?,
1649                    meta: proto.metadata.into_rust()?,
1650                    encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1651                    key_lower: proto.key_lower.into(),
1652                    structured_key_lower: proto.structured_key_lower.into_rust()?,
1653                    stats: proto.key_stats.into_rust()?,
1654                    ts_rewrite,
1655                    diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1656                    format: proto.format.map(|f| f.into_rust()).transpose()?,
1657                    schema_id,
1658                    deprecated_schema_id,
1659                }))
1660            }
1661            Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1662                assert_eq!(proto.encoded_size_bytes, 0);
1663                assert_eq!(proto.key_lower.len(), 0);
1664                assert_none!(proto.key_stats);
1665                assert_none!(proto.diffs_sum);
1666                let updates = LazyInlineBatchPart(x.into_rust()?);
1667                Ok(BatchPart::Inline {
1668                    updates,
1669                    ts_rewrite,
1670                    schema_id,
1671                    deprecated_schema_id,
1672                })
1673            }
1674            _ => Err(TryFromProtoError::unknown_enum_variant(
1675                "ProtoHollowBatchPart::kind",
1676            )),
1677        }
1678    }
1679}
1680
1681impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1682    fn into_proto(&self) -> proto_hollow_batch_part::Format {
1683        match self {
1684            BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1685            BatchColumnarFormat::Both(version) => {
1686                proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1687            }
1688            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1689        }
1690    }
1691
1692    fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1693        let format = match proto {
1694            proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1695            proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1696                BatchColumnarFormat::Both(version.cast_into())
1697            }
1698            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1699        };
1700        Ok(format)
1701    }
1702}
1703
1704/// Aggregate statistics about data contained in a part.
1705///
1706/// These are "lazy" in the sense that we don't decode them (or even validate
1707/// the encoded version) until they're used.
1708#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1709pub struct LazyPartStats {
1710    key: LazyProto<ProtoStructStats>,
1711}
1712
1713impl Debug for LazyPartStats {
1714    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1715        f.debug_tuple("LazyPartStats")
1716            .field(&self.decode())
1717            .finish()
1718    }
1719}
1720
1721impl LazyPartStats {
1722    pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1723        let PartStats { key } = x;
1724        let mut proto_stats = ProtoStructStats::from_rust(key);
1725        map_proto(&mut proto_stats);
1726        LazyPartStats {
1727            key: LazyProto::from(&proto_stats),
1728        }
1729    }
1730    /// Decodes and returns PartStats from the encoded representation.
1731    ///
1732    /// This does not cache the returned value, it decodes each time it's
1733    /// called.
1734    pub fn decode(&self) -> PartStats {
1735        let key = self.key.decode().expect("valid proto");
1736        PartStats {
1737            key: key.into_rust().expect("valid stats"),
1738        }
1739    }
1740}
1741
1742impl RustType<Bytes> for LazyPartStats {
1743    fn into_proto(&self) -> Bytes {
1744        let LazyPartStats { key } = self;
1745        key.into_proto()
1746    }
1747
1748    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1749        Ok(LazyPartStats {
1750            key: proto.into_rust()?,
1751        })
1752    }
1753}
1754
1755#[cfg(test)]
1756pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1757    proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1758}
1759
1760#[allow(unused_parens)]
1761impl Arbitrary for LazyPartStats {
1762    type Parameters = ();
1763    type Strategy =
1764        proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1765
1766    fn arbitrary_with(_: ()) -> Self::Strategy {
1767        Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1768            LazyPartStats::encode(&x, |_| {})
1769        })
1770    }
1771}
1772
1773impl ProtoInlineBatchPart {
1774    pub(crate) fn into_rust<T: Timestamp + Codec64>(
1775        lgbytes: &ColumnarMetrics,
1776        proto: Self,
1777    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1778        let updates = proto
1779            .updates
1780            .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1781        let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1782
1783        Ok(BlobTraceBatchPart {
1784            desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1785            index: proto.index.into_rust()?,
1786            updates,
1787        })
1788    }
1789}
1790
1791/// A batch part stored inlined in State.
1792#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1793pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1794
1795impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1796    fn from(value: &ProtoInlineBatchPart) -> Self {
1797        LazyInlineBatchPart(value.into())
1798    }
1799}
1800
1801impl Serialize for LazyInlineBatchPart {
1802    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1803        // NB: This serialize impl is only used for QA and debugging, so emit a
1804        // truncated version.
1805        let proto = self.0.decode().expect("valid proto");
1806        let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1807        let () = s.serialize_field("desc", &proto.desc)?;
1808        let () = s.serialize_field("index", &proto.index)?;
1809        let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1810        s.end()
1811    }
1812}
1813
1814impl LazyInlineBatchPart {
1815    pub(crate) fn encoded_size_bytes(&self) -> usize {
1816        self.0.buf.len()
1817    }
1818
1819    /// Decodes and returns a BlobTraceBatchPart from the encoded
1820    /// representation.
1821    ///
1822    /// This does not cache the returned value, it decodes each time it's
1823    /// called.
1824    pub fn decode<T: Timestamp + Codec64>(
1825        &self,
1826        lgbytes: &ColumnarMetrics,
1827    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1828        let proto = self.0.decode().expect("valid proto");
1829        ProtoInlineBatchPart::into_rust(lgbytes, proto)
1830    }
1831}
1832
1833impl RustType<Bytes> for LazyInlineBatchPart {
1834    fn into_proto(&self) -> Bytes {
1835        self.0.into_proto()
1836    }
1837
1838    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1839        Ok(LazyInlineBatchPart(proto.into_rust()?))
1840    }
1841}
1842
1843impl RustType<ProtoHollowRollup> for HollowRollup {
1844    fn into_proto(&self) -> ProtoHollowRollup {
1845        ProtoHollowRollup {
1846            key: self.key.into_proto(),
1847            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1848        }
1849    }
1850
1851    fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1852        Ok(HollowRollup {
1853            key: proto.key.into_rust()?,
1854            encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1855        })
1856    }
1857}
1858
1859impl RustType<ProtoActiveRollup> for ActiveRollup {
1860    fn into_proto(&self) -> ProtoActiveRollup {
1861        ProtoActiveRollup {
1862            start_ms: self.start_ms,
1863            seqno: self.seqno.into_proto(),
1864        }
1865    }
1866
1867    fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1868        Ok(ActiveRollup {
1869            start_ms: proto.start_ms,
1870            seqno: proto.seqno.into_rust()?,
1871        })
1872    }
1873}
1874
1875impl RustType<ProtoActiveGc> for ActiveGc {
1876    fn into_proto(&self) -> ProtoActiveGc {
1877        ProtoActiveGc {
1878            start_ms: self.start_ms,
1879            seqno: self.seqno.into_proto(),
1880        }
1881    }
1882
1883    fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1884        Ok(ActiveGc {
1885            start_ms: proto.start_ms,
1886            seqno: proto.seqno.into_rust()?,
1887        })
1888    }
1889}
1890
1891impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1892    fn into_proto(&self) -> ProtoU64Description {
1893        ProtoU64Description {
1894            lower: Some(self.lower().into_proto()),
1895            upper: Some(self.upper().into_proto()),
1896            since: Some(self.since().into_proto()),
1897        }
1898    }
1899
1900    fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1901        Ok(Description::new(
1902            proto.lower.into_rust_if_some("lower")?,
1903            proto.upper.into_rust_if_some("upper")?,
1904            proto.since.into_rust_if_some("since")?,
1905        ))
1906    }
1907}
1908
1909impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1910    fn into_proto(&self) -> ProtoU64Antichain {
1911        ProtoU64Antichain {
1912            elements: self
1913                .elements()
1914                .iter()
1915                .map(|x| i64::from_le_bytes(T::encode(x)))
1916                .collect(),
1917        }
1918    }
1919
1920    fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1921        let elements = proto
1922            .elements
1923            .iter()
1924            .map(|x| T::decode(x.to_le_bytes()))
1925            .collect::<Vec<_>>();
1926        Ok(Antichain::from(elements))
1927    }
1928}
1929
1930#[cfg(test)]
1931mod tests {
1932    use bytes::Bytes;
1933    use mz_build_info::DUMMY_BUILD_INFO;
1934    use mz_dyncfg::ConfigUpdates;
1935    use mz_ore::assert_err;
1936    use mz_persist::location::SeqNo;
1937    use proptest::prelude::*;
1938
1939    use crate::ShardId;
1940    use crate::internal::paths::PartialRollupKey;
1941    use crate::internal::state::tests::any_state;
1942    use crate::internal::state::{BatchPart, HandleDebugState};
1943    use crate::internal::state_diff::StateDiff;
1944    use crate::tests::new_test_client_cache;
1945
1946    use super::*;
1947
1948    #[mz_ore::test]
1949    fn metadata_map() {
1950        const COUNT: MetadataKey<u64> = MetadataKey::new("count");
1951
1952        let mut map = MetadataMap::default();
1953        map.set(COUNT, 100);
1954        let mut map = MetadataMap::from_proto(map.into_proto()).unwrap();
1955        assert_eq!(map.get(COUNT), Some(100));
1956
1957        const ANTICHAIN: MetadataKey<Antichain<u64>, ProtoU64Antichain> =
1958            MetadataKey::new("antichain");
1959        assert_none!(map.get(ANTICHAIN));
1960
1961        map.set(ANTICHAIN, Antichain::from_elem(30));
1962        let map = MetadataMap::from_proto(map.into_proto()).unwrap();
1963        assert_eq!(map.get(COUNT), Some(100));
1964        assert_eq!(map.get(ANTICHAIN), Some(Antichain::from_elem(30)));
1965    }
1966
1967    #[mz_ore::test]
1968    fn applier_version_state() {
1969        let v1 = semver::Version::new(1, 0, 0);
1970        let v2 = semver::Version::new(2, 0, 0);
1971        let v3 = semver::Version::new(3, 0, 0);
1972
1973        // Code version v2 evaluates and writes out some State.
1974        let shard_id = ShardId::new();
1975        let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1976        let rollup =
1977            Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1978        let mut buf = Vec::new();
1979        rollup.encode(&mut buf).expect("serializable");
1980        let bytes = Bytes::from(buf);
1981
1982        // We can read it back using persist code v2 and v3.
1983        assert_eq!(
1984            UntypedState::<u64>::decode(&v2, bytes.clone())
1985                .check_codecs(&shard_id)
1986                .as_ref(),
1987            Ok(&state)
1988        );
1989        assert_eq!(
1990            UntypedState::<u64>::decode(&v3, bytes.clone())
1991                .check_codecs(&shard_id)
1992                .as_ref(),
1993            Ok(&state)
1994        );
1995
1996        // But we can't read it back using v1 because v1 might corrupt it by
1997        // losing or misinterpreting something written out by a future version
1998        // of code.
1999        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
2000        let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
2001        assert_err!(v1_res);
2002    }
2003
2004    #[mz_ore::test]
2005    fn applier_version_state_diff() {
2006        let v1 = semver::Version::new(1, 0, 0);
2007        let v2 = semver::Version::new(2, 0, 0);
2008        let v3 = semver::Version::new(3, 0, 0);
2009
2010        // Code version v2 evaluates and writes out some State.
2011        let diff = StateDiff::<u64>::new(
2012            v2.clone(),
2013            SeqNo(0),
2014            SeqNo(1),
2015            2,
2016            PartialRollupKey("rollup".into()),
2017        );
2018        let mut buf = Vec::new();
2019        diff.encode(&mut buf);
2020        let bytes = Bytes::from(buf);
2021
2022        // We can read it back using persist code v2 and v3.
2023        assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
2024        assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
2025
2026        // But we can't read it back using v1 because v1 might corrupt it by
2027        // losing or misinterpreting something written out by a future version
2028        // of code.
2029        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
2030        let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
2031        assert_err!(v1_res);
2032    }
2033
2034    #[mz_ore::test]
2035    fn hollow_batch_migration_keys() {
2036        let x = HollowBatch::new_run(
2037            Description::new(
2038                Antichain::from_elem(1u64),
2039                Antichain::from_elem(2u64),
2040                Antichain::from_elem(3u64),
2041            ),
2042            vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2043                key: PartialBatchKey("a".into()),
2044                meta: Default::default(),
2045                encoded_size_bytes: 5,
2046                key_lower: vec![],
2047                structured_key_lower: None,
2048                stats: None,
2049                ts_rewrite: None,
2050                diffs_sum: None,
2051                format: None,
2052                schema_id: None,
2053                deprecated_schema_id: None,
2054            }))],
2055            4,
2056        );
2057        let mut old = x.into_proto();
2058        // Old ProtoHollowBatch had keys instead of parts.
2059        old.deprecated_keys = vec!["b".into()];
2060        // We don't expect to see a ProtoHollowBatch with keys _and_ parts, only
2061        // one or the other, but we have a defined output, so may as well test
2062        // it.
2063        let mut expected = x;
2064        // We fill in 0 for encoded_size_bytes when we migrate from keys. This
2065        // will violate bounded memory usage compaction during the transition
2066        // (short-term issue), but that's better than creating unnecessary runs
2067        // (longer-term issue).
2068        expected
2069            .parts
2070            .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2071                key: PartialBatchKey("b".into()),
2072                meta: Default::default(),
2073                encoded_size_bytes: 0,
2074                key_lower: vec![],
2075                structured_key_lower: None,
2076                stats: None,
2077                ts_rewrite: None,
2078                diffs_sum: None,
2079                format: None,
2080                schema_id: None,
2081                deprecated_schema_id: None,
2082            })));
2083        assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
2084    }
2085
2086    #[mz_ore::test]
2087    fn reader_state_migration_lease_duration() {
2088        let x = LeasedReaderState {
2089            seqno: SeqNo(1),
2090            since: Antichain::from_elem(2u64),
2091            last_heartbeat_timestamp_ms: 3,
2092            debug: HandleDebugState {
2093                hostname: "host".to_owned(),
2094                purpose: "purpose".to_owned(),
2095            },
2096            // Old ProtoReaderState had no lease_duration_ms field
2097            lease_duration_ms: 0,
2098        };
2099        let old = x.into_proto();
2100        let mut expected = x;
2101        // We fill in DEFAULT_READ_LEASE_DURATION for lease_duration_ms when we
2102        // migrate from unset.
2103        expected.lease_duration_ms =
2104            u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
2105        assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
2106    }
2107
2108    #[mz_ore::test]
2109    fn writer_state_migration_most_recent_write() {
2110        let proto = ProtoWriterState {
2111            last_heartbeat_timestamp_ms: 1,
2112            lease_duration_ms: 2,
2113            // Old ProtoWriterState had no most_recent_write_token or
2114            // most_recent_write_upper.
2115            most_recent_write_token: "".into(),
2116            most_recent_write_upper: None,
2117            debug: Some(ProtoHandleDebugState {
2118                hostname: "host".to_owned(),
2119                purpose: "purpose".to_owned(),
2120            }),
2121        };
2122        let expected = WriterState {
2123            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
2124            lease_duration_ms: proto.lease_duration_ms,
2125            most_recent_write_token: IdempotencyToken::SENTINEL,
2126            most_recent_write_upper: Antichain::from_elem(0),
2127            debug: HandleDebugState {
2128                hostname: "host".to_owned(),
2129                purpose: "purpose".to_owned(),
2130            },
2131        };
2132        assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2133    }
2134
2135    #[mz_ore::test]
2136    fn state_migration_rollups() {
2137        let r1 = HollowRollup {
2138            key: PartialRollupKey("foo".to_owned()),
2139            encoded_size_bytes: None,
2140        };
2141        let r2 = HollowRollup {
2142            key: PartialRollupKey("bar".to_owned()),
2143            encoded_size_bytes: Some(2),
2144        };
2145        let shard_id = ShardId::new();
2146        let mut state = TypedState::<(), (), u64, i64>::new(
2147            DUMMY_BUILD_INFO.semver_version(),
2148            shard_id,
2149            "host".to_owned(),
2150            0,
2151        );
2152        state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2153        let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2154
2155        // Manually add the old rollup encoding.
2156        proto.deprecated_rollups.insert(1, r1.key.0.clone());
2157
2158        let state: Rollup<u64> = proto.into_rust().unwrap();
2159        let state = state.state;
2160        let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2161        let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2162        assert_eq!(
2163            state
2164                .state
2165                .collections
2166                .rollups
2167                .into_iter()
2168                .collect::<Vec<_>>(),
2169            expected
2170        );
2171    }
2172
2173    #[mz_persist_proc::test(tokio::test)]
2174    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2175    async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2176        let r1_rollup = HollowRollup {
2177            key: PartialRollupKey("foo".to_owned()),
2178            encoded_size_bytes: None,
2179        };
2180        let r1 = StateFieldDiff {
2181            key: SeqNo(1),
2182            val: StateFieldValDiff::Insert(r1_rollup.clone()),
2183        };
2184        let r2_rollup = HollowRollup {
2185            key: PartialRollupKey("bar".to_owned()),
2186            encoded_size_bytes: Some(2),
2187        };
2188        let r2 = StateFieldDiff {
2189            key: SeqNo(2),
2190            val: StateFieldValDiff::Insert(r2_rollup.clone()),
2191        };
2192        let r3_rollup = HollowRollup {
2193            key: PartialRollupKey("baz".to_owned()),
2194            encoded_size_bytes: None,
2195        };
2196        let r3 = StateFieldDiff {
2197            key: SeqNo(3),
2198            val: StateFieldValDiff::Delete(r3_rollup.clone()),
2199        };
2200        let mut diff = StateDiff::<u64>::new(
2201            DUMMY_BUILD_INFO.semver_version(),
2202            SeqNo(4),
2203            SeqNo(5),
2204            0,
2205            PartialRollupKey("ignored".to_owned()),
2206        );
2207        diff.rollups.push(r2.clone());
2208        diff.rollups.push(r3.clone());
2209        let mut diff_proto = diff.into_proto();
2210
2211        let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2212        let mut field_diffs_writer = field_diffs.into_writer();
2213
2214        // Manually add the old rollup encoding.
2215        field_diffs_into_proto(
2216            ProtoStateField::DeprecatedRollups,
2217            &[StateFieldDiff {
2218                key: r1.key,
2219                val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2220            }],
2221            &mut field_diffs_writer,
2222        );
2223
2224        assert_none!(diff_proto.field_diffs);
2225        diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2226
2227        let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2228        assert_eq!(
2229            diff.rollups.into_iter().collect::<Vec<_>>(),
2230            vec![r2, r3, r1]
2231        );
2232
2233        // Also make sure that a rollup delete in a diff applies cleanly to a
2234        // state that had it in the deprecated field.
2235        let shard_id = ShardId::new();
2236        let mut state = TypedState::<(), (), u64, i64>::new(
2237            DUMMY_BUILD_INFO.semver_version(),
2238            shard_id,
2239            "host".to_owned(),
2240            0,
2241        );
2242        state.state.seqno = SeqNo(4);
2243        let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2244        rollup
2245            .deprecated_rollups
2246            .insert(3, r3_rollup.key.into_proto());
2247        let state: Rollup<u64> = rollup.into_rust().unwrap();
2248        let state = state.state;
2249        let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2250        let cache = new_test_client_cache(&dyncfgs);
2251        let encoded_diff = VersionedData {
2252            seqno: SeqNo(5),
2253            data: diff_proto.encode_to_vec().into(),
2254        };
2255        state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2256        assert_eq!(
2257            state
2258                .state
2259                .collections
2260                .rollups
2261                .into_iter()
2262                .collect::<Vec<_>>(),
2263            vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2264        );
2265    }
2266
2267    #[mz_ore::test]
2268    #[cfg_attr(miri, ignore)] // too slow
2269    fn state_proto_roundtrip() {
2270        fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2271            let before = UntypedState {
2272                key_codec: <() as Codec>::codec_name(),
2273                val_codec: <() as Codec>::codec_name(),
2274                ts_codec: <T as Codec64>::codec_name(),
2275                diff_codec: <i64 as Codec64>::codec_name(),
2276                state,
2277            };
2278            let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2279            let after: Rollup<T> = proto.into_rust().unwrap();
2280            let after = after.state;
2281            assert_eq!(before, after);
2282        }
2283
2284        proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2285    }
2286
2287    #[mz_ore::test]
2288    fn check_data_versions() {
2289        #[track_caller]
2290        fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2291            let code = Version::parse(code).unwrap();
2292            let data = Version::parse(data).unwrap();
2293            #[allow(clippy::disallowed_methods)]
2294            let actual = cfg::code_can_write_data(&code, &data)
2295                .then_some(())
2296                .ok_or(());
2297            assert_eq!(actual, expected, "data at {data} read by code {code}");
2298        }
2299
2300        testcase("0.160.0-dev", "0.160.0-dev", Ok(()));
2301        testcase("0.160.0-dev", "0.160.0", Err(()));
2302        // Note: Probably useful to let tests use two arbitrary shas on main, at
2303        // the very least for things like git bisect.
2304        testcase("0.160.0-dev", "0.161.0-dev", Err(()));
2305        testcase("0.160.0-dev", "0.161.0", Err(()));
2306        testcase("0.160.0-dev", "0.162.0-dev", Err(()));
2307        testcase("0.160.0-dev", "0.162.0", Err(()));
2308        testcase("0.160.0-dev", "0.163.0-dev", Err(()));
2309
2310        testcase("0.160.0", "0.158.0-dev", Ok(()));
2311        testcase("0.160.0", "0.158.0", Ok(()));
2312        testcase("0.160.0", "0.159.0-dev", Ok(()));
2313        testcase("0.160.0", "0.159.0", Ok(()));
2314        testcase("0.160.0", "0.160.0-dev", Ok(()));
2315        testcase("0.160.0", "0.160.0", Ok(()));
2316
2317        testcase("0.160.0", "0.161.0-dev", Err(()));
2318        testcase("0.160.0", "0.161.0", Err(()));
2319        testcase("0.160.0", "0.161.1", Err(()));
2320        testcase("0.160.0", "0.161.1000000", Err(()));
2321        testcase("0.160.0", "0.162.0-dev", Err(()));
2322        testcase("0.160.0", "0.162.0", Err(()));
2323        testcase("0.160.0", "0.163.0-dev", Err(()));
2324
2325        testcase("0.160.1", "0.159.0", Ok(()));
2326        testcase("0.160.1", "0.160.0", Ok(()));
2327        testcase("0.160.1", "0.161.0", Err(()));
2328        testcase("0.160.1", "0.161.1", Err(()));
2329        testcase("0.160.1", "0.161.100", Err(()));
2330        testcase("0.160.0", "0.160.1", Err(()));
2331
2332        testcase("0.160.1", "26.0.0", Err(()));
2333        testcase("26.0.0", "0.160.1", Ok(()));
2334        testcase("26.2.0", "0.160.1", Ok(()));
2335        testcase("26.200.200", "0.160.1", Ok(()));
2336
2337        testcase("27.0.0", "0.160.1", Err(()));
2338        testcase("27.0.0", "0.16000.1", Err(()));
2339        testcase("27.0.0", "26.0.1", Ok(()));
2340        testcase("27.1000.100", "26.0.1", Ok(()));
2341        testcase("28.0.0", "26.0.1", Err(()));
2342        testcase("28.0.0", "26.1000.1", Err(()));
2343        testcase("28.0.0", "27.0.0", Ok(()));
2344    }
2345}