1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::Debug;
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44 ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45 HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46 LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47 ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48 ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49 ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50 ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51 ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52 ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53 RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54 proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57 ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60 ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68 pub id: Option<SchemaId>,
73 pub key: Arc<K::Schema>,
75 pub val: Arc<V::Schema>,
77}
78
79impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
80 fn clone(&self) -> Self {
81 Self {
82 id: self.id,
83 key: Arc::clone(&self.key),
84 val: Arc::clone(&self.val),
85 }
86 }
87}
88
89#[derive(Clone, Serialize, Deserialize)]
112pub struct LazyProto<T> {
113 buf: Bytes,
114 _phantom: PhantomData<fn() -> T>,
115}
116
117impl<T: Message + Default> Debug for LazyProto<T> {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 match self.decode() {
120 Ok(proto) => Debug::fmt(&proto, f),
121 Err(err) => f
122 .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
123 .field("err", &err)
124 .finish(),
125 }
126 }
127}
128
129impl<T> PartialEq for LazyProto<T> {
130 fn eq(&self, other: &Self) -> bool {
131 self.cmp(other) == Ordering::Equal
132 }
133}
134
135impl<T> Eq for LazyProto<T> {}
136
137impl<T> PartialOrd for LazyProto<T> {
138 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139 Some(self.cmp(other))
140 }
141}
142
143impl<T> Ord for LazyProto<T> {
144 fn cmp(&self, other: &Self) -> Ordering {
145 let LazyProto {
146 buf: self_buf,
147 _phantom: _,
148 } = self;
149 let LazyProto {
150 buf: other_buf,
151 _phantom: _,
152 } = other;
153 self_buf.cmp(other_buf)
154 }
155}
156
157impl<T> Hash for LazyProto<T> {
158 fn hash<H: Hasher>(&self, state: &mut H) {
159 let LazyProto { buf, _phantom } = self;
160 buf.hash(state);
161 }
162}
163
164impl<T: Message + Default> From<&T> for LazyProto<T> {
165 fn from(value: &T) -> Self {
166 let buf = Bytes::from(value.encode_to_vec());
167 LazyProto {
168 buf,
169 _phantom: PhantomData,
170 }
171 }
172}
173
174impl<T: Message + Default> LazyProto<T> {
175 pub fn decode(&self) -> Result<T, prost::DecodeError> {
176 T::decode(&*self.buf)
177 }
178
179 pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
180 Ok(T::decode(&*self.buf)?.into_rust()?)
181 }
182}
183
184impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
185 fn into_proto(&self) -> Bytes {
186 self.buf.clone()
187 }
188
189 fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
190 Ok(Self {
191 buf,
192 _phantom: PhantomData,
193 })
194 }
195}
196
197pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
198 let uuid_encoded = match encoded.strip_prefix(id_prefix) {
199 Some(x) => x,
200 None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
201 };
202 let uuid = Uuid::parse_str(uuid_encoded)
203 .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
204 Ok(*uuid.as_bytes())
205}
206
207pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
208 if let Err(msg) = cfg::check_data_version(code_version, data_version) {
209 if cfg!(test) {
212 panic!("{msg}");
213 } else {
214 halt!("{msg}");
215 }
216 }
217}
218
219impl RustType<String> for LeasedReaderId {
220 fn into_proto(&self) -> String {
221 self.to_string()
222 }
223
224 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
225 match proto.parse() {
226 Ok(x) => Ok(x),
227 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
228 }
229 }
230}
231
232impl RustType<String> for CriticalReaderId {
233 fn into_proto(&self) -> String {
234 self.to_string()
235 }
236
237 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
238 match proto.parse() {
239 Ok(x) => Ok(x),
240 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
241 }
242 }
243}
244
245impl RustType<String> for WriterId {
246 fn into_proto(&self) -> String {
247 self.to_string()
248 }
249
250 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
251 match proto.parse() {
252 Ok(x) => Ok(x),
253 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
254 }
255 }
256}
257
258impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
259 fn into_proto(&self) -> ProtoEncodedSchemas {
260 ProtoEncodedSchemas {
261 key: self.key.clone(),
262 key_data_type: self.key_data_type.clone(),
263 val: self.val.clone(),
264 val_data_type: self.val_data_type.clone(),
265 }
266 }
267
268 fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
269 Ok(EncodedSchemas {
270 key: proto.key,
271 key_data_type: proto.key_data_type,
272 val: proto.val,
273 val_data_type: proto.val_data_type,
274 })
275 }
276}
277
278impl RustType<String> for IdempotencyToken {
279 fn into_proto(&self) -> String {
280 self.to_string()
281 }
282
283 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
284 match proto.parse() {
285 Ok(x) => Ok(x),
286 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
287 }
288 }
289}
290
291impl RustType<String> for PartialBatchKey {
292 fn into_proto(&self) -> String {
293 self.0.clone()
294 }
295
296 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
297 Ok(PartialBatchKey(proto))
298 }
299}
300
301impl RustType<String> for PartialRollupKey {
302 fn into_proto(&self) -> String {
303 self.0.clone()
304 }
305
306 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
307 Ok(PartialRollupKey(proto))
308 }
309}
310
311impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
312 pub fn encode<B>(&self, buf: &mut B)
313 where
314 B: bytes::BufMut,
315 {
316 self.into_proto()
317 .encode(buf)
318 .expect("no required fields means no initialization errors");
319 }
320
321 pub fn decode(build_version: &Version, buf: Bytes) -> Self {
322 let proto = ProtoStateDiff::decode(buf)
323 .expect("internal error: invalid encoded state");
328 let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
329 check_data_version(build_version, &diff.applier_version);
330 diff
331 }
332}
333
334impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
335 fn into_proto(&self) -> ProtoStateDiff {
336 let StateDiff {
338 applier_version,
339 seqno_from,
340 seqno_to,
341 walltime_ms,
342 latest_rollup_key,
343 rollups,
344 active_rollup,
345 active_gc,
346 hostname,
347 last_gc_req,
348 leased_readers,
349 critical_readers,
350 writers,
351 schemas,
352 since,
353 legacy_batches,
354 hollow_batches,
355 spine_batches,
356 merges,
357 } = self;
358
359 let proto = ProtoStateFieldDiffs::default();
360
361 let mut writer = proto.into_writer();
363
364 field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
365 field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
366 field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
367 field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
368 field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
369 field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
370 field_diffs_into_proto(
371 ProtoStateField::CriticalReaders,
372 critical_readers,
373 &mut writer,
374 );
375 field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
376 field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
377 field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
378 field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
379 field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
380 field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
381 field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
382
383 let field_diffs = writer.into_proto();
385
386 debug_assert_eq!(field_diffs.validate(), Ok(()));
387 ProtoStateDiff {
388 applier_version: applier_version.to_string(),
389 seqno_from: seqno_from.into_proto(),
390 seqno_to: seqno_to.into_proto(),
391 walltime_ms: walltime_ms.into_proto(),
392 latest_rollup_key: latest_rollup_key.into_proto(),
393 field_diffs: Some(field_diffs),
394 }
395 }
396
397 fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
398 let applier_version = if proto.applier_version.is_empty() {
399 semver::Version::new(0, 0, 0)
403 } else {
404 semver::Version::parse(&proto.applier_version).map_err(|err| {
405 TryFromProtoError::InvalidSemverVersion(format!(
406 "invalid applier_version {}: {}",
407 proto.applier_version, err
408 ))
409 })?
410 };
411 let mut state_diff = StateDiff::new(
412 applier_version,
413 proto.seqno_from.into_rust()?,
414 proto.seqno_to.into_rust()?,
415 proto.walltime_ms,
416 proto.latest_rollup_key.into_rust()?,
417 );
418 if let Some(field_diffs) = proto.field_diffs {
419 debug_assert_eq!(field_diffs.validate(), Ok(()));
420 for field_diff in field_diffs.iter() {
421 let (field, diff) = field_diff?;
422 match field {
423 ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
424 diff,
425 &mut state_diff.hostname,
426 |()| Ok(()),
427 |v| v.into_rust(),
428 )?,
429 ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
430 diff,
431 &mut state_diff.last_gc_req,
432 |()| Ok(()),
433 |v| v.into_rust(),
434 )?,
435 ProtoStateField::ActiveGc => {
436 field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
437 diff,
438 &mut state_diff.active_gc,
439 |()| Ok(()),
440 |v| v.into_rust(),
441 )?
442 }
443 ProtoStateField::ActiveRollup => {
444 field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
445 diff,
446 &mut state_diff.active_rollup,
447 |()| Ok(()),
448 |v| v.into_rust(),
449 )?
450 }
451 ProtoStateField::Rollups => {
452 field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
453 diff,
454 &mut state_diff.rollups,
455 |k| k.into_rust(),
456 |v| v.into_rust(),
457 )?
458 }
459 ProtoStateField::DeprecatedRollups => {
463 field_diff_into_rust::<u64, String, _, _, _, _>(
464 diff,
465 &mut state_diff.rollups,
466 |k| k.into_rust(),
467 |v| {
468 Ok(HollowRollup {
469 key: v.into_rust()?,
470 encoded_size_bytes: None,
471 })
472 },
473 )?
474 }
475 ProtoStateField::LeasedReaders => {
476 field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
477 diff,
478 &mut state_diff.leased_readers,
479 |k| k.into_rust(),
480 |v| v.into_rust(),
481 )?
482 }
483 ProtoStateField::CriticalReaders => {
484 field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
485 diff,
486 &mut state_diff.critical_readers,
487 |k| k.into_rust(),
488 |v| v.into_rust(),
489 )?
490 }
491 ProtoStateField::Writers => {
492 field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
493 diff,
494 &mut state_diff.writers,
495 |k| k.into_rust(),
496 |v| v.into_rust(),
497 )?
498 }
499 ProtoStateField::Schemas => {
500 field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
501 diff,
502 &mut state_diff.schemas,
503 |k| k.into_rust(),
504 |v| v.into_rust(),
505 )?
506 }
507 ProtoStateField::Since => {
508 field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
509 diff,
510 &mut state_diff.since,
511 |()| Ok(()),
512 |v| v.into_rust(),
513 )?
514 }
515 ProtoStateField::LegacyBatches => {
516 field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
517 diff,
518 &mut state_diff.legacy_batches,
519 |k| k.into_rust(),
520 |()| Ok(()),
521 )?
522 }
523 ProtoStateField::HollowBatches => {
524 field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
525 diff,
526 &mut state_diff.hollow_batches,
527 |k| k.into_rust(),
528 |v| v.into_rust(),
529 )?
530 }
531 ProtoStateField::SpineBatches => {
532 field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
533 diff,
534 &mut state_diff.spine_batches,
535 |k| k.into_rust(),
536 |v| v.into_rust(),
537 )?
538 }
539 ProtoStateField::SpineMerges => {
540 field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
541 diff,
542 &mut state_diff.merges,
543 |k| k.into_rust(),
544 |v| v.into_rust(),
545 )?
546 }
547 }
548 }
549 }
550 Ok(state_diff)
551 }
552}
553
554fn field_diffs_into_proto<K, KP, V, VP>(
555 field: ProtoStateField,
556 diffs: &[StateFieldDiff<K, V>],
557 writer: &mut ProtoStateFieldDiffsWriter,
558) where
559 KP: prost::Message,
560 K: RustType<KP>,
561 VP: prost::Message,
562 V: RustType<VP>,
563{
564 for diff in diffs.iter() {
565 field_diff_into_proto(field, diff, writer);
566 }
567}
568
569fn field_diff_into_proto<K, KP, V, VP>(
570 field: ProtoStateField,
571 diff: &StateFieldDiff<K, V>,
572 writer: &mut ProtoStateFieldDiffsWriter,
573) where
574 KP: prost::Message,
575 K: RustType<KP>,
576 VP: prost::Message,
577 V: RustType<VP>,
578{
579 writer.push_field(field);
580 writer.encode_proto(&diff.key.into_proto());
581 match &diff.val {
582 StateFieldValDiff::Insert(to) => {
583 writer.push_diff_type(ProtoStateFieldDiffType::Insert);
584 writer.encode_proto(&to.into_proto());
585 }
586 StateFieldValDiff::Update(from, to) => {
587 writer.push_diff_type(ProtoStateFieldDiffType::Update);
588 writer.encode_proto(&from.into_proto());
589 writer.encode_proto(&to.into_proto());
590 }
591 StateFieldValDiff::Delete(from) => {
592 writer.push_diff_type(ProtoStateFieldDiffType::Delete);
593 writer.encode_proto(&from.into_proto());
594 }
595 };
596}
597
598fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
599 proto: ProtoStateFieldDiff<'_>,
600 diffs: &mut Vec<StateFieldDiff<K, V>>,
601 k_fn: KFn,
602 v_fn: VFn,
603) -> Result<(), TryFromProtoError>
604where
605 KP: prost::Message + Default,
606 VP: prost::Message + Default,
607 KFn: Fn(KP) -> Result<K, TryFromProtoError>,
608 VFn: Fn(VP) -> Result<V, TryFromProtoError>,
609{
610 let val = match proto.diff_type {
611 ProtoStateFieldDiffType::Insert => {
612 let to = VP::decode(proto.to)
613 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
614 StateFieldValDiff::Insert(v_fn(to)?)
615 }
616 ProtoStateFieldDiffType::Update => {
617 let from = VP::decode(proto.from)
618 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
619 let to = VP::decode(proto.to)
620 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
621
622 StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
623 }
624 ProtoStateFieldDiffType::Delete => {
625 let from = VP::decode(proto.from)
626 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
627 StateFieldValDiff::Delete(v_fn(from)?)
628 }
629 };
630 let key = KP::decode(proto.key)
631 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
632 diffs.push(StateFieldDiff {
633 key: k_fn(key)?,
634 val,
635 });
636 Ok(())
637}
638
639#[derive(Debug)]
642#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
643pub struct UntypedState<T> {
644 pub(crate) key_codec: String,
645 pub(crate) val_codec: String,
646 pub(crate) ts_codec: String,
647 pub(crate) diff_codec: String,
648
649 state: State<T>,
653}
654
655impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
656 pub fn seqno(&self) -> SeqNo {
657 self.state.seqno
658 }
659
660 pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
661 &self.state.collections.rollups
662 }
663
664 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
665 self.state.latest_rollup()
666 }
667
668 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
669 &mut self,
670 cfg: &PersistConfig,
671 metrics: &Metrics,
672 diffs: I,
673 ) {
674 if T::codec_name() != self.ts_codec {
678 return;
679 }
680 self.state.apply_encoded_diffs(cfg, metrics, diffs);
681 }
682
683 pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
684 self,
685 shard_id: &ShardId,
686 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
687 assert_eq!(shard_id, &self.state.shard_id);
690 if K::codec_name() != self.key_codec
691 || V::codec_name() != self.val_codec
692 || T::codec_name() != self.ts_codec
693 || D::codec_name() != self.diff_codec
694 {
695 return Err(Box::new(CodecMismatch {
696 requested: (
697 K::codec_name(),
698 V::codec_name(),
699 T::codec_name(),
700 D::codec_name(),
701 None,
702 ),
703 actual: (
704 self.key_codec,
705 self.val_codec,
706 self.ts_codec,
707 self.diff_codec,
708 None,
709 ),
710 }));
711 }
712 Ok(TypedState {
713 state: self.state,
714 _phantom: PhantomData,
715 })
716 }
717
718 pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
719 assert_eq!(shard_id, &self.state.shard_id);
722 if T::codec_name() != self.ts_codec {
723 return Err(CodecMismatchT {
724 requested: T::codec_name(),
725 actual: self.ts_codec,
726 });
727 }
728 Ok(self.state)
729 }
730
731 pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
732 let proto = ProtoRollup::decode(buf)
733 .expect("internal error: invalid encoded state");
738 let state = Rollup::from_proto(proto)
739 .expect("internal error: invalid encoded state")
740 .state;
741 check_data_version(build_version, &state.state.applier_version);
742 state
743 }
744}
745
746impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
747where
748 K: Codec,
749 V: Codec,
750 T: Codec64,
751 D: Codec64,
752{
753 fn from(typed_state: TypedState<K, V, T, D>) -> Self {
754 UntypedState {
755 key_codec: K::codec_name(),
756 val_codec: V::codec_name(),
757 ts_codec: T::codec_name(),
758 diff_codec: D::codec_name(),
759 state: typed_state.state,
760 }
761 }
762}
763
764#[derive(Debug)]
771pub struct Rollup<T> {
772 pub(crate) state: UntypedState<T>,
773 pub(crate) diffs: Option<InlinedDiffs>,
774}
775
776impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
777 pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
781 let latest_rollup_seqno = *state.latest_rollup().0;
782 let mut verify_seqno = latest_rollup_seqno;
783 for diff in &diffs {
784 assert_eq!(verify_seqno.next(), diff.seqno);
785 verify_seqno = diff.seqno;
786 }
787 assert_eq!(verify_seqno, state.seqno());
788
789 let diffs = Some(InlinedDiffs::from(
790 latest_rollup_seqno.next(),
791 state.seqno().next(),
792 diffs,
793 ));
794
795 Self { state, diffs }
796 }
797
798 pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
799 Self { state, diffs: None }
800 }
801
802 pub(crate) fn from_state_without_diffs(
803 state: State<T>,
804 key_codec: String,
805 val_codec: String,
806 ts_codec: String,
807 diff_codec: String,
808 ) -> Self {
809 Self::from_untyped_state_without_diffs(UntypedState {
810 key_codec,
811 val_codec,
812 ts_codec,
813 diff_codec,
814 state,
815 })
816 }
817}
818
819#[derive(Debug)]
820pub(crate) struct InlinedDiffs {
821 pub(crate) lower: SeqNo,
822 pub(crate) upper: SeqNo,
823 pub(crate) diffs: Vec<VersionedData>,
824}
825
826impl InlinedDiffs {
827 pub(crate) fn description(&self) -> Description<SeqNo> {
828 Description::new(
829 Antichain::from_elem(self.lower),
830 Antichain::from_elem(self.upper),
831 Antichain::from_elem(SeqNo::minimum()),
832 )
833 }
834
835 fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
836 for diff in &diffs {
837 assert!(diff.seqno >= lower);
838 assert!(diff.seqno < upper);
839 }
840 Self {
841 lower,
842 upper,
843 diffs,
844 }
845 }
846}
847
848impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
849 fn into_proto(&self) -> ProtoInlinedDiffs {
850 ProtoInlinedDiffs {
851 lower: self.lower.into_proto(),
852 upper: self.upper.into_proto(),
853 diffs: self.diffs.into_proto(),
854 }
855 }
856
857 fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
858 Ok(Self {
859 lower: proto.lower.into_rust()?,
860 upper: proto.upper.into_rust()?,
861 diffs: proto.diffs.into_rust()?,
862 })
863 }
864}
865
866impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
867 fn into_proto(&self) -> ProtoRollup {
868 ProtoRollup {
869 applier_version: self.state.state.applier_version.to_string(),
870 shard_id: self.state.state.shard_id.into_proto(),
871 seqno: self.state.state.seqno.into_proto(),
872 walltime_ms: self.state.state.walltime_ms.into_proto(),
873 hostname: self.state.state.hostname.into_proto(),
874 key_codec: self.state.key_codec.into_proto(),
875 val_codec: self.state.val_codec.into_proto(),
876 ts_codec: T::codec_name(),
877 diff_codec: self.state.diff_codec.into_proto(),
878 last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
879 active_rollup: self.state.state.collections.active_rollup.into_proto(),
880 active_gc: self.state.state.collections.active_gc.into_proto(),
881 rollups: self
882 .state
883 .state
884 .collections
885 .rollups
886 .iter()
887 .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
888 .collect(),
889 deprecated_rollups: Default::default(),
890 leased_readers: self
891 .state
892 .state
893 .collections
894 .leased_readers
895 .iter()
896 .map(|(id, state)| (id.into_proto(), state.into_proto()))
897 .collect(),
898 critical_readers: self
899 .state
900 .state
901 .collections
902 .critical_readers
903 .iter()
904 .map(|(id, state)| (id.into_proto(), state.into_proto()))
905 .collect(),
906 writers: self
907 .state
908 .state
909 .collections
910 .writers
911 .iter()
912 .map(|(id, state)| (id.into_proto(), state.into_proto()))
913 .collect(),
914 schemas: self
915 .state
916 .state
917 .collections
918 .schemas
919 .iter()
920 .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
921 .collect(),
922 trace: Some(self.state.state.collections.trace.into_proto()),
923 diffs: self.diffs.as_ref().map(|x| x.into_proto()),
924 }
925 }
926
927 fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
928 let applier_version = if x.applier_version.is_empty() {
929 semver::Version::new(0, 0, 0)
933 } else {
934 semver::Version::parse(&x.applier_version).map_err(|err| {
935 TryFromProtoError::InvalidSemverVersion(format!(
936 "invalid applier_version {}: {}",
937 x.applier_version, err
938 ))
939 })?
940 };
941
942 let mut rollups = BTreeMap::new();
943 for (seqno, rollup) in x.rollups {
944 rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
945 }
946 for (seqno, key) in x.deprecated_rollups {
947 rollups.insert(
948 seqno.into_rust()?,
949 HollowRollup {
950 key: key.into_rust()?,
951 encoded_size_bytes: None,
952 },
953 );
954 }
955 let mut leased_readers = BTreeMap::new();
956 for (id, state) in x.leased_readers {
957 leased_readers.insert(id.into_rust()?, state.into_rust()?);
958 }
959 let mut critical_readers = BTreeMap::new();
960 for (id, state) in x.critical_readers {
961 critical_readers.insert(id.into_rust()?, state.into_rust()?);
962 }
963 let mut writers = BTreeMap::new();
964 for (id, state) in x.writers {
965 writers.insert(id.into_rust()?, state.into_rust()?);
966 }
967 let mut schemas = BTreeMap::new();
968 for (id, x) in x.schemas {
969 schemas.insert(id.into_rust()?, x.into_rust()?);
970 }
971 let active_rollup = x
972 .active_rollup
973 .map(|rollup| rollup.into_rust())
974 .transpose()?;
975 let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
976 let collections = StateCollections {
977 rollups,
978 active_rollup,
979 active_gc,
980 last_gc_req: x.last_gc_req.into_rust()?,
981 leased_readers,
982 critical_readers,
983 writers,
984 schemas,
985 trace: x.trace.into_rust_if_some("trace")?,
986 };
987 let state = State {
988 applier_version,
989 shard_id: x.shard_id.into_rust()?,
990 seqno: x.seqno.into_rust()?,
991 walltime_ms: x.walltime_ms,
992 hostname: x.hostname,
993 collections,
994 };
995
996 let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
997 if let Some(diffs) = &diffs {
998 if diffs.lower != state.latest_rollup().0.next() {
999 return Err(TryFromProtoError::InvalidPersistState(format!(
1000 "diffs lower ({}) should match latest rollup's successor: ({})",
1001 diffs.lower,
1002 state.latest_rollup().0.next()
1003 )));
1004 }
1005 if diffs.upper != state.seqno.next() {
1006 return Err(TryFromProtoError::InvalidPersistState(format!(
1007 "diffs upper ({}) should match state's successor: ({})",
1008 diffs.lower,
1009 state.seqno.next()
1010 )));
1011 }
1012 }
1013
1014 Ok(Rollup {
1015 state: UntypedState {
1016 state,
1017 key_codec: x.key_codec.into_rust()?,
1018 val_codec: x.val_codec.into_rust()?,
1019 ts_codec: x.ts_codec.into_rust()?,
1020 diff_codec: x.diff_codec.into_rust()?,
1021 },
1022 diffs,
1023 })
1024 }
1025}
1026
1027impl RustType<ProtoVersionedData> for VersionedData {
1028 fn into_proto(&self) -> ProtoVersionedData {
1029 ProtoVersionedData {
1030 seqno: self.seqno.into_proto(),
1031 data: Bytes::clone(&self.data),
1032 }
1033 }
1034
1035 fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1036 Ok(Self {
1037 seqno: proto.seqno.into_rust()?,
1038 data: proto.data,
1039 })
1040 }
1041}
1042
1043impl RustType<ProtoSpineId> for SpineId {
1044 fn into_proto(&self) -> ProtoSpineId {
1045 ProtoSpineId {
1046 lo: self.0.into_proto(),
1047 hi: self.1.into_proto(),
1048 }
1049 }
1050
1051 fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1052 Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1053 }
1054}
1055
1056impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1057 fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1058 let (id, batch) = entry;
1059 ProtoIdHollowBatch {
1060 id: Some(id.into_proto()),
1061 batch: Some(batch.into_proto()),
1062 }
1063 }
1064
1065 fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1066 let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1067 let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1068 Ok((id, batch))
1069 }
1070}
1071
1072impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1073 fn into_proto(&self) -> ProtoSpineBatch {
1074 ProtoSpineBatch {
1075 desc: Some(self.desc.into_proto()),
1076 parts: self.parts.into_proto(),
1077 level: self.level.into_proto(),
1078 descs: self.descs.into_proto(),
1079 }
1080 }
1081
1082 fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1083 let level = proto.level.into_rust()?;
1084 let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1085 let parts = proto.parts.into_rust()?;
1086 let descs = proto.descs.into_rust()?;
1087 Ok(ThinSpineBatch {
1088 level,
1089 desc,
1090 parts,
1091 descs,
1092 })
1093 }
1094}
1095
1096impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1097 fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1098 let (id, batch) = entry;
1099 ProtoIdSpineBatch {
1100 id: Some(id.into_proto()),
1101 batch: Some(batch.into_proto()),
1102 }
1103 }
1104
1105 fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1106 let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1107 let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1108 Ok((id, batch))
1109 }
1110}
1111
1112impl RustType<ProtoCompaction> for ActiveCompaction {
1113 fn into_proto(&self) -> ProtoCompaction {
1114 ProtoCompaction {
1115 start_ms: self.start_ms,
1116 }
1117 }
1118
1119 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1120 Ok(Self {
1121 start_ms: proto.start_ms,
1122 })
1123 }
1124}
1125
1126impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1127 fn into_proto(&self) -> ProtoMerge {
1128 ProtoMerge {
1129 since: Some(self.since.into_proto()),
1130 remaining_work: self.remaining_work.into_proto(),
1131 active_compaction: self.active_compaction.into_proto(),
1132 }
1133 }
1134
1135 fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1136 let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1137 let remaining_work = proto.remaining_work.into_rust()?;
1138 let active_compaction = proto.active_compaction.into_rust()?;
1139 Ok(Self {
1140 since,
1141 remaining_work,
1142 active_compaction,
1143 })
1144 }
1145}
1146
1147impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1148 fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1149 ProtoIdMerge {
1150 id: Some(id.into_proto()),
1151 merge: Some(merge.into_proto()),
1152 }
1153 }
1154
1155 fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1156 let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1157 let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1158 Ok((id, merge))
1159 }
1160}
1161
1162impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1163 fn into_proto(&self) -> ProtoTrace {
1164 let since = self.since.into_proto();
1165 let legacy_batches = self
1166 .legacy_batches
1167 .iter()
1168 .map(|(b, _)| b.into_proto())
1169 .collect();
1170 let hollow_batches = self.hollow_batches.into_proto();
1171 let spine_batches = self.spine_batches.into_proto();
1172 let merges = self.merges.into_proto();
1173 ProtoTrace {
1174 since: Some(since),
1175 legacy_batches,
1176 hollow_batches,
1177 spine_batches,
1178 merges,
1179 }
1180 }
1181
1182 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1183 let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1184 let legacy_batches = proto
1185 .legacy_batches
1186 .into_iter()
1187 .map(|b| b.into_rust().map(|b| (b, ())))
1188 .collect::<Result<_, _>>()?;
1189 let hollow_batches = proto.hollow_batches.into_rust()?;
1190 let spine_batches = proto.spine_batches.into_rust()?;
1191 let merges = proto.merges.into_rust()?;
1192 Ok(FlatTrace {
1193 since,
1194 legacy_batches,
1195 hollow_batches,
1196 spine_batches,
1197 merges,
1198 })
1199 }
1200}
1201
1202impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1203 fn into_proto(&self) -> ProtoTrace {
1204 self.flatten().into_proto()
1205 }
1206
1207 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1208 Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1209 }
1210}
1211
1212impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1213 fn into_proto(&self) -> ProtoLeasedReaderState {
1214 ProtoLeasedReaderState {
1215 seqno: self.seqno.into_proto(),
1216 since: Some(self.since.into_proto()),
1217 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1218 lease_duration_ms: self.lease_duration_ms.into_proto(),
1219 debug: Some(self.debug.into_proto()),
1220 }
1221 }
1222
1223 fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1224 let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1225 if lease_duration_ms == 0 {
1230 lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1231 .expect("lease duration as millis should fit within u64");
1232 }
1233 let debug = proto.debug.unwrap_or_default().into_rust()?;
1236 Ok(LeasedReaderState {
1237 seqno: proto.seqno.into_rust()?,
1238 since: proto
1239 .since
1240 .into_rust_if_some("ProtoLeasedReaderState::since")?,
1241 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1242 lease_duration_ms,
1243 debug,
1244 })
1245 }
1246}
1247
1248impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1249 fn into_proto(&self) -> ProtoCriticalReaderState {
1250 ProtoCriticalReaderState {
1251 since: Some(self.since.into_proto()),
1252 opaque: i64::from_le_bytes(self.opaque.0),
1253 opaque_codec: self.opaque_codec.clone(),
1254 debug: Some(self.debug.into_proto()),
1255 }
1256 }
1257
1258 fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1259 let debug = proto.debug.unwrap_or_default().into_rust()?;
1262 Ok(CriticalReaderState {
1263 since: proto
1264 .since
1265 .into_rust_if_some("ProtoCriticalReaderState::since")?,
1266 opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1267 opaque_codec: proto.opaque_codec,
1268 debug,
1269 })
1270 }
1271}
1272
1273impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1274 fn into_proto(&self) -> ProtoWriterState {
1275 ProtoWriterState {
1276 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1277 lease_duration_ms: self.lease_duration_ms.into_proto(),
1278 most_recent_write_token: self.most_recent_write_token.into_proto(),
1279 most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1280 debug: Some(self.debug.into_proto()),
1281 }
1282 }
1283
1284 fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1285 let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1291 IdempotencyToken::SENTINEL
1292 } else {
1293 proto.most_recent_write_token.into_rust()?
1294 };
1295 let most_recent_write_upper = match proto.most_recent_write_upper {
1296 Some(x) => x.into_rust()?,
1297 None => Antichain::from_elem(T::minimum()),
1298 };
1299 let debug = proto.debug.unwrap_or_default().into_rust()?;
1302 Ok(WriterState {
1303 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1304 lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1305 most_recent_write_token,
1306 most_recent_write_upper,
1307 debug,
1308 })
1309 }
1310}
1311
1312impl RustType<ProtoHandleDebugState> for HandleDebugState {
1313 fn into_proto(&self) -> ProtoHandleDebugState {
1314 ProtoHandleDebugState {
1315 hostname: self.hostname.into_proto(),
1316 purpose: self.purpose.into_proto(),
1317 }
1318 }
1319
1320 fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1321 Ok(HandleDebugState {
1322 hostname: proto.hostname,
1323 purpose: proto.purpose,
1324 })
1325 }
1326}
1327
1328impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1329 fn into_proto(&self) -> ProtoHollowRun {
1330 ProtoHollowRun {
1331 parts: self.parts.into_proto(),
1332 }
1333 }
1334
1335 fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1336 Ok(HollowRun {
1337 parts: proto.parts.into_rust()?,
1338 })
1339 }
1340}
1341
1342impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1343 fn into_proto(&self) -> ProtoHollowBatch {
1344 let mut run_meta = self.run_meta.into_proto();
1345 let run_meta_default = RunMeta::default().into_proto();
1347 while run_meta.last() == Some(&run_meta_default) {
1348 run_meta.pop();
1349 }
1350 ProtoHollowBatch {
1351 desc: Some(self.desc.into_proto()),
1352 parts: self.parts.into_proto(),
1353 len: self.len.into_proto(),
1354 runs: self.run_splits.into_proto(),
1355 run_meta,
1356 deprecated_keys: vec![],
1357 }
1358 }
1359
1360 fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1361 let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1362 parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1365 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1366 key: PartialBatchKey(key),
1367 encoded_size_bytes: 0,
1368 key_lower: vec![],
1369 structured_key_lower: None,
1370 stats: None,
1371 ts_rewrite: None,
1372 diffs_sum: None,
1373 format: None,
1374 schema_id: None,
1375 deprecated_schema_id: None,
1376 }))
1377 }));
1378 let run_splits: Vec<usize> = proto.runs.into_rust()?;
1380 let num_runs = if parts.is_empty() {
1381 0
1382 } else {
1383 run_splits.len() + 1
1384 };
1385 let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1386 run_meta.resize(num_runs, RunMeta::default());
1387 Ok(HollowBatch {
1388 desc: proto.desc.into_rust_if_some("desc")?,
1389 parts,
1390 len: proto.len.into_rust()?,
1391 run_splits,
1392 run_meta,
1393 })
1394 }
1395}
1396
1397impl RustType<String> for RunId {
1398 fn into_proto(&self) -> String {
1399 self.to_string()
1400 }
1401
1402 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1403 RunId::from_str(&proto).map_err(|_| {
1404 TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1405 })
1406 }
1407}
1408
1409impl RustType<ProtoRunMeta> for RunMeta {
1410 fn into_proto(&self) -> ProtoRunMeta {
1411 let order = match self.order {
1412 None => ProtoRunOrder::Unknown,
1413 Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1414 Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1415 Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1416 };
1417 ProtoRunMeta {
1418 order: order.into(),
1419 schema_id: self.schema.into_proto(),
1420 deprecated_schema_id: self.deprecated_schema.into_proto(),
1421 id: self.id.into_proto(),
1422 len: self.len.into_proto(),
1423 }
1424 }
1425
1426 fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1427 let order = match ProtoRunOrder::try_from(proto.order)? {
1428 ProtoRunOrder::Unknown => None,
1429 ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1430 ProtoRunOrder::Codec => Some(RunOrder::Codec),
1431 ProtoRunOrder::Structured => Some(RunOrder::Structured),
1432 };
1433 Ok(Self {
1434 order,
1435 schema: proto.schema_id.into_rust()?,
1436 deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1437 id: proto.id.into_rust()?,
1438 len: proto.len.into_rust()?,
1439 })
1440 }
1441}
1442
1443impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1444 fn into_proto(&self) -> ProtoHollowBatchPart {
1445 match self {
1446 RunPart::Single(part) => part.into_proto(),
1447 RunPart::Many(runs) => runs.into_proto(),
1448 }
1449 }
1450
1451 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1452 let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1453 RunPart::Many(proto.into_rust()?)
1454 } else {
1455 RunPart::Single(proto.into_rust()?)
1456 };
1457 Ok(run_part)
1458 }
1459}
1460
1461impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1462 fn into_proto(&self) -> ProtoHollowBatchPart {
1463 let part = ProtoHollowBatchPart {
1464 kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1465 key: self.key.into_proto(),
1466 max_part_bytes: self.max_part_bytes.into_proto(),
1467 })),
1468 encoded_size_bytes: self.hollow_bytes.into_proto(),
1469 key_lower: Bytes::copy_from_slice(&self.key_lower),
1470 diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1471 key_stats: None,
1472 ts_rewrite: None,
1473 format: None,
1474 schema_id: None,
1475 structured_key_lower: self.structured_key_lower.into_proto(),
1476 deprecated_schema_id: None,
1477 };
1478 part
1479 }
1480
1481 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1482 let run_proto = match proto.kind {
1483 Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1484 _ => Err(TryFromProtoError::UnknownEnumVariant(
1485 "ProtoHollowBatchPart::kind".to_string(),
1486 ))?,
1487 };
1488 Ok(Self {
1489 key: run_proto.key.into_rust()?,
1490 hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1491 max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1492 key_lower: proto.key_lower.to_vec(),
1493 structured_key_lower: proto.structured_key_lower.into_rust()?,
1494 diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1495 _phantom_data: Default::default(),
1496 })
1497 }
1498}
1499
1500impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1501 fn into_proto(&self) -> ProtoHollowBatchPart {
1502 match self {
1503 BatchPart::Hollow(x) => ProtoHollowBatchPart {
1504 kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1505 encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1506 key_lower: Bytes::copy_from_slice(&x.key_lower),
1507 structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1508 key_stats: x.stats.into_proto(),
1509 ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1510 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1511 format: x.format.map(|f| f.into_proto()),
1512 schema_id: x.schema_id.into_proto(),
1513 deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1514 },
1515 BatchPart::Inline {
1516 updates,
1517 ts_rewrite,
1518 schema_id,
1519 deprecated_schema_id,
1520 } => ProtoHollowBatchPart {
1521 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1522 encoded_size_bytes: 0,
1523 key_lower: Bytes::new(),
1524 structured_key_lower: None,
1525 key_stats: None,
1526 ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1527 diffs_sum: None,
1528 format: None,
1529 schema_id: schema_id.into_proto(),
1530 deprecated_schema_id: deprecated_schema_id.into_proto(),
1531 },
1532 }
1533 }
1534
1535 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1536 let ts_rewrite = match proto.ts_rewrite {
1537 Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1538 None => None,
1539 };
1540 let schema_id = proto.schema_id.into_rust()?;
1541 let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1542 match proto.kind {
1543 Some(proto_hollow_batch_part::Kind::Key(key)) => {
1544 Ok(BatchPart::Hollow(HollowBatchPart {
1545 key: key.into_rust()?,
1546 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1547 key_lower: proto.key_lower.into(),
1548 structured_key_lower: proto.structured_key_lower.into_rust()?,
1549 stats: proto.key_stats.into_rust()?,
1550 ts_rewrite,
1551 diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1552 format: proto.format.map(|f| f.into_rust()).transpose()?,
1553 schema_id,
1554 deprecated_schema_id,
1555 }))
1556 }
1557 Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1558 assert_eq!(proto.encoded_size_bytes, 0);
1559 assert_eq!(proto.key_lower.len(), 0);
1560 assert_none!(proto.key_stats);
1561 assert_none!(proto.diffs_sum);
1562 let updates = LazyInlineBatchPart(x.into_rust()?);
1563 Ok(BatchPart::Inline {
1564 updates,
1565 ts_rewrite,
1566 schema_id,
1567 deprecated_schema_id,
1568 })
1569 }
1570 _ => Err(TryFromProtoError::unknown_enum_variant(
1571 "ProtoHollowBatchPart::kind",
1572 )),
1573 }
1574 }
1575}
1576
1577impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1578 fn into_proto(&self) -> proto_hollow_batch_part::Format {
1579 match self {
1580 BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1581 BatchColumnarFormat::Both(version) => {
1582 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1583 }
1584 BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1585 }
1586 }
1587
1588 fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1589 let format = match proto {
1590 proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1591 proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1592 BatchColumnarFormat::Both(version.cast_into())
1593 }
1594 proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1595 };
1596 Ok(format)
1597 }
1598}
1599
1600#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1605pub struct LazyPartStats {
1606 key: LazyProto<ProtoStructStats>,
1607}
1608
1609impl LazyPartStats {
1610 pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1611 let PartStats { key } = x;
1612 let mut proto_stats = ProtoStructStats::from_rust(key);
1613 map_proto(&mut proto_stats);
1614 LazyPartStats {
1615 key: LazyProto::from(&proto_stats),
1616 }
1617 }
1618 pub fn decode(&self) -> PartStats {
1623 let key = self.key.decode().expect("valid proto");
1624 PartStats {
1625 key: key.into_rust().expect("valid stats"),
1626 }
1627 }
1628}
1629
1630impl RustType<Bytes> for LazyPartStats {
1631 fn into_proto(&self) -> Bytes {
1632 let LazyPartStats { key } = self;
1633 key.into_proto()
1634 }
1635
1636 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1637 Ok(LazyPartStats {
1638 key: proto.into_rust()?,
1639 })
1640 }
1641}
1642
1643#[cfg(test)]
1644pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1645 proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1646}
1647
1648#[allow(unused_parens)]
1649impl Arbitrary for LazyPartStats {
1650 type Parameters = ();
1651 type Strategy =
1652 proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1653
1654 fn arbitrary_with(_: ()) -> Self::Strategy {
1655 Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1656 LazyPartStats::encode(&x, |_| {})
1657 })
1658 }
1659}
1660
1661impl ProtoInlineBatchPart {
1662 pub(crate) fn into_rust<T: Timestamp + Codec64>(
1663 lgbytes: &ColumnarMetrics,
1664 proto: Self,
1665 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1666 let updates = proto
1667 .updates
1668 .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1669 let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1670
1671 Ok(BlobTraceBatchPart {
1672 desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1673 index: proto.index.into_rust()?,
1674 updates,
1675 })
1676 }
1677}
1678
1679#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1681pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1682
1683impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1684 fn from(value: &ProtoInlineBatchPart) -> Self {
1685 LazyInlineBatchPart(value.into())
1686 }
1687}
1688
1689impl Serialize for LazyInlineBatchPart {
1690 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1691 let proto = self.0.decode().expect("valid proto");
1694 let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1695 let () = s.serialize_field("desc", &proto.desc)?;
1696 let () = s.serialize_field("index", &proto.index)?;
1697 let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1698 s.end()
1699 }
1700}
1701
1702impl LazyInlineBatchPart {
1703 pub(crate) fn encoded_size_bytes(&self) -> usize {
1704 self.0.buf.len()
1705 }
1706
1707 pub fn decode<T: Timestamp + Codec64>(
1713 &self,
1714 lgbytes: &ColumnarMetrics,
1715 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1716 let proto = self.0.decode().expect("valid proto");
1717 ProtoInlineBatchPart::into_rust(lgbytes, proto)
1718 }
1719}
1720
1721impl RustType<Bytes> for LazyInlineBatchPart {
1722 fn into_proto(&self) -> Bytes {
1723 self.0.into_proto()
1724 }
1725
1726 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1727 Ok(LazyInlineBatchPart(proto.into_rust()?))
1728 }
1729}
1730
1731impl RustType<ProtoHollowRollup> for HollowRollup {
1732 fn into_proto(&self) -> ProtoHollowRollup {
1733 ProtoHollowRollup {
1734 key: self.key.into_proto(),
1735 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1736 }
1737 }
1738
1739 fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1740 Ok(HollowRollup {
1741 key: proto.key.into_rust()?,
1742 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1743 })
1744 }
1745}
1746
1747impl RustType<ProtoActiveRollup> for ActiveRollup {
1748 fn into_proto(&self) -> ProtoActiveRollup {
1749 ProtoActiveRollup {
1750 start_ms: self.start_ms,
1751 seqno: self.seqno.into_proto(),
1752 }
1753 }
1754
1755 fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1756 Ok(ActiveRollup {
1757 start_ms: proto.start_ms,
1758 seqno: proto.seqno.into_rust()?,
1759 })
1760 }
1761}
1762
1763impl RustType<ProtoActiveGc> for ActiveGc {
1764 fn into_proto(&self) -> ProtoActiveGc {
1765 ProtoActiveGc {
1766 start_ms: self.start_ms,
1767 seqno: self.seqno.into_proto(),
1768 }
1769 }
1770
1771 fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1772 Ok(ActiveGc {
1773 start_ms: proto.start_ms,
1774 seqno: proto.seqno.into_rust()?,
1775 })
1776 }
1777}
1778
1779impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1780 fn into_proto(&self) -> ProtoU64Description {
1781 ProtoU64Description {
1782 lower: Some(self.lower().into_proto()),
1783 upper: Some(self.upper().into_proto()),
1784 since: Some(self.since().into_proto()),
1785 }
1786 }
1787
1788 fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1789 Ok(Description::new(
1790 proto.lower.into_rust_if_some("lower")?,
1791 proto.upper.into_rust_if_some("upper")?,
1792 proto.since.into_rust_if_some("since")?,
1793 ))
1794 }
1795}
1796
1797impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1798 fn into_proto(&self) -> ProtoU64Antichain {
1799 ProtoU64Antichain {
1800 elements: self
1801 .elements()
1802 .iter()
1803 .map(|x| i64::from_le_bytes(T::encode(x)))
1804 .collect(),
1805 }
1806 }
1807
1808 fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1809 let elements = proto
1810 .elements
1811 .iter()
1812 .map(|x| T::decode(x.to_le_bytes()))
1813 .collect::<Vec<_>>();
1814 Ok(Antichain::from(elements))
1815 }
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820 use bytes::Bytes;
1821 use mz_build_info::DUMMY_BUILD_INFO;
1822 use mz_dyncfg::ConfigUpdates;
1823 use mz_ore::assert_err;
1824 use mz_persist::location::SeqNo;
1825 use proptest::prelude::*;
1826
1827 use crate::ShardId;
1828 use crate::internal::paths::PartialRollupKey;
1829 use crate::internal::state::tests::any_state;
1830 use crate::internal::state::{BatchPart, HandleDebugState};
1831 use crate::internal::state_diff::StateDiff;
1832 use crate::tests::new_test_client_cache;
1833
1834 use super::*;
1835
1836 #[mz_ore::test]
1837 fn applier_version_state() {
1838 let v1 = semver::Version::new(1, 0, 0);
1839 let v2 = semver::Version::new(2, 0, 0);
1840 let v3 = semver::Version::new(3, 0, 0);
1841
1842 let shard_id = ShardId::new();
1844 let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1845 let rollup =
1846 Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1847 let mut buf = Vec::new();
1848 rollup.encode(&mut buf).expect("serializable");
1849 let bytes = Bytes::from(buf);
1850
1851 assert_eq!(
1853 UntypedState::<u64>::decode(&v2, bytes.clone())
1854 .check_codecs(&shard_id)
1855 .as_ref(),
1856 Ok(&state)
1857 );
1858 assert_eq!(
1859 UntypedState::<u64>::decode(&v3, bytes.clone())
1860 .check_codecs(&shard_id)
1861 .as_ref(),
1862 Ok(&state)
1863 );
1864
1865 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1870 assert_err!(v1_res);
1871 }
1872
1873 #[mz_ore::test]
1874 fn applier_version_state_diff() {
1875 let v1 = semver::Version::new(1, 0, 0);
1876 let v2 = semver::Version::new(2, 0, 0);
1877 let v3 = semver::Version::new(3, 0, 0);
1878
1879 let diff = StateDiff::<u64>::new(
1881 v2.clone(),
1882 SeqNo(0),
1883 SeqNo(1),
1884 2,
1885 PartialRollupKey("rollup".into()),
1886 );
1887 let mut buf = Vec::new();
1888 diff.encode(&mut buf);
1889 let bytes = Bytes::from(buf);
1890
1891 assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1893 assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1894
1895 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1900 assert_err!(v1_res);
1901 }
1902
1903 #[mz_ore::test]
1904 fn hollow_batch_migration_keys() {
1905 let x = HollowBatch::new_run(
1906 Description::new(
1907 Antichain::from_elem(1u64),
1908 Antichain::from_elem(2u64),
1909 Antichain::from_elem(3u64),
1910 ),
1911 vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1912 key: PartialBatchKey("a".into()),
1913 encoded_size_bytes: 5,
1914 key_lower: vec![],
1915 structured_key_lower: None,
1916 stats: None,
1917 ts_rewrite: None,
1918 diffs_sum: None,
1919 format: None,
1920 schema_id: None,
1921 deprecated_schema_id: None,
1922 }))],
1923 4,
1924 );
1925 let mut old = x.into_proto();
1926 old.deprecated_keys = vec!["b".into()];
1928 let mut expected = x;
1932 expected
1937 .parts
1938 .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1939 key: PartialBatchKey("b".into()),
1940 encoded_size_bytes: 0,
1941 key_lower: vec![],
1942 structured_key_lower: None,
1943 stats: None,
1944 ts_rewrite: None,
1945 diffs_sum: None,
1946 format: None,
1947 schema_id: None,
1948 deprecated_schema_id: None,
1949 })));
1950 assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1951 }
1952
1953 #[mz_ore::test]
1954 fn reader_state_migration_lease_duration() {
1955 let x = LeasedReaderState {
1956 seqno: SeqNo(1),
1957 since: Antichain::from_elem(2u64),
1958 last_heartbeat_timestamp_ms: 3,
1959 debug: HandleDebugState {
1960 hostname: "host".to_owned(),
1961 purpose: "purpose".to_owned(),
1962 },
1963 lease_duration_ms: 0,
1965 };
1966 let old = x.into_proto();
1967 let mut expected = x;
1968 expected.lease_duration_ms =
1971 u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1972 assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1973 }
1974
1975 #[mz_ore::test]
1976 fn writer_state_migration_most_recent_write() {
1977 let proto = ProtoWriterState {
1978 last_heartbeat_timestamp_ms: 1,
1979 lease_duration_ms: 2,
1980 most_recent_write_token: "".into(),
1983 most_recent_write_upper: None,
1984 debug: Some(ProtoHandleDebugState {
1985 hostname: "host".to_owned(),
1986 purpose: "purpose".to_owned(),
1987 }),
1988 };
1989 let expected = WriterState {
1990 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1991 lease_duration_ms: proto.lease_duration_ms,
1992 most_recent_write_token: IdempotencyToken::SENTINEL,
1993 most_recent_write_upper: Antichain::from_elem(0),
1994 debug: HandleDebugState {
1995 hostname: "host".to_owned(),
1996 purpose: "purpose".to_owned(),
1997 },
1998 };
1999 assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2000 }
2001
2002 #[mz_ore::test]
2003 fn state_migration_rollups() {
2004 let r1 = HollowRollup {
2005 key: PartialRollupKey("foo".to_owned()),
2006 encoded_size_bytes: None,
2007 };
2008 let r2 = HollowRollup {
2009 key: PartialRollupKey("bar".to_owned()),
2010 encoded_size_bytes: Some(2),
2011 };
2012 let shard_id = ShardId::new();
2013 let mut state = TypedState::<(), (), u64, i64>::new(
2014 DUMMY_BUILD_INFO.semver_version(),
2015 shard_id,
2016 "host".to_owned(),
2017 0,
2018 );
2019 state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2020 let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2021
2022 proto.deprecated_rollups.insert(1, r1.key.0.clone());
2024
2025 let state: Rollup<u64> = proto.into_rust().unwrap();
2026 let state = state.state;
2027 let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2028 let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2029 assert_eq!(
2030 state
2031 .state
2032 .collections
2033 .rollups
2034 .into_iter()
2035 .collect::<Vec<_>>(),
2036 expected
2037 );
2038 }
2039
2040 #[mz_persist_proc::test(tokio::test)]
2041 #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2043 let r1_rollup = HollowRollup {
2044 key: PartialRollupKey("foo".to_owned()),
2045 encoded_size_bytes: None,
2046 };
2047 let r1 = StateFieldDiff {
2048 key: SeqNo(1),
2049 val: StateFieldValDiff::Insert(r1_rollup.clone()),
2050 };
2051 let r2_rollup = HollowRollup {
2052 key: PartialRollupKey("bar".to_owned()),
2053 encoded_size_bytes: Some(2),
2054 };
2055 let r2 = StateFieldDiff {
2056 key: SeqNo(2),
2057 val: StateFieldValDiff::Insert(r2_rollup.clone()),
2058 };
2059 let r3_rollup = HollowRollup {
2060 key: PartialRollupKey("baz".to_owned()),
2061 encoded_size_bytes: None,
2062 };
2063 let r3 = StateFieldDiff {
2064 key: SeqNo(3),
2065 val: StateFieldValDiff::Delete(r3_rollup.clone()),
2066 };
2067 let mut diff = StateDiff::<u64>::new(
2068 DUMMY_BUILD_INFO.semver_version(),
2069 SeqNo(4),
2070 SeqNo(5),
2071 0,
2072 PartialRollupKey("ignored".to_owned()),
2073 );
2074 diff.rollups.push(r2.clone());
2075 diff.rollups.push(r3.clone());
2076 let mut diff_proto = diff.into_proto();
2077
2078 let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2079 let mut field_diffs_writer = field_diffs.into_writer();
2080
2081 field_diffs_into_proto(
2083 ProtoStateField::DeprecatedRollups,
2084 &[StateFieldDiff {
2085 key: r1.key,
2086 val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2087 }],
2088 &mut field_diffs_writer,
2089 );
2090
2091 assert_none!(diff_proto.field_diffs);
2092 diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2093
2094 let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2095 assert_eq!(
2096 diff.rollups.into_iter().collect::<Vec<_>>(),
2097 vec![r2, r3, r1]
2098 );
2099
2100 let shard_id = ShardId::new();
2103 let mut state = TypedState::<(), (), u64, i64>::new(
2104 DUMMY_BUILD_INFO.semver_version(),
2105 shard_id,
2106 "host".to_owned(),
2107 0,
2108 );
2109 state.state.seqno = SeqNo(4);
2110 let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2111 rollup
2112 .deprecated_rollups
2113 .insert(3, r3_rollup.key.into_proto());
2114 let state: Rollup<u64> = rollup.into_rust().unwrap();
2115 let state = state.state;
2116 let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2117 let cache = new_test_client_cache(&dyncfgs);
2118 let encoded_diff = VersionedData {
2119 seqno: SeqNo(5),
2120 data: diff_proto.encode_to_vec().into(),
2121 };
2122 state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2123 assert_eq!(
2124 state
2125 .state
2126 .collections
2127 .rollups
2128 .into_iter()
2129 .collect::<Vec<_>>(),
2130 vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2131 );
2132 }
2133
2134 #[mz_ore::test]
2135 #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2137 fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2138 let before = UntypedState {
2139 key_codec: <() as Codec>::codec_name(),
2140 val_codec: <() as Codec>::codec_name(),
2141 ts_codec: <T as Codec64>::codec_name(),
2142 diff_codec: <i64 as Codec64>::codec_name(),
2143 state,
2144 };
2145 let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2146 let after: Rollup<T> = proto.into_rust().unwrap();
2147 let after = after.state;
2148 assert_eq!(before, after);
2149 }
2150
2151 proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2152 }
2153
2154 #[mz_ore::test]
2155 fn check_data_versions_with_self_managed_versions() {
2156 #[track_caller]
2157 fn testcase(
2158 code: &str,
2159 data: &str,
2160 self_managed_versions: &[Version],
2161 expected: Result<(), ()>,
2162 ) {
2163 let code = Version::parse(code).unwrap();
2164 let data = Version::parse(data).unwrap();
2165 let actual = cfg::check_data_version_with_self_managed_versions(
2166 &code,
2167 &data,
2168 self_managed_versions,
2169 )
2170 .map_err(|_| ());
2171 assert_eq!(actual, expected);
2172 }
2173
2174 let none = [];
2175 let one = [Version::new(0, 130, 0)];
2176 let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2177 let three = [
2178 Version::new(0, 130, 0),
2179 Version::new(0, 140, 0),
2180 Version::new(0, 150, 0),
2181 ];
2182
2183 testcase("0.130.0", "0.128.0", &none, Ok(()));
2184 testcase("0.130.0", "0.129.0", &none, Ok(()));
2185 testcase("0.130.0", "0.130.0", &none, Ok(()));
2186 testcase("0.130.0", "0.130.1", &none, Ok(()));
2187 testcase("0.130.1", "0.130.0", &none, Ok(()));
2188 testcase("0.130.0", "0.131.0", &none, Ok(()));
2189 testcase("0.130.0", "0.132.0", &none, Err(()));
2190
2191 testcase("0.129.0", "0.127.0", &none, Ok(()));
2192 testcase("0.129.0", "0.128.0", &none, Ok(()));
2193 testcase("0.129.0", "0.129.0", &none, Ok(()));
2194 testcase("0.129.0", "0.129.1", &none, Ok(()));
2195 testcase("0.129.1", "0.129.0", &none, Ok(()));
2196 testcase("0.129.0", "0.130.0", &none, Ok(()));
2197 testcase("0.129.0", "0.131.0", &none, Err(()));
2198
2199 testcase("0.130.0", "0.128.0", &one, Ok(()));
2200 testcase("0.130.0", "0.129.0", &one, Ok(()));
2201 testcase("0.130.0", "0.130.0", &one, Ok(()));
2202 testcase("0.130.0", "0.130.1", &one, Ok(()));
2203 testcase("0.130.1", "0.130.0", &one, Ok(()));
2204 testcase("0.130.0", "0.131.0", &one, Ok(()));
2205 testcase("0.130.0", "0.132.0", &one, Ok(()));
2206
2207 testcase("0.129.0", "0.127.0", &one, Ok(()));
2208 testcase("0.129.0", "0.128.0", &one, Ok(()));
2209 testcase("0.129.0", "0.129.0", &one, Ok(()));
2210 testcase("0.129.0", "0.129.1", &one, Ok(()));
2211 testcase("0.129.1", "0.129.0", &one, Ok(()));
2212 testcase("0.129.0", "0.130.0", &one, Ok(()));
2213 testcase("0.129.0", "0.131.0", &one, Err(()));
2214
2215 testcase("0.131.0", "0.129.0", &one, Ok(()));
2216 testcase("0.131.0", "0.130.0", &one, Ok(()));
2217 testcase("0.131.0", "0.131.0", &one, Ok(()));
2218 testcase("0.131.0", "0.131.1", &one, Ok(()));
2219 testcase("0.131.1", "0.131.0", &one, Ok(()));
2220 testcase("0.131.0", "0.132.0", &one, Ok(()));
2221 testcase("0.131.0", "0.133.0", &one, Err(()));
2222
2223 testcase("0.130.0", "0.128.0", &two, Ok(()));
2224 testcase("0.130.0", "0.129.0", &two, Ok(()));
2225 testcase("0.130.0", "0.130.0", &two, Ok(()));
2226 testcase("0.130.0", "0.130.1", &two, Ok(()));
2227 testcase("0.130.1", "0.130.0", &two, Ok(()));
2228 testcase("0.130.0", "0.131.0", &two, Ok(()));
2229 testcase("0.130.0", "0.132.0", &two, Ok(()));
2230 testcase("0.130.0", "0.135.0", &two, Ok(()));
2231 testcase("0.130.0", "0.138.0", &two, Ok(()));
2232 testcase("0.130.0", "0.139.0", &two, Ok(()));
2233 testcase("0.130.0", "0.140.0", &two, Ok(()));
2234 testcase("0.130.9", "0.140.0", &two, Ok(()));
2235 testcase("0.130.0", "0.140.1", &two, Ok(()));
2236 testcase("0.130.3", "0.140.1", &two, Ok(()));
2237 testcase("0.130.3", "0.140.9", &two, Ok(()));
2238 testcase("0.130.0", "0.141.0", &two, Err(()));
2239 testcase("0.129.0", "0.133.0", &two, Err(()));
2240 testcase("0.129.0", "0.140.0", &two, Err(()));
2241 testcase("0.131.0", "0.133.0", &two, Err(()));
2242 testcase("0.131.0", "0.140.0", &two, Err(()));
2243
2244 testcase("0.130.0", "0.128.0", &three, Ok(()));
2245 testcase("0.130.0", "0.129.0", &three, Ok(()));
2246 testcase("0.130.0", "0.130.0", &three, Ok(()));
2247 testcase("0.130.0", "0.130.1", &three, Ok(()));
2248 testcase("0.130.1", "0.130.0", &three, Ok(()));
2249 testcase("0.130.0", "0.131.0", &three, Ok(()));
2250 testcase("0.130.0", "0.132.0", &three, Ok(()));
2251 testcase("0.130.0", "0.135.0", &three, Ok(()));
2252 testcase("0.130.0", "0.138.0", &three, Ok(()));
2253 testcase("0.130.0", "0.139.0", &three, Ok(()));
2254 testcase("0.130.0", "0.140.0", &three, Ok(()));
2255 testcase("0.130.9", "0.140.0", &three, Ok(()));
2256 testcase("0.130.0", "0.140.1", &three, Ok(()));
2257 testcase("0.130.3", "0.140.1", &three, Ok(()));
2258 testcase("0.130.3", "0.140.9", &three, Ok(()));
2259 testcase("0.130.0", "0.141.0", &three, Err(()));
2260 testcase("0.129.0", "0.133.0", &three, Err(()));
2261 testcase("0.129.0", "0.140.0", &three, Err(()));
2262 testcase("0.131.0", "0.133.0", &three, Err(()));
2263 testcase("0.131.0", "0.140.0", &three, Err(()));
2264 testcase("0.130.0", "0.150.0", &three, Err(()));
2265
2266 testcase("0.140.0", "0.138.0", &three, Ok(()));
2267 testcase("0.140.0", "0.139.0", &three, Ok(()));
2268 testcase("0.140.0", "0.140.0", &three, Ok(()));
2269 testcase("0.140.0", "0.140.1", &three, Ok(()));
2270 testcase("0.140.1", "0.140.0", &three, Ok(()));
2271 testcase("0.140.0", "0.141.0", &three, Ok(()));
2272 testcase("0.140.0", "0.142.0", &three, Ok(()));
2273 testcase("0.140.0", "0.145.0", &three, Ok(()));
2274 testcase("0.140.0", "0.148.0", &three, Ok(()));
2275 testcase("0.140.0", "0.149.0", &three, Ok(()));
2276 testcase("0.140.0", "0.150.0", &three, Ok(()));
2277 testcase("0.140.9", "0.150.0", &three, Ok(()));
2278 testcase("0.140.0", "0.150.1", &three, Ok(()));
2279 testcase("0.140.3", "0.150.1", &three, Ok(()));
2280 testcase("0.140.3", "0.150.9", &three, Ok(()));
2281 testcase("0.140.0", "0.151.0", &three, Err(()));
2282 testcase("0.139.0", "0.143.0", &three, Err(()));
2283 testcase("0.139.0", "0.150.0", &three, Err(()));
2284 testcase("0.141.0", "0.143.0", &three, Err(()));
2285 testcase("0.141.0", "0.150.0", &three, Err(()));
2286
2287 testcase("0.150.0", "0.148.0", &three, Ok(()));
2288 testcase("0.150.0", "0.149.0", &three, Ok(()));
2289 testcase("0.150.0", "0.150.0", &three, Ok(()));
2290 testcase("0.150.0", "0.150.1", &three, Ok(()));
2291 testcase("0.150.1", "0.150.0", &three, Ok(()));
2292 testcase("0.150.0", "0.151.0", &three, Ok(()));
2293 testcase("0.150.0", "0.152.0", &three, Ok(()));
2294 testcase("0.150.0", "0.155.0", &three, Ok(()));
2295 testcase("0.150.0", "0.158.0", &three, Ok(()));
2296 testcase("0.150.0", "0.159.0", &three, Ok(()));
2297 testcase("0.150.0", "0.160.0", &three, Ok(()));
2298 testcase("0.150.9", "0.160.0", &three, Ok(()));
2299 testcase("0.150.0", "0.160.1", &three, Ok(()));
2300 testcase("0.150.3", "0.160.1", &three, Ok(()));
2301 testcase("0.150.3", "0.160.9", &three, Ok(()));
2302 testcase("0.150.0", "0.161.0", &three, Ok(()));
2303 testcase("0.149.0", "0.153.0", &three, Err(()));
2304 testcase("0.149.0", "0.160.0", &three, Err(()));
2305 testcase("0.151.0", "0.153.0", &three, Err(()));
2306 testcase("0.151.0", "0.160.0", &three, Err(()));
2307 }
2308
2309 #[mz_ore::test]
2310 fn check_data_versions() {
2311 #[track_caller]
2312 fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2313 let code = Version::parse(code).unwrap();
2314 let data = Version::parse(data).unwrap();
2315 #[allow(clippy::disallowed_methods)]
2316 let actual =
2317 std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2318 assert_eq!(actual, expected);
2319 }
2320
2321 testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2322 testcase("0.10.0-dev", "0.10.0", Ok(()));
2323 testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2326 testcase("0.10.0-dev", "0.11.0", Ok(()));
2327 testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2328 testcase("0.10.0-dev", "0.12.0", Err(()));
2329 testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2330
2331 testcase("0.10.0", "0.8.0-dev", Ok(()));
2332 testcase("0.10.0", "0.8.0", Ok(()));
2333 testcase("0.10.0", "0.9.0-dev", Ok(()));
2334 testcase("0.10.0", "0.9.0", Ok(()));
2335 testcase("0.10.0", "0.10.0-dev", Ok(()));
2336 testcase("0.10.0", "0.10.0", Ok(()));
2337 testcase("0.10.0", "0.11.0-dev", Ok(()));
2340 testcase("0.10.0", "0.11.0", Ok(()));
2341 testcase("0.10.0", "0.11.1", Ok(()));
2342 testcase("0.10.0", "0.11.1000000", Ok(()));
2343 testcase("0.10.0", "0.12.0-dev", Err(()));
2344 testcase("0.10.0", "0.12.0", Err(()));
2345 testcase("0.10.0", "0.13.0-dev", Err(()));
2346
2347 testcase("0.10.1", "0.9.0", Ok(()));
2348 testcase("0.10.1", "0.10.0", Ok(()));
2349 testcase("0.10.1", "0.11.0", Ok(()));
2350 testcase("0.10.1", "0.11.1", Ok(()));
2351 testcase("0.10.1", "0.11.100", Ok(()));
2352
2353 testcase("0.10.0", "0.10.1", Ok(()));
2359 }
2360}