1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44 ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45 HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46 LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47 ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48 ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49 ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50 ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51 ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52 ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53 RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54 proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57 ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60 ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68 pub id: Option<SchemaId>,
71 pub key: Arc<K::Schema>,
73 pub val: Arc<V::Schema>,
75}
76
77impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
78 fn clone(&self) -> Self {
79 Self {
80 id: self.id,
81 key: Arc::clone(&self.key),
82 val: Arc::clone(&self.val),
83 }
84 }
85}
86
87#[derive(Clone, Serialize, Deserialize)]
110pub struct LazyProto<T> {
111 buf: Bytes,
112 _phantom: PhantomData<fn() -> T>,
113}
114
115impl<T: Message + Default> Debug for LazyProto<T> {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self.decode() {
118 Ok(proto) => Debug::fmt(&proto, f),
119 Err(err) => f
120 .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
121 .field("err", &err)
122 .finish(),
123 }
124 }
125}
126
127impl<T> PartialEq for LazyProto<T> {
128 fn eq(&self, other: &Self) -> bool {
129 self.cmp(other) == Ordering::Equal
130 }
131}
132
133impl<T> Eq for LazyProto<T> {}
134
135impl<T> PartialOrd for LazyProto<T> {
136 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
137 Some(self.cmp(other))
138 }
139}
140
141impl<T> Ord for LazyProto<T> {
142 fn cmp(&self, other: &Self) -> Ordering {
143 let LazyProto {
144 buf: self_buf,
145 _phantom: _,
146 } = self;
147 let LazyProto {
148 buf: other_buf,
149 _phantom: _,
150 } = other;
151 self_buf.cmp(other_buf)
152 }
153}
154
155impl<T> Hash for LazyProto<T> {
156 fn hash<H: Hasher>(&self, state: &mut H) {
157 let LazyProto { buf, _phantom } = self;
158 buf.hash(state);
159 }
160}
161
162impl<T: Message + Default> From<&T> for LazyProto<T> {
163 fn from(value: &T) -> Self {
164 let buf = Bytes::from(value.encode_to_vec());
165 LazyProto {
166 buf,
167 _phantom: PhantomData,
168 }
169 }
170}
171
172impl<T: Message + Default> LazyProto<T> {
173 pub fn decode(&self) -> Result<T, prost::DecodeError> {
174 T::decode(&*self.buf)
175 }
176
177 pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
178 Ok(T::decode(&*self.buf)?.into_rust()?)
179 }
180}
181
182impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
183 fn into_proto(&self) -> Bytes {
184 self.buf.clone()
185 }
186
187 fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
188 Ok(Self {
189 buf,
190 _phantom: PhantomData,
191 })
192 }
193}
194
195pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
196 let uuid_encoded = match encoded.strip_prefix(id_prefix) {
197 Some(x) => x,
198 None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
199 };
200 let uuid = Uuid::parse_str(uuid_encoded)
201 .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
202 Ok(*uuid.as_bytes())
203}
204
205pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
206 if let Err(msg) = cfg::check_data_version(code_version, data_version) {
207 if cfg!(test) {
210 panic!("{msg}");
211 } else {
212 halt!("{msg}");
213 }
214 }
215}
216
217impl RustType<String> for LeasedReaderId {
218 fn into_proto(&self) -> String {
219 self.to_string()
220 }
221
222 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
223 match proto.parse() {
224 Ok(x) => Ok(x),
225 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
226 }
227 }
228}
229
230impl RustType<String> for CriticalReaderId {
231 fn into_proto(&self) -> String {
232 self.to_string()
233 }
234
235 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
236 match proto.parse() {
237 Ok(x) => Ok(x),
238 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
239 }
240 }
241}
242
243impl RustType<String> for WriterId {
244 fn into_proto(&self) -> String {
245 self.to_string()
246 }
247
248 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
249 match proto.parse() {
250 Ok(x) => Ok(x),
251 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
252 }
253 }
254}
255
256impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
257 fn into_proto(&self) -> ProtoEncodedSchemas {
258 ProtoEncodedSchemas {
259 key: self.key.clone(),
260 key_data_type: self.key_data_type.clone(),
261 val: self.val.clone(),
262 val_data_type: self.val_data_type.clone(),
263 }
264 }
265
266 fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
267 Ok(EncodedSchemas {
268 key: proto.key,
269 key_data_type: proto.key_data_type,
270 val: proto.val,
271 val_data_type: proto.val_data_type,
272 })
273 }
274}
275
276impl RustType<String> for IdempotencyToken {
277 fn into_proto(&self) -> String {
278 self.to_string()
279 }
280
281 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
282 match proto.parse() {
283 Ok(x) => Ok(x),
284 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
285 }
286 }
287}
288
289impl RustType<String> for PartialBatchKey {
290 fn into_proto(&self) -> String {
291 self.0.clone()
292 }
293
294 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
295 Ok(PartialBatchKey(proto))
296 }
297}
298
299impl RustType<String> for PartialRollupKey {
300 fn into_proto(&self) -> String {
301 self.0.clone()
302 }
303
304 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
305 Ok(PartialRollupKey(proto))
306 }
307}
308
309impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
310 pub fn encode<B>(&self, buf: &mut B)
311 where
312 B: bytes::BufMut,
313 {
314 self.into_proto()
315 .encode(buf)
316 .expect("no required fields means no initialization errors");
317 }
318
319 pub fn decode(build_version: &Version, buf: Bytes) -> Self {
320 let proto = ProtoStateDiff::decode(buf)
321 .expect("internal error: invalid encoded state");
326 let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
327 check_data_version(build_version, &diff.applier_version);
328 diff
329 }
330}
331
332impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
333 fn into_proto(&self) -> ProtoStateDiff {
334 let StateDiff {
336 applier_version,
337 seqno_from,
338 seqno_to,
339 walltime_ms,
340 latest_rollup_key,
341 rollups,
342 active_rollup,
343 active_gc,
344 hostname,
345 last_gc_req,
346 leased_readers,
347 critical_readers,
348 writers,
349 schemas,
350 since,
351 legacy_batches,
352 hollow_batches,
353 spine_batches,
354 merges,
355 } = self;
356
357 let proto = ProtoStateFieldDiffs::default();
358
359 let mut writer = proto.into_writer();
361
362 field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
363 field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
364 field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
365 field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
366 field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
367 field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
368 field_diffs_into_proto(
369 ProtoStateField::CriticalReaders,
370 critical_readers,
371 &mut writer,
372 );
373 field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
374 field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
375 field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
376 field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
377 field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
378 field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
379 field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
380
381 let field_diffs = writer.into_proto();
383
384 debug_assert_eq!(field_diffs.validate(), Ok(()));
385 ProtoStateDiff {
386 applier_version: applier_version.to_string(),
387 seqno_from: seqno_from.into_proto(),
388 seqno_to: seqno_to.into_proto(),
389 walltime_ms: walltime_ms.into_proto(),
390 latest_rollup_key: latest_rollup_key.into_proto(),
391 field_diffs: Some(field_diffs),
392 }
393 }
394
395 fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
396 let applier_version = if proto.applier_version.is_empty() {
397 semver::Version::new(0, 0, 0)
401 } else {
402 semver::Version::parse(&proto.applier_version).map_err(|err| {
403 TryFromProtoError::InvalidSemverVersion(format!(
404 "invalid applier_version {}: {}",
405 proto.applier_version, err
406 ))
407 })?
408 };
409 let mut state_diff = StateDiff::new(
410 applier_version,
411 proto.seqno_from.into_rust()?,
412 proto.seqno_to.into_rust()?,
413 proto.walltime_ms,
414 proto.latest_rollup_key.into_rust()?,
415 );
416 if let Some(field_diffs) = proto.field_diffs {
417 debug_assert_eq!(field_diffs.validate(), Ok(()));
418 for field_diff in field_diffs.iter() {
419 let (field, diff) = field_diff?;
420 match field {
421 ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
422 diff,
423 &mut state_diff.hostname,
424 |()| Ok(()),
425 |v| v.into_rust(),
426 )?,
427 ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
428 diff,
429 &mut state_diff.last_gc_req,
430 |()| Ok(()),
431 |v| v.into_rust(),
432 )?,
433 ProtoStateField::ActiveGc => {
434 field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
435 diff,
436 &mut state_diff.active_gc,
437 |()| Ok(()),
438 |v| v.into_rust(),
439 )?
440 }
441 ProtoStateField::ActiveRollup => {
442 field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
443 diff,
444 &mut state_diff.active_rollup,
445 |()| Ok(()),
446 |v| v.into_rust(),
447 )?
448 }
449 ProtoStateField::Rollups => {
450 field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
451 diff,
452 &mut state_diff.rollups,
453 |k| k.into_rust(),
454 |v| v.into_rust(),
455 )?
456 }
457 ProtoStateField::DeprecatedRollups => {
461 field_diff_into_rust::<u64, String, _, _, _, _>(
462 diff,
463 &mut state_diff.rollups,
464 |k| k.into_rust(),
465 |v| {
466 Ok(HollowRollup {
467 key: v.into_rust()?,
468 encoded_size_bytes: None,
469 })
470 },
471 )?
472 }
473 ProtoStateField::LeasedReaders => {
474 field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
475 diff,
476 &mut state_diff.leased_readers,
477 |k| k.into_rust(),
478 |v| v.into_rust(),
479 )?
480 }
481 ProtoStateField::CriticalReaders => {
482 field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
483 diff,
484 &mut state_diff.critical_readers,
485 |k| k.into_rust(),
486 |v| v.into_rust(),
487 )?
488 }
489 ProtoStateField::Writers => {
490 field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
491 diff,
492 &mut state_diff.writers,
493 |k| k.into_rust(),
494 |v| v.into_rust(),
495 )?
496 }
497 ProtoStateField::Schemas => {
498 field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
499 diff,
500 &mut state_diff.schemas,
501 |k| k.into_rust(),
502 |v| v.into_rust(),
503 )?
504 }
505 ProtoStateField::Since => {
506 field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
507 diff,
508 &mut state_diff.since,
509 |()| Ok(()),
510 |v| v.into_rust(),
511 )?
512 }
513 ProtoStateField::LegacyBatches => {
514 field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
515 diff,
516 &mut state_diff.legacy_batches,
517 |k| k.into_rust(),
518 |()| Ok(()),
519 )?
520 }
521 ProtoStateField::HollowBatches => {
522 field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
523 diff,
524 &mut state_diff.hollow_batches,
525 |k| k.into_rust(),
526 |v| v.into_rust(),
527 )?
528 }
529 ProtoStateField::SpineBatches => {
530 field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
531 diff,
532 &mut state_diff.spine_batches,
533 |k| k.into_rust(),
534 |v| v.into_rust(),
535 )?
536 }
537 ProtoStateField::SpineMerges => {
538 field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
539 diff,
540 &mut state_diff.merges,
541 |k| k.into_rust(),
542 |v| v.into_rust(),
543 )?
544 }
545 }
546 }
547 }
548 Ok(state_diff)
549 }
550}
551
552fn field_diffs_into_proto<K, KP, V, VP>(
553 field: ProtoStateField,
554 diffs: &[StateFieldDiff<K, V>],
555 writer: &mut ProtoStateFieldDiffsWriter,
556) where
557 KP: prost::Message,
558 K: RustType<KP>,
559 VP: prost::Message,
560 V: RustType<VP>,
561{
562 for diff in diffs.iter() {
563 field_diff_into_proto(field, diff, writer);
564 }
565}
566
567fn field_diff_into_proto<K, KP, V, VP>(
568 field: ProtoStateField,
569 diff: &StateFieldDiff<K, V>,
570 writer: &mut ProtoStateFieldDiffsWriter,
571) where
572 KP: prost::Message,
573 K: RustType<KP>,
574 VP: prost::Message,
575 V: RustType<VP>,
576{
577 writer.push_field(field);
578 writer.encode_proto(&diff.key.into_proto());
579 match &diff.val {
580 StateFieldValDiff::Insert(to) => {
581 writer.push_diff_type(ProtoStateFieldDiffType::Insert);
582 writer.encode_proto(&to.into_proto());
583 }
584 StateFieldValDiff::Update(from, to) => {
585 writer.push_diff_type(ProtoStateFieldDiffType::Update);
586 writer.encode_proto(&from.into_proto());
587 writer.encode_proto(&to.into_proto());
588 }
589 StateFieldValDiff::Delete(from) => {
590 writer.push_diff_type(ProtoStateFieldDiffType::Delete);
591 writer.encode_proto(&from.into_proto());
592 }
593 };
594}
595
596fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
597 proto: ProtoStateFieldDiff<'_>,
598 diffs: &mut Vec<StateFieldDiff<K, V>>,
599 k_fn: KFn,
600 v_fn: VFn,
601) -> Result<(), TryFromProtoError>
602where
603 KP: prost::Message + Default,
604 VP: prost::Message + Default,
605 KFn: Fn(KP) -> Result<K, TryFromProtoError>,
606 VFn: Fn(VP) -> Result<V, TryFromProtoError>,
607{
608 let val = match proto.diff_type {
609 ProtoStateFieldDiffType::Insert => {
610 let to = VP::decode(proto.to)
611 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
612 StateFieldValDiff::Insert(v_fn(to)?)
613 }
614 ProtoStateFieldDiffType::Update => {
615 let from = VP::decode(proto.from)
616 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
617 let to = VP::decode(proto.to)
618 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
619
620 StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
621 }
622 ProtoStateFieldDiffType::Delete => {
623 let from = VP::decode(proto.from)
624 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
625 StateFieldValDiff::Delete(v_fn(from)?)
626 }
627 };
628 let key = KP::decode(proto.key)
629 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
630 diffs.push(StateFieldDiff {
631 key: k_fn(key)?,
632 val,
633 });
634 Ok(())
635}
636
637#[derive(Debug)]
640#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
641pub struct UntypedState<T> {
642 pub(crate) key_codec: String,
643 pub(crate) val_codec: String,
644 pub(crate) ts_codec: String,
645 pub(crate) diff_codec: String,
646
647 state: State<T>,
651}
652
653impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
654 pub fn seqno(&self) -> SeqNo {
655 self.state.seqno
656 }
657
658 pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
659 &self.state.collections.rollups
660 }
661
662 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
663 self.state.latest_rollup()
664 }
665
666 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
667 &mut self,
668 cfg: &PersistConfig,
669 metrics: &Metrics,
670 diffs: I,
671 ) {
672 if T::codec_name() != self.ts_codec {
676 return;
677 }
678 self.state.apply_encoded_diffs(cfg, metrics, diffs);
679 }
680
681 pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
682 self,
683 shard_id: &ShardId,
684 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
685 assert_eq!(shard_id, &self.state.shard_id);
688 if K::codec_name() != self.key_codec
689 || V::codec_name() != self.val_codec
690 || T::codec_name() != self.ts_codec
691 || D::codec_name() != self.diff_codec
692 {
693 return Err(Box::new(CodecMismatch {
694 requested: (
695 K::codec_name(),
696 V::codec_name(),
697 T::codec_name(),
698 D::codec_name(),
699 None,
700 ),
701 actual: (
702 self.key_codec,
703 self.val_codec,
704 self.ts_codec,
705 self.diff_codec,
706 None,
707 ),
708 }));
709 }
710 Ok(TypedState {
711 state: self.state,
712 _phantom: PhantomData,
713 })
714 }
715
716 pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
717 assert_eq!(shard_id, &self.state.shard_id);
720 if T::codec_name() != self.ts_codec {
721 return Err(CodecMismatchT {
722 requested: T::codec_name(),
723 actual: self.ts_codec,
724 });
725 }
726 Ok(self.state)
727 }
728
729 pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
730 let proto = ProtoRollup::decode(buf)
731 .expect("internal error: invalid encoded state");
736 let state = Rollup::from_proto(proto)
737 .expect("internal error: invalid encoded state")
738 .state;
739 check_data_version(build_version, &state.state.applier_version);
740 state
741 }
742}
743
744impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
745where
746 K: Codec,
747 V: Codec,
748 T: Codec64,
749 D: Codec64,
750{
751 fn from(typed_state: TypedState<K, V, T, D>) -> Self {
752 UntypedState {
753 key_codec: K::codec_name(),
754 val_codec: V::codec_name(),
755 ts_codec: T::codec_name(),
756 diff_codec: D::codec_name(),
757 state: typed_state.state,
758 }
759 }
760}
761
762#[derive(Debug)]
769pub struct Rollup<T> {
770 pub(crate) state: UntypedState<T>,
771 pub(crate) diffs: Option<InlinedDiffs>,
772}
773
774impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
775 pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
779 let latest_rollup_seqno = *state.latest_rollup().0;
780 let mut verify_seqno = latest_rollup_seqno;
781 for diff in &diffs {
782 assert_eq!(verify_seqno.next(), diff.seqno);
783 verify_seqno = diff.seqno;
784 }
785 assert_eq!(verify_seqno, state.seqno());
786
787 let diffs = Some(InlinedDiffs::from(
788 latest_rollup_seqno.next(),
789 state.seqno().next(),
790 diffs,
791 ));
792
793 Self { state, diffs }
794 }
795
796 pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
797 Self { state, diffs: None }
798 }
799
800 pub(crate) fn from_state_without_diffs(
801 state: State<T>,
802 key_codec: String,
803 val_codec: String,
804 ts_codec: String,
805 diff_codec: String,
806 ) -> Self {
807 Self::from_untyped_state_without_diffs(UntypedState {
808 key_codec,
809 val_codec,
810 ts_codec,
811 diff_codec,
812 state,
813 })
814 }
815}
816
817#[derive(Debug)]
818pub(crate) struct InlinedDiffs {
819 pub(crate) lower: SeqNo,
820 pub(crate) upper: SeqNo,
821 pub(crate) diffs: Vec<VersionedData>,
822}
823
824impl InlinedDiffs {
825 pub(crate) fn description(&self) -> Description<SeqNo> {
826 Description::new(
827 Antichain::from_elem(self.lower),
828 Antichain::from_elem(self.upper),
829 Antichain::from_elem(SeqNo::minimum()),
830 )
831 }
832
833 fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
834 for diff in &diffs {
835 assert!(diff.seqno >= lower);
836 assert!(diff.seqno < upper);
837 }
838 Self {
839 lower,
840 upper,
841 diffs,
842 }
843 }
844}
845
846impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
847 fn into_proto(&self) -> ProtoInlinedDiffs {
848 ProtoInlinedDiffs {
849 lower: self.lower.into_proto(),
850 upper: self.upper.into_proto(),
851 diffs: self.diffs.into_proto(),
852 }
853 }
854
855 fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
856 Ok(Self {
857 lower: proto.lower.into_rust()?,
858 upper: proto.upper.into_rust()?,
859 diffs: proto.diffs.into_rust()?,
860 })
861 }
862}
863
864impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
865 fn into_proto(&self) -> ProtoRollup {
866 ProtoRollup {
867 applier_version: self.state.state.applier_version.to_string(),
868 shard_id: self.state.state.shard_id.into_proto(),
869 seqno: self.state.state.seqno.into_proto(),
870 walltime_ms: self.state.state.walltime_ms.into_proto(),
871 hostname: self.state.state.hostname.into_proto(),
872 key_codec: self.state.key_codec.into_proto(),
873 val_codec: self.state.val_codec.into_proto(),
874 ts_codec: T::codec_name(),
875 diff_codec: self.state.diff_codec.into_proto(),
876 last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
877 active_rollup: self.state.state.collections.active_rollup.into_proto(),
878 active_gc: self.state.state.collections.active_gc.into_proto(),
879 rollups: self
880 .state
881 .state
882 .collections
883 .rollups
884 .iter()
885 .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
886 .collect(),
887 deprecated_rollups: Default::default(),
888 leased_readers: self
889 .state
890 .state
891 .collections
892 .leased_readers
893 .iter()
894 .map(|(id, state)| (id.into_proto(), state.into_proto()))
895 .collect(),
896 critical_readers: self
897 .state
898 .state
899 .collections
900 .critical_readers
901 .iter()
902 .map(|(id, state)| (id.into_proto(), state.into_proto()))
903 .collect(),
904 writers: self
905 .state
906 .state
907 .collections
908 .writers
909 .iter()
910 .map(|(id, state)| (id.into_proto(), state.into_proto()))
911 .collect(),
912 schemas: self
913 .state
914 .state
915 .collections
916 .schemas
917 .iter()
918 .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
919 .collect(),
920 trace: Some(self.state.state.collections.trace.into_proto()),
921 diffs: self.diffs.as_ref().map(|x| x.into_proto()),
922 }
923 }
924
925 fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
926 let applier_version = if x.applier_version.is_empty() {
927 semver::Version::new(0, 0, 0)
931 } else {
932 semver::Version::parse(&x.applier_version).map_err(|err| {
933 TryFromProtoError::InvalidSemverVersion(format!(
934 "invalid applier_version {}: {}",
935 x.applier_version, err
936 ))
937 })?
938 };
939
940 let mut rollups = BTreeMap::new();
941 for (seqno, rollup) in x.rollups {
942 rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
943 }
944 for (seqno, key) in x.deprecated_rollups {
945 rollups.insert(
946 seqno.into_rust()?,
947 HollowRollup {
948 key: key.into_rust()?,
949 encoded_size_bytes: None,
950 },
951 );
952 }
953 let mut leased_readers = BTreeMap::new();
954 for (id, state) in x.leased_readers {
955 leased_readers.insert(id.into_rust()?, state.into_rust()?);
956 }
957 let mut critical_readers = BTreeMap::new();
958 for (id, state) in x.critical_readers {
959 critical_readers.insert(id.into_rust()?, state.into_rust()?);
960 }
961 let mut writers = BTreeMap::new();
962 for (id, state) in x.writers {
963 writers.insert(id.into_rust()?, state.into_rust()?);
964 }
965 let mut schemas = BTreeMap::new();
966 for (id, x) in x.schemas {
967 schemas.insert(id.into_rust()?, x.into_rust()?);
968 }
969 let active_rollup = x
970 .active_rollup
971 .map(|rollup| rollup.into_rust())
972 .transpose()?;
973 let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
974 let collections = StateCollections {
975 rollups,
976 active_rollup,
977 active_gc,
978 last_gc_req: x.last_gc_req.into_rust()?,
979 leased_readers,
980 critical_readers,
981 writers,
982 schemas,
983 trace: x.trace.into_rust_if_some("trace")?,
984 };
985 let state = State {
986 applier_version,
987 shard_id: x.shard_id.into_rust()?,
988 seqno: x.seqno.into_rust()?,
989 walltime_ms: x.walltime_ms,
990 hostname: x.hostname,
991 collections,
992 };
993
994 let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
995 if let Some(diffs) = &diffs {
996 if diffs.lower != state.latest_rollup().0.next() {
997 return Err(TryFromProtoError::InvalidPersistState(format!(
998 "diffs lower ({}) should match latest rollup's successor: ({})",
999 diffs.lower,
1000 state.latest_rollup().0.next()
1001 )));
1002 }
1003 if diffs.upper != state.seqno.next() {
1004 return Err(TryFromProtoError::InvalidPersistState(format!(
1005 "diffs upper ({}) should match state's successor: ({})",
1006 diffs.lower,
1007 state.seqno.next()
1008 )));
1009 }
1010 }
1011
1012 Ok(Rollup {
1013 state: UntypedState {
1014 state,
1015 key_codec: x.key_codec.into_rust()?,
1016 val_codec: x.val_codec.into_rust()?,
1017 ts_codec: x.ts_codec.into_rust()?,
1018 diff_codec: x.diff_codec.into_rust()?,
1019 },
1020 diffs,
1021 })
1022 }
1023}
1024
1025impl RustType<ProtoVersionedData> for VersionedData {
1026 fn into_proto(&self) -> ProtoVersionedData {
1027 ProtoVersionedData {
1028 seqno: self.seqno.into_proto(),
1029 data: Bytes::clone(&self.data),
1030 }
1031 }
1032
1033 fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1034 Ok(Self {
1035 seqno: proto.seqno.into_rust()?,
1036 data: proto.data,
1037 })
1038 }
1039}
1040
1041impl RustType<ProtoSpineId> for SpineId {
1042 fn into_proto(&self) -> ProtoSpineId {
1043 ProtoSpineId {
1044 lo: self.0.into_proto(),
1045 hi: self.1.into_proto(),
1046 }
1047 }
1048
1049 fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1050 Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1051 }
1052}
1053
1054impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1055 fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1056 let (id, batch) = entry;
1057 ProtoIdHollowBatch {
1058 id: Some(id.into_proto()),
1059 batch: Some(batch.into_proto()),
1060 }
1061 }
1062
1063 fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1064 let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1065 let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1066 Ok((id, batch))
1067 }
1068}
1069
1070impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1071 fn into_proto(&self) -> ProtoSpineBatch {
1072 ProtoSpineBatch {
1073 desc: Some(self.desc.into_proto()),
1074 parts: self.parts.into_proto(),
1075 level: self.level.into_proto(),
1076 descs: self.descs.into_proto(),
1077 }
1078 }
1079
1080 fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1081 let level = proto.level.into_rust()?;
1082 let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1083 let parts = proto.parts.into_rust()?;
1084 let descs = proto.descs.into_rust()?;
1085 Ok(ThinSpineBatch {
1086 level,
1087 desc,
1088 parts,
1089 descs,
1090 })
1091 }
1092}
1093
1094impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1095 fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1096 let (id, batch) = entry;
1097 ProtoIdSpineBatch {
1098 id: Some(id.into_proto()),
1099 batch: Some(batch.into_proto()),
1100 }
1101 }
1102
1103 fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1104 let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1105 let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1106 Ok((id, batch))
1107 }
1108}
1109
1110impl RustType<ProtoCompaction> for ActiveCompaction {
1111 fn into_proto(&self) -> ProtoCompaction {
1112 ProtoCompaction {
1113 start_ms: self.start_ms,
1114 }
1115 }
1116
1117 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1118 Ok(Self {
1119 start_ms: proto.start_ms,
1120 })
1121 }
1122}
1123
1124impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1125 fn into_proto(&self) -> ProtoMerge {
1126 ProtoMerge {
1127 since: Some(self.since.into_proto()),
1128 remaining_work: self.remaining_work.into_proto(),
1129 active_compaction: self.active_compaction.into_proto(),
1130 }
1131 }
1132
1133 fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1134 let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1135 let remaining_work = proto.remaining_work.into_rust()?;
1136 let active_compaction = proto.active_compaction.into_rust()?;
1137 Ok(Self {
1138 since,
1139 remaining_work,
1140 active_compaction,
1141 })
1142 }
1143}
1144
1145impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1146 fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1147 ProtoIdMerge {
1148 id: Some(id.into_proto()),
1149 merge: Some(merge.into_proto()),
1150 }
1151 }
1152
1153 fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1154 let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1155 let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1156 Ok((id, merge))
1157 }
1158}
1159
1160impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1161 fn into_proto(&self) -> ProtoTrace {
1162 let since = self.since.into_proto();
1163 let legacy_batches = self
1164 .legacy_batches
1165 .iter()
1166 .map(|(b, _)| b.into_proto())
1167 .collect();
1168 let hollow_batches = self.hollow_batches.into_proto();
1169 let spine_batches = self.spine_batches.into_proto();
1170 let merges = self.merges.into_proto();
1171 ProtoTrace {
1172 since: Some(since),
1173 legacy_batches,
1174 hollow_batches,
1175 spine_batches,
1176 merges,
1177 }
1178 }
1179
1180 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1181 let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1182 let legacy_batches = proto
1183 .legacy_batches
1184 .into_iter()
1185 .map(|b| b.into_rust().map(|b| (b, ())))
1186 .collect::<Result<_, _>>()?;
1187 let hollow_batches = proto.hollow_batches.into_rust()?;
1188 let spine_batches = proto.spine_batches.into_rust()?;
1189 let merges = proto.merges.into_rust()?;
1190 Ok(FlatTrace {
1191 since,
1192 legacy_batches,
1193 hollow_batches,
1194 spine_batches,
1195 merges,
1196 })
1197 }
1198}
1199
1200impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1201 fn into_proto(&self) -> ProtoTrace {
1202 self.flatten().into_proto()
1203 }
1204
1205 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1206 Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1207 }
1208}
1209
1210impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1211 fn into_proto(&self) -> ProtoLeasedReaderState {
1212 ProtoLeasedReaderState {
1213 seqno: self.seqno.into_proto(),
1214 since: Some(self.since.into_proto()),
1215 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1216 lease_duration_ms: self.lease_duration_ms.into_proto(),
1217 debug: Some(self.debug.into_proto()),
1218 }
1219 }
1220
1221 fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1222 let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1223 if lease_duration_ms == 0 {
1228 lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1229 .expect("lease duration as millis should fit within u64");
1230 }
1231 let debug = proto.debug.unwrap_or_default().into_rust()?;
1234 Ok(LeasedReaderState {
1235 seqno: proto.seqno.into_rust()?,
1236 since: proto
1237 .since
1238 .into_rust_if_some("ProtoLeasedReaderState::since")?,
1239 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1240 lease_duration_ms,
1241 debug,
1242 })
1243 }
1244}
1245
1246impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1247 fn into_proto(&self) -> ProtoCriticalReaderState {
1248 ProtoCriticalReaderState {
1249 since: Some(self.since.into_proto()),
1250 opaque: i64::from_le_bytes(self.opaque.0),
1251 opaque_codec: self.opaque_codec.clone(),
1252 debug: Some(self.debug.into_proto()),
1253 }
1254 }
1255
1256 fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1257 let debug = proto.debug.unwrap_or_default().into_rust()?;
1260 Ok(CriticalReaderState {
1261 since: proto
1262 .since
1263 .into_rust_if_some("ProtoCriticalReaderState::since")?,
1264 opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1265 opaque_codec: proto.opaque_codec,
1266 debug,
1267 })
1268 }
1269}
1270
1271impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1272 fn into_proto(&self) -> ProtoWriterState {
1273 ProtoWriterState {
1274 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1275 lease_duration_ms: self.lease_duration_ms.into_proto(),
1276 most_recent_write_token: self.most_recent_write_token.into_proto(),
1277 most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1278 debug: Some(self.debug.into_proto()),
1279 }
1280 }
1281
1282 fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1283 let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1289 IdempotencyToken::SENTINEL
1290 } else {
1291 proto.most_recent_write_token.into_rust()?
1292 };
1293 let most_recent_write_upper = match proto.most_recent_write_upper {
1294 Some(x) => x.into_rust()?,
1295 None => Antichain::from_elem(T::minimum()),
1296 };
1297 let debug = proto.debug.unwrap_or_default().into_rust()?;
1300 Ok(WriterState {
1301 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1302 lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1303 most_recent_write_token,
1304 most_recent_write_upper,
1305 debug,
1306 })
1307 }
1308}
1309
1310impl RustType<ProtoHandleDebugState> for HandleDebugState {
1311 fn into_proto(&self) -> ProtoHandleDebugState {
1312 ProtoHandleDebugState {
1313 hostname: self.hostname.into_proto(),
1314 purpose: self.purpose.into_proto(),
1315 }
1316 }
1317
1318 fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1319 Ok(HandleDebugState {
1320 hostname: proto.hostname,
1321 purpose: proto.purpose,
1322 })
1323 }
1324}
1325
1326impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1327 fn into_proto(&self) -> ProtoHollowRun {
1328 ProtoHollowRun {
1329 parts: self.parts.into_proto(),
1330 }
1331 }
1332
1333 fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1334 Ok(HollowRun {
1335 parts: proto.parts.into_rust()?,
1336 })
1337 }
1338}
1339
1340impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1341 fn into_proto(&self) -> ProtoHollowBatch {
1342 let mut run_meta = self.run_meta.into_proto();
1343 let run_meta_default = RunMeta::default().into_proto();
1345 while run_meta.last() == Some(&run_meta_default) {
1346 run_meta.pop();
1347 }
1348 ProtoHollowBatch {
1349 desc: Some(self.desc.into_proto()),
1350 parts: self.parts.into_proto(),
1351 len: self.len.into_proto(),
1352 runs: self.run_splits.into_proto(),
1353 run_meta,
1354 deprecated_keys: vec![],
1355 }
1356 }
1357
1358 fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1359 let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1360 parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1363 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1364 key: PartialBatchKey(key),
1365 encoded_size_bytes: 0,
1366 key_lower: vec![],
1367 structured_key_lower: None,
1368 stats: None,
1369 ts_rewrite: None,
1370 diffs_sum: None,
1371 format: None,
1372 schema_id: None,
1373 deprecated_schema_id: None,
1374 }))
1375 }));
1376 let run_splits: Vec<usize> = proto.runs.into_rust()?;
1378 let num_runs = if parts.is_empty() {
1379 0
1380 } else {
1381 run_splits.len() + 1
1382 };
1383 let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1384 run_meta.resize(num_runs, RunMeta::default());
1385 Ok(HollowBatch {
1386 desc: proto.desc.into_rust_if_some("desc")?,
1387 parts,
1388 len: proto.len.into_rust()?,
1389 run_splits,
1390 run_meta,
1391 })
1392 }
1393}
1394
1395impl RustType<String> for RunId {
1396 fn into_proto(&self) -> String {
1397 self.to_string()
1398 }
1399
1400 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1401 RunId::from_str(&proto).map_err(|_| {
1402 TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1403 })
1404 }
1405}
1406
1407impl RustType<ProtoRunMeta> for RunMeta {
1408 fn into_proto(&self) -> ProtoRunMeta {
1409 let order = match self.order {
1410 None => ProtoRunOrder::Unknown,
1411 Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1412 Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1413 Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1414 };
1415 ProtoRunMeta {
1416 order: order.into(),
1417 schema_id: self.schema.into_proto(),
1418 deprecated_schema_id: self.deprecated_schema.into_proto(),
1419 id: self.id.into_proto(),
1420 len: self.len.into_proto(),
1421 }
1422 }
1423
1424 fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1425 let order = match ProtoRunOrder::try_from(proto.order)? {
1426 ProtoRunOrder::Unknown => None,
1427 ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1428 ProtoRunOrder::Codec => Some(RunOrder::Codec),
1429 ProtoRunOrder::Structured => Some(RunOrder::Structured),
1430 };
1431 Ok(Self {
1432 order,
1433 schema: proto.schema_id.into_rust()?,
1434 deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1435 id: proto.id.into_rust()?,
1436 len: proto.len.into_rust()?,
1437 })
1438 }
1439}
1440
1441impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1442 fn into_proto(&self) -> ProtoHollowBatchPart {
1443 match self {
1444 RunPart::Single(part) => part.into_proto(),
1445 RunPart::Many(runs) => runs.into_proto(),
1446 }
1447 }
1448
1449 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1450 let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1451 RunPart::Many(proto.into_rust()?)
1452 } else {
1453 RunPart::Single(proto.into_rust()?)
1454 };
1455 Ok(run_part)
1456 }
1457}
1458
1459impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1460 fn into_proto(&self) -> ProtoHollowBatchPart {
1461 let part = ProtoHollowBatchPart {
1462 kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1463 key: self.key.into_proto(),
1464 max_part_bytes: self.max_part_bytes.into_proto(),
1465 })),
1466 encoded_size_bytes: self.hollow_bytes.into_proto(),
1467 key_lower: Bytes::copy_from_slice(&self.key_lower),
1468 diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1469 key_stats: None,
1470 ts_rewrite: None,
1471 format: None,
1472 schema_id: None,
1473 structured_key_lower: self.structured_key_lower.into_proto(),
1474 deprecated_schema_id: None,
1475 };
1476 part
1477 }
1478
1479 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1480 let run_proto = match proto.kind {
1481 Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1482 _ => Err(TryFromProtoError::UnknownEnumVariant(
1483 "ProtoHollowBatchPart::kind".to_string(),
1484 ))?,
1485 };
1486 Ok(Self {
1487 key: run_proto.key.into_rust()?,
1488 hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1489 max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1490 key_lower: proto.key_lower.to_vec(),
1491 structured_key_lower: proto.structured_key_lower.into_rust()?,
1492 diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1493 _phantom_data: Default::default(),
1494 })
1495 }
1496}
1497
1498impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1499 fn into_proto(&self) -> ProtoHollowBatchPart {
1500 match self {
1501 BatchPart::Hollow(x) => ProtoHollowBatchPart {
1502 kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1503 encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1504 key_lower: Bytes::copy_from_slice(&x.key_lower),
1505 structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1506 key_stats: x.stats.into_proto(),
1507 ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1508 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1509 format: x.format.map(|f| f.into_proto()),
1510 schema_id: x.schema_id.into_proto(),
1511 deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1512 },
1513 BatchPart::Inline {
1514 updates,
1515 ts_rewrite,
1516 schema_id,
1517 deprecated_schema_id,
1518 } => ProtoHollowBatchPart {
1519 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1520 encoded_size_bytes: 0,
1521 key_lower: Bytes::new(),
1522 structured_key_lower: None,
1523 key_stats: None,
1524 ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1525 diffs_sum: None,
1526 format: None,
1527 schema_id: schema_id.into_proto(),
1528 deprecated_schema_id: deprecated_schema_id.into_proto(),
1529 },
1530 }
1531 }
1532
1533 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1534 let ts_rewrite = match proto.ts_rewrite {
1535 Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1536 None => None,
1537 };
1538 let schema_id = proto.schema_id.into_rust()?;
1539 let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1540 match proto.kind {
1541 Some(proto_hollow_batch_part::Kind::Key(key)) => {
1542 Ok(BatchPart::Hollow(HollowBatchPart {
1543 key: key.into_rust()?,
1544 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1545 key_lower: proto.key_lower.into(),
1546 structured_key_lower: proto.structured_key_lower.into_rust()?,
1547 stats: proto.key_stats.into_rust()?,
1548 ts_rewrite,
1549 diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1550 format: proto.format.map(|f| f.into_rust()).transpose()?,
1551 schema_id,
1552 deprecated_schema_id,
1553 }))
1554 }
1555 Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1556 assert_eq!(proto.encoded_size_bytes, 0);
1557 assert_eq!(proto.key_lower.len(), 0);
1558 assert_none!(proto.key_stats);
1559 assert_none!(proto.diffs_sum);
1560 let updates = LazyInlineBatchPart(x.into_rust()?);
1561 Ok(BatchPart::Inline {
1562 updates,
1563 ts_rewrite,
1564 schema_id,
1565 deprecated_schema_id,
1566 })
1567 }
1568 _ => Err(TryFromProtoError::unknown_enum_variant(
1569 "ProtoHollowBatchPart::kind",
1570 )),
1571 }
1572 }
1573}
1574
1575impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1576 fn into_proto(&self) -> proto_hollow_batch_part::Format {
1577 match self {
1578 BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1579 BatchColumnarFormat::Both(version) => {
1580 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1581 }
1582 BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1583 }
1584 }
1585
1586 fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1587 let format = match proto {
1588 proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1589 proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1590 BatchColumnarFormat::Both(version.cast_into())
1591 }
1592 proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1593 };
1594 Ok(format)
1595 }
1596}
1597
1598#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1603pub struct LazyPartStats {
1604 key: LazyProto<ProtoStructStats>,
1605}
1606
1607impl Debug for LazyPartStats {
1608 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1609 f.debug_tuple("LazyPartStats")
1610 .field(&self.decode())
1611 .finish()
1612 }
1613}
1614
1615impl LazyPartStats {
1616 pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1617 let PartStats { key } = x;
1618 let mut proto_stats = ProtoStructStats::from_rust(key);
1619 map_proto(&mut proto_stats);
1620 LazyPartStats {
1621 key: LazyProto::from(&proto_stats),
1622 }
1623 }
1624 pub fn decode(&self) -> PartStats {
1629 let key = self.key.decode().expect("valid proto");
1630 PartStats {
1631 key: key.into_rust().expect("valid stats"),
1632 }
1633 }
1634}
1635
1636impl RustType<Bytes> for LazyPartStats {
1637 fn into_proto(&self) -> Bytes {
1638 let LazyPartStats { key } = self;
1639 key.into_proto()
1640 }
1641
1642 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1643 Ok(LazyPartStats {
1644 key: proto.into_rust()?,
1645 })
1646 }
1647}
1648
1649#[cfg(test)]
1650pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1651 proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1652}
1653
1654#[allow(unused_parens)]
1655impl Arbitrary for LazyPartStats {
1656 type Parameters = ();
1657 type Strategy =
1658 proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1659
1660 fn arbitrary_with(_: ()) -> Self::Strategy {
1661 Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1662 LazyPartStats::encode(&x, |_| {})
1663 })
1664 }
1665}
1666
1667impl ProtoInlineBatchPart {
1668 pub(crate) fn into_rust<T: Timestamp + Codec64>(
1669 lgbytes: &ColumnarMetrics,
1670 proto: Self,
1671 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1672 let updates = proto
1673 .updates
1674 .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1675 let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1676
1677 Ok(BlobTraceBatchPart {
1678 desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1679 index: proto.index.into_rust()?,
1680 updates,
1681 })
1682 }
1683}
1684
1685#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1687pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1688
1689impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1690 fn from(value: &ProtoInlineBatchPart) -> Self {
1691 LazyInlineBatchPart(value.into())
1692 }
1693}
1694
1695impl Serialize for LazyInlineBatchPart {
1696 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1697 let proto = self.0.decode().expect("valid proto");
1700 let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1701 let () = s.serialize_field("desc", &proto.desc)?;
1702 let () = s.serialize_field("index", &proto.index)?;
1703 let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1704 s.end()
1705 }
1706}
1707
1708impl LazyInlineBatchPart {
1709 pub(crate) fn encoded_size_bytes(&self) -> usize {
1710 self.0.buf.len()
1711 }
1712
1713 pub fn decode<T: Timestamp + Codec64>(
1719 &self,
1720 lgbytes: &ColumnarMetrics,
1721 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1722 let proto = self.0.decode().expect("valid proto");
1723 ProtoInlineBatchPart::into_rust(lgbytes, proto)
1724 }
1725}
1726
1727impl RustType<Bytes> for LazyInlineBatchPart {
1728 fn into_proto(&self) -> Bytes {
1729 self.0.into_proto()
1730 }
1731
1732 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1733 Ok(LazyInlineBatchPart(proto.into_rust()?))
1734 }
1735}
1736
1737impl RustType<ProtoHollowRollup> for HollowRollup {
1738 fn into_proto(&self) -> ProtoHollowRollup {
1739 ProtoHollowRollup {
1740 key: self.key.into_proto(),
1741 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1742 }
1743 }
1744
1745 fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1746 Ok(HollowRollup {
1747 key: proto.key.into_rust()?,
1748 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1749 })
1750 }
1751}
1752
1753impl RustType<ProtoActiveRollup> for ActiveRollup {
1754 fn into_proto(&self) -> ProtoActiveRollup {
1755 ProtoActiveRollup {
1756 start_ms: self.start_ms,
1757 seqno: self.seqno.into_proto(),
1758 }
1759 }
1760
1761 fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1762 Ok(ActiveRollup {
1763 start_ms: proto.start_ms,
1764 seqno: proto.seqno.into_rust()?,
1765 })
1766 }
1767}
1768
1769impl RustType<ProtoActiveGc> for ActiveGc {
1770 fn into_proto(&self) -> ProtoActiveGc {
1771 ProtoActiveGc {
1772 start_ms: self.start_ms,
1773 seqno: self.seqno.into_proto(),
1774 }
1775 }
1776
1777 fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1778 Ok(ActiveGc {
1779 start_ms: proto.start_ms,
1780 seqno: proto.seqno.into_rust()?,
1781 })
1782 }
1783}
1784
1785impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1786 fn into_proto(&self) -> ProtoU64Description {
1787 ProtoU64Description {
1788 lower: Some(self.lower().into_proto()),
1789 upper: Some(self.upper().into_proto()),
1790 since: Some(self.since().into_proto()),
1791 }
1792 }
1793
1794 fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1795 Ok(Description::new(
1796 proto.lower.into_rust_if_some("lower")?,
1797 proto.upper.into_rust_if_some("upper")?,
1798 proto.since.into_rust_if_some("since")?,
1799 ))
1800 }
1801}
1802
1803impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1804 fn into_proto(&self) -> ProtoU64Antichain {
1805 ProtoU64Antichain {
1806 elements: self
1807 .elements()
1808 .iter()
1809 .map(|x| i64::from_le_bytes(T::encode(x)))
1810 .collect(),
1811 }
1812 }
1813
1814 fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1815 let elements = proto
1816 .elements
1817 .iter()
1818 .map(|x| T::decode(x.to_le_bytes()))
1819 .collect::<Vec<_>>();
1820 Ok(Antichain::from(elements))
1821 }
1822}
1823
1824#[cfg(test)]
1825mod tests {
1826 use bytes::Bytes;
1827 use mz_build_info::DUMMY_BUILD_INFO;
1828 use mz_dyncfg::ConfigUpdates;
1829 use mz_ore::assert_err;
1830 use mz_persist::location::SeqNo;
1831 use proptest::prelude::*;
1832
1833 use crate::ShardId;
1834 use crate::internal::paths::PartialRollupKey;
1835 use crate::internal::state::tests::any_state;
1836 use crate::internal::state::{BatchPart, HandleDebugState};
1837 use crate::internal::state_diff::StateDiff;
1838 use crate::tests::new_test_client_cache;
1839
1840 use super::*;
1841
1842 #[mz_ore::test]
1843 fn applier_version_state() {
1844 let v1 = semver::Version::new(1, 0, 0);
1845 let v2 = semver::Version::new(2, 0, 0);
1846 let v3 = semver::Version::new(3, 0, 0);
1847
1848 let shard_id = ShardId::new();
1850 let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1851 let rollup =
1852 Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1853 let mut buf = Vec::new();
1854 rollup.encode(&mut buf).expect("serializable");
1855 let bytes = Bytes::from(buf);
1856
1857 assert_eq!(
1859 UntypedState::<u64>::decode(&v2, bytes.clone())
1860 .check_codecs(&shard_id)
1861 .as_ref(),
1862 Ok(&state)
1863 );
1864 assert_eq!(
1865 UntypedState::<u64>::decode(&v3, bytes.clone())
1866 .check_codecs(&shard_id)
1867 .as_ref(),
1868 Ok(&state)
1869 );
1870
1871 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1876 assert_err!(v1_res);
1877 }
1878
1879 #[mz_ore::test]
1880 fn applier_version_state_diff() {
1881 let v1 = semver::Version::new(1, 0, 0);
1882 let v2 = semver::Version::new(2, 0, 0);
1883 let v3 = semver::Version::new(3, 0, 0);
1884
1885 let diff = StateDiff::<u64>::new(
1887 v2.clone(),
1888 SeqNo(0),
1889 SeqNo(1),
1890 2,
1891 PartialRollupKey("rollup".into()),
1892 );
1893 let mut buf = Vec::new();
1894 diff.encode(&mut buf);
1895 let bytes = Bytes::from(buf);
1896
1897 assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1899 assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1900
1901 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1906 assert_err!(v1_res);
1907 }
1908
1909 #[mz_ore::test]
1910 fn hollow_batch_migration_keys() {
1911 let x = HollowBatch::new_run(
1912 Description::new(
1913 Antichain::from_elem(1u64),
1914 Antichain::from_elem(2u64),
1915 Antichain::from_elem(3u64),
1916 ),
1917 vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1918 key: PartialBatchKey("a".into()),
1919 encoded_size_bytes: 5,
1920 key_lower: vec![],
1921 structured_key_lower: None,
1922 stats: None,
1923 ts_rewrite: None,
1924 diffs_sum: None,
1925 format: None,
1926 schema_id: None,
1927 deprecated_schema_id: None,
1928 }))],
1929 4,
1930 );
1931 let mut old = x.into_proto();
1932 old.deprecated_keys = vec!["b".into()];
1934 let mut expected = x;
1938 expected
1943 .parts
1944 .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1945 key: PartialBatchKey("b".into()),
1946 encoded_size_bytes: 0,
1947 key_lower: vec![],
1948 structured_key_lower: None,
1949 stats: None,
1950 ts_rewrite: None,
1951 diffs_sum: None,
1952 format: None,
1953 schema_id: None,
1954 deprecated_schema_id: None,
1955 })));
1956 assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1957 }
1958
1959 #[mz_ore::test]
1960 fn reader_state_migration_lease_duration() {
1961 let x = LeasedReaderState {
1962 seqno: SeqNo(1),
1963 since: Antichain::from_elem(2u64),
1964 last_heartbeat_timestamp_ms: 3,
1965 debug: HandleDebugState {
1966 hostname: "host".to_owned(),
1967 purpose: "purpose".to_owned(),
1968 },
1969 lease_duration_ms: 0,
1971 };
1972 let old = x.into_proto();
1973 let mut expected = x;
1974 expected.lease_duration_ms =
1977 u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1978 assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1979 }
1980
1981 #[mz_ore::test]
1982 fn writer_state_migration_most_recent_write() {
1983 let proto = ProtoWriterState {
1984 last_heartbeat_timestamp_ms: 1,
1985 lease_duration_ms: 2,
1986 most_recent_write_token: "".into(),
1989 most_recent_write_upper: None,
1990 debug: Some(ProtoHandleDebugState {
1991 hostname: "host".to_owned(),
1992 purpose: "purpose".to_owned(),
1993 }),
1994 };
1995 let expected = WriterState {
1996 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1997 lease_duration_ms: proto.lease_duration_ms,
1998 most_recent_write_token: IdempotencyToken::SENTINEL,
1999 most_recent_write_upper: Antichain::from_elem(0),
2000 debug: HandleDebugState {
2001 hostname: "host".to_owned(),
2002 purpose: "purpose".to_owned(),
2003 },
2004 };
2005 assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2006 }
2007
2008 #[mz_ore::test]
2009 fn state_migration_rollups() {
2010 let r1 = HollowRollup {
2011 key: PartialRollupKey("foo".to_owned()),
2012 encoded_size_bytes: None,
2013 };
2014 let r2 = HollowRollup {
2015 key: PartialRollupKey("bar".to_owned()),
2016 encoded_size_bytes: Some(2),
2017 };
2018 let shard_id = ShardId::new();
2019 let mut state = TypedState::<(), (), u64, i64>::new(
2020 DUMMY_BUILD_INFO.semver_version(),
2021 shard_id,
2022 "host".to_owned(),
2023 0,
2024 );
2025 state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2026 let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2027
2028 proto.deprecated_rollups.insert(1, r1.key.0.clone());
2030
2031 let state: Rollup<u64> = proto.into_rust().unwrap();
2032 let state = state.state;
2033 let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2034 let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2035 assert_eq!(
2036 state
2037 .state
2038 .collections
2039 .rollups
2040 .into_iter()
2041 .collect::<Vec<_>>(),
2042 expected
2043 );
2044 }
2045
2046 #[mz_persist_proc::test(tokio::test)]
2047 #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2049 let r1_rollup = HollowRollup {
2050 key: PartialRollupKey("foo".to_owned()),
2051 encoded_size_bytes: None,
2052 };
2053 let r1 = StateFieldDiff {
2054 key: SeqNo(1),
2055 val: StateFieldValDiff::Insert(r1_rollup.clone()),
2056 };
2057 let r2_rollup = HollowRollup {
2058 key: PartialRollupKey("bar".to_owned()),
2059 encoded_size_bytes: Some(2),
2060 };
2061 let r2 = StateFieldDiff {
2062 key: SeqNo(2),
2063 val: StateFieldValDiff::Insert(r2_rollup.clone()),
2064 };
2065 let r3_rollup = HollowRollup {
2066 key: PartialRollupKey("baz".to_owned()),
2067 encoded_size_bytes: None,
2068 };
2069 let r3 = StateFieldDiff {
2070 key: SeqNo(3),
2071 val: StateFieldValDiff::Delete(r3_rollup.clone()),
2072 };
2073 let mut diff = StateDiff::<u64>::new(
2074 DUMMY_BUILD_INFO.semver_version(),
2075 SeqNo(4),
2076 SeqNo(5),
2077 0,
2078 PartialRollupKey("ignored".to_owned()),
2079 );
2080 diff.rollups.push(r2.clone());
2081 diff.rollups.push(r3.clone());
2082 let mut diff_proto = diff.into_proto();
2083
2084 let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2085 let mut field_diffs_writer = field_diffs.into_writer();
2086
2087 field_diffs_into_proto(
2089 ProtoStateField::DeprecatedRollups,
2090 &[StateFieldDiff {
2091 key: r1.key,
2092 val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2093 }],
2094 &mut field_diffs_writer,
2095 );
2096
2097 assert_none!(diff_proto.field_diffs);
2098 diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2099
2100 let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2101 assert_eq!(
2102 diff.rollups.into_iter().collect::<Vec<_>>(),
2103 vec![r2, r3, r1]
2104 );
2105
2106 let shard_id = ShardId::new();
2109 let mut state = TypedState::<(), (), u64, i64>::new(
2110 DUMMY_BUILD_INFO.semver_version(),
2111 shard_id,
2112 "host".to_owned(),
2113 0,
2114 );
2115 state.state.seqno = SeqNo(4);
2116 let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2117 rollup
2118 .deprecated_rollups
2119 .insert(3, r3_rollup.key.into_proto());
2120 let state: Rollup<u64> = rollup.into_rust().unwrap();
2121 let state = state.state;
2122 let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2123 let cache = new_test_client_cache(&dyncfgs);
2124 let encoded_diff = VersionedData {
2125 seqno: SeqNo(5),
2126 data: diff_proto.encode_to_vec().into(),
2127 };
2128 state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2129 assert_eq!(
2130 state
2131 .state
2132 .collections
2133 .rollups
2134 .into_iter()
2135 .collect::<Vec<_>>(),
2136 vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2137 );
2138 }
2139
2140 #[mz_ore::test]
2141 #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2143 fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2144 let before = UntypedState {
2145 key_codec: <() as Codec>::codec_name(),
2146 val_codec: <() as Codec>::codec_name(),
2147 ts_codec: <T as Codec64>::codec_name(),
2148 diff_codec: <i64 as Codec64>::codec_name(),
2149 state,
2150 };
2151 let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2152 let after: Rollup<T> = proto.into_rust().unwrap();
2153 let after = after.state;
2154 assert_eq!(before, after);
2155 }
2156
2157 proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2158 }
2159
2160 #[mz_ore::test]
2161 fn check_data_versions_with_self_managed_versions() {
2162 #[track_caller]
2163 fn testcase(
2164 code: &str,
2165 data: &str,
2166 self_managed_versions: &[Version],
2167 expected: Result<(), ()>,
2168 ) {
2169 let code = Version::parse(code).unwrap();
2170 let data = Version::parse(data).unwrap();
2171 let actual = cfg::check_data_version_with_self_managed_versions(
2172 &code,
2173 &data,
2174 self_managed_versions,
2175 )
2176 .map_err(|_| ());
2177 assert_eq!(actual, expected);
2178 }
2179
2180 let none = [];
2181 let one = [Version::new(0, 130, 0)];
2182 let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2183 let three = [
2184 Version::new(0, 130, 0),
2185 Version::new(0, 140, 0),
2186 Version::new(0, 150, 0),
2187 ];
2188
2189 testcase("0.130.0", "0.128.0", &none, Ok(()));
2190 testcase("0.130.0", "0.129.0", &none, Ok(()));
2191 testcase("0.130.0", "0.130.0", &none, Ok(()));
2192 testcase("0.130.0", "0.130.1", &none, Ok(()));
2193 testcase("0.130.1", "0.130.0", &none, Ok(()));
2194 testcase("0.130.0", "0.131.0", &none, Ok(()));
2195 testcase("0.130.0", "0.132.0", &none, Err(()));
2196
2197 testcase("0.129.0", "0.127.0", &none, Ok(()));
2198 testcase("0.129.0", "0.128.0", &none, Ok(()));
2199 testcase("0.129.0", "0.129.0", &none, Ok(()));
2200 testcase("0.129.0", "0.129.1", &none, Ok(()));
2201 testcase("0.129.1", "0.129.0", &none, Ok(()));
2202 testcase("0.129.0", "0.130.0", &none, Ok(()));
2203 testcase("0.129.0", "0.131.0", &none, Err(()));
2204
2205 testcase("0.130.0", "0.128.0", &one, Ok(()));
2206 testcase("0.130.0", "0.129.0", &one, Ok(()));
2207 testcase("0.130.0", "0.130.0", &one, Ok(()));
2208 testcase("0.130.0", "0.130.1", &one, Ok(()));
2209 testcase("0.130.1", "0.130.0", &one, Ok(()));
2210 testcase("0.130.0", "0.131.0", &one, Ok(()));
2211 testcase("0.130.0", "0.132.0", &one, Ok(()));
2212
2213 testcase("0.129.0", "0.127.0", &one, Ok(()));
2214 testcase("0.129.0", "0.128.0", &one, Ok(()));
2215 testcase("0.129.0", "0.129.0", &one, Ok(()));
2216 testcase("0.129.0", "0.129.1", &one, Ok(()));
2217 testcase("0.129.1", "0.129.0", &one, Ok(()));
2218 testcase("0.129.0", "0.130.0", &one, Ok(()));
2219 testcase("0.129.0", "0.131.0", &one, Err(()));
2220
2221 testcase("0.131.0", "0.129.0", &one, Ok(()));
2222 testcase("0.131.0", "0.130.0", &one, Ok(()));
2223 testcase("0.131.0", "0.131.0", &one, Ok(()));
2224 testcase("0.131.0", "0.131.1", &one, Ok(()));
2225 testcase("0.131.1", "0.131.0", &one, Ok(()));
2226 testcase("0.131.0", "0.132.0", &one, Ok(()));
2227 testcase("0.131.0", "0.133.0", &one, Err(()));
2228
2229 testcase("0.130.0", "0.128.0", &two, Ok(()));
2230 testcase("0.130.0", "0.129.0", &two, Ok(()));
2231 testcase("0.130.0", "0.130.0", &two, Ok(()));
2232 testcase("0.130.0", "0.130.1", &two, Ok(()));
2233 testcase("0.130.1", "0.130.0", &two, Ok(()));
2234 testcase("0.130.0", "0.131.0", &two, Ok(()));
2235 testcase("0.130.0", "0.132.0", &two, Ok(()));
2236 testcase("0.130.0", "0.135.0", &two, Ok(()));
2237 testcase("0.130.0", "0.138.0", &two, Ok(()));
2238 testcase("0.130.0", "0.139.0", &two, Ok(()));
2239 testcase("0.130.0", "0.140.0", &two, Ok(()));
2240 testcase("0.130.9", "0.140.0", &two, Ok(()));
2241 testcase("0.130.0", "0.140.1", &two, Ok(()));
2242 testcase("0.130.3", "0.140.1", &two, Ok(()));
2243 testcase("0.130.3", "0.140.9", &two, Ok(()));
2244 testcase("0.130.0", "0.141.0", &two, Err(()));
2245 testcase("0.129.0", "0.133.0", &two, Err(()));
2246 testcase("0.129.0", "0.140.0", &two, Err(()));
2247 testcase("0.131.0", "0.133.0", &two, Err(()));
2248 testcase("0.131.0", "0.140.0", &two, Err(()));
2249
2250 testcase("0.130.0", "0.128.0", &three, Ok(()));
2251 testcase("0.130.0", "0.129.0", &three, Ok(()));
2252 testcase("0.130.0", "0.130.0", &three, Ok(()));
2253 testcase("0.130.0", "0.130.1", &three, Ok(()));
2254 testcase("0.130.1", "0.130.0", &three, Ok(()));
2255 testcase("0.130.0", "0.131.0", &three, Ok(()));
2256 testcase("0.130.0", "0.132.0", &three, Ok(()));
2257 testcase("0.130.0", "0.135.0", &three, Ok(()));
2258 testcase("0.130.0", "0.138.0", &three, Ok(()));
2259 testcase("0.130.0", "0.139.0", &three, Ok(()));
2260 testcase("0.130.0", "0.140.0", &three, Ok(()));
2261 testcase("0.130.9", "0.140.0", &three, Ok(()));
2262 testcase("0.130.0", "0.140.1", &three, Ok(()));
2263 testcase("0.130.3", "0.140.1", &three, Ok(()));
2264 testcase("0.130.3", "0.140.9", &three, Ok(()));
2265 testcase("0.130.0", "0.141.0", &three, Err(()));
2266 testcase("0.129.0", "0.133.0", &three, Err(()));
2267 testcase("0.129.0", "0.140.0", &three, Err(()));
2268 testcase("0.131.0", "0.133.0", &three, Err(()));
2269 testcase("0.131.0", "0.140.0", &three, Err(()));
2270 testcase("0.130.0", "0.150.0", &three, Err(()));
2271
2272 testcase("0.140.0", "0.138.0", &three, Ok(()));
2273 testcase("0.140.0", "0.139.0", &three, Ok(()));
2274 testcase("0.140.0", "0.140.0", &three, Ok(()));
2275 testcase("0.140.0", "0.140.1", &three, Ok(()));
2276 testcase("0.140.1", "0.140.0", &three, Ok(()));
2277 testcase("0.140.0", "0.141.0", &three, Ok(()));
2278 testcase("0.140.0", "0.142.0", &three, Ok(()));
2279 testcase("0.140.0", "0.145.0", &three, Ok(()));
2280 testcase("0.140.0", "0.148.0", &three, Ok(()));
2281 testcase("0.140.0", "0.149.0", &three, Ok(()));
2282 testcase("0.140.0", "0.150.0", &three, Ok(()));
2283 testcase("0.140.9", "0.150.0", &three, Ok(()));
2284 testcase("0.140.0", "0.150.1", &three, Ok(()));
2285 testcase("0.140.3", "0.150.1", &three, Ok(()));
2286 testcase("0.140.3", "0.150.9", &three, Ok(()));
2287 testcase("0.140.0", "0.151.0", &three, Err(()));
2288 testcase("0.139.0", "0.143.0", &three, Err(()));
2289 testcase("0.139.0", "0.150.0", &three, Err(()));
2290 testcase("0.141.0", "0.143.0", &three, Err(()));
2291 testcase("0.141.0", "0.150.0", &three, Err(()));
2292
2293 testcase("0.150.0", "0.148.0", &three, Ok(()));
2294 testcase("0.150.0", "0.149.0", &three, Ok(()));
2295 testcase("0.150.0", "0.150.0", &three, Ok(()));
2296 testcase("0.150.0", "0.150.1", &three, Ok(()));
2297 testcase("0.150.1", "0.150.0", &three, Ok(()));
2298 testcase("0.150.0", "0.151.0", &three, Ok(()));
2299 testcase("0.150.0", "0.152.0", &three, Ok(()));
2300 testcase("0.150.0", "0.155.0", &three, Ok(()));
2301 testcase("0.150.0", "0.158.0", &three, Ok(()));
2302 testcase("0.150.0", "0.159.0", &three, Ok(()));
2303 testcase("0.150.0", "0.160.0", &three, Ok(()));
2304 testcase("0.150.9", "0.160.0", &three, Ok(()));
2305 testcase("0.150.0", "0.160.1", &three, Ok(()));
2306 testcase("0.150.3", "0.160.1", &three, Ok(()));
2307 testcase("0.150.3", "0.160.9", &three, Ok(()));
2308 testcase("0.150.0", "0.161.0", &three, Ok(()));
2309 testcase("0.149.0", "0.153.0", &three, Err(()));
2310 testcase("0.149.0", "0.160.0", &three, Err(()));
2311 testcase("0.151.0", "0.153.0", &three, Err(()));
2312 testcase("0.151.0", "0.160.0", &three, Err(()));
2313 }
2314
2315 #[mz_ore::test]
2316 fn check_data_versions() {
2317 #[track_caller]
2318 fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2319 let code = Version::parse(code).unwrap();
2320 let data = Version::parse(data).unwrap();
2321 #[allow(clippy::disallowed_methods)]
2322 let actual =
2323 std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2324 assert_eq!(actual, expected);
2325 }
2326
2327 testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2328 testcase("0.10.0-dev", "0.10.0", Ok(()));
2329 testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2332 testcase("0.10.0-dev", "0.11.0", Ok(()));
2333 testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2334 testcase("0.10.0-dev", "0.12.0", Err(()));
2335 testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2336
2337 testcase("0.10.0", "0.8.0-dev", Ok(()));
2338 testcase("0.10.0", "0.8.0", Ok(()));
2339 testcase("0.10.0", "0.9.0-dev", Ok(()));
2340 testcase("0.10.0", "0.9.0", Ok(()));
2341 testcase("0.10.0", "0.10.0-dev", Ok(()));
2342 testcase("0.10.0", "0.10.0", Ok(()));
2343 testcase("0.10.0", "0.11.0-dev", Ok(()));
2346 testcase("0.10.0", "0.11.0", Ok(()));
2347 testcase("0.10.0", "0.11.1", Ok(()));
2348 testcase("0.10.0", "0.11.1000000", Ok(()));
2349 testcase("0.10.0", "0.12.0-dev", Err(()));
2350 testcase("0.10.0", "0.12.0", Err(()));
2351 testcase("0.10.0", "0.13.0-dev", Err(()));
2352
2353 testcase("0.10.1", "0.9.0", Ok(()));
2354 testcase("0.10.1", "0.10.0", Ok(()));
2355 testcase("0.10.1", "0.11.0", Ok(()));
2356 testcase("0.10.1", "0.11.1", Ok(()));
2357 testcase("0.10.1", "0.11.100", Ok(()));
2358
2359 testcase("0.10.0", "0.10.1", Ok(()));
2365 }
2366}