Skip to main content

mz_persist_client/internal/
encoding.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt, soft_panic_or_log};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize, Serializer};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::{CriticalReaderId, Opaque};
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44    ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45    HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46    LeasedReaderState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction, ProtoCriticalReaderState,
47    ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch, ProtoHollowBatchPart,
48    ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch, ProtoIdMerge,
49    ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs, ProtoLeasedReaderState, ProtoMerge,
50    ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch, ProtoSpineId, ProtoStateDiff,
51    ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, ProtoTrace, ProtoU64Antichain,
52    ProtoU64Description, ProtoVersionedData, ProtoWriterState, RunId, RunMeta, RunOrder, RunPart,
53    State, StateCollections, TypedState, WriterState, proto_hollow_batch_part,
54};
55use crate::internal::state_diff::{
56    ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
57};
58use crate::internal::trace::{
59    ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
60};
61use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
62use crate::{PersistConfig, ShardId, WriterId, cfg};
63
64/// A key and value `Schema` of data written to a batch or shard.
65#[derive(Debug)]
66pub struct Schemas<K: Codec, V: Codec> {
67    /// Id under which this schema is registered in the shard's schema registry,
68    /// if any.
69    pub id: Option<SchemaId>,
70    /// Key `Schema`.
71    pub key: Arc<K::Schema>,
72    /// Value `Schema`.
73    pub val: Arc<V::Schema>,
74}
75
76impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
77    fn clone(&self) -> Self {
78        Self {
79            id: self.id,
80            key: Arc::clone(&self.key),
81            val: Arc::clone(&self.val),
82        }
83    }
84}
85
86/// A proto that is decoded on use.
87///
88/// Because of the way prost works, decoding a large protobuf may result in a
89/// number of very short lived allocations in our RustType/ProtoType decode path
90/// (e.g. this happens for a repeated embedded message). Not every use of
91/// persist State needs every transitive bit of it to be decoded, so we opt
92/// certain parts of it (initially stats) to be decoded on use.
93///
94/// This has the dual benefit of only paying for the short-lived allocs when
95/// necessary and also allowing decoding to be gated by a feature flag. The
96/// tradeoffs are that we might decode more than once and that we have to handle
97/// invalid proto errors in more places.
98///
99/// Mechanically, this is accomplished by making the field a proto `bytes` types
100/// instead of `ProtoFoo`. These bytes then contain the serialization of
101/// ProtoFoo. NB: Swapping between the two is actually a forward and backward
102/// compatible change.
103///
104/// > Embedded messages are compatible with bytes if the bytes contain an
105/// > encoded version of the message.
106///
107/// (See <https://protobuf.dev/programming-guides/proto3/#updating>)
108#[derive(Clone, Serialize, Deserialize)]
109pub struct LazyProto<T> {
110    buf: Bytes,
111    _phantom: PhantomData<fn() -> T>,
112}
113
114impl<T: Message + Default + Debug> Debug for LazyProto<T> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        match self.decode() {
117            Ok(proto) => Debug::fmt(&proto, f),
118            Err(err) => f
119                .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
120                .field("err", &err)
121                .finish(),
122        }
123    }
124}
125
126impl<T> PartialEq for LazyProto<T> {
127    fn eq(&self, other: &Self) -> bool {
128        self.cmp(other) == Ordering::Equal
129    }
130}
131
132impl<T> Eq for LazyProto<T> {}
133
134impl<T> PartialOrd for LazyProto<T> {
135    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
136        Some(self.cmp(other))
137    }
138}
139
140impl<T> Ord for LazyProto<T> {
141    fn cmp(&self, other: &Self) -> Ordering {
142        let LazyProto {
143            buf: self_buf,
144            _phantom: _,
145        } = self;
146        let LazyProto {
147            buf: other_buf,
148            _phantom: _,
149        } = other;
150        self_buf.cmp(other_buf)
151    }
152}
153
154impl<T> Hash for LazyProto<T> {
155    fn hash<H: Hasher>(&self, state: &mut H) {
156        let LazyProto { buf, _phantom } = self;
157        buf.hash(state);
158    }
159}
160
161impl<T: Message + Default> From<&T> for LazyProto<T> {
162    fn from(value: &T) -> Self {
163        let buf = Bytes::from(value.encode_to_vec());
164        LazyProto {
165            buf,
166            _phantom: PhantomData,
167        }
168    }
169}
170
171impl<T: Message + Default> LazyProto<T> {
172    pub fn decode(&self) -> Result<T, prost::DecodeError> {
173        T::decode(&*self.buf)
174    }
175
176    pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
177        Ok(T::decode(&*self.buf)?.into_rust()?)
178    }
179}
180
181impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
182    fn into_proto(&self) -> Bytes {
183        self.buf.clone()
184    }
185
186    fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
187        Ok(Self {
188            buf,
189            _phantom: PhantomData,
190        })
191    }
192}
193
194/// Our Proto implementation, Prost, cannot handle unrecognized fields. This means that unexpected
195/// data will be dropped at deserialization time, which means that we can't reliably roundtrip data
196/// from future versions of the code, which causes trouble during upgrades and at other times.
197///
198/// This type works around the issue by defining an unstructured metadata map. Keys are expected to
199/// be well-known strings defined in the code; values are bytes, expected to be encoded protobuf.
200/// (The association between the two is lightly enforced with the affiliated [MetadataKey] type.)
201/// It's safe to add new metadata keys in new versions, since even unrecognized keys can be losslessly
202/// roundtripped. However, if the metadata is not safe for the old version to ignore -- perhaps it
203/// needs to be kept in sync with some other part of the struct -- you will need to use a more
204/// heavyweight migration for it.
205#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
206pub(crate) struct MetadataMap(BTreeMap<String, Bytes>);
207
208/// Associating a field name and an associated Proto message type, for lookup in a metadata map.
209///
210/// It is an error to reuse key names, or to change the type associated with a particular name.
211/// It is polite to choose short names, since they get serialized alongside every struct.
212#[allow(unused)]
213#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
214pub(crate) struct MetadataKey<V, P = V> {
215    name: &'static str,
216    type_: PhantomData<(V, P)>,
217}
218
219impl<V, P> MetadataKey<V, P> {
220    #[allow(unused)]
221    pub(crate) const fn new(name: &'static str) -> Self {
222        MetadataKey {
223            name,
224            type_: PhantomData,
225        }
226    }
227}
228
229impl serde::Serialize for MetadataMap {
230    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
231    where
232        S: Serializer,
233    {
234        serializer.collect_map(self.0.iter())
235    }
236}
237
238impl MetadataMap {
239    /// Returns true iff no metadata keys have been set.
240    pub fn is_empty(&self) -> bool {
241        self.0.is_empty()
242    }
243
244    /// Serialize and insert a new key into the map, replacing any existing value for the key.
245    #[allow(unused)]
246    pub fn set<V: RustType<P>, P: prost::Message>(&mut self, key: MetadataKey<V, P>, value: V) {
247        self.0.insert(
248            String::from(key.name),
249            Bytes::from(value.into_proto_owned().encode_to_vec()),
250        );
251    }
252
253    /// Deserialize a key from the map, if it is present.
254    #[allow(unused)]
255    pub fn get<V: RustType<P>, P: prost::Message + Default>(
256        &self,
257        key: MetadataKey<V, P>,
258    ) -> Option<V> {
259        let proto = match P::decode(self.0.get(key.name)?.as_ref()) {
260            Ok(decoded) => decoded,
261            Err(err) => {
262                // This should be impossible unless one of the MetadataKey invariants are broken.
263                soft_panic_or_log!(
264                    "error when decoding {key}; was it redefined? {err}",
265                    key = key.name
266                );
267                return None;
268            }
269        };
270
271        match proto.into_rust() {
272            Ok(proto) => Some(proto),
273            Err(err) => {
274                // This should be impossible unless one of the MetadataKey invariants are broken.
275                soft_panic_or_log!(
276                    "error when decoding {key}; was it redefined? {err}",
277                    key = key.name
278                );
279                None
280            }
281        }
282    }
283}
284impl RustType<BTreeMap<String, Bytes>> for MetadataMap {
285    fn into_proto(&self) -> BTreeMap<String, Bytes> {
286        self.0.clone()
287    }
288    fn from_proto(proto: BTreeMap<String, Bytes>) -> Result<Self, TryFromProtoError> {
289        Ok(MetadataMap(proto))
290    }
291}
292
293pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
294    let uuid_encoded = match encoded.strip_prefix(id_prefix) {
295        Some(x) => x,
296        None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
297    };
298    let uuid = Uuid::parse_str(uuid_encoded)
299        .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
300    Ok(*uuid.as_bytes())
301}
302
303pub(crate) fn assert_code_can_read_data(code_version: &Version, data_version: &Version) {
304    if !cfg::code_can_read_data(code_version, data_version) {
305        // We can't catch halts, so panic in test, so we can get unit test
306        // coverage.
307        if cfg!(test) {
308            panic!("code at version {code_version} cannot read data with version {data_version}");
309        } else {
310            halt!("code at version {code_version} cannot read data with version {data_version}");
311        }
312    }
313}
314
315impl RustType<String> for LeasedReaderId {
316    fn into_proto(&self) -> String {
317        self.to_string()
318    }
319
320    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
321        match proto.parse() {
322            Ok(x) => Ok(x),
323            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
324        }
325    }
326}
327
328impl RustType<String> for CriticalReaderId {
329    fn into_proto(&self) -> String {
330        self.to_string()
331    }
332
333    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
334        match proto.parse() {
335            Ok(x) => Ok(x),
336            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
337        }
338    }
339}
340
341impl RustType<String> for WriterId {
342    fn into_proto(&self) -> String {
343        self.to_string()
344    }
345
346    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
347        match proto.parse() {
348            Ok(x) => Ok(x),
349            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
350        }
351    }
352}
353
354impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
355    fn into_proto(&self) -> ProtoEncodedSchemas {
356        ProtoEncodedSchemas {
357            key: self.key.clone(),
358            key_data_type: self.key_data_type.clone(),
359            val: self.val.clone(),
360            val_data_type: self.val_data_type.clone(),
361        }
362    }
363
364    fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
365        Ok(EncodedSchemas {
366            key: proto.key,
367            key_data_type: proto.key_data_type,
368            val: proto.val,
369            val_data_type: proto.val_data_type,
370        })
371    }
372}
373
374impl RustType<String> for IdempotencyToken {
375    fn into_proto(&self) -> String {
376        self.to_string()
377    }
378
379    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
380        match proto.parse() {
381            Ok(x) => Ok(x),
382            Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
383        }
384    }
385}
386
387impl RustType<String> for PartialBatchKey {
388    fn into_proto(&self) -> String {
389        self.0.clone()
390    }
391
392    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
393        Ok(PartialBatchKey(proto))
394    }
395}
396
397impl RustType<String> for PartialRollupKey {
398    fn into_proto(&self) -> String {
399        self.0.clone()
400    }
401
402    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
403        Ok(PartialRollupKey(proto))
404    }
405}
406
407impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
408    pub fn encode<B>(&self, buf: &mut B)
409    where
410        B: bytes::BufMut,
411    {
412        self.into_proto()
413            .encode(buf)
414            .expect("no required fields means no initialization errors");
415    }
416
417    pub fn decode(build_version: &Version, buf: Bytes) -> Self {
418        let proto = ProtoStateDiff::decode(buf)
419            // We received a State that we couldn't decode. This could happen if
420            // persist messes up backward/forward compatibility, if the durable
421            // data was corrupted, or if operations messes up deployment. In any
422            // case, fail loudly.
423            .expect("internal error: invalid encoded state");
424        let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
425        assert_code_can_read_data(build_version, &diff.applier_version);
426        diff
427    }
428}
429
430impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
431    fn into_proto(&self) -> ProtoStateDiff {
432        // Deconstruct self so we get a compile failure if new fields are added.
433        let StateDiff {
434            applier_version,
435            seqno_from,
436            seqno_to,
437            walltime_ms,
438            latest_rollup_key,
439            rollups,
440            active_rollup,
441            active_gc,
442            hostname,
443            last_gc_req,
444            leased_readers,
445            critical_readers,
446            writers,
447            schemas,
448            since,
449            legacy_batches,
450            hollow_batches,
451            spine_batches,
452            merges,
453        } = self;
454
455        let proto = ProtoStateFieldDiffs::default();
456
457        // Create a writer so we can efficiently encode our data.
458        let mut writer = proto.into_writer();
459
460        field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
461        field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
462        field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
463        field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
464        field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
465        field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
466        field_diffs_into_proto(
467            ProtoStateField::CriticalReaders,
468            critical_readers,
469            &mut writer,
470        );
471        field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
472        field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
473        field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
474        field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
475        field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
476        field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
477        field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
478
479        // After encoding all of our data, convert back into the proto.
480        let field_diffs = writer.into_proto();
481
482        debug_assert_eq!(field_diffs.validate(), Ok(()));
483        ProtoStateDiff {
484            applier_version: applier_version.to_string(),
485            seqno_from: seqno_from.into_proto(),
486            seqno_to: seqno_to.into_proto(),
487            walltime_ms: walltime_ms.into_proto(),
488            latest_rollup_key: latest_rollup_key.into_proto(),
489            field_diffs: Some(field_diffs),
490        }
491    }
492
493    fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
494        let applier_version = if proto.applier_version.is_empty() {
495            // Backward compatibility with versions of ProtoState before we set
496            // this field: if it's missing (empty), assume an infinitely old
497            // version.
498            semver::Version::new(0, 0, 0)
499        } else {
500            semver::Version::parse(&proto.applier_version).map_err(|err| {
501                TryFromProtoError::InvalidSemverVersion(format!(
502                    "invalid applier_version {}: {}",
503                    proto.applier_version, err
504                ))
505            })?
506        };
507        let mut state_diff = StateDiff::new(
508            applier_version,
509            proto.seqno_from.into_rust()?,
510            proto.seqno_to.into_rust()?,
511            proto.walltime_ms,
512            proto.latest_rollup_key.into_rust()?,
513        );
514        if let Some(field_diffs) = proto.field_diffs {
515            debug_assert_eq!(field_diffs.validate(), Ok(()));
516            for field_diff in field_diffs.iter() {
517                let (field, diff) = field_diff?;
518                match field {
519                    ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
520                        diff,
521                        &mut state_diff.hostname,
522                        |()| Ok(()),
523                        |v| v.into_rust(),
524                    )?,
525                    ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
526                        diff,
527                        &mut state_diff.last_gc_req,
528                        |()| Ok(()),
529                        |v| v.into_rust(),
530                    )?,
531                    ProtoStateField::ActiveGc => {
532                        field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
533                            diff,
534                            &mut state_diff.active_gc,
535                            |()| Ok(()),
536                            |v| v.into_rust(),
537                        )?
538                    }
539                    ProtoStateField::ActiveRollup => {
540                        field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
541                            diff,
542                            &mut state_diff.active_rollup,
543                            |()| Ok(()),
544                            |v| v.into_rust(),
545                        )?
546                    }
547                    ProtoStateField::Rollups => {
548                        field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
549                            diff,
550                            &mut state_diff.rollups,
551                            |k| k.into_rust(),
552                            |v| v.into_rust(),
553                        )?
554                    }
555                    // MIGRATION: We previously stored rollups as a `SeqNo ->
556                    // string Key` map, but now the value is a `struct
557                    // HollowRollup`.
558                    ProtoStateField::DeprecatedRollups => {
559                        field_diff_into_rust::<u64, String, _, _, _, _>(
560                            diff,
561                            &mut state_diff.rollups,
562                            |k| k.into_rust(),
563                            |v| {
564                                Ok(HollowRollup {
565                                    key: v.into_rust()?,
566                                    encoded_size_bytes: None,
567                                })
568                            },
569                        )?
570                    }
571                    ProtoStateField::LeasedReaders => {
572                        field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
573                            diff,
574                            &mut state_diff.leased_readers,
575                            |k| k.into_rust(),
576                            |v| v.into_rust(),
577                        )?
578                    }
579                    ProtoStateField::CriticalReaders => {
580                        field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
581                            diff,
582                            &mut state_diff.critical_readers,
583                            |k| k.into_rust(),
584                            |v| v.into_rust(),
585                        )?
586                    }
587                    ProtoStateField::Writers => {
588                        field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
589                            diff,
590                            &mut state_diff.writers,
591                            |k| k.into_rust(),
592                            |v| v.into_rust(),
593                        )?
594                    }
595                    ProtoStateField::Schemas => {
596                        field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
597                            diff,
598                            &mut state_diff.schemas,
599                            |k| k.into_rust(),
600                            |v| v.into_rust(),
601                        )?
602                    }
603                    ProtoStateField::Since => {
604                        field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
605                            diff,
606                            &mut state_diff.since,
607                            |()| Ok(()),
608                            |v| v.into_rust(),
609                        )?
610                    }
611                    ProtoStateField::LegacyBatches => {
612                        field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
613                            diff,
614                            &mut state_diff.legacy_batches,
615                            |k| k.into_rust(),
616                            |()| Ok(()),
617                        )?
618                    }
619                    ProtoStateField::HollowBatches => {
620                        field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
621                            diff,
622                            &mut state_diff.hollow_batches,
623                            |k| k.into_rust(),
624                            |v| v.into_rust(),
625                        )?
626                    }
627                    ProtoStateField::SpineBatches => {
628                        field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
629                            diff,
630                            &mut state_diff.spine_batches,
631                            |k| k.into_rust(),
632                            |v| v.into_rust(),
633                        )?
634                    }
635                    ProtoStateField::SpineMerges => {
636                        field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
637                            diff,
638                            &mut state_diff.merges,
639                            |k| k.into_rust(),
640                            |v| v.into_rust(),
641                        )?
642                    }
643                }
644            }
645        }
646        Ok(state_diff)
647    }
648}
649
650fn field_diffs_into_proto<K, KP, V, VP>(
651    field: ProtoStateField,
652    diffs: &[StateFieldDiff<K, V>],
653    writer: &mut ProtoStateFieldDiffsWriter,
654) where
655    KP: prost::Message,
656    K: RustType<KP>,
657    VP: prost::Message,
658    V: RustType<VP>,
659{
660    for diff in diffs.iter() {
661        field_diff_into_proto(field, diff, writer);
662    }
663}
664
665fn field_diff_into_proto<K, KP, V, VP>(
666    field: ProtoStateField,
667    diff: &StateFieldDiff<K, V>,
668    writer: &mut ProtoStateFieldDiffsWriter,
669) where
670    KP: prost::Message,
671    K: RustType<KP>,
672    VP: prost::Message,
673    V: RustType<VP>,
674{
675    writer.push_field(field);
676    writer.encode_proto(&diff.key.into_proto());
677    match &diff.val {
678        StateFieldValDiff::Insert(to) => {
679            writer.push_diff_type(ProtoStateFieldDiffType::Insert);
680            writer.encode_proto(&to.into_proto());
681        }
682        StateFieldValDiff::Update(from, to) => {
683            writer.push_diff_type(ProtoStateFieldDiffType::Update);
684            writer.encode_proto(&from.into_proto());
685            writer.encode_proto(&to.into_proto());
686        }
687        StateFieldValDiff::Delete(from) => {
688            writer.push_diff_type(ProtoStateFieldDiffType::Delete);
689            writer.encode_proto(&from.into_proto());
690        }
691    };
692}
693
694fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
695    proto: ProtoStateFieldDiff<'_>,
696    diffs: &mut Vec<StateFieldDiff<K, V>>,
697    k_fn: KFn,
698    v_fn: VFn,
699) -> Result<(), TryFromProtoError>
700where
701    KP: prost::Message + Default,
702    VP: prost::Message + Default,
703    KFn: Fn(KP) -> Result<K, TryFromProtoError>,
704    VFn: Fn(VP) -> Result<V, TryFromProtoError>,
705{
706    let val = match proto.diff_type {
707        ProtoStateFieldDiffType::Insert => {
708            let to = VP::decode(proto.to)
709                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
710            StateFieldValDiff::Insert(v_fn(to)?)
711        }
712        ProtoStateFieldDiffType::Update => {
713            let from = VP::decode(proto.from)
714                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
715            let to = VP::decode(proto.to)
716                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
717
718            StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
719        }
720        ProtoStateFieldDiffType::Delete => {
721            let from = VP::decode(proto.from)
722                .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
723            StateFieldValDiff::Delete(v_fn(from)?)
724        }
725    };
726    let key = KP::decode(proto.key)
727        .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
728    diffs.push(StateFieldDiff {
729        key: k_fn(key)?,
730        val,
731    });
732    Ok(())
733}
734
735/// The decoded state of [ProtoRollup] for which we have not yet checked
736/// that codecs match the ones in durable state.
737#[derive(Debug)]
738#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
739pub struct UntypedState<T> {
740    pub(crate) key_codec: String,
741    pub(crate) val_codec: String,
742    pub(crate) ts_codec: String,
743    pub(crate) diff_codec: String,
744
745    // Important! This T has not been validated, so we can't expose anything in
746    // State that references T until one of the check methods have been called.
747    // Any field on State that doesn't reference T is fair game.
748    state: State<T>,
749}
750
751impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
752    pub fn seqno(&self) -> SeqNo {
753        self.state.seqno
754    }
755
756    pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
757        &self.state.collections.rollups
758    }
759
760    pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
761        self.state.latest_rollup()
762    }
763
764    pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
765        &mut self,
766        cfg: &PersistConfig,
767        metrics: &Metrics,
768        diffs: I,
769    ) {
770        // The apply_encoded_diffs might panic if T is not correct. Making this
771        // a silent no-op is far too subtle for my taste, but it's not clear
772        // what else we could do instead.
773        if T::codec_name() != self.ts_codec {
774            return;
775        }
776        self.state.apply_encoded_diffs(cfg, metrics, diffs);
777    }
778
779    pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
780        self,
781        shard_id: &ShardId,
782    ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
783        // Also defensively check that the shard_id on the state we fetched
784        // matches the shard_id we were trying to fetch.
785        assert_eq!(shard_id, &self.state.shard_id);
786        if K::codec_name() != self.key_codec
787            || V::codec_name() != self.val_codec
788            || T::codec_name() != self.ts_codec
789            || D::codec_name() != self.diff_codec
790        {
791            return Err(Box::new(CodecMismatch {
792                requested: (
793                    K::codec_name(),
794                    V::codec_name(),
795                    T::codec_name(),
796                    D::codec_name(),
797                    None,
798                ),
799                actual: (
800                    self.key_codec,
801                    self.val_codec,
802                    self.ts_codec,
803                    self.diff_codec,
804                    None,
805                ),
806            }));
807        }
808        Ok(TypedState {
809            state: self.state,
810            _phantom: PhantomData,
811        })
812    }
813
814    pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
815        // Also defensively check that the shard_id on the state we fetched
816        // matches the shard_id we were trying to fetch.
817        assert_eq!(shard_id, &self.state.shard_id);
818        if T::codec_name() != self.ts_codec {
819            return Err(CodecMismatchT {
820                requested: T::codec_name(),
821                actual: self.ts_codec,
822            });
823        }
824        Ok(self.state)
825    }
826
827    pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
828        let proto = ProtoRollup::decode(buf)
829            // We received a State that we couldn't decode. This could happen if
830            // persist messes up backward/forward compatibility, if the durable
831            // data was corrupted, or if operations messes up deployment. In any
832            // case, fail loudly.
833            .expect("internal error: invalid encoded state");
834        let state = Rollup::from_proto(proto)
835            .expect("internal error: invalid encoded state")
836            .state;
837        assert_code_can_read_data(build_version, &state.state.collections.version);
838        state
839    }
840}
841
842impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
843where
844    K: Codec,
845    V: Codec,
846    T: Codec64,
847    D: Codec64,
848{
849    fn from(typed_state: TypedState<K, V, T, D>) -> Self {
850        UntypedState {
851            key_codec: K::codec_name(),
852            val_codec: V::codec_name(),
853            ts_codec: T::codec_name(),
854            diff_codec: D::codec_name(),
855            state: typed_state.state,
856        }
857    }
858}
859
860/// A struct that maps 1:1 with ProtoRollup.
861///
862/// Contains State, and optionally the diffs between (state.latest_rollup.seqno, state.seqno]
863///
864/// `diffs` is always expected to be `Some` when writing new rollups, but may be `None`
865/// when deserializing rollups that were persisted before we started inlining diffs.
866#[derive(Debug)]
867pub struct Rollup<T> {
868    pub(crate) state: UntypedState<T>,
869    pub(crate) diffs: Option<InlinedDiffs>,
870}
871
872impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
873    /// Creates a `StateRollup` from a state and diffs from its last rollup.
874    ///
875    /// The diffs must span the seqno range `(state.last_rollup().seqno, state.seqno]`.
876    pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
877        let latest_rollup_seqno = *state.latest_rollup().0;
878        let mut verify_seqno = latest_rollup_seqno;
879        for diff in &diffs {
880            assert_eq!(verify_seqno.next(), diff.seqno);
881            verify_seqno = diff.seqno;
882        }
883        assert_eq!(verify_seqno, state.seqno());
884
885        let diffs = Some(InlinedDiffs::from(
886            latest_rollup_seqno.next(),
887            state.seqno().next(),
888            diffs,
889        ));
890
891        Self { state, diffs }
892    }
893
894    pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
895        Self { state, diffs: None }
896    }
897
898    pub(crate) fn from_state_without_diffs(
899        state: State<T>,
900        key_codec: String,
901        val_codec: String,
902        ts_codec: String,
903        diff_codec: String,
904    ) -> Self {
905        Self::from_untyped_state_without_diffs(UntypedState {
906            key_codec,
907            val_codec,
908            ts_codec,
909            diff_codec,
910            state,
911        })
912    }
913}
914
915#[derive(Debug)]
916pub(crate) struct InlinedDiffs {
917    pub(crate) lower: SeqNo,
918    pub(crate) upper: SeqNo,
919    pub(crate) diffs: Vec<VersionedData>,
920}
921
922impl InlinedDiffs {
923    pub(crate) fn description(&self) -> Description<SeqNo> {
924        Description::new(
925            Antichain::from_elem(self.lower),
926            Antichain::from_elem(self.upper),
927            Antichain::from_elem(SeqNo::minimum()),
928        )
929    }
930
931    fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
932        for diff in &diffs {
933            assert!(diff.seqno >= lower);
934            assert!(diff.seqno < upper);
935        }
936        Self {
937            lower,
938            upper,
939            diffs,
940        }
941    }
942}
943
944impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
945    fn into_proto(&self) -> ProtoInlinedDiffs {
946        ProtoInlinedDiffs {
947            lower: self.lower.into_proto(),
948            upper: self.upper.into_proto(),
949            diffs: self.diffs.into_proto(),
950        }
951    }
952
953    fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
954        Ok(Self {
955            lower: proto.lower.into_rust()?,
956            upper: proto.upper.into_rust()?,
957            diffs: proto.diffs.into_rust()?,
958        })
959    }
960}
961
962impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
963    fn into_proto(&self) -> ProtoRollup {
964        ProtoRollup {
965            applier_version: self.state.state.collections.version.to_string(),
966            shard_id: self.state.state.shard_id.into_proto(),
967            seqno: self.state.state.seqno.into_proto(),
968            walltime_ms: self.state.state.walltime_ms.into_proto(),
969            hostname: self.state.state.hostname.into_proto(),
970            key_codec: self.state.key_codec.into_proto(),
971            val_codec: self.state.val_codec.into_proto(),
972            ts_codec: T::codec_name(),
973            diff_codec: self.state.diff_codec.into_proto(),
974            last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
975            active_rollup: self.state.state.collections.active_rollup.into_proto(),
976            active_gc: self.state.state.collections.active_gc.into_proto(),
977            rollups: self
978                .state
979                .state
980                .collections
981                .rollups
982                .iter()
983                .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
984                .collect(),
985            deprecated_rollups: Default::default(),
986            leased_readers: self
987                .state
988                .state
989                .collections
990                .leased_readers
991                .iter()
992                .map(|(id, state)| (id.into_proto(), state.into_proto()))
993                .collect(),
994            critical_readers: self
995                .state
996                .state
997                .collections
998                .critical_readers
999                .iter()
1000                .map(|(id, state)| (id.into_proto(), state.into_proto()))
1001                .collect(),
1002            writers: self
1003                .state
1004                .state
1005                .collections
1006                .writers
1007                .iter()
1008                .map(|(id, state)| (id.into_proto(), state.into_proto()))
1009                .collect(),
1010            schemas: self
1011                .state
1012                .state
1013                .collections
1014                .schemas
1015                .iter()
1016                .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
1017                .collect(),
1018            trace: Some(self.state.state.collections.trace.into_proto()),
1019            diffs: self.diffs.as_ref().map(|x| x.into_proto()),
1020        }
1021    }
1022
1023    fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
1024        let applier_version = if x.applier_version.is_empty() {
1025            // Backward compatibility with versions of ProtoState before we set
1026            // this field: if it's missing (empty), assume an infinitely old
1027            // version.
1028            semver::Version::new(0, 0, 0)
1029        } else {
1030            semver::Version::parse(&x.applier_version).map_err(|err| {
1031                TryFromProtoError::InvalidSemverVersion(format!(
1032                    "invalid applier_version {}: {}",
1033                    x.applier_version, err
1034                ))
1035            })?
1036        };
1037
1038        let mut rollups = BTreeMap::new();
1039        for (seqno, rollup) in x.rollups {
1040            rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
1041        }
1042        for (seqno, key) in x.deprecated_rollups {
1043            rollups.insert(
1044                seqno.into_rust()?,
1045                HollowRollup {
1046                    key: key.into_rust()?,
1047                    encoded_size_bytes: None,
1048                },
1049            );
1050        }
1051        let mut leased_readers = BTreeMap::new();
1052        for (id, state) in x.leased_readers {
1053            leased_readers.insert(id.into_rust()?, state.into_rust()?);
1054        }
1055        let mut critical_readers = BTreeMap::new();
1056        for (id, state) in x.critical_readers {
1057            critical_readers.insert(id.into_rust()?, state.into_rust()?);
1058        }
1059        let mut writers = BTreeMap::new();
1060        for (id, state) in x.writers {
1061            writers.insert(id.into_rust()?, state.into_rust()?);
1062        }
1063        let mut schemas = BTreeMap::new();
1064        for (id, x) in x.schemas {
1065            schemas.insert(id.into_rust()?, x.into_rust()?);
1066        }
1067        let active_rollup = x
1068            .active_rollup
1069            .map(|rollup| rollup.into_rust())
1070            .transpose()?;
1071        let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
1072        let collections = StateCollections {
1073            version: applier_version.clone(),
1074            rollups,
1075            active_rollup,
1076            active_gc,
1077            last_gc_req: x.last_gc_req.into_rust()?,
1078            leased_readers,
1079            critical_readers,
1080            writers,
1081            schemas,
1082            trace: x.trace.into_rust_if_some("trace")?,
1083        };
1084        let state = State {
1085            shard_id: x.shard_id.into_rust()?,
1086            seqno: x.seqno.into_rust()?,
1087            walltime_ms: x.walltime_ms,
1088            hostname: x.hostname,
1089            collections,
1090        };
1091
1092        let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
1093        if let Some(diffs) = &diffs {
1094            if diffs.lower != state.latest_rollup().0.next() {
1095                return Err(TryFromProtoError::InvalidPersistState(format!(
1096                    "diffs lower ({}) should match latest rollup's successor: ({})",
1097                    diffs.lower,
1098                    state.latest_rollup().0.next()
1099                )));
1100            }
1101            if diffs.upper != state.seqno.next() {
1102                return Err(TryFromProtoError::InvalidPersistState(format!(
1103                    "diffs upper ({}) should match state's successor: ({})",
1104                    diffs.lower,
1105                    state.seqno.next()
1106                )));
1107            }
1108        }
1109
1110        Ok(Rollup {
1111            state: UntypedState {
1112                state,
1113                key_codec: x.key_codec.into_rust()?,
1114                val_codec: x.val_codec.into_rust()?,
1115                ts_codec: x.ts_codec.into_rust()?,
1116                diff_codec: x.diff_codec.into_rust()?,
1117            },
1118            diffs,
1119        })
1120    }
1121}
1122
1123impl RustType<ProtoVersionedData> for VersionedData {
1124    fn into_proto(&self) -> ProtoVersionedData {
1125        ProtoVersionedData {
1126            seqno: self.seqno.into_proto(),
1127            data: Bytes::clone(&self.data),
1128        }
1129    }
1130
1131    fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1132        Ok(Self {
1133            seqno: proto.seqno.into_rust()?,
1134            data: proto.data,
1135        })
1136    }
1137}
1138
1139impl RustType<ProtoSpineId> for SpineId {
1140    fn into_proto(&self) -> ProtoSpineId {
1141        ProtoSpineId {
1142            lo: self.0.into_proto(),
1143            hi: self.1.into_proto(),
1144        }
1145    }
1146
1147    fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1148        Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1149    }
1150}
1151
1152impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1153    fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1154        let (id, batch) = entry;
1155        ProtoIdHollowBatch {
1156            id: Some(id.into_proto()),
1157            batch: Some(batch.into_proto()),
1158        }
1159    }
1160
1161    fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1162        let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1163        let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1164        Ok((id, batch))
1165    }
1166}
1167
1168impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1169    fn into_proto(&self) -> ProtoSpineBatch {
1170        ProtoSpineBatch {
1171            desc: Some(self.desc.into_proto()),
1172            parts: self.parts.into_proto(),
1173            level: self.level.into_proto(),
1174            descs: self.descs.into_proto(),
1175        }
1176    }
1177
1178    fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1179        let level = proto.level.into_rust()?;
1180        let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1181        let parts = proto.parts.into_rust()?;
1182        let descs = proto.descs.into_rust()?;
1183        Ok(ThinSpineBatch {
1184            level,
1185            desc,
1186            parts,
1187            descs,
1188        })
1189    }
1190}
1191
1192impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1193    fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1194        let (id, batch) = entry;
1195        ProtoIdSpineBatch {
1196            id: Some(id.into_proto()),
1197            batch: Some(batch.into_proto()),
1198        }
1199    }
1200
1201    fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1202        let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1203        let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1204        Ok((id, batch))
1205    }
1206}
1207
1208impl RustType<ProtoCompaction> for ActiveCompaction {
1209    fn into_proto(&self) -> ProtoCompaction {
1210        ProtoCompaction {
1211            start_ms: self.start_ms,
1212        }
1213    }
1214
1215    fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1216        Ok(Self {
1217            start_ms: proto.start_ms,
1218        })
1219    }
1220}
1221
1222impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1223    fn into_proto(&self) -> ProtoMerge {
1224        ProtoMerge {
1225            since: Some(self.since.into_proto()),
1226            remaining_work: self.remaining_work.into_proto(),
1227            active_compaction: self.active_compaction.into_proto(),
1228        }
1229    }
1230
1231    fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1232        let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1233        let remaining_work = proto.remaining_work.into_rust()?;
1234        let active_compaction = proto.active_compaction.into_rust()?;
1235        Ok(Self {
1236            since,
1237            remaining_work,
1238            active_compaction,
1239        })
1240    }
1241}
1242
1243impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1244    fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1245        ProtoIdMerge {
1246            id: Some(id.into_proto()),
1247            merge: Some(merge.into_proto()),
1248        }
1249    }
1250
1251    fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1252        let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1253        let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1254        Ok((id, merge))
1255    }
1256}
1257
1258impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1259    fn into_proto(&self) -> ProtoTrace {
1260        let since = self.since.into_proto();
1261        let legacy_batches = self
1262            .legacy_batches
1263            .iter()
1264            .map(|(b, _)| b.into_proto())
1265            .collect();
1266        let hollow_batches = self.hollow_batches.into_proto();
1267        let spine_batches = self.spine_batches.into_proto();
1268        let merges = self.merges.into_proto();
1269        ProtoTrace {
1270            since: Some(since),
1271            legacy_batches,
1272            hollow_batches,
1273            spine_batches,
1274            merges,
1275        }
1276    }
1277
1278    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1279        let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1280        let legacy_batches = proto
1281            .legacy_batches
1282            .into_iter()
1283            .map(|b| b.into_rust().map(|b| (b, ())))
1284            .collect::<Result<_, _>>()?;
1285        let hollow_batches = proto.hollow_batches.into_rust()?;
1286        let spine_batches = proto.spine_batches.into_rust()?;
1287        let merges = proto.merges.into_rust()?;
1288        Ok(FlatTrace {
1289            since,
1290            legacy_batches,
1291            hollow_batches,
1292            spine_batches,
1293            merges,
1294        })
1295    }
1296}
1297
1298impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1299    fn into_proto(&self) -> ProtoTrace {
1300        self.flatten().into_proto()
1301    }
1302
1303    fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1304        Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1305    }
1306}
1307
1308impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1309    fn into_proto(&self) -> ProtoLeasedReaderState {
1310        ProtoLeasedReaderState {
1311            seqno: self.seqno.into_proto(),
1312            since: Some(self.since.into_proto()),
1313            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1314            lease_duration_ms: self.lease_duration_ms.into_proto(),
1315            debug: Some(self.debug.into_proto()),
1316        }
1317    }
1318
1319    fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1320        let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1321        // MIGRATION: If the lease_duration_ms is empty, then the proto field
1322        // was missing and we need to fill in a default. This would ideally be
1323        // based on the actual value in PersistConfig, but it's only here for a
1324        // short time and this is way easier.
1325        if lease_duration_ms == 0 {
1326            lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1327                .expect("lease duration as millis should fit within u64");
1328        }
1329        // MIGRATION: If debug is empty, then the proto field was missing and we
1330        // need to fill in a default.
1331        let debug = proto.debug.unwrap_or_default().into_rust()?;
1332        Ok(LeasedReaderState {
1333            seqno: proto.seqno.into_rust()?,
1334            since: proto
1335                .since
1336                .into_rust_if_some("ProtoLeasedReaderState::since")?,
1337            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1338            lease_duration_ms,
1339            debug,
1340        })
1341    }
1342}
1343
1344impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1345    fn into_proto(&self) -> ProtoCriticalReaderState {
1346        ProtoCriticalReaderState {
1347            since: Some(self.since.into_proto()),
1348            opaque: i64::from_le_bytes(self.opaque.1),
1349            opaque_codec: self.opaque.0.clone(),
1350            debug: Some(self.debug.into_proto()),
1351        }
1352    }
1353
1354    fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1355        // MIGRATION: If debug is empty, then the proto field was missing and we
1356        // need to fill in a default.
1357        let debug = proto.debug.unwrap_or_default().into_rust()?;
1358        Ok(CriticalReaderState {
1359            since: proto
1360                .since
1361                .into_rust_if_some("ProtoCriticalReaderState::since")?,
1362            opaque: Opaque(proto.opaque_codec, i64::to_le_bytes(proto.opaque)),
1363            debug,
1364        })
1365    }
1366}
1367
1368impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1369    fn into_proto(&self) -> ProtoWriterState {
1370        ProtoWriterState {
1371            last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1372            lease_duration_ms: self.lease_duration_ms.into_proto(),
1373            most_recent_write_token: self.most_recent_write_token.into_proto(),
1374            most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1375            debug: Some(self.debug.into_proto()),
1376        }
1377    }
1378
1379    fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1380        // MIGRATION: We didn't originally have most_recent_write_token and
1381        // most_recent_write_upper. Pick values that aren't going to
1382        // accidentally match ones in incoming writes and confuse things. We
1383        // could instead use Option on WriterState but this keeps the backward
1384        // compatibility logic confined to one place.
1385        let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1386            IdempotencyToken::SENTINEL
1387        } else {
1388            proto.most_recent_write_token.into_rust()?
1389        };
1390        let most_recent_write_upper = match proto.most_recent_write_upper {
1391            Some(x) => x.into_rust()?,
1392            None => Antichain::from_elem(T::minimum()),
1393        };
1394        // MIGRATION: If debug is empty, then the proto field was missing and we
1395        // need to fill in a default.
1396        let debug = proto.debug.unwrap_or_default().into_rust()?;
1397        Ok(WriterState {
1398            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1399            lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1400            most_recent_write_token,
1401            most_recent_write_upper,
1402            debug,
1403        })
1404    }
1405}
1406
1407impl RustType<ProtoHandleDebugState> for HandleDebugState {
1408    fn into_proto(&self) -> ProtoHandleDebugState {
1409        ProtoHandleDebugState {
1410            hostname: self.hostname.into_proto(),
1411            purpose: self.purpose.into_proto(),
1412        }
1413    }
1414
1415    fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1416        Ok(HandleDebugState {
1417            hostname: proto.hostname,
1418            purpose: proto.purpose,
1419        })
1420    }
1421}
1422
1423impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1424    fn into_proto(&self) -> ProtoHollowRun {
1425        ProtoHollowRun {
1426            parts: self.parts.into_proto(),
1427        }
1428    }
1429
1430    fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1431        Ok(HollowRun {
1432            parts: proto.parts.into_rust()?,
1433        })
1434    }
1435}
1436
1437impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1438    fn into_proto(&self) -> ProtoHollowBatch {
1439        let mut run_meta = self.run_meta.into_proto();
1440        // For backwards compatibility reasons, don't keep default metadata in the proto.
1441        let run_meta_default = RunMeta::default().into_proto();
1442        while run_meta.last() == Some(&run_meta_default) {
1443            run_meta.pop();
1444        }
1445        ProtoHollowBatch {
1446            desc: Some(self.desc.into_proto()),
1447            parts: self.parts.into_proto(),
1448            len: self.len.into_proto(),
1449            runs: self.run_splits.into_proto(),
1450            run_meta,
1451            deprecated_keys: vec![],
1452        }
1453    }
1454
1455    fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1456        let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1457        // MIGRATION: We used to just have the keys instead of a more structured
1458        // part.
1459        parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1460            RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1461                key: PartialBatchKey(key),
1462                meta: Default::default(),
1463                encoded_size_bytes: 0,
1464                key_lower: vec![],
1465                structured_key_lower: None,
1466                stats: None,
1467                ts_rewrite: None,
1468                diffs_sum: None,
1469                format: None,
1470                schema_id: None,
1471                deprecated_schema_id: None,
1472            }))
1473        }));
1474        // We discard default metadatas from the proto above; re-add them here.
1475        let run_splits: Vec<usize> = proto.runs.into_rust()?;
1476        let num_runs = if parts.is_empty() {
1477            0
1478        } else {
1479            run_splits.len() + 1
1480        };
1481        let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1482        run_meta.resize(num_runs, RunMeta::default());
1483        Ok(HollowBatch {
1484            desc: proto.desc.into_rust_if_some("desc")?,
1485            parts,
1486            len: proto.len.into_rust()?,
1487            run_splits,
1488            run_meta,
1489        })
1490    }
1491}
1492
1493impl RustType<String> for RunId {
1494    fn into_proto(&self) -> String {
1495        self.to_string()
1496    }
1497
1498    fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1499        RunId::from_str(&proto).map_err(|_| {
1500            TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1501        })
1502    }
1503}
1504
1505impl RustType<ProtoRunMeta> for RunMeta {
1506    fn into_proto(&self) -> ProtoRunMeta {
1507        let order = match self.order {
1508            None => ProtoRunOrder::Unknown,
1509            Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1510            Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1511            Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1512        };
1513        ProtoRunMeta {
1514            order: order.into(),
1515            schema_id: self.schema.into_proto(),
1516            deprecated_schema_id: self.deprecated_schema.into_proto(),
1517            id: self.id.into_proto(),
1518            len: self.len.into_proto(),
1519            meta: self.meta.into_proto(),
1520        }
1521    }
1522
1523    fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1524        let order = match ProtoRunOrder::try_from(proto.order)? {
1525            ProtoRunOrder::Unknown => None,
1526            ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1527            ProtoRunOrder::Codec => Some(RunOrder::Codec),
1528            ProtoRunOrder::Structured => Some(RunOrder::Structured),
1529        };
1530        Ok(Self {
1531            order,
1532            schema: proto.schema_id.into_rust()?,
1533            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1534            id: proto.id.into_rust()?,
1535            len: proto.len.into_rust()?,
1536            meta: proto.meta.into_rust()?,
1537        })
1538    }
1539}
1540
1541impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1542    fn into_proto(&self) -> ProtoHollowBatchPart {
1543        match self {
1544            RunPart::Single(part) => part.into_proto(),
1545            RunPart::Many(runs) => runs.into_proto(),
1546        }
1547    }
1548
1549    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1550        let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1551            RunPart::Many(proto.into_rust()?)
1552        } else {
1553            RunPart::Single(proto.into_rust()?)
1554        };
1555        Ok(run_part)
1556    }
1557}
1558
1559impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1560    fn into_proto(&self) -> ProtoHollowBatchPart {
1561        let part = ProtoHollowBatchPart {
1562            kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1563                key: self.key.into_proto(),
1564                max_part_bytes: self.max_part_bytes.into_proto(),
1565            })),
1566            encoded_size_bytes: self.hollow_bytes.into_proto(),
1567            key_lower: Bytes::copy_from_slice(&self.key_lower),
1568            diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1569            key_stats: None,
1570            ts_rewrite: None,
1571            format: None,
1572            schema_id: None,
1573            structured_key_lower: self.structured_key_lower.into_proto(),
1574            deprecated_schema_id: None,
1575            metadata: BTreeMap::default(),
1576        };
1577        part
1578    }
1579
1580    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1581        let run_proto = match proto.kind {
1582            Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1583            _ => Err(TryFromProtoError::UnknownEnumVariant(
1584                "ProtoHollowBatchPart::kind".to_string(),
1585            ))?,
1586        };
1587        Ok(Self {
1588            key: run_proto.key.into_rust()?,
1589            hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1590            max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1591            key_lower: proto.key_lower.to_vec(),
1592            structured_key_lower: proto.structured_key_lower.into_rust()?,
1593            diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1594            _phantom_data: Default::default(),
1595        })
1596    }
1597}
1598
1599impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1600    fn into_proto(&self) -> ProtoHollowBatchPart {
1601        match self {
1602            BatchPart::Hollow(x) => ProtoHollowBatchPart {
1603                kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1604                encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1605                key_lower: Bytes::copy_from_slice(&x.key_lower),
1606                structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1607                key_stats: x.stats.into_proto(),
1608                ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1609                diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1610                format: x.format.map(|f| f.into_proto()),
1611                schema_id: x.schema_id.into_proto(),
1612                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1613                metadata: BTreeMap::default(),
1614            },
1615            BatchPart::Inline {
1616                updates,
1617                ts_rewrite,
1618                schema_id,
1619                deprecated_schema_id,
1620            } => ProtoHollowBatchPart {
1621                kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1622                encoded_size_bytes: 0,
1623                key_lower: Bytes::new(),
1624                structured_key_lower: None,
1625                key_stats: None,
1626                ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1627                diffs_sum: None,
1628                format: None,
1629                schema_id: schema_id.into_proto(),
1630                deprecated_schema_id: deprecated_schema_id.into_proto(),
1631                metadata: BTreeMap::default(),
1632            },
1633        }
1634    }
1635
1636    fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1637        let ts_rewrite = match proto.ts_rewrite {
1638            Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1639            None => None,
1640        };
1641        let schema_id = proto.schema_id.into_rust()?;
1642        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1643        match proto.kind {
1644            Some(proto_hollow_batch_part::Kind::Key(key)) => {
1645                Ok(BatchPart::Hollow(HollowBatchPart {
1646                    key: key.into_rust()?,
1647                    meta: proto.metadata.into_rust()?,
1648                    encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1649                    key_lower: proto.key_lower.into(),
1650                    structured_key_lower: proto.structured_key_lower.into_rust()?,
1651                    stats: proto.key_stats.into_rust()?,
1652                    ts_rewrite,
1653                    diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1654                    format: proto.format.map(|f| f.into_rust()).transpose()?,
1655                    schema_id,
1656                    deprecated_schema_id,
1657                }))
1658            }
1659            Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1660                assert_eq!(proto.encoded_size_bytes, 0);
1661                assert_eq!(proto.key_lower.len(), 0);
1662                assert_none!(proto.key_stats);
1663                assert_none!(proto.diffs_sum);
1664                let updates = LazyInlineBatchPart(x.into_rust()?);
1665                Ok(BatchPart::Inline {
1666                    updates,
1667                    ts_rewrite,
1668                    schema_id,
1669                    deprecated_schema_id,
1670                })
1671            }
1672            _ => Err(TryFromProtoError::unknown_enum_variant(
1673                "ProtoHollowBatchPart::kind",
1674            )),
1675        }
1676    }
1677}
1678
1679impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1680    fn into_proto(&self) -> proto_hollow_batch_part::Format {
1681        match self {
1682            BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1683            BatchColumnarFormat::Both(version) => {
1684                proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1685            }
1686            BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1687        }
1688    }
1689
1690    fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1691        let format = match proto {
1692            proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1693            proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1694                BatchColumnarFormat::Both(version.cast_into())
1695            }
1696            proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1697        };
1698        Ok(format)
1699    }
1700}
1701
1702/// Aggregate statistics about data contained in a part.
1703///
1704/// These are "lazy" in the sense that we don't decode them (or even validate
1705/// the encoded version) until they're used.
1706#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1707pub struct LazyPartStats {
1708    key: LazyProto<ProtoStructStats>,
1709}
1710
1711impl Debug for LazyPartStats {
1712    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1713        f.debug_tuple("LazyPartStats")
1714            .field(&self.decode())
1715            .finish()
1716    }
1717}
1718
1719impl LazyPartStats {
1720    pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1721        let PartStats { key } = x;
1722        let mut proto_stats = ProtoStructStats::from_rust(key);
1723        map_proto(&mut proto_stats);
1724        LazyPartStats {
1725            key: LazyProto::from(&proto_stats),
1726        }
1727    }
1728    /// Decodes and returns PartStats from the encoded representation.
1729    ///
1730    /// This does not cache the returned value, it decodes each time it's
1731    /// called.
1732    pub fn decode(&self) -> PartStats {
1733        let key = self.key.decode().expect("valid proto");
1734        PartStats {
1735            key: key.into_rust().expect("valid stats"),
1736        }
1737    }
1738}
1739
1740impl RustType<Bytes> for LazyPartStats {
1741    fn into_proto(&self) -> Bytes {
1742        let LazyPartStats { key } = self;
1743        key.into_proto()
1744    }
1745
1746    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1747        Ok(LazyPartStats {
1748            key: proto.into_rust()?,
1749        })
1750    }
1751}
1752
1753#[cfg(test)]
1754pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1755    proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1756}
1757
1758#[allow(unused_parens)]
1759impl Arbitrary for LazyPartStats {
1760    type Parameters = ();
1761    type Strategy =
1762        proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1763
1764    fn arbitrary_with(_: ()) -> Self::Strategy {
1765        Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1766            LazyPartStats::encode(&x, |_| {})
1767        })
1768    }
1769}
1770
1771impl ProtoInlineBatchPart {
1772    pub(crate) fn into_rust<T: Timestamp + Codec64>(
1773        lgbytes: &ColumnarMetrics,
1774        proto: Self,
1775    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1776        let updates = proto
1777            .updates
1778            .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1779        let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1780
1781        Ok(BlobTraceBatchPart {
1782            desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1783            index: proto.index.into_rust()?,
1784            updates,
1785        })
1786    }
1787}
1788
1789/// A batch part stored inlined in State.
1790#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1791pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1792
1793impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1794    fn from(value: &ProtoInlineBatchPart) -> Self {
1795        LazyInlineBatchPart(value.into())
1796    }
1797}
1798
1799impl Serialize for LazyInlineBatchPart {
1800    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1801        // NB: This serialize impl is only used for QA and debugging, so emit a
1802        // truncated version.
1803        let proto = self.0.decode().expect("valid proto");
1804        let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1805        let () = s.serialize_field("desc", &proto.desc)?;
1806        let () = s.serialize_field("index", &proto.index)?;
1807        let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1808        s.end()
1809    }
1810}
1811
1812impl LazyInlineBatchPart {
1813    pub(crate) fn encoded_size_bytes(&self) -> usize {
1814        self.0.buf.len()
1815    }
1816
1817    /// Decodes and returns a BlobTraceBatchPart from the encoded
1818    /// representation.
1819    ///
1820    /// This does not cache the returned value, it decodes each time it's
1821    /// called.
1822    pub fn decode<T: Timestamp + Codec64>(
1823        &self,
1824        lgbytes: &ColumnarMetrics,
1825    ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1826        let proto = self.0.decode().expect("valid proto");
1827        ProtoInlineBatchPart::into_rust(lgbytes, proto)
1828    }
1829}
1830
1831impl RustType<Bytes> for LazyInlineBatchPart {
1832    fn into_proto(&self) -> Bytes {
1833        self.0.into_proto()
1834    }
1835
1836    fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1837        Ok(LazyInlineBatchPart(proto.into_rust()?))
1838    }
1839}
1840
1841impl RustType<ProtoHollowRollup> for HollowRollup {
1842    fn into_proto(&self) -> ProtoHollowRollup {
1843        ProtoHollowRollup {
1844            key: self.key.into_proto(),
1845            encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1846        }
1847    }
1848
1849    fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1850        Ok(HollowRollup {
1851            key: proto.key.into_rust()?,
1852            encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1853        })
1854    }
1855}
1856
1857impl RustType<ProtoActiveRollup> for ActiveRollup {
1858    fn into_proto(&self) -> ProtoActiveRollup {
1859        ProtoActiveRollup {
1860            start_ms: self.start_ms,
1861            seqno: self.seqno.into_proto(),
1862        }
1863    }
1864
1865    fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1866        Ok(ActiveRollup {
1867            start_ms: proto.start_ms,
1868            seqno: proto.seqno.into_rust()?,
1869        })
1870    }
1871}
1872
1873impl RustType<ProtoActiveGc> for ActiveGc {
1874    fn into_proto(&self) -> ProtoActiveGc {
1875        ProtoActiveGc {
1876            start_ms: self.start_ms,
1877            seqno: self.seqno.into_proto(),
1878        }
1879    }
1880
1881    fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1882        Ok(ActiveGc {
1883            start_ms: proto.start_ms,
1884            seqno: proto.seqno.into_rust()?,
1885        })
1886    }
1887}
1888
1889impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1890    fn into_proto(&self) -> ProtoU64Description {
1891        ProtoU64Description {
1892            lower: Some(self.lower().into_proto()),
1893            upper: Some(self.upper().into_proto()),
1894            since: Some(self.since().into_proto()),
1895        }
1896    }
1897
1898    fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1899        Ok(Description::new(
1900            proto.lower.into_rust_if_some("lower")?,
1901            proto.upper.into_rust_if_some("upper")?,
1902            proto.since.into_rust_if_some("since")?,
1903        ))
1904    }
1905}
1906
1907impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1908    fn into_proto(&self) -> ProtoU64Antichain {
1909        ProtoU64Antichain {
1910            elements: self
1911                .elements()
1912                .iter()
1913                .map(|x| i64::from_le_bytes(T::encode(x)))
1914                .collect(),
1915        }
1916    }
1917
1918    fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1919        let elements = proto
1920            .elements
1921            .iter()
1922            .map(|x| T::decode(x.to_le_bytes()))
1923            .collect::<Vec<_>>();
1924        Ok(Antichain::from(elements))
1925    }
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930    use bytes::Bytes;
1931    use mz_build_info::DUMMY_BUILD_INFO;
1932    use mz_dyncfg::ConfigUpdates;
1933    use mz_ore::assert_err;
1934    use mz_persist::location::SeqNo;
1935    use proptest::prelude::*;
1936
1937    use crate::ShardId;
1938    use crate::internal::paths::PartialRollupKey;
1939    use crate::internal::state::tests::any_state;
1940    use crate::internal::state::{BatchPart, HandleDebugState};
1941    use crate::internal::state_diff::StateDiff;
1942    use crate::tests::new_test_client_cache;
1943
1944    use super::*;
1945
1946    #[mz_ore::test]
1947    fn metadata_map() {
1948        const COUNT: MetadataKey<u64> = MetadataKey::new("count");
1949
1950        let mut map = MetadataMap::default();
1951        map.set(COUNT, 100);
1952        let mut map = MetadataMap::from_proto(map.into_proto()).unwrap();
1953        assert_eq!(map.get(COUNT), Some(100));
1954
1955        const ANTICHAIN: MetadataKey<Antichain<u64>, ProtoU64Antichain> =
1956            MetadataKey::new("antichain");
1957        assert_none!(map.get(ANTICHAIN));
1958
1959        map.set(ANTICHAIN, Antichain::from_elem(30));
1960        let map = MetadataMap::from_proto(map.into_proto()).unwrap();
1961        assert_eq!(map.get(COUNT), Some(100));
1962        assert_eq!(map.get(ANTICHAIN), Some(Antichain::from_elem(30)));
1963    }
1964
1965    #[mz_ore::test]
1966    fn applier_version_state() {
1967        let v1 = semver::Version::new(1, 0, 0);
1968        let v2 = semver::Version::new(2, 0, 0);
1969        let v3 = semver::Version::new(3, 0, 0);
1970
1971        // Code version v2 evaluates and writes out some State.
1972        let shard_id = ShardId::new();
1973        let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1974        let rollup =
1975            Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1976        let mut buf = Vec::new();
1977        rollup.encode(&mut buf).expect("serializable");
1978        let bytes = Bytes::from(buf);
1979
1980        // We can read it back using persist code v2 and v3.
1981        assert_eq!(
1982            UntypedState::<u64>::decode(&v2, bytes.clone())
1983                .check_codecs(&shard_id)
1984                .as_ref(),
1985            Ok(&state)
1986        );
1987        assert_eq!(
1988            UntypedState::<u64>::decode(&v3, bytes.clone())
1989                .check_codecs(&shard_id)
1990                .as_ref(),
1991            Ok(&state)
1992        );
1993
1994        // But we can't read it back using v1 because v1 might corrupt it by
1995        // losing or misinterpreting something written out by a future version
1996        // of code.
1997        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
1998        let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1999        assert_err!(v1_res);
2000    }
2001
2002    #[mz_ore::test]
2003    fn applier_version_state_diff() {
2004        let v1 = semver::Version::new(1, 0, 0);
2005        let v2 = semver::Version::new(2, 0, 0);
2006        let v3 = semver::Version::new(3, 0, 0);
2007
2008        // Code version v2 evaluates and writes out some State.
2009        let diff = StateDiff::<u64>::new(
2010            v2.clone(),
2011            SeqNo(0),
2012            SeqNo(1),
2013            2,
2014            PartialRollupKey("rollup".into()),
2015        );
2016        let mut buf = Vec::new();
2017        diff.encode(&mut buf);
2018        let bytes = Bytes::from(buf);
2019
2020        // We can read it back using persist code v2 and v3.
2021        assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
2022        assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
2023
2024        // But we can't read it back using v1 because v1 might corrupt it by
2025        // losing or misinterpreting something written out by a future version
2026        // of code.
2027        #[allow(clippy::disallowed_methods)] // not using enhanced panic handler in tests
2028        let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
2029        assert_err!(v1_res);
2030    }
2031
2032    #[mz_ore::test]
2033    fn hollow_batch_migration_keys() {
2034        let x = HollowBatch::new_run(
2035            Description::new(
2036                Antichain::from_elem(1u64),
2037                Antichain::from_elem(2u64),
2038                Antichain::from_elem(3u64),
2039            ),
2040            vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2041                key: PartialBatchKey("a".into()),
2042                meta: Default::default(),
2043                encoded_size_bytes: 5,
2044                key_lower: vec![],
2045                structured_key_lower: None,
2046                stats: None,
2047                ts_rewrite: None,
2048                diffs_sum: None,
2049                format: None,
2050                schema_id: None,
2051                deprecated_schema_id: None,
2052            }))],
2053            4,
2054        );
2055        let mut old = x.into_proto();
2056        // Old ProtoHollowBatch had keys instead of parts.
2057        old.deprecated_keys = vec!["b".into()];
2058        // We don't expect to see a ProtoHollowBatch with keys _and_ parts, only
2059        // one or the other, but we have a defined output, so may as well test
2060        // it.
2061        let mut expected = x;
2062        // We fill in 0 for encoded_size_bytes when we migrate from keys. This
2063        // will violate bounded memory usage compaction during the transition
2064        // (short-term issue), but that's better than creating unnecessary runs
2065        // (longer-term issue).
2066        expected
2067            .parts
2068            .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2069                key: PartialBatchKey("b".into()),
2070                meta: Default::default(),
2071                encoded_size_bytes: 0,
2072                key_lower: vec![],
2073                structured_key_lower: None,
2074                stats: None,
2075                ts_rewrite: None,
2076                diffs_sum: None,
2077                format: None,
2078                schema_id: None,
2079                deprecated_schema_id: None,
2080            })));
2081        assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
2082    }
2083
2084    #[mz_ore::test]
2085    fn reader_state_migration_lease_duration() {
2086        let x = LeasedReaderState {
2087            seqno: SeqNo(1),
2088            since: Antichain::from_elem(2u64),
2089            last_heartbeat_timestamp_ms: 3,
2090            debug: HandleDebugState {
2091                hostname: "host".to_owned(),
2092                purpose: "purpose".to_owned(),
2093            },
2094            // Old ProtoReaderState had no lease_duration_ms field
2095            lease_duration_ms: 0,
2096        };
2097        let old = x.into_proto();
2098        let mut expected = x;
2099        // We fill in DEFAULT_READ_LEASE_DURATION for lease_duration_ms when we
2100        // migrate from unset.
2101        expected.lease_duration_ms =
2102            u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
2103        assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
2104    }
2105
2106    #[mz_ore::test]
2107    fn writer_state_migration_most_recent_write() {
2108        let proto = ProtoWriterState {
2109            last_heartbeat_timestamp_ms: 1,
2110            lease_duration_ms: 2,
2111            // Old ProtoWriterState had no most_recent_write_token or
2112            // most_recent_write_upper.
2113            most_recent_write_token: "".into(),
2114            most_recent_write_upper: None,
2115            debug: Some(ProtoHandleDebugState {
2116                hostname: "host".to_owned(),
2117                purpose: "purpose".to_owned(),
2118            }),
2119        };
2120        let expected = WriterState {
2121            last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
2122            lease_duration_ms: proto.lease_duration_ms,
2123            most_recent_write_token: IdempotencyToken::SENTINEL,
2124            most_recent_write_upper: Antichain::from_elem(0),
2125            debug: HandleDebugState {
2126                hostname: "host".to_owned(),
2127                purpose: "purpose".to_owned(),
2128            },
2129        };
2130        assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2131    }
2132
2133    #[mz_ore::test]
2134    fn state_migration_rollups() {
2135        let r1 = HollowRollup {
2136            key: PartialRollupKey("foo".to_owned()),
2137            encoded_size_bytes: None,
2138        };
2139        let r2 = HollowRollup {
2140            key: PartialRollupKey("bar".to_owned()),
2141            encoded_size_bytes: Some(2),
2142        };
2143        let shard_id = ShardId::new();
2144        let mut state = TypedState::<(), (), u64, i64>::new(
2145            DUMMY_BUILD_INFO.semver_version(),
2146            shard_id,
2147            "host".to_owned(),
2148            0,
2149        );
2150        state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2151        let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2152
2153        // Manually add the old rollup encoding.
2154        proto.deprecated_rollups.insert(1, r1.key.0.clone());
2155
2156        let state: Rollup<u64> = proto.into_rust().unwrap();
2157        let state = state.state;
2158        let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2159        let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2160        assert_eq!(
2161            state
2162                .state
2163                .collections
2164                .rollups
2165                .into_iter()
2166                .collect::<Vec<_>>(),
2167            expected
2168        );
2169    }
2170
2171    #[mz_persist_proc::test(tokio::test)]
2172    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2173    async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2174        let r1_rollup = HollowRollup {
2175            key: PartialRollupKey("foo".to_owned()),
2176            encoded_size_bytes: None,
2177        };
2178        let r1 = StateFieldDiff {
2179            key: SeqNo(1),
2180            val: StateFieldValDiff::Insert(r1_rollup.clone()),
2181        };
2182        let r2_rollup = HollowRollup {
2183            key: PartialRollupKey("bar".to_owned()),
2184            encoded_size_bytes: Some(2),
2185        };
2186        let r2 = StateFieldDiff {
2187            key: SeqNo(2),
2188            val: StateFieldValDiff::Insert(r2_rollup.clone()),
2189        };
2190        let r3_rollup = HollowRollup {
2191            key: PartialRollupKey("baz".to_owned()),
2192            encoded_size_bytes: None,
2193        };
2194        let r3 = StateFieldDiff {
2195            key: SeqNo(3),
2196            val: StateFieldValDiff::Delete(r3_rollup.clone()),
2197        };
2198        let mut diff = StateDiff::<u64>::new(
2199            DUMMY_BUILD_INFO.semver_version(),
2200            SeqNo(4),
2201            SeqNo(5),
2202            0,
2203            PartialRollupKey("ignored".to_owned()),
2204        );
2205        diff.rollups.push(r2.clone());
2206        diff.rollups.push(r3.clone());
2207        let mut diff_proto = diff.into_proto();
2208
2209        let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2210        let mut field_diffs_writer = field_diffs.into_writer();
2211
2212        // Manually add the old rollup encoding.
2213        field_diffs_into_proto(
2214            ProtoStateField::DeprecatedRollups,
2215            &[StateFieldDiff {
2216                key: r1.key,
2217                val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2218            }],
2219            &mut field_diffs_writer,
2220        );
2221
2222        assert_none!(diff_proto.field_diffs);
2223        diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2224
2225        let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2226        assert_eq!(
2227            diff.rollups.into_iter().collect::<Vec<_>>(),
2228            vec![r2, r3, r1]
2229        );
2230
2231        // Also make sure that a rollup delete in a diff applies cleanly to a
2232        // state that had it in the deprecated field.
2233        let shard_id = ShardId::new();
2234        let mut state = TypedState::<(), (), u64, i64>::new(
2235            DUMMY_BUILD_INFO.semver_version(),
2236            shard_id,
2237            "host".to_owned(),
2238            0,
2239        );
2240        state.state.seqno = SeqNo(4);
2241        let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2242        rollup
2243            .deprecated_rollups
2244            .insert(3, r3_rollup.key.into_proto());
2245        let state: Rollup<u64> = rollup.into_rust().unwrap();
2246        let state = state.state;
2247        let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2248        let cache = new_test_client_cache(&dyncfgs);
2249        let encoded_diff = VersionedData {
2250            seqno: SeqNo(5),
2251            data: diff_proto.encode_to_vec().into(),
2252        };
2253        state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2254        assert_eq!(
2255            state
2256                .state
2257                .collections
2258                .rollups
2259                .into_iter()
2260                .collect::<Vec<_>>(),
2261            vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2262        );
2263    }
2264
2265    #[mz_ore::test]
2266    #[cfg_attr(miri, ignore)] // too slow
2267    fn state_proto_roundtrip() {
2268        fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2269            let before = UntypedState {
2270                key_codec: <() as Codec>::codec_name(),
2271                val_codec: <() as Codec>::codec_name(),
2272                ts_codec: <T as Codec64>::codec_name(),
2273                diff_codec: <i64 as Codec64>::codec_name(),
2274                state,
2275            };
2276            let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2277            let after: Rollup<T> = proto.into_rust().unwrap();
2278            let after = after.state;
2279            assert_eq!(before, after);
2280        }
2281
2282        proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2283    }
2284
2285    #[mz_ore::test]
2286    fn check_data_versions() {
2287        #[track_caller]
2288        fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2289            let code = Version::parse(code).unwrap();
2290            let data = Version::parse(data).unwrap();
2291            #[allow(clippy::disallowed_methods)]
2292            let actual = cfg::code_can_write_data(&code, &data)
2293                .then_some(())
2294                .ok_or(());
2295            assert_eq!(actual, expected, "data at {data} read by code {code}");
2296        }
2297
2298        testcase("0.160.0-dev", "0.160.0-dev", Ok(()));
2299        testcase("0.160.0-dev", "0.160.0", Err(()));
2300        // Note: Probably useful to let tests use two arbitrary shas on main, at
2301        // the very least for things like git bisect.
2302        testcase("0.160.0-dev", "0.161.0-dev", Err(()));
2303        testcase("0.160.0-dev", "0.161.0", Err(()));
2304        testcase("0.160.0-dev", "0.162.0-dev", Err(()));
2305        testcase("0.160.0-dev", "0.162.0", Err(()));
2306        testcase("0.160.0-dev", "0.163.0-dev", Err(()));
2307
2308        testcase("0.160.0", "0.158.0-dev", Ok(()));
2309        testcase("0.160.0", "0.158.0", Ok(()));
2310        testcase("0.160.0", "0.159.0-dev", Ok(()));
2311        testcase("0.160.0", "0.159.0", Ok(()));
2312        testcase("0.160.0", "0.160.0-dev", Ok(()));
2313        testcase("0.160.0", "0.160.0", Ok(()));
2314
2315        testcase("0.160.0", "0.161.0-dev", Err(()));
2316        testcase("0.160.0", "0.161.0", Err(()));
2317        testcase("0.160.0", "0.161.1", Err(()));
2318        testcase("0.160.0", "0.161.1000000", Err(()));
2319        testcase("0.160.0", "0.162.0-dev", Err(()));
2320        testcase("0.160.0", "0.162.0", Err(()));
2321        testcase("0.160.0", "0.163.0-dev", Err(()));
2322
2323        testcase("0.160.1", "0.159.0", Ok(()));
2324        testcase("0.160.1", "0.160.0", Ok(()));
2325        testcase("0.160.1", "0.161.0", Err(()));
2326        testcase("0.160.1", "0.161.1", Err(()));
2327        testcase("0.160.1", "0.161.100", Err(()));
2328        testcase("0.160.0", "0.160.1", Err(()));
2329
2330        testcase("0.160.1", "26.0.0", Err(()));
2331        testcase("26.0.0", "0.160.1", Ok(()));
2332        testcase("26.2.0", "0.160.1", Ok(()));
2333        testcase("26.200.200", "0.160.1", Ok(()));
2334
2335        testcase("27.0.0", "0.160.1", Err(()));
2336        testcase("27.0.0", "0.16000.1", Err(()));
2337        testcase("27.0.0", "26.0.1", Ok(()));
2338        testcase("27.1000.100", "26.0.1", Ok(()));
2339        testcase("28.0.0", "26.0.1", Err(()));
2340        testcase("28.0.0", "26.1000.1", Err(()));
2341        testcase("28.0.0", "27.0.0", Ok(()));
2342    }
2343}