1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::sync::Arc;
16
17use bytes::{Buf, Bytes};
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::trace::Description;
20use mz_ore::cast::CastInto;
21use mz_ore::{assert_none, halt};
22use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
23use mz_persist::location::{SeqNo, VersionedData};
24use mz_persist::metrics::ColumnarMetrics;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::stats::{PartStats, ProtoStructStats};
27use mz_persist_types::{Codec, Codec64};
28use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
29use proptest::prelude::Arbitrary;
30use proptest::strategy::Strategy;
31use prost::Message;
32use semver::Version;
33use serde::ser::SerializeStruct;
34use serde::{Deserialize, Serialize};
35use timely::progress::{Antichain, Timestamp};
36use uuid::Uuid;
37
38use crate::critical::CriticalReaderId;
39use crate::error::{CodecMismatch, CodecMismatchT};
40use crate::internal::metrics::Metrics;
41use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
42use crate::internal::state::{
43 ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
44 HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
45 LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
46 ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
47 ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
48 ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
49 ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
50 ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
51 ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
52 RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
53 proto_hollow_batch_part,
54};
55use crate::internal::state_diff::{
56 ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
57};
58use crate::internal::trace::{
59 ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
60};
61use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
62use crate::{PersistConfig, ShardId, WriterId, cfg};
63
64#[derive(Debug)]
65pub struct Schemas<K: Codec, V: Codec> {
66 pub id: Option<SchemaId>,
69 pub key: Arc<K::Schema>,
70 pub val: Arc<V::Schema>,
71}
72
73impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
74 fn clone(&self) -> Self {
75 Self {
76 id: self.id,
77 key: Arc::clone(&self.key),
78 val: Arc::clone(&self.val),
79 }
80 }
81}
82
83#[derive(Clone, Serialize, Deserialize)]
106pub struct LazyProto<T> {
107 buf: Bytes,
108 _phantom: PhantomData<fn() -> T>,
109}
110
111impl<T: Message + Default> Debug for LazyProto<T> {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 match self.decode() {
114 Ok(proto) => Debug::fmt(&proto, f),
115 Err(err) => f
116 .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
117 .field("err", &err)
118 .finish(),
119 }
120 }
121}
122
123impl<T> PartialEq for LazyProto<T> {
124 fn eq(&self, other: &Self) -> bool {
125 self.cmp(other) == Ordering::Equal
126 }
127}
128
129impl<T> Eq for LazyProto<T> {}
130
131impl<T> PartialOrd for LazyProto<T> {
132 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
133 Some(self.cmp(other))
134 }
135}
136
137impl<T> Ord for LazyProto<T> {
138 fn cmp(&self, other: &Self) -> Ordering {
139 let LazyProto {
140 buf: self_buf,
141 _phantom: _,
142 } = self;
143 let LazyProto {
144 buf: other_buf,
145 _phantom: _,
146 } = other;
147 self_buf.cmp(other_buf)
148 }
149}
150
151impl<T> Hash for LazyProto<T> {
152 fn hash<H: Hasher>(&self, state: &mut H) {
153 let LazyProto { buf, _phantom } = self;
154 buf.hash(state);
155 }
156}
157
158impl<T: Message + Default> From<&T> for LazyProto<T> {
159 fn from(value: &T) -> Self {
160 let buf = Bytes::from(value.encode_to_vec());
161 LazyProto {
162 buf,
163 _phantom: PhantomData,
164 }
165 }
166}
167
168impl<T: Message + Default> LazyProto<T> {
169 pub fn decode(&self) -> Result<T, prost::DecodeError> {
170 T::decode(&*self.buf)
171 }
172}
173
174impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
175 fn into_proto(&self) -> Bytes {
176 self.buf.clone()
177 }
178
179 fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
180 Ok(Self {
181 buf,
182 _phantom: PhantomData,
183 })
184 }
185}
186
187pub(crate) fn parse_id(id_prefix: char, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
188 let uuid_encoded = match encoded.strip_prefix(id_prefix) {
189 Some(x) => x,
190 None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
191 };
192 let uuid = Uuid::parse_str(uuid_encoded)
193 .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
194 Ok(*uuid.as_bytes())
195}
196
197pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
198 if let Err(msg) = cfg::check_data_version(code_version, data_version) {
199 if cfg!(test) {
202 panic!("{msg}");
203 } else {
204 halt!("{msg}");
205 }
206 }
207}
208
209impl RustType<String> for LeasedReaderId {
210 fn into_proto(&self) -> String {
211 self.to_string()
212 }
213
214 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
215 match proto.parse() {
216 Ok(x) => Ok(x),
217 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
218 }
219 }
220}
221
222impl RustType<String> for CriticalReaderId {
223 fn into_proto(&self) -> String {
224 self.to_string()
225 }
226
227 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
228 match proto.parse() {
229 Ok(x) => Ok(x),
230 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
231 }
232 }
233}
234
235impl RustType<String> for WriterId {
236 fn into_proto(&self) -> String {
237 self.to_string()
238 }
239
240 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
241 match proto.parse() {
242 Ok(x) => Ok(x),
243 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
244 }
245 }
246}
247
248impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
249 fn into_proto(&self) -> ProtoEncodedSchemas {
250 ProtoEncodedSchemas {
251 key: self.key.clone(),
252 key_data_type: self.key_data_type.clone(),
253 val: self.val.clone(),
254 val_data_type: self.val_data_type.clone(),
255 }
256 }
257
258 fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
259 Ok(EncodedSchemas {
260 key: proto.key,
261 key_data_type: proto.key_data_type,
262 val: proto.val,
263 val_data_type: proto.val_data_type,
264 })
265 }
266}
267
268impl RustType<String> for IdempotencyToken {
269 fn into_proto(&self) -> String {
270 self.to_string()
271 }
272
273 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
274 match proto.parse() {
275 Ok(x) => Ok(x),
276 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
277 }
278 }
279}
280
281impl RustType<String> for PartialBatchKey {
282 fn into_proto(&self) -> String {
283 self.0.clone()
284 }
285
286 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
287 Ok(PartialBatchKey(proto))
288 }
289}
290
291impl RustType<String> for PartialRollupKey {
292 fn into_proto(&self) -> String {
293 self.0.clone()
294 }
295
296 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
297 Ok(PartialRollupKey(proto))
298 }
299}
300
301impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
302 pub fn encode<B>(&self, buf: &mut B)
303 where
304 B: bytes::BufMut,
305 {
306 self.into_proto()
307 .encode(buf)
308 .expect("no required fields means no initialization errors");
309 }
310
311 pub fn decode(build_version: &Version, buf: Bytes) -> Self {
312 let proto = ProtoStateDiff::decode(buf)
313 .expect("internal error: invalid encoded state");
318 let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
319 check_data_version(build_version, &diff.applier_version);
320 diff
321 }
322}
323
324impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
325 fn into_proto(&self) -> ProtoStateDiff {
326 let StateDiff {
328 applier_version,
329 seqno_from,
330 seqno_to,
331 walltime_ms,
332 latest_rollup_key,
333 rollups,
334 active_rollup,
335 active_gc,
336 hostname,
337 last_gc_req,
338 leased_readers,
339 critical_readers,
340 writers,
341 schemas,
342 since,
343 legacy_batches,
344 hollow_batches,
345 spine_batches,
346 merges,
347 } = self;
348
349 let proto = ProtoStateFieldDiffs::default();
350
351 let mut writer = proto.into_writer();
353
354 field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
355 field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
356 field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
357 field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
358 field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
359 field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
360 field_diffs_into_proto(
361 ProtoStateField::CriticalReaders,
362 critical_readers,
363 &mut writer,
364 );
365 field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
366 field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
367 field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
368 field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
369 field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
370 field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
371 field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
372
373 let field_diffs = writer.into_proto();
375
376 debug_assert_eq!(field_diffs.validate(), Ok(()));
377 ProtoStateDiff {
378 applier_version: applier_version.to_string(),
379 seqno_from: seqno_from.into_proto(),
380 seqno_to: seqno_to.into_proto(),
381 walltime_ms: walltime_ms.into_proto(),
382 latest_rollup_key: latest_rollup_key.into_proto(),
383 field_diffs: Some(field_diffs),
384 }
385 }
386
387 fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
388 let applier_version = if proto.applier_version.is_empty() {
389 semver::Version::new(0, 0, 0)
393 } else {
394 semver::Version::parse(&proto.applier_version).map_err(|err| {
395 TryFromProtoError::InvalidSemverVersion(format!(
396 "invalid applier_version {}: {}",
397 proto.applier_version, err
398 ))
399 })?
400 };
401 let mut state_diff = StateDiff::new(
402 applier_version,
403 proto.seqno_from.into_rust()?,
404 proto.seqno_to.into_rust()?,
405 proto.walltime_ms,
406 proto.latest_rollup_key.into_rust()?,
407 );
408 if let Some(field_diffs) = proto.field_diffs {
409 debug_assert_eq!(field_diffs.validate(), Ok(()));
410 for field_diff in field_diffs.iter() {
411 let (field, diff) = field_diff?;
412 match field {
413 ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
414 diff,
415 &mut state_diff.hostname,
416 |()| Ok(()),
417 |v| v.into_rust(),
418 )?,
419 ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
420 diff,
421 &mut state_diff.last_gc_req,
422 |()| Ok(()),
423 |v| v.into_rust(),
424 )?,
425 ProtoStateField::ActiveGc => {
426 field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
427 diff,
428 &mut state_diff.active_gc,
429 |()| Ok(()),
430 |v| v.into_rust(),
431 )?
432 }
433 ProtoStateField::ActiveRollup => {
434 field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
435 diff,
436 &mut state_diff.active_rollup,
437 |()| Ok(()),
438 |v| v.into_rust(),
439 )?
440 }
441 ProtoStateField::Rollups => {
442 field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
443 diff,
444 &mut state_diff.rollups,
445 |k| k.into_rust(),
446 |v| v.into_rust(),
447 )?
448 }
449 ProtoStateField::DeprecatedRollups => {
453 field_diff_into_rust::<u64, String, _, _, _, _>(
454 diff,
455 &mut state_diff.rollups,
456 |k| k.into_rust(),
457 |v| {
458 Ok(HollowRollup {
459 key: v.into_rust()?,
460 encoded_size_bytes: None,
461 })
462 },
463 )?
464 }
465 ProtoStateField::LeasedReaders => {
466 field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
467 diff,
468 &mut state_diff.leased_readers,
469 |k| k.into_rust(),
470 |v| v.into_rust(),
471 )?
472 }
473 ProtoStateField::CriticalReaders => {
474 field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
475 diff,
476 &mut state_diff.critical_readers,
477 |k| k.into_rust(),
478 |v| v.into_rust(),
479 )?
480 }
481 ProtoStateField::Writers => {
482 field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
483 diff,
484 &mut state_diff.writers,
485 |k| k.into_rust(),
486 |v| v.into_rust(),
487 )?
488 }
489 ProtoStateField::Schemas => {
490 field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
491 diff,
492 &mut state_diff.schemas,
493 |k| k.into_rust(),
494 |v| v.into_rust(),
495 )?
496 }
497 ProtoStateField::Since => {
498 field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
499 diff,
500 &mut state_diff.since,
501 |()| Ok(()),
502 |v| v.into_rust(),
503 )?
504 }
505 ProtoStateField::LegacyBatches => {
506 field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
507 diff,
508 &mut state_diff.legacy_batches,
509 |k| k.into_rust(),
510 |()| Ok(()),
511 )?
512 }
513 ProtoStateField::HollowBatches => {
514 field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
515 diff,
516 &mut state_diff.hollow_batches,
517 |k| k.into_rust(),
518 |v| v.into_rust(),
519 )?
520 }
521 ProtoStateField::SpineBatches => {
522 field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
523 diff,
524 &mut state_diff.spine_batches,
525 |k| k.into_rust(),
526 |v| v.into_rust(),
527 )?
528 }
529 ProtoStateField::SpineMerges => {
530 field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
531 diff,
532 &mut state_diff.merges,
533 |k| k.into_rust(),
534 |v| v.into_rust(),
535 )?
536 }
537 }
538 }
539 }
540 Ok(state_diff)
541 }
542}
543
544fn field_diffs_into_proto<K, KP, V, VP>(
545 field: ProtoStateField,
546 diffs: &[StateFieldDiff<K, V>],
547 writer: &mut ProtoStateFieldDiffsWriter,
548) where
549 KP: prost::Message,
550 K: RustType<KP>,
551 VP: prost::Message,
552 V: RustType<VP>,
553{
554 for diff in diffs.iter() {
555 field_diff_into_proto(field, diff, writer);
556 }
557}
558
559fn field_diff_into_proto<K, KP, V, VP>(
560 field: ProtoStateField,
561 diff: &StateFieldDiff<K, V>,
562 writer: &mut ProtoStateFieldDiffsWriter,
563) where
564 KP: prost::Message,
565 K: RustType<KP>,
566 VP: prost::Message,
567 V: RustType<VP>,
568{
569 writer.push_field(field);
570 writer.encode_proto(&diff.key.into_proto());
571 match &diff.val {
572 StateFieldValDiff::Insert(to) => {
573 writer.push_diff_type(ProtoStateFieldDiffType::Insert);
574 writer.encode_proto(&to.into_proto());
575 }
576 StateFieldValDiff::Update(from, to) => {
577 writer.push_diff_type(ProtoStateFieldDiffType::Update);
578 writer.encode_proto(&from.into_proto());
579 writer.encode_proto(&to.into_proto());
580 }
581 StateFieldValDiff::Delete(from) => {
582 writer.push_diff_type(ProtoStateFieldDiffType::Delete);
583 writer.encode_proto(&from.into_proto());
584 }
585 };
586}
587
588fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
589 proto: ProtoStateFieldDiff<'_>,
590 diffs: &mut Vec<StateFieldDiff<K, V>>,
591 k_fn: KFn,
592 v_fn: VFn,
593) -> Result<(), TryFromProtoError>
594where
595 KP: prost::Message + Default,
596 VP: prost::Message + Default,
597 KFn: Fn(KP) -> Result<K, TryFromProtoError>,
598 VFn: Fn(VP) -> Result<V, TryFromProtoError>,
599{
600 let val = match proto.diff_type {
601 ProtoStateFieldDiffType::Insert => {
602 let to = VP::decode(proto.to)
603 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
604 StateFieldValDiff::Insert(v_fn(to)?)
605 }
606 ProtoStateFieldDiffType::Update => {
607 let from = VP::decode(proto.from)
608 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
609 let to = VP::decode(proto.to)
610 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
611
612 StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
613 }
614 ProtoStateFieldDiffType::Delete => {
615 let from = VP::decode(proto.from)
616 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
617 StateFieldValDiff::Delete(v_fn(from)?)
618 }
619 };
620 let key = KP::decode(proto.key)
621 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
622 diffs.push(StateFieldDiff {
623 key: k_fn(key)?,
624 val,
625 });
626 Ok(())
627}
628
629#[derive(Debug)]
632#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
633pub struct UntypedState<T> {
634 pub(crate) key_codec: String,
635 pub(crate) val_codec: String,
636 pub(crate) ts_codec: String,
637 pub(crate) diff_codec: String,
638
639 state: State<T>,
643}
644
645impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
646 pub fn seqno(&self) -> SeqNo {
647 self.state.seqno
648 }
649
650 pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
651 &self.state.collections.rollups
652 }
653
654 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
655 self.state.latest_rollup()
656 }
657
658 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
659 &mut self,
660 cfg: &PersistConfig,
661 metrics: &Metrics,
662 diffs: I,
663 ) {
664 if T::codec_name() != self.ts_codec {
668 return;
669 }
670 self.state.apply_encoded_diffs(cfg, metrics, diffs);
671 }
672
673 pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
674 self,
675 shard_id: &ShardId,
676 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
677 assert_eq!(shard_id, &self.state.shard_id);
680 if K::codec_name() != self.key_codec
681 || V::codec_name() != self.val_codec
682 || T::codec_name() != self.ts_codec
683 || D::codec_name() != self.diff_codec
684 {
685 return Err(Box::new(CodecMismatch {
686 requested: (
687 K::codec_name(),
688 V::codec_name(),
689 T::codec_name(),
690 D::codec_name(),
691 None,
692 ),
693 actual: (
694 self.key_codec,
695 self.val_codec,
696 self.ts_codec,
697 self.diff_codec,
698 None,
699 ),
700 }));
701 }
702 Ok(TypedState {
703 state: self.state,
704 _phantom: PhantomData,
705 })
706 }
707
708 pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
709 assert_eq!(shard_id, &self.state.shard_id);
712 if T::codec_name() != self.ts_codec {
713 return Err(CodecMismatchT {
714 requested: T::codec_name(),
715 actual: self.ts_codec,
716 });
717 }
718 Ok(self.state)
719 }
720
721 pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
722 let proto = ProtoRollup::decode(buf)
723 .expect("internal error: invalid encoded state");
728 let state = Rollup::from_proto(proto)
729 .expect("internal error: invalid encoded state")
730 .state;
731 check_data_version(build_version, &state.state.applier_version);
732 state
733 }
734}
735
736impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
737where
738 K: Codec,
739 V: Codec,
740 T: Codec64,
741 D: Codec64,
742{
743 fn from(typed_state: TypedState<K, V, T, D>) -> Self {
744 UntypedState {
745 key_codec: K::codec_name(),
746 val_codec: V::codec_name(),
747 ts_codec: T::codec_name(),
748 diff_codec: D::codec_name(),
749 state: typed_state.state,
750 }
751 }
752}
753
754#[derive(Debug)]
761pub struct Rollup<T> {
762 pub(crate) state: UntypedState<T>,
763 pub(crate) diffs: Option<InlinedDiffs>,
764}
765
766impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
767 pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
771 let latest_rollup_seqno = *state.latest_rollup().0;
772 let mut verify_seqno = latest_rollup_seqno;
773 for diff in &diffs {
774 assert_eq!(verify_seqno.next(), diff.seqno);
775 verify_seqno = diff.seqno;
776 }
777 assert_eq!(verify_seqno, state.seqno());
778
779 let diffs = Some(InlinedDiffs::from(
780 latest_rollup_seqno.next(),
781 state.seqno().next(),
782 diffs,
783 ));
784
785 Self { state, diffs }
786 }
787
788 pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
789 Self { state, diffs: None }
790 }
791
792 pub(crate) fn from_state_without_diffs(
793 state: State<T>,
794 key_codec: String,
795 val_codec: String,
796 ts_codec: String,
797 diff_codec: String,
798 ) -> Self {
799 Self::from_untyped_state_without_diffs(UntypedState {
800 key_codec,
801 val_codec,
802 ts_codec,
803 diff_codec,
804 state,
805 })
806 }
807}
808
809#[derive(Debug)]
810pub(crate) struct InlinedDiffs {
811 pub(crate) lower: SeqNo,
812 pub(crate) upper: SeqNo,
813 pub(crate) diffs: Vec<VersionedData>,
814}
815
816impl InlinedDiffs {
817 pub(crate) fn description(&self) -> Description<SeqNo> {
818 Description::new(
819 Antichain::from_elem(self.lower),
820 Antichain::from_elem(self.upper),
821 Antichain::from_elem(SeqNo::minimum()),
822 )
823 }
824
825 fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
826 for diff in &diffs {
827 assert!(diff.seqno >= lower);
828 assert!(diff.seqno < upper);
829 }
830 Self {
831 lower,
832 upper,
833 diffs,
834 }
835 }
836}
837
838impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
839 fn into_proto(&self) -> ProtoInlinedDiffs {
840 ProtoInlinedDiffs {
841 lower: self.lower.into_proto(),
842 upper: self.upper.into_proto(),
843 diffs: self.diffs.into_proto(),
844 }
845 }
846
847 fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
848 Ok(Self {
849 lower: proto.lower.into_rust()?,
850 upper: proto.upper.into_rust()?,
851 diffs: proto.diffs.into_rust()?,
852 })
853 }
854}
855
856impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
857 fn into_proto(&self) -> ProtoRollup {
858 ProtoRollup {
859 applier_version: self.state.state.applier_version.to_string(),
860 shard_id: self.state.state.shard_id.into_proto(),
861 seqno: self.state.state.seqno.into_proto(),
862 walltime_ms: self.state.state.walltime_ms.into_proto(),
863 hostname: self.state.state.hostname.into_proto(),
864 key_codec: self.state.key_codec.into_proto(),
865 val_codec: self.state.val_codec.into_proto(),
866 ts_codec: T::codec_name(),
867 diff_codec: self.state.diff_codec.into_proto(),
868 last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
869 active_rollup: self.state.state.collections.active_rollup.into_proto(),
870 active_gc: self.state.state.collections.active_gc.into_proto(),
871 rollups: self
872 .state
873 .state
874 .collections
875 .rollups
876 .iter()
877 .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
878 .collect(),
879 deprecated_rollups: Default::default(),
880 leased_readers: self
881 .state
882 .state
883 .collections
884 .leased_readers
885 .iter()
886 .map(|(id, state)| (id.into_proto(), state.into_proto()))
887 .collect(),
888 critical_readers: self
889 .state
890 .state
891 .collections
892 .critical_readers
893 .iter()
894 .map(|(id, state)| (id.into_proto(), state.into_proto()))
895 .collect(),
896 writers: self
897 .state
898 .state
899 .collections
900 .writers
901 .iter()
902 .map(|(id, state)| (id.into_proto(), state.into_proto()))
903 .collect(),
904 schemas: self
905 .state
906 .state
907 .collections
908 .schemas
909 .iter()
910 .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
911 .collect(),
912 trace: Some(self.state.state.collections.trace.into_proto()),
913 diffs: self.diffs.as_ref().map(|x| x.into_proto()),
914 }
915 }
916
917 fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
918 let applier_version = if x.applier_version.is_empty() {
919 semver::Version::new(0, 0, 0)
923 } else {
924 semver::Version::parse(&x.applier_version).map_err(|err| {
925 TryFromProtoError::InvalidSemverVersion(format!(
926 "invalid applier_version {}: {}",
927 x.applier_version, err
928 ))
929 })?
930 };
931
932 let mut rollups = BTreeMap::new();
933 for (seqno, rollup) in x.rollups {
934 rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
935 }
936 for (seqno, key) in x.deprecated_rollups {
937 rollups.insert(
938 seqno.into_rust()?,
939 HollowRollup {
940 key: key.into_rust()?,
941 encoded_size_bytes: None,
942 },
943 );
944 }
945 let mut leased_readers = BTreeMap::new();
946 for (id, state) in x.leased_readers {
947 leased_readers.insert(id.into_rust()?, state.into_rust()?);
948 }
949 let mut critical_readers = BTreeMap::new();
950 for (id, state) in x.critical_readers {
951 critical_readers.insert(id.into_rust()?, state.into_rust()?);
952 }
953 let mut writers = BTreeMap::new();
954 for (id, state) in x.writers {
955 writers.insert(id.into_rust()?, state.into_rust()?);
956 }
957 let mut schemas = BTreeMap::new();
958 for (id, x) in x.schemas {
959 schemas.insert(id.into_rust()?, x.into_rust()?);
960 }
961 let active_rollup = x
962 .active_rollup
963 .map(|rollup| rollup.into_rust())
964 .transpose()?;
965 let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
966 let collections = StateCollections {
967 rollups,
968 active_rollup,
969 active_gc,
970 last_gc_req: x.last_gc_req.into_rust()?,
971 leased_readers,
972 critical_readers,
973 writers,
974 schemas,
975 trace: x.trace.into_rust_if_some("trace")?,
976 };
977 let state = State {
978 applier_version,
979 shard_id: x.shard_id.into_rust()?,
980 seqno: x.seqno.into_rust()?,
981 walltime_ms: x.walltime_ms,
982 hostname: x.hostname,
983 collections,
984 };
985
986 let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
987 if let Some(diffs) = &diffs {
988 if diffs.lower != state.latest_rollup().0.next() {
989 return Err(TryFromProtoError::InvalidPersistState(format!(
990 "diffs lower ({}) should match latest rollup's successor: ({})",
991 diffs.lower,
992 state.latest_rollup().0.next()
993 )));
994 }
995 if diffs.upper != state.seqno.next() {
996 return Err(TryFromProtoError::InvalidPersistState(format!(
997 "diffs upper ({}) should match state's successor: ({})",
998 diffs.lower,
999 state.seqno.next()
1000 )));
1001 }
1002 }
1003
1004 Ok(Rollup {
1005 state: UntypedState {
1006 state,
1007 key_codec: x.key_codec.into_rust()?,
1008 val_codec: x.val_codec.into_rust()?,
1009 ts_codec: x.ts_codec.into_rust()?,
1010 diff_codec: x.diff_codec.into_rust()?,
1011 },
1012 diffs,
1013 })
1014 }
1015}
1016
1017impl RustType<ProtoVersionedData> for VersionedData {
1018 fn into_proto(&self) -> ProtoVersionedData {
1019 ProtoVersionedData {
1020 seqno: self.seqno.into_proto(),
1021 data: Bytes::clone(&self.data),
1022 }
1023 }
1024
1025 fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1026 Ok(Self {
1027 seqno: proto.seqno.into_rust()?,
1028 data: proto.data,
1029 })
1030 }
1031}
1032
1033impl RustType<ProtoSpineId> for SpineId {
1034 fn into_proto(&self) -> ProtoSpineId {
1035 ProtoSpineId {
1036 lo: self.0.into_proto(),
1037 hi: self.1.into_proto(),
1038 }
1039 }
1040
1041 fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1042 Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1043 }
1044}
1045
1046impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1047 fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1048 let (id, batch) = entry;
1049 ProtoIdHollowBatch {
1050 id: Some(id.into_proto()),
1051 batch: Some(batch.into_proto()),
1052 }
1053 }
1054
1055 fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1056 let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1057 let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1058 Ok((id, batch))
1059 }
1060}
1061
1062impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1063 fn into_proto(&self) -> ProtoSpineBatch {
1064 ProtoSpineBatch {
1065 desc: Some(self.desc.into_proto()),
1066 parts: self.parts.into_proto(),
1067 level: self.level.into_proto(),
1068 descs: self.descs.into_proto(),
1069 }
1070 }
1071
1072 fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1073 let level = proto.level.into_rust()?;
1074 let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1075 let parts = proto.parts.into_rust()?;
1076 let descs = proto.descs.into_rust()?;
1077 Ok(ThinSpineBatch {
1078 level,
1079 desc,
1080 parts,
1081 descs,
1082 })
1083 }
1084}
1085
1086impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1087 fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1088 let (id, batch) = entry;
1089 ProtoIdSpineBatch {
1090 id: Some(id.into_proto()),
1091 batch: Some(batch.into_proto()),
1092 }
1093 }
1094
1095 fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1096 let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1097 let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1098 Ok((id, batch))
1099 }
1100}
1101
1102impl RustType<ProtoCompaction> for ActiveCompaction {
1103 fn into_proto(&self) -> ProtoCompaction {
1104 ProtoCompaction {
1105 start_ms: self.start_ms,
1106 }
1107 }
1108
1109 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1110 Ok(Self {
1111 start_ms: proto.start_ms,
1112 })
1113 }
1114}
1115
1116impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1117 fn into_proto(&self) -> ProtoMerge {
1118 ProtoMerge {
1119 since: Some(self.since.into_proto()),
1120 remaining_work: self.remaining_work.into_proto(),
1121 active_compaction: self.active_compaction.into_proto(),
1122 }
1123 }
1124
1125 fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1126 let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1127 let remaining_work = proto.remaining_work.into_rust()?;
1128 let active_compaction = proto.active_compaction.into_rust()?;
1129 Ok(Self {
1130 since,
1131 remaining_work,
1132 active_compaction,
1133 })
1134 }
1135}
1136
1137impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1138 fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1139 ProtoIdMerge {
1140 id: Some(id.into_proto()),
1141 merge: Some(merge.into_proto()),
1142 }
1143 }
1144
1145 fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1146 let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1147 let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1148 Ok((id, merge))
1149 }
1150}
1151
1152impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1153 fn into_proto(&self) -> ProtoTrace {
1154 let since = self.since.into_proto();
1155 let legacy_batches = self
1156 .legacy_batches
1157 .iter()
1158 .map(|(b, _)| b.into_proto())
1159 .collect();
1160 let hollow_batches = self.hollow_batches.into_proto();
1161 let spine_batches = self.spine_batches.into_proto();
1162 let merges = self.merges.into_proto();
1163 ProtoTrace {
1164 since: Some(since),
1165 legacy_batches,
1166 hollow_batches,
1167 spine_batches,
1168 merges,
1169 }
1170 }
1171
1172 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1173 let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1174 let legacy_batches = proto
1175 .legacy_batches
1176 .into_iter()
1177 .map(|b| b.into_rust().map(|b| (b, ())))
1178 .collect::<Result<_, _>>()?;
1179 let hollow_batches = proto.hollow_batches.into_rust()?;
1180 let spine_batches = proto.spine_batches.into_rust()?;
1181 let merges = proto.merges.into_rust()?;
1182 Ok(FlatTrace {
1183 since,
1184 legacy_batches,
1185 hollow_batches,
1186 spine_batches,
1187 merges,
1188 })
1189 }
1190}
1191
1192impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1193 fn into_proto(&self) -> ProtoTrace {
1194 self.flatten().into_proto()
1195 }
1196
1197 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1198 Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1199 }
1200}
1201
1202impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1203 fn into_proto(&self) -> ProtoLeasedReaderState {
1204 ProtoLeasedReaderState {
1205 seqno: self.seqno.into_proto(),
1206 since: Some(self.since.into_proto()),
1207 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1208 lease_duration_ms: self.lease_duration_ms.into_proto(),
1209 debug: Some(self.debug.into_proto()),
1210 }
1211 }
1212
1213 fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1214 let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1215 if lease_duration_ms == 0 {
1220 lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1221 .expect("lease duration as millis should fit within u64");
1222 }
1223 let debug = proto.debug.unwrap_or_default().into_rust()?;
1226 Ok(LeasedReaderState {
1227 seqno: proto.seqno.into_rust()?,
1228 since: proto
1229 .since
1230 .into_rust_if_some("ProtoLeasedReaderState::since")?,
1231 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1232 lease_duration_ms,
1233 debug,
1234 })
1235 }
1236}
1237
1238impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1239 fn into_proto(&self) -> ProtoCriticalReaderState {
1240 ProtoCriticalReaderState {
1241 since: Some(self.since.into_proto()),
1242 opaque: i64::from_le_bytes(self.opaque.0),
1243 opaque_codec: self.opaque_codec.clone(),
1244 debug: Some(self.debug.into_proto()),
1245 }
1246 }
1247
1248 fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1249 let debug = proto.debug.unwrap_or_default().into_rust()?;
1252 Ok(CriticalReaderState {
1253 since: proto
1254 .since
1255 .into_rust_if_some("ProtoCriticalReaderState::since")?,
1256 opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1257 opaque_codec: proto.opaque_codec,
1258 debug,
1259 })
1260 }
1261}
1262
1263impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1264 fn into_proto(&self) -> ProtoWriterState {
1265 ProtoWriterState {
1266 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1267 lease_duration_ms: self.lease_duration_ms.into_proto(),
1268 most_recent_write_token: self.most_recent_write_token.into_proto(),
1269 most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1270 debug: Some(self.debug.into_proto()),
1271 }
1272 }
1273
1274 fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1275 let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1281 IdempotencyToken::SENTINEL
1282 } else {
1283 proto.most_recent_write_token.into_rust()?
1284 };
1285 let most_recent_write_upper = match proto.most_recent_write_upper {
1286 Some(x) => x.into_rust()?,
1287 None => Antichain::from_elem(T::minimum()),
1288 };
1289 let debug = proto.debug.unwrap_or_default().into_rust()?;
1292 Ok(WriterState {
1293 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1294 lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1295 most_recent_write_token,
1296 most_recent_write_upper,
1297 debug,
1298 })
1299 }
1300}
1301
1302impl RustType<ProtoHandleDebugState> for HandleDebugState {
1303 fn into_proto(&self) -> ProtoHandleDebugState {
1304 ProtoHandleDebugState {
1305 hostname: self.hostname.into_proto(),
1306 purpose: self.purpose.into_proto(),
1307 }
1308 }
1309
1310 fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1311 Ok(HandleDebugState {
1312 hostname: proto.hostname,
1313 purpose: proto.purpose,
1314 })
1315 }
1316}
1317
1318impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1319 fn into_proto(&self) -> ProtoHollowRun {
1320 ProtoHollowRun {
1321 parts: self.parts.into_proto(),
1322 }
1323 }
1324
1325 fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1326 Ok(HollowRun {
1327 parts: proto.parts.into_rust()?,
1328 })
1329 }
1330}
1331
1332impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1333 fn into_proto(&self) -> ProtoHollowBatch {
1334 let mut run_meta = self.run_meta.into_proto();
1335 let run_meta_default = RunMeta::default().into_proto();
1337 while run_meta.last() == Some(&run_meta_default) {
1338 run_meta.pop();
1339 }
1340 ProtoHollowBatch {
1341 desc: Some(self.desc.into_proto()),
1342 parts: self.parts.into_proto(),
1343 len: self.len.into_proto(),
1344 runs: self.run_splits.into_proto(),
1345 run_meta,
1346 deprecated_keys: vec![],
1347 }
1348 }
1349
1350 fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1351 let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1352 parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1355 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1356 key: PartialBatchKey(key),
1357 encoded_size_bytes: 0,
1358 key_lower: vec![],
1359 structured_key_lower: None,
1360 stats: None,
1361 ts_rewrite: None,
1362 diffs_sum: None,
1363 format: None,
1364 schema_id: None,
1365 deprecated_schema_id: None,
1366 }))
1367 }));
1368 let run_splits: Vec<usize> = proto.runs.into_rust()?;
1370 let num_runs = if parts.is_empty() {
1371 0
1372 } else {
1373 run_splits.len() + 1
1374 };
1375 let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1376 run_meta.resize(num_runs, RunMeta::default());
1377 Ok(HollowBatch {
1378 desc: proto.desc.into_rust_if_some("desc")?,
1379 parts,
1380 len: proto.len.into_rust()?,
1381 run_splits,
1382 run_meta,
1383 })
1384 }
1385}
1386
1387impl RustType<ProtoRunMeta> for RunMeta {
1388 fn into_proto(&self) -> ProtoRunMeta {
1389 let order = match self.order {
1390 None => ProtoRunOrder::Unknown,
1391 Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1392 Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1393 Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1394 };
1395 ProtoRunMeta {
1396 order: order.into(),
1397 schema_id: self.schema.into_proto(),
1398 deprecated_schema_id: self.deprecated_schema.into_proto(),
1399 }
1400 }
1401
1402 fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1403 let order = match ProtoRunOrder::try_from(proto.order)? {
1404 ProtoRunOrder::Unknown => None,
1405 ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1406 ProtoRunOrder::Codec => Some(RunOrder::Codec),
1407 ProtoRunOrder::Structured => Some(RunOrder::Structured),
1408 };
1409 Ok(Self {
1410 order,
1411 schema: proto.schema_id.into_rust()?,
1412 deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1413 })
1414 }
1415}
1416
1417impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1418 fn into_proto(&self) -> ProtoHollowBatchPart {
1419 match self {
1420 RunPart::Single(part) => part.into_proto(),
1421 RunPart::Many(runs) => runs.into_proto(),
1422 }
1423 }
1424
1425 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1426 let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1427 RunPart::Many(proto.into_rust()?)
1428 } else {
1429 RunPart::Single(proto.into_rust()?)
1430 };
1431 Ok(run_part)
1432 }
1433}
1434
1435impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1436 fn into_proto(&self) -> ProtoHollowBatchPart {
1437 let part = ProtoHollowBatchPart {
1438 kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1439 key: self.key.into_proto(),
1440 max_part_bytes: self.max_part_bytes.into_proto(),
1441 })),
1442 encoded_size_bytes: self.hollow_bytes.into_proto(),
1443 key_lower: Bytes::copy_from_slice(&self.key_lower),
1444 diffs_sum: None,
1445 key_stats: None,
1446 ts_rewrite: None,
1447 format: None,
1448 schema_id: None,
1449 structured_key_lower: self.structured_key_lower.into_proto(),
1450 deprecated_schema_id: None,
1451 };
1452 part
1453 }
1454
1455 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1456 let run_proto = match proto.kind {
1457 Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1458 _ => Err(TryFromProtoError::UnknownEnumVariant(
1459 "ProtoHollowBatchPart::kind".to_string(),
1460 ))?,
1461 };
1462 Ok(Self {
1463 key: run_proto.key.into_rust()?,
1464 hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1465 max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1466 key_lower: proto.key_lower.to_vec(),
1467 structured_key_lower: proto.structured_key_lower.into_rust()?,
1468 _phantom_data: Default::default(),
1469 })
1470 }
1471}
1472
1473impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1474 fn into_proto(&self) -> ProtoHollowBatchPart {
1475 match self {
1476 BatchPart::Hollow(x) => ProtoHollowBatchPart {
1477 kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1478 encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1479 key_lower: Bytes::copy_from_slice(&x.key_lower),
1480 structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1481 key_stats: x.stats.into_proto(),
1482 ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1483 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1484 format: x.format.map(|f| f.into_proto()),
1485 schema_id: x.schema_id.into_proto(),
1486 deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1487 },
1488 BatchPart::Inline {
1489 updates,
1490 ts_rewrite,
1491 schema_id,
1492 deprecated_schema_id,
1493 } => ProtoHollowBatchPart {
1494 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1495 encoded_size_bytes: 0,
1496 key_lower: Bytes::new(),
1497 structured_key_lower: None,
1498 key_stats: None,
1499 ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1500 diffs_sum: None,
1501 format: None,
1502 schema_id: schema_id.into_proto(),
1503 deprecated_schema_id: deprecated_schema_id.into_proto(),
1504 },
1505 }
1506 }
1507
1508 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1509 let ts_rewrite = match proto.ts_rewrite {
1510 Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1511 None => None,
1512 };
1513 let schema_id = proto.schema_id.into_rust()?;
1514 let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1515 match proto.kind {
1516 Some(proto_hollow_batch_part::Kind::Key(key)) => {
1517 Ok(BatchPart::Hollow(HollowBatchPart {
1518 key: key.into_rust()?,
1519 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1520 key_lower: proto.key_lower.into(),
1521 structured_key_lower: proto.structured_key_lower.into_rust()?,
1522 stats: proto.key_stats.into_rust()?,
1523 ts_rewrite,
1524 diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1525 format: proto.format.map(|f| f.into_rust()).transpose()?,
1526 schema_id,
1527 deprecated_schema_id,
1528 }))
1529 }
1530 Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1531 assert_eq!(proto.encoded_size_bytes, 0);
1532 assert_eq!(proto.key_lower.len(), 0);
1533 assert_none!(proto.key_stats);
1534 assert_none!(proto.diffs_sum);
1535 let updates = LazyInlineBatchPart(x.into_rust()?);
1536 Ok(BatchPart::Inline {
1537 updates,
1538 ts_rewrite,
1539 schema_id,
1540 deprecated_schema_id,
1541 })
1542 }
1543 _ => Err(TryFromProtoError::unknown_enum_variant(
1544 "ProtoHollowBatchPart::kind",
1545 )),
1546 }
1547 }
1548}
1549
1550impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1551 fn into_proto(&self) -> proto_hollow_batch_part::Format {
1552 match self {
1553 BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1554 BatchColumnarFormat::Both(version) => {
1555 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1556 }
1557 BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1558 }
1559 }
1560
1561 fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1562 let format = match proto {
1563 proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1564 proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1565 BatchColumnarFormat::Both(version.cast_into())
1566 }
1567 proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1568 };
1569 Ok(format)
1570 }
1571}
1572
1573#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1578pub struct LazyPartStats {
1579 key: LazyProto<ProtoStructStats>,
1580}
1581
1582impl LazyPartStats {
1583 pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1584 let PartStats { key } = x;
1585 let mut proto_stats = ProtoStructStats::from_rust(key);
1586 map_proto(&mut proto_stats);
1587 LazyPartStats {
1588 key: LazyProto::from(&proto_stats),
1589 }
1590 }
1591 pub fn decode(&self) -> PartStats {
1596 let key = self.key.decode().expect("valid proto");
1597 PartStats {
1598 key: key.into_rust().expect("valid stats"),
1599 }
1600 }
1601}
1602
1603impl RustType<Bytes> for LazyPartStats {
1604 fn into_proto(&self) -> Bytes {
1605 let LazyPartStats { key } = self;
1606 key.into_proto()
1607 }
1608
1609 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1610 Ok(LazyPartStats {
1611 key: proto.into_rust()?,
1612 })
1613 }
1614}
1615
1616#[cfg(test)]
1617pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1618 proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1619}
1620
1621#[allow(unused_parens)]
1622impl Arbitrary for LazyPartStats {
1623 type Parameters = ();
1624 type Strategy =
1625 proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1626
1627 fn arbitrary_with(_: ()) -> Self::Strategy {
1628 Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1629 LazyPartStats::encode(&x, |_| {})
1630 })
1631 }
1632}
1633
1634impl ProtoInlineBatchPart {
1635 pub(crate) fn into_rust<T: Timestamp + Codec64>(
1636 lgbytes: &ColumnarMetrics,
1637 proto: Self,
1638 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1639 let updates = proto
1640 .updates
1641 .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1642 let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1643
1644 Ok(BlobTraceBatchPart {
1645 desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1646 index: proto.index.into_rust()?,
1647 updates,
1648 })
1649 }
1650}
1651
1652#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1654pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1655
1656impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1657 fn from(value: &ProtoInlineBatchPart) -> Self {
1658 LazyInlineBatchPart(value.into())
1659 }
1660}
1661
1662impl Serialize for LazyInlineBatchPart {
1663 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1664 let proto = self.0.decode().expect("valid proto");
1667 let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1668 let () = s.serialize_field("desc", &proto.desc)?;
1669 let () = s.serialize_field("index", &proto.index)?;
1670 let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1671 s.end()
1672 }
1673}
1674
1675impl LazyInlineBatchPart {
1676 pub(crate) fn encoded_size_bytes(&self) -> usize {
1677 self.0.buf.len()
1678 }
1679
1680 pub fn decode<T: Timestamp + Codec64>(
1686 &self,
1687 lgbytes: &ColumnarMetrics,
1688 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1689 let proto = self.0.decode().expect("valid proto");
1690 ProtoInlineBatchPart::into_rust(lgbytes, proto)
1691 }
1692}
1693
1694impl RustType<Bytes> for LazyInlineBatchPart {
1695 fn into_proto(&self) -> Bytes {
1696 self.0.into_proto()
1697 }
1698
1699 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1700 Ok(LazyInlineBatchPart(proto.into_rust()?))
1701 }
1702}
1703
1704impl RustType<ProtoHollowRollup> for HollowRollup {
1705 fn into_proto(&self) -> ProtoHollowRollup {
1706 ProtoHollowRollup {
1707 key: self.key.into_proto(),
1708 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1709 }
1710 }
1711
1712 fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1713 Ok(HollowRollup {
1714 key: proto.key.into_rust()?,
1715 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1716 })
1717 }
1718}
1719
1720impl RustType<ProtoActiveRollup> for ActiveRollup {
1721 fn into_proto(&self) -> ProtoActiveRollup {
1722 ProtoActiveRollup {
1723 start_ms: self.start_ms,
1724 seqno: self.seqno.into_proto(),
1725 }
1726 }
1727
1728 fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1729 Ok(ActiveRollup {
1730 start_ms: proto.start_ms,
1731 seqno: proto.seqno.into_rust()?,
1732 })
1733 }
1734}
1735
1736impl RustType<ProtoActiveGc> for ActiveGc {
1737 fn into_proto(&self) -> ProtoActiveGc {
1738 ProtoActiveGc {
1739 start_ms: self.start_ms,
1740 seqno: self.seqno.into_proto(),
1741 }
1742 }
1743
1744 fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1745 Ok(ActiveGc {
1746 start_ms: proto.start_ms,
1747 seqno: proto.seqno.into_rust()?,
1748 })
1749 }
1750}
1751
1752impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1753 fn into_proto(&self) -> ProtoU64Description {
1754 ProtoU64Description {
1755 lower: Some(self.lower().into_proto()),
1756 upper: Some(self.upper().into_proto()),
1757 since: Some(self.since().into_proto()),
1758 }
1759 }
1760
1761 fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1762 Ok(Description::new(
1763 proto.lower.into_rust_if_some("lower")?,
1764 proto.upper.into_rust_if_some("upper")?,
1765 proto.since.into_rust_if_some("since")?,
1766 ))
1767 }
1768}
1769
1770impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1771 fn into_proto(&self) -> ProtoU64Antichain {
1772 ProtoU64Antichain {
1773 elements: self
1774 .elements()
1775 .iter()
1776 .map(|x| i64::from_le_bytes(T::encode(x)))
1777 .collect(),
1778 }
1779 }
1780
1781 fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1782 let elements = proto
1783 .elements
1784 .iter()
1785 .map(|x| T::decode(x.to_le_bytes()))
1786 .collect::<Vec<_>>();
1787 Ok(Antichain::from(elements))
1788 }
1789}
1790
1791#[cfg(test)]
1792mod tests {
1793 use bytes::Bytes;
1794 use mz_build_info::DUMMY_BUILD_INFO;
1795 use mz_dyncfg::ConfigUpdates;
1796 use mz_ore::assert_err;
1797 use mz_persist::location::SeqNo;
1798 use proptest::prelude::*;
1799
1800 use crate::ShardId;
1801 use crate::internal::paths::PartialRollupKey;
1802 use crate::internal::state::tests::any_state;
1803 use crate::internal::state::{BatchPart, HandleDebugState};
1804 use crate::internal::state_diff::StateDiff;
1805 use crate::tests::new_test_client_cache;
1806
1807 use super::*;
1808
1809 #[mz_ore::test]
1810 fn applier_version_state() {
1811 let v1 = semver::Version::new(1, 0, 0);
1812 let v2 = semver::Version::new(2, 0, 0);
1813 let v3 = semver::Version::new(3, 0, 0);
1814
1815 let shard_id = ShardId::new();
1817 let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1818 let rollup =
1819 Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1820 let mut buf = Vec::new();
1821 rollup.encode(&mut buf).expect("serializable");
1822 let bytes = Bytes::from(buf);
1823
1824 assert_eq!(
1826 UntypedState::<u64>::decode(&v2, bytes.clone())
1827 .check_codecs(&shard_id)
1828 .as_ref(),
1829 Ok(&state)
1830 );
1831 assert_eq!(
1832 UntypedState::<u64>::decode(&v3, bytes.clone())
1833 .check_codecs(&shard_id)
1834 .as_ref(),
1835 Ok(&state)
1836 );
1837
1838 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1843 assert_err!(v1_res);
1844 }
1845
1846 #[mz_ore::test]
1847 fn applier_version_state_diff() {
1848 let v1 = semver::Version::new(1, 0, 0);
1849 let v2 = semver::Version::new(2, 0, 0);
1850 let v3 = semver::Version::new(3, 0, 0);
1851
1852 let diff = StateDiff::<u64>::new(
1854 v2.clone(),
1855 SeqNo(0),
1856 SeqNo(1),
1857 2,
1858 PartialRollupKey("rollup".into()),
1859 );
1860 let mut buf = Vec::new();
1861 diff.encode(&mut buf);
1862 let bytes = Bytes::from(buf);
1863
1864 assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1866 assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1867
1868 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1873 assert_err!(v1_res);
1874 }
1875
1876 #[mz_ore::test]
1877 fn hollow_batch_migration_keys() {
1878 let x = HollowBatch::new_run(
1879 Description::new(
1880 Antichain::from_elem(1u64),
1881 Antichain::from_elem(2u64),
1882 Antichain::from_elem(3u64),
1883 ),
1884 vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1885 key: PartialBatchKey("a".into()),
1886 encoded_size_bytes: 5,
1887 key_lower: vec![],
1888 structured_key_lower: None,
1889 stats: None,
1890 ts_rewrite: None,
1891 diffs_sum: None,
1892 format: None,
1893 schema_id: None,
1894 deprecated_schema_id: None,
1895 }))],
1896 4,
1897 );
1898 let mut old = x.into_proto();
1899 old.deprecated_keys = vec!["b".into()];
1901 let mut expected = x;
1905 expected
1910 .parts
1911 .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1912 key: PartialBatchKey("b".into()),
1913 encoded_size_bytes: 0,
1914 key_lower: vec![],
1915 structured_key_lower: None,
1916 stats: None,
1917 ts_rewrite: None,
1918 diffs_sum: None,
1919 format: None,
1920 schema_id: None,
1921 deprecated_schema_id: None,
1922 })));
1923 assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1924 }
1925
1926 #[mz_ore::test]
1927 fn reader_state_migration_lease_duration() {
1928 let x = LeasedReaderState {
1929 seqno: SeqNo(1),
1930 since: Antichain::from_elem(2u64),
1931 last_heartbeat_timestamp_ms: 3,
1932 debug: HandleDebugState {
1933 hostname: "host".to_owned(),
1934 purpose: "purpose".to_owned(),
1935 },
1936 lease_duration_ms: 0,
1938 };
1939 let old = x.into_proto();
1940 let mut expected = x;
1941 expected.lease_duration_ms =
1944 u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1945 assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1946 }
1947
1948 #[mz_ore::test]
1949 fn writer_state_migration_most_recent_write() {
1950 let proto = ProtoWriterState {
1951 last_heartbeat_timestamp_ms: 1,
1952 lease_duration_ms: 2,
1953 most_recent_write_token: "".into(),
1956 most_recent_write_upper: None,
1957 debug: Some(ProtoHandleDebugState {
1958 hostname: "host".to_owned(),
1959 purpose: "purpose".to_owned(),
1960 }),
1961 };
1962 let expected = WriterState {
1963 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1964 lease_duration_ms: proto.lease_duration_ms,
1965 most_recent_write_token: IdempotencyToken::SENTINEL,
1966 most_recent_write_upper: Antichain::from_elem(0),
1967 debug: HandleDebugState {
1968 hostname: "host".to_owned(),
1969 purpose: "purpose".to_owned(),
1970 },
1971 };
1972 assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
1973 }
1974
1975 #[mz_ore::test]
1976 fn state_migration_rollups() {
1977 let r1 = HollowRollup {
1978 key: PartialRollupKey("foo".to_owned()),
1979 encoded_size_bytes: None,
1980 };
1981 let r2 = HollowRollup {
1982 key: PartialRollupKey("bar".to_owned()),
1983 encoded_size_bytes: Some(2),
1984 };
1985 let shard_id = ShardId::new();
1986 let mut state = TypedState::<(), (), u64, i64>::new(
1987 DUMMY_BUILD_INFO.semver_version(),
1988 shard_id,
1989 "host".to_owned(),
1990 0,
1991 );
1992 state.state.collections.rollups.insert(SeqNo(2), r2.clone());
1993 let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
1994
1995 proto.deprecated_rollups.insert(1, r1.key.0.clone());
1997
1998 let state: Rollup<u64> = proto.into_rust().unwrap();
1999 let state = state.state;
2000 let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2001 let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2002 assert_eq!(
2003 state
2004 .state
2005 .collections
2006 .rollups
2007 .into_iter()
2008 .collect::<Vec<_>>(),
2009 expected
2010 );
2011 }
2012
2013 #[mz_persist_proc::test(tokio::test)]
2014 #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2016 let r1_rollup = HollowRollup {
2017 key: PartialRollupKey("foo".to_owned()),
2018 encoded_size_bytes: None,
2019 };
2020 let r1 = StateFieldDiff {
2021 key: SeqNo(1),
2022 val: StateFieldValDiff::Insert(r1_rollup.clone()),
2023 };
2024 let r2_rollup = HollowRollup {
2025 key: PartialRollupKey("bar".to_owned()),
2026 encoded_size_bytes: Some(2),
2027 };
2028 let r2 = StateFieldDiff {
2029 key: SeqNo(2),
2030 val: StateFieldValDiff::Insert(r2_rollup.clone()),
2031 };
2032 let r3_rollup = HollowRollup {
2033 key: PartialRollupKey("baz".to_owned()),
2034 encoded_size_bytes: None,
2035 };
2036 let r3 = StateFieldDiff {
2037 key: SeqNo(3),
2038 val: StateFieldValDiff::Delete(r3_rollup.clone()),
2039 };
2040 let mut diff = StateDiff::<u64>::new(
2041 DUMMY_BUILD_INFO.semver_version(),
2042 SeqNo(4),
2043 SeqNo(5),
2044 0,
2045 PartialRollupKey("ignored".to_owned()),
2046 );
2047 diff.rollups.push(r2.clone());
2048 diff.rollups.push(r3.clone());
2049 let mut diff_proto = diff.into_proto();
2050
2051 let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2052 let mut field_diffs_writer = field_diffs.into_writer();
2053
2054 field_diffs_into_proto(
2056 ProtoStateField::DeprecatedRollups,
2057 &[StateFieldDiff {
2058 key: r1.key,
2059 val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2060 }],
2061 &mut field_diffs_writer,
2062 );
2063
2064 assert_none!(diff_proto.field_diffs);
2065 diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2066
2067 let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2068 assert_eq!(
2069 diff.rollups.into_iter().collect::<Vec<_>>(),
2070 vec![r2, r3, r1]
2071 );
2072
2073 let shard_id = ShardId::new();
2076 let mut state = TypedState::<(), (), u64, i64>::new(
2077 DUMMY_BUILD_INFO.semver_version(),
2078 shard_id,
2079 "host".to_owned(),
2080 0,
2081 );
2082 state.state.seqno = SeqNo(4);
2083 let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2084 rollup
2085 .deprecated_rollups
2086 .insert(3, r3_rollup.key.into_proto());
2087 let state: Rollup<u64> = rollup.into_rust().unwrap();
2088 let state = state.state;
2089 let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2090 let cache = new_test_client_cache(&dyncfgs);
2091 let encoded_diff = VersionedData {
2092 seqno: SeqNo(5),
2093 data: diff_proto.encode_to_vec().into(),
2094 };
2095 state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2096 assert_eq!(
2097 state
2098 .state
2099 .collections
2100 .rollups
2101 .into_iter()
2102 .collect::<Vec<_>>(),
2103 vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2104 );
2105 }
2106
2107 #[mz_ore::test]
2108 #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2110 fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2111 let before = UntypedState {
2112 key_codec: <() as Codec>::codec_name(),
2113 val_codec: <() as Codec>::codec_name(),
2114 ts_codec: <T as Codec64>::codec_name(),
2115 diff_codec: <i64 as Codec64>::codec_name(),
2116 state,
2117 };
2118 let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2119 let after: Rollup<T> = proto.into_rust().unwrap();
2120 let after = after.state;
2121 assert_eq!(before, after);
2122 }
2123
2124 proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2125 }
2126
2127 #[mz_ore::test]
2128 fn check_data_versions_with_lts_versions() {
2129 #[track_caller]
2130 fn testcase(code: &str, data: &str, lts_versions: &[Version], expected: Result<(), ()>) {
2131 let code = Version::parse(code).unwrap();
2132 let data = Version::parse(data).unwrap();
2133 let actual = cfg::check_data_version_with_lts_versions(&code, &data, lts_versions)
2134 .map_err(|_| ());
2135 assert_eq!(actual, expected);
2136 }
2137
2138 let none = [];
2139 let one = [Version::new(0, 130, 0)];
2140 let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2141 let three = [
2142 Version::new(0, 130, 0),
2143 Version::new(0, 140, 0),
2144 Version::new(0, 150, 0),
2145 ];
2146
2147 testcase("0.130.0", "0.128.0", &none, Ok(()));
2148 testcase("0.130.0", "0.129.0", &none, Ok(()));
2149 testcase("0.130.0", "0.130.0", &none, Ok(()));
2150 testcase("0.130.0", "0.130.1", &none, Ok(()));
2151 testcase("0.130.1", "0.130.0", &none, Ok(()));
2152 testcase("0.130.0", "0.131.0", &none, Ok(()));
2153 testcase("0.130.0", "0.132.0", &none, Err(()));
2154
2155 testcase("0.129.0", "0.127.0", &none, Ok(()));
2156 testcase("0.129.0", "0.128.0", &none, Ok(()));
2157 testcase("0.129.0", "0.129.0", &none, Ok(()));
2158 testcase("0.129.0", "0.129.1", &none, Ok(()));
2159 testcase("0.129.1", "0.129.0", &none, Ok(()));
2160 testcase("0.129.0", "0.130.0", &none, Ok(()));
2161 testcase("0.129.0", "0.131.0", &none, Err(()));
2162
2163 testcase("0.130.0", "0.128.0", &one, Ok(()));
2164 testcase("0.130.0", "0.129.0", &one, Ok(()));
2165 testcase("0.130.0", "0.130.0", &one, Ok(()));
2166 testcase("0.130.0", "0.130.1", &one, Ok(()));
2167 testcase("0.130.1", "0.130.0", &one, Ok(()));
2168 testcase("0.130.0", "0.131.0", &one, Ok(()));
2169 testcase("0.130.0", "0.132.0", &one, Ok(()));
2170
2171 testcase("0.129.0", "0.127.0", &one, Ok(()));
2172 testcase("0.129.0", "0.128.0", &one, Ok(()));
2173 testcase("0.129.0", "0.129.0", &one, Ok(()));
2174 testcase("0.129.0", "0.129.1", &one, Ok(()));
2175 testcase("0.129.1", "0.129.0", &one, Ok(()));
2176 testcase("0.129.0", "0.130.0", &one, Ok(()));
2177 testcase("0.129.0", "0.131.0", &one, Err(()));
2178
2179 testcase("0.131.0", "0.129.0", &one, Ok(()));
2180 testcase("0.131.0", "0.130.0", &one, Ok(()));
2181 testcase("0.131.0", "0.131.0", &one, Ok(()));
2182 testcase("0.131.0", "0.131.1", &one, Ok(()));
2183 testcase("0.131.1", "0.131.0", &one, Ok(()));
2184 testcase("0.131.0", "0.132.0", &one, Ok(()));
2185 testcase("0.131.0", "0.133.0", &one, Err(()));
2186
2187 testcase("0.130.0", "0.128.0", &two, Ok(()));
2188 testcase("0.130.0", "0.129.0", &two, Ok(()));
2189 testcase("0.130.0", "0.130.0", &two, Ok(()));
2190 testcase("0.130.0", "0.130.1", &two, Ok(()));
2191 testcase("0.130.1", "0.130.0", &two, Ok(()));
2192 testcase("0.130.0", "0.131.0", &two, Ok(()));
2193 testcase("0.130.0", "0.132.0", &two, Ok(()));
2194 testcase("0.130.0", "0.135.0", &two, Ok(()));
2195 testcase("0.130.0", "0.138.0", &two, Ok(()));
2196 testcase("0.130.0", "0.139.0", &two, Ok(()));
2197 testcase("0.130.0", "0.140.0", &two, Ok(()));
2198 testcase("0.130.9", "0.140.0", &two, Ok(()));
2199 testcase("0.130.0", "0.140.1", &two, Ok(()));
2200 testcase("0.130.3", "0.140.1", &two, Ok(()));
2201 testcase("0.130.3", "0.140.9", &two, Ok(()));
2202 testcase("0.130.0", "0.141.0", &two, Err(()));
2203 testcase("0.129.0", "0.133.0", &two, Err(()));
2204 testcase("0.129.0", "0.140.0", &two, Err(()));
2205 testcase("0.131.0", "0.133.0", &two, Err(()));
2206 testcase("0.131.0", "0.140.0", &two, Err(()));
2207
2208 testcase("0.130.0", "0.128.0", &three, Ok(()));
2209 testcase("0.130.0", "0.129.0", &three, Ok(()));
2210 testcase("0.130.0", "0.130.0", &three, Ok(()));
2211 testcase("0.130.0", "0.130.1", &three, Ok(()));
2212 testcase("0.130.1", "0.130.0", &three, Ok(()));
2213 testcase("0.130.0", "0.131.0", &three, Ok(()));
2214 testcase("0.130.0", "0.132.0", &three, Ok(()));
2215 testcase("0.130.0", "0.135.0", &three, Ok(()));
2216 testcase("0.130.0", "0.138.0", &three, Ok(()));
2217 testcase("0.130.0", "0.139.0", &three, Ok(()));
2218 testcase("0.130.0", "0.140.0", &three, Ok(()));
2219 testcase("0.130.9", "0.140.0", &three, Ok(()));
2220 testcase("0.130.0", "0.140.1", &three, Ok(()));
2221 testcase("0.130.3", "0.140.1", &three, Ok(()));
2222 testcase("0.130.3", "0.140.9", &three, Ok(()));
2223 testcase("0.130.0", "0.141.0", &three, Err(()));
2224 testcase("0.129.0", "0.133.0", &three, Err(()));
2225 testcase("0.129.0", "0.140.0", &three, Err(()));
2226 testcase("0.131.0", "0.133.0", &three, Err(()));
2227 testcase("0.131.0", "0.140.0", &three, Err(()));
2228 testcase("0.130.0", "0.150.0", &three, Err(()));
2229
2230 testcase("0.140.0", "0.138.0", &three, Ok(()));
2231 testcase("0.140.0", "0.139.0", &three, Ok(()));
2232 testcase("0.140.0", "0.140.0", &three, Ok(()));
2233 testcase("0.140.0", "0.140.1", &three, Ok(()));
2234 testcase("0.140.1", "0.140.0", &three, Ok(()));
2235 testcase("0.140.0", "0.141.0", &three, Ok(()));
2236 testcase("0.140.0", "0.142.0", &three, Ok(()));
2237 testcase("0.140.0", "0.145.0", &three, Ok(()));
2238 testcase("0.140.0", "0.148.0", &three, Ok(()));
2239 testcase("0.140.0", "0.149.0", &three, Ok(()));
2240 testcase("0.140.0", "0.150.0", &three, Ok(()));
2241 testcase("0.140.9", "0.150.0", &three, Ok(()));
2242 testcase("0.140.0", "0.150.1", &three, Ok(()));
2243 testcase("0.140.3", "0.150.1", &three, Ok(()));
2244 testcase("0.140.3", "0.150.9", &three, Ok(()));
2245 testcase("0.140.0", "0.151.0", &three, Err(()));
2246 testcase("0.139.0", "0.143.0", &three, Err(()));
2247 testcase("0.139.0", "0.150.0", &three, Err(()));
2248 testcase("0.141.0", "0.143.0", &three, Err(()));
2249 testcase("0.141.0", "0.150.0", &three, Err(()));
2250
2251 testcase("0.150.0", "0.148.0", &three, Ok(()));
2252 testcase("0.150.0", "0.149.0", &three, Ok(()));
2253 testcase("0.150.0", "0.150.0", &three, Ok(()));
2254 testcase("0.150.0", "0.150.1", &three, Ok(()));
2255 testcase("0.150.1", "0.150.0", &three, Ok(()));
2256 testcase("0.150.0", "0.151.0", &three, Ok(()));
2257 testcase("0.150.0", "0.152.0", &three, Ok(()));
2258 testcase("0.150.0", "0.155.0", &three, Ok(()));
2259 testcase("0.150.0", "0.158.0", &three, Ok(()));
2260 testcase("0.150.0", "0.159.0", &three, Ok(()));
2261 testcase("0.150.0", "0.160.0", &three, Ok(()));
2262 testcase("0.150.9", "0.160.0", &three, Ok(()));
2263 testcase("0.150.0", "0.160.1", &three, Ok(()));
2264 testcase("0.150.3", "0.160.1", &three, Ok(()));
2265 testcase("0.150.3", "0.160.9", &three, Ok(()));
2266 testcase("0.150.0", "0.161.0", &three, Ok(()));
2267 testcase("0.149.0", "0.153.0", &three, Err(()));
2268 testcase("0.149.0", "0.160.0", &three, Err(()));
2269 testcase("0.151.0", "0.153.0", &three, Err(()));
2270 testcase("0.151.0", "0.160.0", &three, Err(()));
2271 }
2272
2273 #[mz_ore::test]
2274 fn check_data_versions() {
2275 #[track_caller]
2276 fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2277 let code = Version::parse(code).unwrap();
2278 let data = Version::parse(data).unwrap();
2279 #[allow(clippy::disallowed_methods)]
2280 let actual =
2281 std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2282 assert_eq!(actual, expected);
2283 }
2284
2285 testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2286 testcase("0.10.0-dev", "0.10.0", Ok(()));
2287 testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2290 testcase("0.10.0-dev", "0.11.0", Ok(()));
2291 testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2292 testcase("0.10.0-dev", "0.12.0", Err(()));
2293 testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2294
2295 testcase("0.10.0", "0.8.0-dev", Ok(()));
2296 testcase("0.10.0", "0.8.0", Ok(()));
2297 testcase("0.10.0", "0.9.0-dev", Ok(()));
2298 testcase("0.10.0", "0.9.0", Ok(()));
2299 testcase("0.10.0", "0.10.0-dev", Ok(()));
2300 testcase("0.10.0", "0.10.0", Ok(()));
2301 testcase("0.10.0", "0.11.0-dev", Ok(()));
2304 testcase("0.10.0", "0.11.0", Ok(()));
2305 testcase("0.10.0", "0.11.1", Ok(()));
2306 testcase("0.10.0", "0.11.1000000", Ok(()));
2307 testcase("0.10.0", "0.12.0-dev", Err(()));
2308 testcase("0.10.0", "0.12.0", Err(()));
2309 testcase("0.10.0", "0.13.0-dev", Err(()));
2310
2311 testcase("0.10.1", "0.9.0", Ok(()));
2312 testcase("0.10.1", "0.10.0", Ok(()));
2313 testcase("0.10.1", "0.11.0", Ok(()));
2314 testcase("0.10.1", "0.11.1", Ok(()));
2315 testcase("0.10.1", "0.11.100", Ok(()));
2316
2317 testcase("0.10.0", "0.10.1", Ok(()));
2323 }
2324}