1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::sync::Arc;
16
17use bytes::{Buf, Bytes};
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::trace::Description;
20use mz_ore::cast::CastInto;
21use mz_ore::{assert_none, halt};
22use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
23use mz_persist::location::{SeqNo, VersionedData};
24use mz_persist::metrics::ColumnarMetrics;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::stats::{PartStats, ProtoStructStats};
27use mz_persist_types::{Codec, Codec64};
28use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
29use proptest::prelude::Arbitrary;
30use proptest::strategy::Strategy;
31use prost::Message;
32use semver::Version;
33use serde::ser::SerializeStruct;
34use serde::{Deserialize, Serialize};
35use timely::progress::{Antichain, Timestamp};
36use uuid::Uuid;
37
38use crate::critical::CriticalReaderId;
39use crate::error::{CodecMismatch, CodecMismatchT};
40use crate::internal::metrics::Metrics;
41use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
42use crate::internal::state::{
43 ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
44 HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
45 LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
46 ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
47 ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
48 ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
49 ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
50 ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
51 ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
52 RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
53 proto_hollow_batch_part,
54};
55use crate::internal::state_diff::{
56 ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
57};
58use crate::internal::trace::{
59 ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
60};
61use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
62use crate::{PersistConfig, ShardId, WriterId, cfg};
63
64#[derive(Debug)]
66pub struct Schemas<K: Codec, V: Codec> {
67 pub id: Option<SchemaId>,
72 pub key: Arc<K::Schema>,
74 pub val: Arc<V::Schema>,
76}
77
78impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
79 fn clone(&self) -> Self {
80 Self {
81 id: self.id,
82 key: Arc::clone(&self.key),
83 val: Arc::clone(&self.val),
84 }
85 }
86}
87
88#[derive(Clone, Serialize, Deserialize)]
111pub struct LazyProto<T> {
112 buf: Bytes,
113 _phantom: PhantomData<fn() -> T>,
114}
115
116impl<T: Message + Default> Debug for LazyProto<T> {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 match self.decode() {
119 Ok(proto) => Debug::fmt(&proto, f),
120 Err(err) => f
121 .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
122 .field("err", &err)
123 .finish(),
124 }
125 }
126}
127
128impl<T> PartialEq for LazyProto<T> {
129 fn eq(&self, other: &Self) -> bool {
130 self.cmp(other) == Ordering::Equal
131 }
132}
133
134impl<T> Eq for LazyProto<T> {}
135
136impl<T> PartialOrd for LazyProto<T> {
137 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
138 Some(self.cmp(other))
139 }
140}
141
142impl<T> Ord for LazyProto<T> {
143 fn cmp(&self, other: &Self) -> Ordering {
144 let LazyProto {
145 buf: self_buf,
146 _phantom: _,
147 } = self;
148 let LazyProto {
149 buf: other_buf,
150 _phantom: _,
151 } = other;
152 self_buf.cmp(other_buf)
153 }
154}
155
156impl<T> Hash for LazyProto<T> {
157 fn hash<H: Hasher>(&self, state: &mut H) {
158 let LazyProto { buf, _phantom } = self;
159 buf.hash(state);
160 }
161}
162
163impl<T: Message + Default> From<&T> for LazyProto<T> {
164 fn from(value: &T) -> Self {
165 let buf = Bytes::from(value.encode_to_vec());
166 LazyProto {
167 buf,
168 _phantom: PhantomData,
169 }
170 }
171}
172
173impl<T: Message + Default> LazyProto<T> {
174 pub fn decode(&self) -> Result<T, prost::DecodeError> {
175 T::decode(&*self.buf)
176 }
177
178 pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
179 Ok(T::decode(&*self.buf)?.into_rust()?)
180 }
181}
182
183impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
184 fn into_proto(&self) -> Bytes {
185 self.buf.clone()
186 }
187
188 fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
189 Ok(Self {
190 buf,
191 _phantom: PhantomData,
192 })
193 }
194}
195
196pub(crate) fn parse_id(id_prefix: char, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
197 let uuid_encoded = match encoded.strip_prefix(id_prefix) {
198 Some(x) => x,
199 None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
200 };
201 let uuid = Uuid::parse_str(uuid_encoded)
202 .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
203 Ok(*uuid.as_bytes())
204}
205
206pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
207 if let Err(msg) = cfg::check_data_version(code_version, data_version) {
208 if cfg!(test) {
211 panic!("{msg}");
212 } else {
213 halt!("{msg}");
214 }
215 }
216}
217
218impl RustType<String> for LeasedReaderId {
219 fn into_proto(&self) -> String {
220 self.to_string()
221 }
222
223 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
224 match proto.parse() {
225 Ok(x) => Ok(x),
226 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
227 }
228 }
229}
230
231impl RustType<String> for CriticalReaderId {
232 fn into_proto(&self) -> String {
233 self.to_string()
234 }
235
236 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
237 match proto.parse() {
238 Ok(x) => Ok(x),
239 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
240 }
241 }
242}
243
244impl RustType<String> for WriterId {
245 fn into_proto(&self) -> String {
246 self.to_string()
247 }
248
249 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
250 match proto.parse() {
251 Ok(x) => Ok(x),
252 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
253 }
254 }
255}
256
257impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
258 fn into_proto(&self) -> ProtoEncodedSchemas {
259 ProtoEncodedSchemas {
260 key: self.key.clone(),
261 key_data_type: self.key_data_type.clone(),
262 val: self.val.clone(),
263 val_data_type: self.val_data_type.clone(),
264 }
265 }
266
267 fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
268 Ok(EncodedSchemas {
269 key: proto.key,
270 key_data_type: proto.key_data_type,
271 val: proto.val,
272 val_data_type: proto.val_data_type,
273 })
274 }
275}
276
277impl RustType<String> for IdempotencyToken {
278 fn into_proto(&self) -> String {
279 self.to_string()
280 }
281
282 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
283 match proto.parse() {
284 Ok(x) => Ok(x),
285 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
286 }
287 }
288}
289
290impl RustType<String> for PartialBatchKey {
291 fn into_proto(&self) -> String {
292 self.0.clone()
293 }
294
295 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
296 Ok(PartialBatchKey(proto))
297 }
298}
299
300impl RustType<String> for PartialRollupKey {
301 fn into_proto(&self) -> String {
302 self.0.clone()
303 }
304
305 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
306 Ok(PartialRollupKey(proto))
307 }
308}
309
310impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
311 pub fn encode<B>(&self, buf: &mut B)
312 where
313 B: bytes::BufMut,
314 {
315 self.into_proto()
316 .encode(buf)
317 .expect("no required fields means no initialization errors");
318 }
319
320 pub fn decode(build_version: &Version, buf: Bytes) -> Self {
321 let proto = ProtoStateDiff::decode(buf)
322 .expect("internal error: invalid encoded state");
327 let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
328 check_data_version(build_version, &diff.applier_version);
329 diff
330 }
331}
332
333impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
334 fn into_proto(&self) -> ProtoStateDiff {
335 let StateDiff {
337 applier_version,
338 seqno_from,
339 seqno_to,
340 walltime_ms,
341 latest_rollup_key,
342 rollups,
343 active_rollup,
344 active_gc,
345 hostname,
346 last_gc_req,
347 leased_readers,
348 critical_readers,
349 writers,
350 schemas,
351 since,
352 legacy_batches,
353 hollow_batches,
354 spine_batches,
355 merges,
356 } = self;
357
358 let proto = ProtoStateFieldDiffs::default();
359
360 let mut writer = proto.into_writer();
362
363 field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
364 field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
365 field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
366 field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
367 field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
368 field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
369 field_diffs_into_proto(
370 ProtoStateField::CriticalReaders,
371 critical_readers,
372 &mut writer,
373 );
374 field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
375 field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
376 field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
377 field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
378 field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
379 field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
380 field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
381
382 let field_diffs = writer.into_proto();
384
385 debug_assert_eq!(field_diffs.validate(), Ok(()));
386 ProtoStateDiff {
387 applier_version: applier_version.to_string(),
388 seqno_from: seqno_from.into_proto(),
389 seqno_to: seqno_to.into_proto(),
390 walltime_ms: walltime_ms.into_proto(),
391 latest_rollup_key: latest_rollup_key.into_proto(),
392 field_diffs: Some(field_diffs),
393 }
394 }
395
396 fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
397 let applier_version = if proto.applier_version.is_empty() {
398 semver::Version::new(0, 0, 0)
402 } else {
403 semver::Version::parse(&proto.applier_version).map_err(|err| {
404 TryFromProtoError::InvalidSemverVersion(format!(
405 "invalid applier_version {}: {}",
406 proto.applier_version, err
407 ))
408 })?
409 };
410 let mut state_diff = StateDiff::new(
411 applier_version,
412 proto.seqno_from.into_rust()?,
413 proto.seqno_to.into_rust()?,
414 proto.walltime_ms,
415 proto.latest_rollup_key.into_rust()?,
416 );
417 if let Some(field_diffs) = proto.field_diffs {
418 debug_assert_eq!(field_diffs.validate(), Ok(()));
419 for field_diff in field_diffs.iter() {
420 let (field, diff) = field_diff?;
421 match field {
422 ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
423 diff,
424 &mut state_diff.hostname,
425 |()| Ok(()),
426 |v| v.into_rust(),
427 )?,
428 ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
429 diff,
430 &mut state_diff.last_gc_req,
431 |()| Ok(()),
432 |v| v.into_rust(),
433 )?,
434 ProtoStateField::ActiveGc => {
435 field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
436 diff,
437 &mut state_diff.active_gc,
438 |()| Ok(()),
439 |v| v.into_rust(),
440 )?
441 }
442 ProtoStateField::ActiveRollup => {
443 field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
444 diff,
445 &mut state_diff.active_rollup,
446 |()| Ok(()),
447 |v| v.into_rust(),
448 )?
449 }
450 ProtoStateField::Rollups => {
451 field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
452 diff,
453 &mut state_diff.rollups,
454 |k| k.into_rust(),
455 |v| v.into_rust(),
456 )?
457 }
458 ProtoStateField::DeprecatedRollups => {
462 field_diff_into_rust::<u64, String, _, _, _, _>(
463 diff,
464 &mut state_diff.rollups,
465 |k| k.into_rust(),
466 |v| {
467 Ok(HollowRollup {
468 key: v.into_rust()?,
469 encoded_size_bytes: None,
470 })
471 },
472 )?
473 }
474 ProtoStateField::LeasedReaders => {
475 field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
476 diff,
477 &mut state_diff.leased_readers,
478 |k| k.into_rust(),
479 |v| v.into_rust(),
480 )?
481 }
482 ProtoStateField::CriticalReaders => {
483 field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
484 diff,
485 &mut state_diff.critical_readers,
486 |k| k.into_rust(),
487 |v| v.into_rust(),
488 )?
489 }
490 ProtoStateField::Writers => {
491 field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
492 diff,
493 &mut state_diff.writers,
494 |k| k.into_rust(),
495 |v| v.into_rust(),
496 )?
497 }
498 ProtoStateField::Schemas => {
499 field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
500 diff,
501 &mut state_diff.schemas,
502 |k| k.into_rust(),
503 |v| v.into_rust(),
504 )?
505 }
506 ProtoStateField::Since => {
507 field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
508 diff,
509 &mut state_diff.since,
510 |()| Ok(()),
511 |v| v.into_rust(),
512 )?
513 }
514 ProtoStateField::LegacyBatches => {
515 field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
516 diff,
517 &mut state_diff.legacy_batches,
518 |k| k.into_rust(),
519 |()| Ok(()),
520 )?
521 }
522 ProtoStateField::HollowBatches => {
523 field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
524 diff,
525 &mut state_diff.hollow_batches,
526 |k| k.into_rust(),
527 |v| v.into_rust(),
528 )?
529 }
530 ProtoStateField::SpineBatches => {
531 field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
532 diff,
533 &mut state_diff.spine_batches,
534 |k| k.into_rust(),
535 |v| v.into_rust(),
536 )?
537 }
538 ProtoStateField::SpineMerges => {
539 field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
540 diff,
541 &mut state_diff.merges,
542 |k| k.into_rust(),
543 |v| v.into_rust(),
544 )?
545 }
546 }
547 }
548 }
549 Ok(state_diff)
550 }
551}
552
553fn field_diffs_into_proto<K, KP, V, VP>(
554 field: ProtoStateField,
555 diffs: &[StateFieldDiff<K, V>],
556 writer: &mut ProtoStateFieldDiffsWriter,
557) where
558 KP: prost::Message,
559 K: RustType<KP>,
560 VP: prost::Message,
561 V: RustType<VP>,
562{
563 for diff in diffs.iter() {
564 field_diff_into_proto(field, diff, writer);
565 }
566}
567
568fn field_diff_into_proto<K, KP, V, VP>(
569 field: ProtoStateField,
570 diff: &StateFieldDiff<K, V>,
571 writer: &mut ProtoStateFieldDiffsWriter,
572) where
573 KP: prost::Message,
574 K: RustType<KP>,
575 VP: prost::Message,
576 V: RustType<VP>,
577{
578 writer.push_field(field);
579 writer.encode_proto(&diff.key.into_proto());
580 match &diff.val {
581 StateFieldValDiff::Insert(to) => {
582 writer.push_diff_type(ProtoStateFieldDiffType::Insert);
583 writer.encode_proto(&to.into_proto());
584 }
585 StateFieldValDiff::Update(from, to) => {
586 writer.push_diff_type(ProtoStateFieldDiffType::Update);
587 writer.encode_proto(&from.into_proto());
588 writer.encode_proto(&to.into_proto());
589 }
590 StateFieldValDiff::Delete(from) => {
591 writer.push_diff_type(ProtoStateFieldDiffType::Delete);
592 writer.encode_proto(&from.into_proto());
593 }
594 };
595}
596
597fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
598 proto: ProtoStateFieldDiff<'_>,
599 diffs: &mut Vec<StateFieldDiff<K, V>>,
600 k_fn: KFn,
601 v_fn: VFn,
602) -> Result<(), TryFromProtoError>
603where
604 KP: prost::Message + Default,
605 VP: prost::Message + Default,
606 KFn: Fn(KP) -> Result<K, TryFromProtoError>,
607 VFn: Fn(VP) -> Result<V, TryFromProtoError>,
608{
609 let val = match proto.diff_type {
610 ProtoStateFieldDiffType::Insert => {
611 let to = VP::decode(proto.to)
612 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
613 StateFieldValDiff::Insert(v_fn(to)?)
614 }
615 ProtoStateFieldDiffType::Update => {
616 let from = VP::decode(proto.from)
617 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
618 let to = VP::decode(proto.to)
619 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
620
621 StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
622 }
623 ProtoStateFieldDiffType::Delete => {
624 let from = VP::decode(proto.from)
625 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
626 StateFieldValDiff::Delete(v_fn(from)?)
627 }
628 };
629 let key = KP::decode(proto.key)
630 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
631 diffs.push(StateFieldDiff {
632 key: k_fn(key)?,
633 val,
634 });
635 Ok(())
636}
637
638#[derive(Debug)]
641#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
642pub struct UntypedState<T> {
643 pub(crate) key_codec: String,
644 pub(crate) val_codec: String,
645 pub(crate) ts_codec: String,
646 pub(crate) diff_codec: String,
647
648 state: State<T>,
652}
653
654impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
655 pub fn seqno(&self) -> SeqNo {
656 self.state.seqno
657 }
658
659 pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
660 &self.state.collections.rollups
661 }
662
663 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
664 self.state.latest_rollup()
665 }
666
667 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
668 &mut self,
669 cfg: &PersistConfig,
670 metrics: &Metrics,
671 diffs: I,
672 ) {
673 if T::codec_name() != self.ts_codec {
677 return;
678 }
679 self.state.apply_encoded_diffs(cfg, metrics, diffs);
680 }
681
682 pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
683 self,
684 shard_id: &ShardId,
685 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
686 assert_eq!(shard_id, &self.state.shard_id);
689 if K::codec_name() != self.key_codec
690 || V::codec_name() != self.val_codec
691 || T::codec_name() != self.ts_codec
692 || D::codec_name() != self.diff_codec
693 {
694 return Err(Box::new(CodecMismatch {
695 requested: (
696 K::codec_name(),
697 V::codec_name(),
698 T::codec_name(),
699 D::codec_name(),
700 None,
701 ),
702 actual: (
703 self.key_codec,
704 self.val_codec,
705 self.ts_codec,
706 self.diff_codec,
707 None,
708 ),
709 }));
710 }
711 Ok(TypedState {
712 state: self.state,
713 _phantom: PhantomData,
714 })
715 }
716
717 pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
718 assert_eq!(shard_id, &self.state.shard_id);
721 if T::codec_name() != self.ts_codec {
722 return Err(CodecMismatchT {
723 requested: T::codec_name(),
724 actual: self.ts_codec,
725 });
726 }
727 Ok(self.state)
728 }
729
730 pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
731 let proto = ProtoRollup::decode(buf)
732 .expect("internal error: invalid encoded state");
737 let state = Rollup::from_proto(proto)
738 .expect("internal error: invalid encoded state")
739 .state;
740 check_data_version(build_version, &state.state.applier_version);
741 state
742 }
743}
744
745impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
746where
747 K: Codec,
748 V: Codec,
749 T: Codec64,
750 D: Codec64,
751{
752 fn from(typed_state: TypedState<K, V, T, D>) -> Self {
753 UntypedState {
754 key_codec: K::codec_name(),
755 val_codec: V::codec_name(),
756 ts_codec: T::codec_name(),
757 diff_codec: D::codec_name(),
758 state: typed_state.state,
759 }
760 }
761}
762
763#[derive(Debug)]
770pub struct Rollup<T> {
771 pub(crate) state: UntypedState<T>,
772 pub(crate) diffs: Option<InlinedDiffs>,
773}
774
775impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
776 pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
780 let latest_rollup_seqno = *state.latest_rollup().0;
781 let mut verify_seqno = latest_rollup_seqno;
782 for diff in &diffs {
783 assert_eq!(verify_seqno.next(), diff.seqno);
784 verify_seqno = diff.seqno;
785 }
786 assert_eq!(verify_seqno, state.seqno());
787
788 let diffs = Some(InlinedDiffs::from(
789 latest_rollup_seqno.next(),
790 state.seqno().next(),
791 diffs,
792 ));
793
794 Self { state, diffs }
795 }
796
797 pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
798 Self { state, diffs: None }
799 }
800
801 pub(crate) fn from_state_without_diffs(
802 state: State<T>,
803 key_codec: String,
804 val_codec: String,
805 ts_codec: String,
806 diff_codec: String,
807 ) -> Self {
808 Self::from_untyped_state_without_diffs(UntypedState {
809 key_codec,
810 val_codec,
811 ts_codec,
812 diff_codec,
813 state,
814 })
815 }
816}
817
818#[derive(Debug)]
819pub(crate) struct InlinedDiffs {
820 pub(crate) lower: SeqNo,
821 pub(crate) upper: SeqNo,
822 pub(crate) diffs: Vec<VersionedData>,
823}
824
825impl InlinedDiffs {
826 pub(crate) fn description(&self) -> Description<SeqNo> {
827 Description::new(
828 Antichain::from_elem(self.lower),
829 Antichain::from_elem(self.upper),
830 Antichain::from_elem(SeqNo::minimum()),
831 )
832 }
833
834 fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
835 for diff in &diffs {
836 assert!(diff.seqno >= lower);
837 assert!(diff.seqno < upper);
838 }
839 Self {
840 lower,
841 upper,
842 diffs,
843 }
844 }
845}
846
847impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
848 fn into_proto(&self) -> ProtoInlinedDiffs {
849 ProtoInlinedDiffs {
850 lower: self.lower.into_proto(),
851 upper: self.upper.into_proto(),
852 diffs: self.diffs.into_proto(),
853 }
854 }
855
856 fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
857 Ok(Self {
858 lower: proto.lower.into_rust()?,
859 upper: proto.upper.into_rust()?,
860 diffs: proto.diffs.into_rust()?,
861 })
862 }
863}
864
865impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
866 fn into_proto(&self) -> ProtoRollup {
867 ProtoRollup {
868 applier_version: self.state.state.applier_version.to_string(),
869 shard_id: self.state.state.shard_id.into_proto(),
870 seqno: self.state.state.seqno.into_proto(),
871 walltime_ms: self.state.state.walltime_ms.into_proto(),
872 hostname: self.state.state.hostname.into_proto(),
873 key_codec: self.state.key_codec.into_proto(),
874 val_codec: self.state.val_codec.into_proto(),
875 ts_codec: T::codec_name(),
876 diff_codec: self.state.diff_codec.into_proto(),
877 last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
878 active_rollup: self.state.state.collections.active_rollup.into_proto(),
879 active_gc: self.state.state.collections.active_gc.into_proto(),
880 rollups: self
881 .state
882 .state
883 .collections
884 .rollups
885 .iter()
886 .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
887 .collect(),
888 deprecated_rollups: Default::default(),
889 leased_readers: self
890 .state
891 .state
892 .collections
893 .leased_readers
894 .iter()
895 .map(|(id, state)| (id.into_proto(), state.into_proto()))
896 .collect(),
897 critical_readers: self
898 .state
899 .state
900 .collections
901 .critical_readers
902 .iter()
903 .map(|(id, state)| (id.into_proto(), state.into_proto()))
904 .collect(),
905 writers: self
906 .state
907 .state
908 .collections
909 .writers
910 .iter()
911 .map(|(id, state)| (id.into_proto(), state.into_proto()))
912 .collect(),
913 schemas: self
914 .state
915 .state
916 .collections
917 .schemas
918 .iter()
919 .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
920 .collect(),
921 trace: Some(self.state.state.collections.trace.into_proto()),
922 diffs: self.diffs.as_ref().map(|x| x.into_proto()),
923 }
924 }
925
926 fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
927 let applier_version = if x.applier_version.is_empty() {
928 semver::Version::new(0, 0, 0)
932 } else {
933 semver::Version::parse(&x.applier_version).map_err(|err| {
934 TryFromProtoError::InvalidSemverVersion(format!(
935 "invalid applier_version {}: {}",
936 x.applier_version, err
937 ))
938 })?
939 };
940
941 let mut rollups = BTreeMap::new();
942 for (seqno, rollup) in x.rollups {
943 rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
944 }
945 for (seqno, key) in x.deprecated_rollups {
946 rollups.insert(
947 seqno.into_rust()?,
948 HollowRollup {
949 key: key.into_rust()?,
950 encoded_size_bytes: None,
951 },
952 );
953 }
954 let mut leased_readers = BTreeMap::new();
955 for (id, state) in x.leased_readers {
956 leased_readers.insert(id.into_rust()?, state.into_rust()?);
957 }
958 let mut critical_readers = BTreeMap::new();
959 for (id, state) in x.critical_readers {
960 critical_readers.insert(id.into_rust()?, state.into_rust()?);
961 }
962 let mut writers = BTreeMap::new();
963 for (id, state) in x.writers {
964 writers.insert(id.into_rust()?, state.into_rust()?);
965 }
966 let mut schemas = BTreeMap::new();
967 for (id, x) in x.schemas {
968 schemas.insert(id.into_rust()?, x.into_rust()?);
969 }
970 let active_rollup = x
971 .active_rollup
972 .map(|rollup| rollup.into_rust())
973 .transpose()?;
974 let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
975 let collections = StateCollections {
976 rollups,
977 active_rollup,
978 active_gc,
979 last_gc_req: x.last_gc_req.into_rust()?,
980 leased_readers,
981 critical_readers,
982 writers,
983 schemas,
984 trace: x.trace.into_rust_if_some("trace")?,
985 };
986 let state = State {
987 applier_version,
988 shard_id: x.shard_id.into_rust()?,
989 seqno: x.seqno.into_rust()?,
990 walltime_ms: x.walltime_ms,
991 hostname: x.hostname,
992 collections,
993 };
994
995 let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
996 if let Some(diffs) = &diffs {
997 if diffs.lower != state.latest_rollup().0.next() {
998 return Err(TryFromProtoError::InvalidPersistState(format!(
999 "diffs lower ({}) should match latest rollup's successor: ({})",
1000 diffs.lower,
1001 state.latest_rollup().0.next()
1002 )));
1003 }
1004 if diffs.upper != state.seqno.next() {
1005 return Err(TryFromProtoError::InvalidPersistState(format!(
1006 "diffs upper ({}) should match state's successor: ({})",
1007 diffs.lower,
1008 state.seqno.next()
1009 )));
1010 }
1011 }
1012
1013 Ok(Rollup {
1014 state: UntypedState {
1015 state,
1016 key_codec: x.key_codec.into_rust()?,
1017 val_codec: x.val_codec.into_rust()?,
1018 ts_codec: x.ts_codec.into_rust()?,
1019 diff_codec: x.diff_codec.into_rust()?,
1020 },
1021 diffs,
1022 })
1023 }
1024}
1025
1026impl RustType<ProtoVersionedData> for VersionedData {
1027 fn into_proto(&self) -> ProtoVersionedData {
1028 ProtoVersionedData {
1029 seqno: self.seqno.into_proto(),
1030 data: Bytes::clone(&self.data),
1031 }
1032 }
1033
1034 fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1035 Ok(Self {
1036 seqno: proto.seqno.into_rust()?,
1037 data: proto.data,
1038 })
1039 }
1040}
1041
1042impl RustType<ProtoSpineId> for SpineId {
1043 fn into_proto(&self) -> ProtoSpineId {
1044 ProtoSpineId {
1045 lo: self.0.into_proto(),
1046 hi: self.1.into_proto(),
1047 }
1048 }
1049
1050 fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1051 Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1052 }
1053}
1054
1055impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1056 fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1057 let (id, batch) = entry;
1058 ProtoIdHollowBatch {
1059 id: Some(id.into_proto()),
1060 batch: Some(batch.into_proto()),
1061 }
1062 }
1063
1064 fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1065 let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1066 let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1067 Ok((id, batch))
1068 }
1069}
1070
1071impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1072 fn into_proto(&self) -> ProtoSpineBatch {
1073 ProtoSpineBatch {
1074 desc: Some(self.desc.into_proto()),
1075 parts: self.parts.into_proto(),
1076 level: self.level.into_proto(),
1077 descs: self.descs.into_proto(),
1078 }
1079 }
1080
1081 fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1082 let level = proto.level.into_rust()?;
1083 let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1084 let parts = proto.parts.into_rust()?;
1085 let descs = proto.descs.into_rust()?;
1086 Ok(ThinSpineBatch {
1087 level,
1088 desc,
1089 parts,
1090 descs,
1091 })
1092 }
1093}
1094
1095impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1096 fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1097 let (id, batch) = entry;
1098 ProtoIdSpineBatch {
1099 id: Some(id.into_proto()),
1100 batch: Some(batch.into_proto()),
1101 }
1102 }
1103
1104 fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1105 let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1106 let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1107 Ok((id, batch))
1108 }
1109}
1110
1111impl RustType<ProtoCompaction> for ActiveCompaction {
1112 fn into_proto(&self) -> ProtoCompaction {
1113 ProtoCompaction {
1114 start_ms: self.start_ms,
1115 }
1116 }
1117
1118 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1119 Ok(Self {
1120 start_ms: proto.start_ms,
1121 })
1122 }
1123}
1124
1125impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1126 fn into_proto(&self) -> ProtoMerge {
1127 ProtoMerge {
1128 since: Some(self.since.into_proto()),
1129 remaining_work: self.remaining_work.into_proto(),
1130 active_compaction: self.active_compaction.into_proto(),
1131 }
1132 }
1133
1134 fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1135 let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1136 let remaining_work = proto.remaining_work.into_rust()?;
1137 let active_compaction = proto.active_compaction.into_rust()?;
1138 Ok(Self {
1139 since,
1140 remaining_work,
1141 active_compaction,
1142 })
1143 }
1144}
1145
1146impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1147 fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1148 ProtoIdMerge {
1149 id: Some(id.into_proto()),
1150 merge: Some(merge.into_proto()),
1151 }
1152 }
1153
1154 fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1155 let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1156 let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1157 Ok((id, merge))
1158 }
1159}
1160
1161impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1162 fn into_proto(&self) -> ProtoTrace {
1163 let since = self.since.into_proto();
1164 let legacy_batches = self
1165 .legacy_batches
1166 .iter()
1167 .map(|(b, _)| b.into_proto())
1168 .collect();
1169 let hollow_batches = self.hollow_batches.into_proto();
1170 let spine_batches = self.spine_batches.into_proto();
1171 let merges = self.merges.into_proto();
1172 ProtoTrace {
1173 since: Some(since),
1174 legacy_batches,
1175 hollow_batches,
1176 spine_batches,
1177 merges,
1178 }
1179 }
1180
1181 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1182 let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1183 let legacy_batches = proto
1184 .legacy_batches
1185 .into_iter()
1186 .map(|b| b.into_rust().map(|b| (b, ())))
1187 .collect::<Result<_, _>>()?;
1188 let hollow_batches = proto.hollow_batches.into_rust()?;
1189 let spine_batches = proto.spine_batches.into_rust()?;
1190 let merges = proto.merges.into_rust()?;
1191 Ok(FlatTrace {
1192 since,
1193 legacy_batches,
1194 hollow_batches,
1195 spine_batches,
1196 merges,
1197 })
1198 }
1199}
1200
1201impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1202 fn into_proto(&self) -> ProtoTrace {
1203 self.flatten().into_proto()
1204 }
1205
1206 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1207 Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1208 }
1209}
1210
1211impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1212 fn into_proto(&self) -> ProtoLeasedReaderState {
1213 ProtoLeasedReaderState {
1214 seqno: self.seqno.into_proto(),
1215 since: Some(self.since.into_proto()),
1216 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1217 lease_duration_ms: self.lease_duration_ms.into_proto(),
1218 debug: Some(self.debug.into_proto()),
1219 }
1220 }
1221
1222 fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1223 let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1224 if lease_duration_ms == 0 {
1229 lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1230 .expect("lease duration as millis should fit within u64");
1231 }
1232 let debug = proto.debug.unwrap_or_default().into_rust()?;
1235 Ok(LeasedReaderState {
1236 seqno: proto.seqno.into_rust()?,
1237 since: proto
1238 .since
1239 .into_rust_if_some("ProtoLeasedReaderState::since")?,
1240 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1241 lease_duration_ms,
1242 debug,
1243 })
1244 }
1245}
1246
1247impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1248 fn into_proto(&self) -> ProtoCriticalReaderState {
1249 ProtoCriticalReaderState {
1250 since: Some(self.since.into_proto()),
1251 opaque: i64::from_le_bytes(self.opaque.0),
1252 opaque_codec: self.opaque_codec.clone(),
1253 debug: Some(self.debug.into_proto()),
1254 }
1255 }
1256
1257 fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1258 let debug = proto.debug.unwrap_or_default().into_rust()?;
1261 Ok(CriticalReaderState {
1262 since: proto
1263 .since
1264 .into_rust_if_some("ProtoCriticalReaderState::since")?,
1265 opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1266 opaque_codec: proto.opaque_codec,
1267 debug,
1268 })
1269 }
1270}
1271
1272impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1273 fn into_proto(&self) -> ProtoWriterState {
1274 ProtoWriterState {
1275 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1276 lease_duration_ms: self.lease_duration_ms.into_proto(),
1277 most_recent_write_token: self.most_recent_write_token.into_proto(),
1278 most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1279 debug: Some(self.debug.into_proto()),
1280 }
1281 }
1282
1283 fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1284 let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1290 IdempotencyToken::SENTINEL
1291 } else {
1292 proto.most_recent_write_token.into_rust()?
1293 };
1294 let most_recent_write_upper = match proto.most_recent_write_upper {
1295 Some(x) => x.into_rust()?,
1296 None => Antichain::from_elem(T::minimum()),
1297 };
1298 let debug = proto.debug.unwrap_or_default().into_rust()?;
1301 Ok(WriterState {
1302 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1303 lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1304 most_recent_write_token,
1305 most_recent_write_upper,
1306 debug,
1307 })
1308 }
1309}
1310
1311impl RustType<ProtoHandleDebugState> for HandleDebugState {
1312 fn into_proto(&self) -> ProtoHandleDebugState {
1313 ProtoHandleDebugState {
1314 hostname: self.hostname.into_proto(),
1315 purpose: self.purpose.into_proto(),
1316 }
1317 }
1318
1319 fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1320 Ok(HandleDebugState {
1321 hostname: proto.hostname,
1322 purpose: proto.purpose,
1323 })
1324 }
1325}
1326
1327impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1328 fn into_proto(&self) -> ProtoHollowRun {
1329 ProtoHollowRun {
1330 parts: self.parts.into_proto(),
1331 }
1332 }
1333
1334 fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1335 Ok(HollowRun {
1336 parts: proto.parts.into_rust()?,
1337 })
1338 }
1339}
1340
1341impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1342 fn into_proto(&self) -> ProtoHollowBatch {
1343 let mut run_meta = self.run_meta.into_proto();
1344 let run_meta_default = RunMeta::default().into_proto();
1346 while run_meta.last() == Some(&run_meta_default) {
1347 run_meta.pop();
1348 }
1349 ProtoHollowBatch {
1350 desc: Some(self.desc.into_proto()),
1351 parts: self.parts.into_proto(),
1352 len: self.len.into_proto(),
1353 runs: self.run_splits.into_proto(),
1354 run_meta,
1355 deprecated_keys: vec![],
1356 }
1357 }
1358
1359 fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1360 let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1361 parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1364 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1365 key: PartialBatchKey(key),
1366 encoded_size_bytes: 0,
1367 key_lower: vec![],
1368 structured_key_lower: None,
1369 stats: None,
1370 ts_rewrite: None,
1371 diffs_sum: None,
1372 format: None,
1373 schema_id: None,
1374 deprecated_schema_id: None,
1375 }))
1376 }));
1377 let run_splits: Vec<usize> = proto.runs.into_rust()?;
1379 let num_runs = if parts.is_empty() {
1380 0
1381 } else {
1382 run_splits.len() + 1
1383 };
1384 let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1385 run_meta.resize(num_runs, RunMeta::default());
1386 Ok(HollowBatch {
1387 desc: proto.desc.into_rust_if_some("desc")?,
1388 parts,
1389 len: proto.len.into_rust()?,
1390 run_splits,
1391 run_meta,
1392 })
1393 }
1394}
1395
1396impl RustType<ProtoRunMeta> for RunMeta {
1397 fn into_proto(&self) -> ProtoRunMeta {
1398 let order = match self.order {
1399 None => ProtoRunOrder::Unknown,
1400 Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1401 Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1402 Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1403 };
1404 ProtoRunMeta {
1405 order: order.into(),
1406 schema_id: self.schema.into_proto(),
1407 deprecated_schema_id: self.deprecated_schema.into_proto(),
1408 }
1409 }
1410
1411 fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1412 let order = match ProtoRunOrder::try_from(proto.order)? {
1413 ProtoRunOrder::Unknown => None,
1414 ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1415 ProtoRunOrder::Codec => Some(RunOrder::Codec),
1416 ProtoRunOrder::Structured => Some(RunOrder::Structured),
1417 };
1418 Ok(Self {
1419 order,
1420 schema: proto.schema_id.into_rust()?,
1421 deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1422 })
1423 }
1424}
1425
1426impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1427 fn into_proto(&self) -> ProtoHollowBatchPart {
1428 match self {
1429 RunPart::Single(part) => part.into_proto(),
1430 RunPart::Many(runs) => runs.into_proto(),
1431 }
1432 }
1433
1434 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1435 let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1436 RunPart::Many(proto.into_rust()?)
1437 } else {
1438 RunPart::Single(proto.into_rust()?)
1439 };
1440 Ok(run_part)
1441 }
1442}
1443
1444impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1445 fn into_proto(&self) -> ProtoHollowBatchPart {
1446 let part = ProtoHollowBatchPart {
1447 kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1448 key: self.key.into_proto(),
1449 max_part_bytes: self.max_part_bytes.into_proto(),
1450 })),
1451 encoded_size_bytes: self.hollow_bytes.into_proto(),
1452 key_lower: Bytes::copy_from_slice(&self.key_lower),
1453 diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1454 key_stats: None,
1455 ts_rewrite: None,
1456 format: None,
1457 schema_id: None,
1458 structured_key_lower: self.structured_key_lower.into_proto(),
1459 deprecated_schema_id: None,
1460 };
1461 part
1462 }
1463
1464 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1465 let run_proto = match proto.kind {
1466 Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1467 _ => Err(TryFromProtoError::UnknownEnumVariant(
1468 "ProtoHollowBatchPart::kind".to_string(),
1469 ))?,
1470 };
1471 Ok(Self {
1472 key: run_proto.key.into_rust()?,
1473 hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1474 max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1475 key_lower: proto.key_lower.to_vec(),
1476 structured_key_lower: proto.structured_key_lower.into_rust()?,
1477 diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1478 _phantom_data: Default::default(),
1479 })
1480 }
1481}
1482
1483impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1484 fn into_proto(&self) -> ProtoHollowBatchPart {
1485 match self {
1486 BatchPart::Hollow(x) => ProtoHollowBatchPart {
1487 kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1488 encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1489 key_lower: Bytes::copy_from_slice(&x.key_lower),
1490 structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1491 key_stats: x.stats.into_proto(),
1492 ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1493 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1494 format: x.format.map(|f| f.into_proto()),
1495 schema_id: x.schema_id.into_proto(),
1496 deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1497 },
1498 BatchPart::Inline {
1499 updates,
1500 ts_rewrite,
1501 schema_id,
1502 deprecated_schema_id,
1503 } => ProtoHollowBatchPart {
1504 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1505 encoded_size_bytes: 0,
1506 key_lower: Bytes::new(),
1507 structured_key_lower: None,
1508 key_stats: None,
1509 ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1510 diffs_sum: None,
1511 format: None,
1512 schema_id: schema_id.into_proto(),
1513 deprecated_schema_id: deprecated_schema_id.into_proto(),
1514 },
1515 }
1516 }
1517
1518 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1519 let ts_rewrite = match proto.ts_rewrite {
1520 Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1521 None => None,
1522 };
1523 let schema_id = proto.schema_id.into_rust()?;
1524 let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1525 match proto.kind {
1526 Some(proto_hollow_batch_part::Kind::Key(key)) => {
1527 Ok(BatchPart::Hollow(HollowBatchPart {
1528 key: key.into_rust()?,
1529 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1530 key_lower: proto.key_lower.into(),
1531 structured_key_lower: proto.structured_key_lower.into_rust()?,
1532 stats: proto.key_stats.into_rust()?,
1533 ts_rewrite,
1534 diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1535 format: proto.format.map(|f| f.into_rust()).transpose()?,
1536 schema_id,
1537 deprecated_schema_id,
1538 }))
1539 }
1540 Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1541 assert_eq!(proto.encoded_size_bytes, 0);
1542 assert_eq!(proto.key_lower.len(), 0);
1543 assert_none!(proto.key_stats);
1544 assert_none!(proto.diffs_sum);
1545 let updates = LazyInlineBatchPart(x.into_rust()?);
1546 Ok(BatchPart::Inline {
1547 updates,
1548 ts_rewrite,
1549 schema_id,
1550 deprecated_schema_id,
1551 })
1552 }
1553 _ => Err(TryFromProtoError::unknown_enum_variant(
1554 "ProtoHollowBatchPart::kind",
1555 )),
1556 }
1557 }
1558}
1559
1560impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1561 fn into_proto(&self) -> proto_hollow_batch_part::Format {
1562 match self {
1563 BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1564 BatchColumnarFormat::Both(version) => {
1565 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1566 }
1567 BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1568 }
1569 }
1570
1571 fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1572 let format = match proto {
1573 proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1574 proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1575 BatchColumnarFormat::Both(version.cast_into())
1576 }
1577 proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1578 };
1579 Ok(format)
1580 }
1581}
1582
1583#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1588pub struct LazyPartStats {
1589 key: LazyProto<ProtoStructStats>,
1590}
1591
1592impl LazyPartStats {
1593 pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1594 let PartStats { key } = x;
1595 let mut proto_stats = ProtoStructStats::from_rust(key);
1596 map_proto(&mut proto_stats);
1597 LazyPartStats {
1598 key: LazyProto::from(&proto_stats),
1599 }
1600 }
1601 pub fn decode(&self) -> PartStats {
1606 let key = self.key.decode().expect("valid proto");
1607 PartStats {
1608 key: key.into_rust().expect("valid stats"),
1609 }
1610 }
1611}
1612
1613impl RustType<Bytes> for LazyPartStats {
1614 fn into_proto(&self) -> Bytes {
1615 let LazyPartStats { key } = self;
1616 key.into_proto()
1617 }
1618
1619 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1620 Ok(LazyPartStats {
1621 key: proto.into_rust()?,
1622 })
1623 }
1624}
1625
1626#[cfg(test)]
1627pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1628 proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1629}
1630
1631#[allow(unused_parens)]
1632impl Arbitrary for LazyPartStats {
1633 type Parameters = ();
1634 type Strategy =
1635 proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1636
1637 fn arbitrary_with(_: ()) -> Self::Strategy {
1638 Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1639 LazyPartStats::encode(&x, |_| {})
1640 })
1641 }
1642}
1643
1644impl ProtoInlineBatchPart {
1645 pub(crate) fn into_rust<T: Timestamp + Codec64>(
1646 lgbytes: &ColumnarMetrics,
1647 proto: Self,
1648 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1649 let updates = proto
1650 .updates
1651 .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1652 let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1653
1654 Ok(BlobTraceBatchPart {
1655 desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1656 index: proto.index.into_rust()?,
1657 updates,
1658 })
1659 }
1660}
1661
1662#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1664pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1665
1666impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1667 fn from(value: &ProtoInlineBatchPart) -> Self {
1668 LazyInlineBatchPart(value.into())
1669 }
1670}
1671
1672impl Serialize for LazyInlineBatchPart {
1673 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1674 let proto = self.0.decode().expect("valid proto");
1677 let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1678 let () = s.serialize_field("desc", &proto.desc)?;
1679 let () = s.serialize_field("index", &proto.index)?;
1680 let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1681 s.end()
1682 }
1683}
1684
1685impl LazyInlineBatchPart {
1686 pub(crate) fn encoded_size_bytes(&self) -> usize {
1687 self.0.buf.len()
1688 }
1689
1690 pub fn decode<T: Timestamp + Codec64>(
1696 &self,
1697 lgbytes: &ColumnarMetrics,
1698 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1699 let proto = self.0.decode().expect("valid proto");
1700 ProtoInlineBatchPart::into_rust(lgbytes, proto)
1701 }
1702}
1703
1704impl RustType<Bytes> for LazyInlineBatchPart {
1705 fn into_proto(&self) -> Bytes {
1706 self.0.into_proto()
1707 }
1708
1709 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1710 Ok(LazyInlineBatchPart(proto.into_rust()?))
1711 }
1712}
1713
1714impl RustType<ProtoHollowRollup> for HollowRollup {
1715 fn into_proto(&self) -> ProtoHollowRollup {
1716 ProtoHollowRollup {
1717 key: self.key.into_proto(),
1718 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1719 }
1720 }
1721
1722 fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1723 Ok(HollowRollup {
1724 key: proto.key.into_rust()?,
1725 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1726 })
1727 }
1728}
1729
1730impl RustType<ProtoActiveRollup> for ActiveRollup {
1731 fn into_proto(&self) -> ProtoActiveRollup {
1732 ProtoActiveRollup {
1733 start_ms: self.start_ms,
1734 seqno: self.seqno.into_proto(),
1735 }
1736 }
1737
1738 fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1739 Ok(ActiveRollup {
1740 start_ms: proto.start_ms,
1741 seqno: proto.seqno.into_rust()?,
1742 })
1743 }
1744}
1745
1746impl RustType<ProtoActiveGc> for ActiveGc {
1747 fn into_proto(&self) -> ProtoActiveGc {
1748 ProtoActiveGc {
1749 start_ms: self.start_ms,
1750 seqno: self.seqno.into_proto(),
1751 }
1752 }
1753
1754 fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1755 Ok(ActiveGc {
1756 start_ms: proto.start_ms,
1757 seqno: proto.seqno.into_rust()?,
1758 })
1759 }
1760}
1761
1762impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1763 fn into_proto(&self) -> ProtoU64Description {
1764 ProtoU64Description {
1765 lower: Some(self.lower().into_proto()),
1766 upper: Some(self.upper().into_proto()),
1767 since: Some(self.since().into_proto()),
1768 }
1769 }
1770
1771 fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1772 Ok(Description::new(
1773 proto.lower.into_rust_if_some("lower")?,
1774 proto.upper.into_rust_if_some("upper")?,
1775 proto.since.into_rust_if_some("since")?,
1776 ))
1777 }
1778}
1779
1780impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1781 fn into_proto(&self) -> ProtoU64Antichain {
1782 ProtoU64Antichain {
1783 elements: self
1784 .elements()
1785 .iter()
1786 .map(|x| i64::from_le_bytes(T::encode(x)))
1787 .collect(),
1788 }
1789 }
1790
1791 fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1792 let elements = proto
1793 .elements
1794 .iter()
1795 .map(|x| T::decode(x.to_le_bytes()))
1796 .collect::<Vec<_>>();
1797 Ok(Antichain::from(elements))
1798 }
1799}
1800
1801#[cfg(test)]
1802mod tests {
1803 use bytes::Bytes;
1804 use mz_build_info::DUMMY_BUILD_INFO;
1805 use mz_dyncfg::ConfigUpdates;
1806 use mz_ore::assert_err;
1807 use mz_persist::location::SeqNo;
1808 use proptest::prelude::*;
1809
1810 use crate::ShardId;
1811 use crate::internal::paths::PartialRollupKey;
1812 use crate::internal::state::tests::any_state;
1813 use crate::internal::state::{BatchPart, HandleDebugState};
1814 use crate::internal::state_diff::StateDiff;
1815 use crate::tests::new_test_client_cache;
1816
1817 use super::*;
1818
1819 #[mz_ore::test]
1820 fn applier_version_state() {
1821 let v1 = semver::Version::new(1, 0, 0);
1822 let v2 = semver::Version::new(2, 0, 0);
1823 let v3 = semver::Version::new(3, 0, 0);
1824
1825 let shard_id = ShardId::new();
1827 let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1828 let rollup =
1829 Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1830 let mut buf = Vec::new();
1831 rollup.encode(&mut buf).expect("serializable");
1832 let bytes = Bytes::from(buf);
1833
1834 assert_eq!(
1836 UntypedState::<u64>::decode(&v2, bytes.clone())
1837 .check_codecs(&shard_id)
1838 .as_ref(),
1839 Ok(&state)
1840 );
1841 assert_eq!(
1842 UntypedState::<u64>::decode(&v3, bytes.clone())
1843 .check_codecs(&shard_id)
1844 .as_ref(),
1845 Ok(&state)
1846 );
1847
1848 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1853 assert_err!(v1_res);
1854 }
1855
1856 #[mz_ore::test]
1857 fn applier_version_state_diff() {
1858 let v1 = semver::Version::new(1, 0, 0);
1859 let v2 = semver::Version::new(2, 0, 0);
1860 let v3 = semver::Version::new(3, 0, 0);
1861
1862 let diff = StateDiff::<u64>::new(
1864 v2.clone(),
1865 SeqNo(0),
1866 SeqNo(1),
1867 2,
1868 PartialRollupKey("rollup".into()),
1869 );
1870 let mut buf = Vec::new();
1871 diff.encode(&mut buf);
1872 let bytes = Bytes::from(buf);
1873
1874 assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1876 assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1877
1878 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1883 assert_err!(v1_res);
1884 }
1885
1886 #[mz_ore::test]
1887 fn hollow_batch_migration_keys() {
1888 let x = HollowBatch::new_run(
1889 Description::new(
1890 Antichain::from_elem(1u64),
1891 Antichain::from_elem(2u64),
1892 Antichain::from_elem(3u64),
1893 ),
1894 vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1895 key: PartialBatchKey("a".into()),
1896 encoded_size_bytes: 5,
1897 key_lower: vec![],
1898 structured_key_lower: None,
1899 stats: None,
1900 ts_rewrite: None,
1901 diffs_sum: None,
1902 format: None,
1903 schema_id: None,
1904 deprecated_schema_id: None,
1905 }))],
1906 4,
1907 );
1908 let mut old = x.into_proto();
1909 old.deprecated_keys = vec!["b".into()];
1911 let mut expected = x;
1915 expected
1920 .parts
1921 .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1922 key: PartialBatchKey("b".into()),
1923 encoded_size_bytes: 0,
1924 key_lower: vec![],
1925 structured_key_lower: None,
1926 stats: None,
1927 ts_rewrite: None,
1928 diffs_sum: None,
1929 format: None,
1930 schema_id: None,
1931 deprecated_schema_id: None,
1932 })));
1933 assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1934 }
1935
1936 #[mz_ore::test]
1937 fn reader_state_migration_lease_duration() {
1938 let x = LeasedReaderState {
1939 seqno: SeqNo(1),
1940 since: Antichain::from_elem(2u64),
1941 last_heartbeat_timestamp_ms: 3,
1942 debug: HandleDebugState {
1943 hostname: "host".to_owned(),
1944 purpose: "purpose".to_owned(),
1945 },
1946 lease_duration_ms: 0,
1948 };
1949 let old = x.into_proto();
1950 let mut expected = x;
1951 expected.lease_duration_ms =
1954 u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1955 assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1956 }
1957
1958 #[mz_ore::test]
1959 fn writer_state_migration_most_recent_write() {
1960 let proto = ProtoWriterState {
1961 last_heartbeat_timestamp_ms: 1,
1962 lease_duration_ms: 2,
1963 most_recent_write_token: "".into(),
1966 most_recent_write_upper: None,
1967 debug: Some(ProtoHandleDebugState {
1968 hostname: "host".to_owned(),
1969 purpose: "purpose".to_owned(),
1970 }),
1971 };
1972 let expected = WriterState {
1973 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1974 lease_duration_ms: proto.lease_duration_ms,
1975 most_recent_write_token: IdempotencyToken::SENTINEL,
1976 most_recent_write_upper: Antichain::from_elem(0),
1977 debug: HandleDebugState {
1978 hostname: "host".to_owned(),
1979 purpose: "purpose".to_owned(),
1980 },
1981 };
1982 assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
1983 }
1984
1985 #[mz_ore::test]
1986 fn state_migration_rollups() {
1987 let r1 = HollowRollup {
1988 key: PartialRollupKey("foo".to_owned()),
1989 encoded_size_bytes: None,
1990 };
1991 let r2 = HollowRollup {
1992 key: PartialRollupKey("bar".to_owned()),
1993 encoded_size_bytes: Some(2),
1994 };
1995 let shard_id = ShardId::new();
1996 let mut state = TypedState::<(), (), u64, i64>::new(
1997 DUMMY_BUILD_INFO.semver_version(),
1998 shard_id,
1999 "host".to_owned(),
2000 0,
2001 );
2002 state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2003 let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2004
2005 proto.deprecated_rollups.insert(1, r1.key.0.clone());
2007
2008 let state: Rollup<u64> = proto.into_rust().unwrap();
2009 let state = state.state;
2010 let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2011 let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2012 assert_eq!(
2013 state
2014 .state
2015 .collections
2016 .rollups
2017 .into_iter()
2018 .collect::<Vec<_>>(),
2019 expected
2020 );
2021 }
2022
2023 #[mz_persist_proc::test(tokio::test)]
2024 #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2026 let r1_rollup = HollowRollup {
2027 key: PartialRollupKey("foo".to_owned()),
2028 encoded_size_bytes: None,
2029 };
2030 let r1 = StateFieldDiff {
2031 key: SeqNo(1),
2032 val: StateFieldValDiff::Insert(r1_rollup.clone()),
2033 };
2034 let r2_rollup = HollowRollup {
2035 key: PartialRollupKey("bar".to_owned()),
2036 encoded_size_bytes: Some(2),
2037 };
2038 let r2 = StateFieldDiff {
2039 key: SeqNo(2),
2040 val: StateFieldValDiff::Insert(r2_rollup.clone()),
2041 };
2042 let r3_rollup = HollowRollup {
2043 key: PartialRollupKey("baz".to_owned()),
2044 encoded_size_bytes: None,
2045 };
2046 let r3 = StateFieldDiff {
2047 key: SeqNo(3),
2048 val: StateFieldValDiff::Delete(r3_rollup.clone()),
2049 };
2050 let mut diff = StateDiff::<u64>::new(
2051 DUMMY_BUILD_INFO.semver_version(),
2052 SeqNo(4),
2053 SeqNo(5),
2054 0,
2055 PartialRollupKey("ignored".to_owned()),
2056 );
2057 diff.rollups.push(r2.clone());
2058 diff.rollups.push(r3.clone());
2059 let mut diff_proto = diff.into_proto();
2060
2061 let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2062 let mut field_diffs_writer = field_diffs.into_writer();
2063
2064 field_diffs_into_proto(
2066 ProtoStateField::DeprecatedRollups,
2067 &[StateFieldDiff {
2068 key: r1.key,
2069 val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2070 }],
2071 &mut field_diffs_writer,
2072 );
2073
2074 assert_none!(diff_proto.field_diffs);
2075 diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2076
2077 let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2078 assert_eq!(
2079 diff.rollups.into_iter().collect::<Vec<_>>(),
2080 vec![r2, r3, r1]
2081 );
2082
2083 let shard_id = ShardId::new();
2086 let mut state = TypedState::<(), (), u64, i64>::new(
2087 DUMMY_BUILD_INFO.semver_version(),
2088 shard_id,
2089 "host".to_owned(),
2090 0,
2091 );
2092 state.state.seqno = SeqNo(4);
2093 let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2094 rollup
2095 .deprecated_rollups
2096 .insert(3, r3_rollup.key.into_proto());
2097 let state: Rollup<u64> = rollup.into_rust().unwrap();
2098 let state = state.state;
2099 let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2100 let cache = new_test_client_cache(&dyncfgs);
2101 let encoded_diff = VersionedData {
2102 seqno: SeqNo(5),
2103 data: diff_proto.encode_to_vec().into(),
2104 };
2105 state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2106 assert_eq!(
2107 state
2108 .state
2109 .collections
2110 .rollups
2111 .into_iter()
2112 .collect::<Vec<_>>(),
2113 vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2114 );
2115 }
2116
2117 #[mz_ore::test]
2118 #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2120 fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2121 let before = UntypedState {
2122 key_codec: <() as Codec>::codec_name(),
2123 val_codec: <() as Codec>::codec_name(),
2124 ts_codec: <T as Codec64>::codec_name(),
2125 diff_codec: <i64 as Codec64>::codec_name(),
2126 state,
2127 };
2128 let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2129 let after: Rollup<T> = proto.into_rust().unwrap();
2130 let after = after.state;
2131 assert_eq!(before, after);
2132 }
2133
2134 proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2135 }
2136
2137 #[mz_ore::test]
2138 fn check_data_versions_with_self_managed_versions() {
2139 #[track_caller]
2140 fn testcase(
2141 code: &str,
2142 data: &str,
2143 self_managed_versions: &[Version],
2144 expected: Result<(), ()>,
2145 ) {
2146 let code = Version::parse(code).unwrap();
2147 let data = Version::parse(data).unwrap();
2148 let actual = cfg::check_data_version_with_self_managed_versions(
2149 &code,
2150 &data,
2151 self_managed_versions,
2152 )
2153 .map_err(|_| ());
2154 assert_eq!(actual, expected);
2155 }
2156
2157 let none = [];
2158 let one = [Version::new(0, 130, 0)];
2159 let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2160 let three = [
2161 Version::new(0, 130, 0),
2162 Version::new(0, 140, 0),
2163 Version::new(0, 150, 0),
2164 ];
2165
2166 testcase("0.130.0", "0.128.0", &none, Ok(()));
2167 testcase("0.130.0", "0.129.0", &none, Ok(()));
2168 testcase("0.130.0", "0.130.0", &none, Ok(()));
2169 testcase("0.130.0", "0.130.1", &none, Ok(()));
2170 testcase("0.130.1", "0.130.0", &none, Ok(()));
2171 testcase("0.130.0", "0.131.0", &none, Ok(()));
2172 testcase("0.130.0", "0.132.0", &none, Err(()));
2173
2174 testcase("0.129.0", "0.127.0", &none, Ok(()));
2175 testcase("0.129.0", "0.128.0", &none, Ok(()));
2176 testcase("0.129.0", "0.129.0", &none, Ok(()));
2177 testcase("0.129.0", "0.129.1", &none, Ok(()));
2178 testcase("0.129.1", "0.129.0", &none, Ok(()));
2179 testcase("0.129.0", "0.130.0", &none, Ok(()));
2180 testcase("0.129.0", "0.131.0", &none, Err(()));
2181
2182 testcase("0.130.0", "0.128.0", &one, Ok(()));
2183 testcase("0.130.0", "0.129.0", &one, Ok(()));
2184 testcase("0.130.0", "0.130.0", &one, Ok(()));
2185 testcase("0.130.0", "0.130.1", &one, Ok(()));
2186 testcase("0.130.1", "0.130.0", &one, Ok(()));
2187 testcase("0.130.0", "0.131.0", &one, Ok(()));
2188 testcase("0.130.0", "0.132.0", &one, Ok(()));
2189
2190 testcase("0.129.0", "0.127.0", &one, Ok(()));
2191 testcase("0.129.0", "0.128.0", &one, Ok(()));
2192 testcase("0.129.0", "0.129.0", &one, Ok(()));
2193 testcase("0.129.0", "0.129.1", &one, Ok(()));
2194 testcase("0.129.1", "0.129.0", &one, Ok(()));
2195 testcase("0.129.0", "0.130.0", &one, Ok(()));
2196 testcase("0.129.0", "0.131.0", &one, Err(()));
2197
2198 testcase("0.131.0", "0.129.0", &one, Ok(()));
2199 testcase("0.131.0", "0.130.0", &one, Ok(()));
2200 testcase("0.131.0", "0.131.0", &one, Ok(()));
2201 testcase("0.131.0", "0.131.1", &one, Ok(()));
2202 testcase("0.131.1", "0.131.0", &one, Ok(()));
2203 testcase("0.131.0", "0.132.0", &one, Ok(()));
2204 testcase("0.131.0", "0.133.0", &one, Err(()));
2205
2206 testcase("0.130.0", "0.128.0", &two, Ok(()));
2207 testcase("0.130.0", "0.129.0", &two, Ok(()));
2208 testcase("0.130.0", "0.130.0", &two, Ok(()));
2209 testcase("0.130.0", "0.130.1", &two, Ok(()));
2210 testcase("0.130.1", "0.130.0", &two, Ok(()));
2211 testcase("0.130.0", "0.131.0", &two, Ok(()));
2212 testcase("0.130.0", "0.132.0", &two, Ok(()));
2213 testcase("0.130.0", "0.135.0", &two, Ok(()));
2214 testcase("0.130.0", "0.138.0", &two, Ok(()));
2215 testcase("0.130.0", "0.139.0", &two, Ok(()));
2216 testcase("0.130.0", "0.140.0", &two, Ok(()));
2217 testcase("0.130.9", "0.140.0", &two, Ok(()));
2218 testcase("0.130.0", "0.140.1", &two, Ok(()));
2219 testcase("0.130.3", "0.140.1", &two, Ok(()));
2220 testcase("0.130.3", "0.140.9", &two, Ok(()));
2221 testcase("0.130.0", "0.141.0", &two, Err(()));
2222 testcase("0.129.0", "0.133.0", &two, Err(()));
2223 testcase("0.129.0", "0.140.0", &two, Err(()));
2224 testcase("0.131.0", "0.133.0", &two, Err(()));
2225 testcase("0.131.0", "0.140.0", &two, Err(()));
2226
2227 testcase("0.130.0", "0.128.0", &three, Ok(()));
2228 testcase("0.130.0", "0.129.0", &three, Ok(()));
2229 testcase("0.130.0", "0.130.0", &three, Ok(()));
2230 testcase("0.130.0", "0.130.1", &three, Ok(()));
2231 testcase("0.130.1", "0.130.0", &three, Ok(()));
2232 testcase("0.130.0", "0.131.0", &three, Ok(()));
2233 testcase("0.130.0", "0.132.0", &three, Ok(()));
2234 testcase("0.130.0", "0.135.0", &three, Ok(()));
2235 testcase("0.130.0", "0.138.0", &three, Ok(()));
2236 testcase("0.130.0", "0.139.0", &three, Ok(()));
2237 testcase("0.130.0", "0.140.0", &three, Ok(()));
2238 testcase("0.130.9", "0.140.0", &three, Ok(()));
2239 testcase("0.130.0", "0.140.1", &three, Ok(()));
2240 testcase("0.130.3", "0.140.1", &three, Ok(()));
2241 testcase("0.130.3", "0.140.9", &three, Ok(()));
2242 testcase("0.130.0", "0.141.0", &three, Err(()));
2243 testcase("0.129.0", "0.133.0", &three, Err(()));
2244 testcase("0.129.0", "0.140.0", &three, Err(()));
2245 testcase("0.131.0", "0.133.0", &three, Err(()));
2246 testcase("0.131.0", "0.140.0", &three, Err(()));
2247 testcase("0.130.0", "0.150.0", &three, Err(()));
2248
2249 testcase("0.140.0", "0.138.0", &three, Ok(()));
2250 testcase("0.140.0", "0.139.0", &three, Ok(()));
2251 testcase("0.140.0", "0.140.0", &three, Ok(()));
2252 testcase("0.140.0", "0.140.1", &three, Ok(()));
2253 testcase("0.140.1", "0.140.0", &three, Ok(()));
2254 testcase("0.140.0", "0.141.0", &three, Ok(()));
2255 testcase("0.140.0", "0.142.0", &three, Ok(()));
2256 testcase("0.140.0", "0.145.0", &three, Ok(()));
2257 testcase("0.140.0", "0.148.0", &three, Ok(()));
2258 testcase("0.140.0", "0.149.0", &three, Ok(()));
2259 testcase("0.140.0", "0.150.0", &three, Ok(()));
2260 testcase("0.140.9", "0.150.0", &three, Ok(()));
2261 testcase("0.140.0", "0.150.1", &three, Ok(()));
2262 testcase("0.140.3", "0.150.1", &three, Ok(()));
2263 testcase("0.140.3", "0.150.9", &three, Ok(()));
2264 testcase("0.140.0", "0.151.0", &three, Err(()));
2265 testcase("0.139.0", "0.143.0", &three, Err(()));
2266 testcase("0.139.0", "0.150.0", &three, Err(()));
2267 testcase("0.141.0", "0.143.0", &three, Err(()));
2268 testcase("0.141.0", "0.150.0", &three, Err(()));
2269
2270 testcase("0.150.0", "0.148.0", &three, Ok(()));
2271 testcase("0.150.0", "0.149.0", &three, Ok(()));
2272 testcase("0.150.0", "0.150.0", &three, Ok(()));
2273 testcase("0.150.0", "0.150.1", &three, Ok(()));
2274 testcase("0.150.1", "0.150.0", &three, Ok(()));
2275 testcase("0.150.0", "0.151.0", &three, Ok(()));
2276 testcase("0.150.0", "0.152.0", &three, Ok(()));
2277 testcase("0.150.0", "0.155.0", &three, Ok(()));
2278 testcase("0.150.0", "0.158.0", &three, Ok(()));
2279 testcase("0.150.0", "0.159.0", &three, Ok(()));
2280 testcase("0.150.0", "0.160.0", &three, Ok(()));
2281 testcase("0.150.9", "0.160.0", &three, Ok(()));
2282 testcase("0.150.0", "0.160.1", &three, Ok(()));
2283 testcase("0.150.3", "0.160.1", &three, Ok(()));
2284 testcase("0.150.3", "0.160.9", &three, Ok(()));
2285 testcase("0.150.0", "0.161.0", &three, Ok(()));
2286 testcase("0.149.0", "0.153.0", &three, Err(()));
2287 testcase("0.149.0", "0.160.0", &three, Err(()));
2288 testcase("0.151.0", "0.153.0", &three, Err(()));
2289 testcase("0.151.0", "0.160.0", &three, Err(()));
2290 }
2291
2292 #[mz_ore::test]
2293 fn check_data_versions() {
2294 #[track_caller]
2295 fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2296 let code = Version::parse(code).unwrap();
2297 let data = Version::parse(data).unwrap();
2298 #[allow(clippy::disallowed_methods)]
2299 let actual =
2300 std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2301 assert_eq!(actual, expected);
2302 }
2303
2304 testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2305 testcase("0.10.0-dev", "0.10.0", Ok(()));
2306 testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2309 testcase("0.10.0-dev", "0.11.0", Ok(()));
2310 testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2311 testcase("0.10.0-dev", "0.12.0", Err(()));
2312 testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2313
2314 testcase("0.10.0", "0.8.0-dev", Ok(()));
2315 testcase("0.10.0", "0.8.0", Ok(()));
2316 testcase("0.10.0", "0.9.0-dev", Ok(()));
2317 testcase("0.10.0", "0.9.0", Ok(()));
2318 testcase("0.10.0", "0.10.0-dev", Ok(()));
2319 testcase("0.10.0", "0.10.0", Ok(()));
2320 testcase("0.10.0", "0.11.0-dev", Ok(()));
2323 testcase("0.10.0", "0.11.0", Ok(()));
2324 testcase("0.10.0", "0.11.1", Ok(()));
2325 testcase("0.10.0", "0.11.1000000", Ok(()));
2326 testcase("0.10.0", "0.12.0-dev", Err(()));
2327 testcase("0.10.0", "0.12.0", Err(()));
2328 testcase("0.10.0", "0.13.0-dev", Err(()));
2329
2330 testcase("0.10.1", "0.9.0", Ok(()));
2331 testcase("0.10.1", "0.10.0", Ok(()));
2332 testcase("0.10.1", "0.11.0", Ok(()));
2333 testcase("0.10.1", "0.11.1", Ok(()));
2334 testcase("0.10.1", "0.11.100", Ok(()));
2335
2336 testcase("0.10.0", "0.10.1", Ok(()));
2342 }
2343}