1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44 ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45 HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46 LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47 ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48 ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49 ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50 ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51 ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52 ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53 RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54 proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57 ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60 ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68 pub id: Option<SchemaId>,
73 pub key: Arc<K::Schema>,
75 pub val: Arc<V::Schema>,
77}
78
79impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
80 fn clone(&self) -> Self {
81 Self {
82 id: self.id,
83 key: Arc::clone(&self.key),
84 val: Arc::clone(&self.val),
85 }
86 }
87}
88
89#[derive(Clone, Serialize, Deserialize)]
112pub struct LazyProto<T> {
113 buf: Bytes,
114 _phantom: PhantomData<fn() -> T>,
115}
116
117impl<T: Message + Default> Debug for LazyProto<T> {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 match self.decode() {
120 Ok(proto) => Debug::fmt(&proto, f),
121 Err(err) => f
122 .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
123 .field("err", &err)
124 .finish(),
125 }
126 }
127}
128
129impl<T> PartialEq for LazyProto<T> {
130 fn eq(&self, other: &Self) -> bool {
131 self.cmp(other) == Ordering::Equal
132 }
133}
134
135impl<T> Eq for LazyProto<T> {}
136
137impl<T> PartialOrd for LazyProto<T> {
138 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139 Some(self.cmp(other))
140 }
141}
142
143impl<T> Ord for LazyProto<T> {
144 fn cmp(&self, other: &Self) -> Ordering {
145 let LazyProto {
146 buf: self_buf,
147 _phantom: _,
148 } = self;
149 let LazyProto {
150 buf: other_buf,
151 _phantom: _,
152 } = other;
153 self_buf.cmp(other_buf)
154 }
155}
156
157impl<T> Hash for LazyProto<T> {
158 fn hash<H: Hasher>(&self, state: &mut H) {
159 let LazyProto { buf, _phantom } = self;
160 buf.hash(state);
161 }
162}
163
164impl<T: Message + Default> From<&T> for LazyProto<T> {
165 fn from(value: &T) -> Self {
166 let buf = Bytes::from(value.encode_to_vec());
167 LazyProto {
168 buf,
169 _phantom: PhantomData,
170 }
171 }
172}
173
174impl<T: Message + Default> LazyProto<T> {
175 pub fn decode(&self) -> Result<T, prost::DecodeError> {
176 T::decode(&*self.buf)
177 }
178
179 pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
180 Ok(T::decode(&*self.buf)?.into_rust()?)
181 }
182}
183
184impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
185 fn into_proto(&self) -> Bytes {
186 self.buf.clone()
187 }
188
189 fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
190 Ok(Self {
191 buf,
192 _phantom: PhantomData,
193 })
194 }
195}
196
197pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
198 let uuid_encoded = match encoded.strip_prefix(id_prefix) {
199 Some(x) => x,
200 None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
201 };
202 let uuid = Uuid::parse_str(uuid_encoded)
203 .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
204 Ok(*uuid.as_bytes())
205}
206
207pub(crate) fn check_data_version(code_version: &Version, data_version: &Version) {
208 if let Err(msg) = cfg::check_data_version(code_version, data_version) {
209 if cfg!(test) {
212 panic!("{msg}");
213 } else {
214 halt!("{msg}");
215 }
216 }
217}
218
219impl RustType<String> for LeasedReaderId {
220 fn into_proto(&self) -> String {
221 self.to_string()
222 }
223
224 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
225 match proto.parse() {
226 Ok(x) => Ok(x),
227 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
228 }
229 }
230}
231
232impl RustType<String> for CriticalReaderId {
233 fn into_proto(&self) -> String {
234 self.to_string()
235 }
236
237 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
238 match proto.parse() {
239 Ok(x) => Ok(x),
240 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
241 }
242 }
243}
244
245impl RustType<String> for WriterId {
246 fn into_proto(&self) -> String {
247 self.to_string()
248 }
249
250 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
251 match proto.parse() {
252 Ok(x) => Ok(x),
253 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
254 }
255 }
256}
257
258impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
259 fn into_proto(&self) -> ProtoEncodedSchemas {
260 ProtoEncodedSchemas {
261 key: self.key.clone(),
262 key_data_type: self.key_data_type.clone(),
263 val: self.val.clone(),
264 val_data_type: self.val_data_type.clone(),
265 }
266 }
267
268 fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
269 Ok(EncodedSchemas {
270 key: proto.key,
271 key_data_type: proto.key_data_type,
272 val: proto.val,
273 val_data_type: proto.val_data_type,
274 })
275 }
276}
277
278impl RustType<String> for IdempotencyToken {
279 fn into_proto(&self) -> String {
280 self.to_string()
281 }
282
283 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
284 match proto.parse() {
285 Ok(x) => Ok(x),
286 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
287 }
288 }
289}
290
291impl RustType<String> for PartialBatchKey {
292 fn into_proto(&self) -> String {
293 self.0.clone()
294 }
295
296 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
297 Ok(PartialBatchKey(proto))
298 }
299}
300
301impl RustType<String> for PartialRollupKey {
302 fn into_proto(&self) -> String {
303 self.0.clone()
304 }
305
306 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
307 Ok(PartialRollupKey(proto))
308 }
309}
310
311impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
312 pub fn encode<B>(&self, buf: &mut B)
313 where
314 B: bytes::BufMut,
315 {
316 self.into_proto()
317 .encode(buf)
318 .expect("no required fields means no initialization errors");
319 }
320
321 pub fn decode(build_version: &Version, buf: Bytes) -> Self {
322 let proto = ProtoStateDiff::decode(buf)
323 .expect("internal error: invalid encoded state");
328 let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
329 check_data_version(build_version, &diff.applier_version);
330 diff
331 }
332}
333
334impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
335 fn into_proto(&self) -> ProtoStateDiff {
336 let StateDiff {
338 applier_version,
339 seqno_from,
340 seqno_to,
341 walltime_ms,
342 latest_rollup_key,
343 rollups,
344 active_rollup,
345 active_gc,
346 hostname,
347 last_gc_req,
348 leased_readers,
349 critical_readers,
350 writers,
351 schemas,
352 since,
353 legacy_batches,
354 hollow_batches,
355 spine_batches,
356 merges,
357 } = self;
358
359 let proto = ProtoStateFieldDiffs::default();
360
361 let mut writer = proto.into_writer();
363
364 field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
365 field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
366 field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
367 field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
368 field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
369 field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
370 field_diffs_into_proto(
371 ProtoStateField::CriticalReaders,
372 critical_readers,
373 &mut writer,
374 );
375 field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
376 field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
377 field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
378 field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
379 field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
380 field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
381 field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
382
383 let field_diffs = writer.into_proto();
385
386 debug_assert_eq!(field_diffs.validate(), Ok(()));
387 ProtoStateDiff {
388 applier_version: applier_version.to_string(),
389 seqno_from: seqno_from.into_proto(),
390 seqno_to: seqno_to.into_proto(),
391 walltime_ms: walltime_ms.into_proto(),
392 latest_rollup_key: latest_rollup_key.into_proto(),
393 field_diffs: Some(field_diffs),
394 }
395 }
396
397 fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
398 let applier_version = if proto.applier_version.is_empty() {
399 semver::Version::new(0, 0, 0)
403 } else {
404 semver::Version::parse(&proto.applier_version).map_err(|err| {
405 TryFromProtoError::InvalidSemverVersion(format!(
406 "invalid applier_version {}: {}",
407 proto.applier_version, err
408 ))
409 })?
410 };
411 let mut state_diff = StateDiff::new(
412 applier_version,
413 proto.seqno_from.into_rust()?,
414 proto.seqno_to.into_rust()?,
415 proto.walltime_ms,
416 proto.latest_rollup_key.into_rust()?,
417 );
418 if let Some(field_diffs) = proto.field_diffs {
419 debug_assert_eq!(field_diffs.validate(), Ok(()));
420 for field_diff in field_diffs.iter() {
421 let (field, diff) = field_diff?;
422 match field {
423 ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
424 diff,
425 &mut state_diff.hostname,
426 |()| Ok(()),
427 |v| v.into_rust(),
428 )?,
429 ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
430 diff,
431 &mut state_diff.last_gc_req,
432 |()| Ok(()),
433 |v| v.into_rust(),
434 )?,
435 ProtoStateField::ActiveGc => {
436 field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
437 diff,
438 &mut state_diff.active_gc,
439 |()| Ok(()),
440 |v| v.into_rust(),
441 )?
442 }
443 ProtoStateField::ActiveRollup => {
444 field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
445 diff,
446 &mut state_diff.active_rollup,
447 |()| Ok(()),
448 |v| v.into_rust(),
449 )?
450 }
451 ProtoStateField::Rollups => {
452 field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
453 diff,
454 &mut state_diff.rollups,
455 |k| k.into_rust(),
456 |v| v.into_rust(),
457 )?
458 }
459 ProtoStateField::DeprecatedRollups => {
463 field_diff_into_rust::<u64, String, _, _, _, _>(
464 diff,
465 &mut state_diff.rollups,
466 |k| k.into_rust(),
467 |v| {
468 Ok(HollowRollup {
469 key: v.into_rust()?,
470 encoded_size_bytes: None,
471 })
472 },
473 )?
474 }
475 ProtoStateField::LeasedReaders => {
476 field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
477 diff,
478 &mut state_diff.leased_readers,
479 |k| k.into_rust(),
480 |v| v.into_rust(),
481 )?
482 }
483 ProtoStateField::CriticalReaders => {
484 field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
485 diff,
486 &mut state_diff.critical_readers,
487 |k| k.into_rust(),
488 |v| v.into_rust(),
489 )?
490 }
491 ProtoStateField::Writers => {
492 field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
493 diff,
494 &mut state_diff.writers,
495 |k| k.into_rust(),
496 |v| v.into_rust(),
497 )?
498 }
499 ProtoStateField::Schemas => {
500 field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
501 diff,
502 &mut state_diff.schemas,
503 |k| k.into_rust(),
504 |v| v.into_rust(),
505 )?
506 }
507 ProtoStateField::Since => {
508 field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
509 diff,
510 &mut state_diff.since,
511 |()| Ok(()),
512 |v| v.into_rust(),
513 )?
514 }
515 ProtoStateField::LegacyBatches => {
516 field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
517 diff,
518 &mut state_diff.legacy_batches,
519 |k| k.into_rust(),
520 |()| Ok(()),
521 )?
522 }
523 ProtoStateField::HollowBatches => {
524 field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
525 diff,
526 &mut state_diff.hollow_batches,
527 |k| k.into_rust(),
528 |v| v.into_rust(),
529 )?
530 }
531 ProtoStateField::SpineBatches => {
532 field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
533 diff,
534 &mut state_diff.spine_batches,
535 |k| k.into_rust(),
536 |v| v.into_rust(),
537 )?
538 }
539 ProtoStateField::SpineMerges => {
540 field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
541 diff,
542 &mut state_diff.merges,
543 |k| k.into_rust(),
544 |v| v.into_rust(),
545 )?
546 }
547 }
548 }
549 }
550 Ok(state_diff)
551 }
552}
553
554fn field_diffs_into_proto<K, KP, V, VP>(
555 field: ProtoStateField,
556 diffs: &[StateFieldDiff<K, V>],
557 writer: &mut ProtoStateFieldDiffsWriter,
558) where
559 KP: prost::Message,
560 K: RustType<KP>,
561 VP: prost::Message,
562 V: RustType<VP>,
563{
564 for diff in diffs.iter() {
565 field_diff_into_proto(field, diff, writer);
566 }
567}
568
569fn field_diff_into_proto<K, KP, V, VP>(
570 field: ProtoStateField,
571 diff: &StateFieldDiff<K, V>,
572 writer: &mut ProtoStateFieldDiffsWriter,
573) where
574 KP: prost::Message,
575 K: RustType<KP>,
576 VP: prost::Message,
577 V: RustType<VP>,
578{
579 writer.push_field(field);
580 writer.encode_proto(&diff.key.into_proto());
581 match &diff.val {
582 StateFieldValDiff::Insert(to) => {
583 writer.push_diff_type(ProtoStateFieldDiffType::Insert);
584 writer.encode_proto(&to.into_proto());
585 }
586 StateFieldValDiff::Update(from, to) => {
587 writer.push_diff_type(ProtoStateFieldDiffType::Update);
588 writer.encode_proto(&from.into_proto());
589 writer.encode_proto(&to.into_proto());
590 }
591 StateFieldValDiff::Delete(from) => {
592 writer.push_diff_type(ProtoStateFieldDiffType::Delete);
593 writer.encode_proto(&from.into_proto());
594 }
595 };
596}
597
598fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
599 proto: ProtoStateFieldDiff<'_>,
600 diffs: &mut Vec<StateFieldDiff<K, V>>,
601 k_fn: KFn,
602 v_fn: VFn,
603) -> Result<(), TryFromProtoError>
604where
605 KP: prost::Message + Default,
606 VP: prost::Message + Default,
607 KFn: Fn(KP) -> Result<K, TryFromProtoError>,
608 VFn: Fn(VP) -> Result<V, TryFromProtoError>,
609{
610 let val = match proto.diff_type {
611 ProtoStateFieldDiffType::Insert => {
612 let to = VP::decode(proto.to)
613 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
614 StateFieldValDiff::Insert(v_fn(to)?)
615 }
616 ProtoStateFieldDiffType::Update => {
617 let from = VP::decode(proto.from)
618 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
619 let to = VP::decode(proto.to)
620 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
621
622 StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
623 }
624 ProtoStateFieldDiffType::Delete => {
625 let from = VP::decode(proto.from)
626 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
627 StateFieldValDiff::Delete(v_fn(from)?)
628 }
629 };
630 let key = KP::decode(proto.key)
631 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
632 diffs.push(StateFieldDiff {
633 key: k_fn(key)?,
634 val,
635 });
636 Ok(())
637}
638
639#[derive(Debug)]
642#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
643pub struct UntypedState<T> {
644 pub(crate) key_codec: String,
645 pub(crate) val_codec: String,
646 pub(crate) ts_codec: String,
647 pub(crate) diff_codec: String,
648
649 state: State<T>,
653}
654
655impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
656 pub fn seqno(&self) -> SeqNo {
657 self.state.seqno
658 }
659
660 pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
661 &self.state.collections.rollups
662 }
663
664 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
665 self.state.latest_rollup()
666 }
667
668 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
669 &mut self,
670 cfg: &PersistConfig,
671 metrics: &Metrics,
672 diffs: I,
673 ) {
674 if T::codec_name() != self.ts_codec {
678 return;
679 }
680 self.state.apply_encoded_diffs(cfg, metrics, diffs);
681 }
682
683 pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
684 self,
685 shard_id: &ShardId,
686 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
687 assert_eq!(shard_id, &self.state.shard_id);
690 if K::codec_name() != self.key_codec
691 || V::codec_name() != self.val_codec
692 || T::codec_name() != self.ts_codec
693 || D::codec_name() != self.diff_codec
694 {
695 return Err(Box::new(CodecMismatch {
696 requested: (
697 K::codec_name(),
698 V::codec_name(),
699 T::codec_name(),
700 D::codec_name(),
701 None,
702 ),
703 actual: (
704 self.key_codec,
705 self.val_codec,
706 self.ts_codec,
707 self.diff_codec,
708 None,
709 ),
710 }));
711 }
712 Ok(TypedState {
713 state: self.state,
714 _phantom: PhantomData,
715 })
716 }
717
718 pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
719 assert_eq!(shard_id, &self.state.shard_id);
722 if T::codec_name() != self.ts_codec {
723 return Err(CodecMismatchT {
724 requested: T::codec_name(),
725 actual: self.ts_codec,
726 });
727 }
728 Ok(self.state)
729 }
730
731 pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
732 let proto = ProtoRollup::decode(buf)
733 .expect("internal error: invalid encoded state");
738 let state = Rollup::from_proto(proto)
739 .expect("internal error: invalid encoded state")
740 .state;
741 check_data_version(build_version, &state.state.applier_version);
742 state
743 }
744}
745
746impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
747where
748 K: Codec,
749 V: Codec,
750 T: Codec64,
751 D: Codec64,
752{
753 fn from(typed_state: TypedState<K, V, T, D>) -> Self {
754 UntypedState {
755 key_codec: K::codec_name(),
756 val_codec: V::codec_name(),
757 ts_codec: T::codec_name(),
758 diff_codec: D::codec_name(),
759 state: typed_state.state,
760 }
761 }
762}
763
764#[derive(Debug)]
771pub struct Rollup<T> {
772 pub(crate) state: UntypedState<T>,
773 pub(crate) diffs: Option<InlinedDiffs>,
774}
775
776impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
777 pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
781 let latest_rollup_seqno = *state.latest_rollup().0;
782 let mut verify_seqno = latest_rollup_seqno;
783 for diff in &diffs {
784 assert_eq!(verify_seqno.next(), diff.seqno);
785 verify_seqno = diff.seqno;
786 }
787 assert_eq!(verify_seqno, state.seqno());
788
789 let diffs = Some(InlinedDiffs::from(
790 latest_rollup_seqno.next(),
791 state.seqno().next(),
792 diffs,
793 ));
794
795 Self { state, diffs }
796 }
797
798 pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
799 Self { state, diffs: None }
800 }
801
802 pub(crate) fn from_state_without_diffs(
803 state: State<T>,
804 key_codec: String,
805 val_codec: String,
806 ts_codec: String,
807 diff_codec: String,
808 ) -> Self {
809 Self::from_untyped_state_without_diffs(UntypedState {
810 key_codec,
811 val_codec,
812 ts_codec,
813 diff_codec,
814 state,
815 })
816 }
817}
818
819#[derive(Debug)]
820pub(crate) struct InlinedDiffs {
821 pub(crate) lower: SeqNo,
822 pub(crate) upper: SeqNo,
823 pub(crate) diffs: Vec<VersionedData>,
824}
825
826impl InlinedDiffs {
827 pub(crate) fn description(&self) -> Description<SeqNo> {
828 Description::new(
829 Antichain::from_elem(self.lower),
830 Antichain::from_elem(self.upper),
831 Antichain::from_elem(SeqNo::minimum()),
832 )
833 }
834
835 fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
836 for diff in &diffs {
837 assert!(diff.seqno >= lower);
838 assert!(diff.seqno < upper);
839 }
840 Self {
841 lower,
842 upper,
843 diffs,
844 }
845 }
846}
847
848impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
849 fn into_proto(&self) -> ProtoInlinedDiffs {
850 ProtoInlinedDiffs {
851 lower: self.lower.into_proto(),
852 upper: self.upper.into_proto(),
853 diffs: self.diffs.into_proto(),
854 }
855 }
856
857 fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
858 Ok(Self {
859 lower: proto.lower.into_rust()?,
860 upper: proto.upper.into_rust()?,
861 diffs: proto.diffs.into_rust()?,
862 })
863 }
864}
865
866impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
867 fn into_proto(&self) -> ProtoRollup {
868 ProtoRollup {
869 applier_version: self.state.state.applier_version.to_string(),
870 shard_id: self.state.state.shard_id.into_proto(),
871 seqno: self.state.state.seqno.into_proto(),
872 walltime_ms: self.state.state.walltime_ms.into_proto(),
873 hostname: self.state.state.hostname.into_proto(),
874 key_codec: self.state.key_codec.into_proto(),
875 val_codec: self.state.val_codec.into_proto(),
876 ts_codec: T::codec_name(),
877 diff_codec: self.state.diff_codec.into_proto(),
878 last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
879 active_rollup: self.state.state.collections.active_rollup.into_proto(),
880 active_gc: self.state.state.collections.active_gc.into_proto(),
881 rollups: self
882 .state
883 .state
884 .collections
885 .rollups
886 .iter()
887 .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
888 .collect(),
889 deprecated_rollups: Default::default(),
890 leased_readers: self
891 .state
892 .state
893 .collections
894 .leased_readers
895 .iter()
896 .map(|(id, state)| (id.into_proto(), state.into_proto()))
897 .collect(),
898 critical_readers: self
899 .state
900 .state
901 .collections
902 .critical_readers
903 .iter()
904 .map(|(id, state)| (id.into_proto(), state.into_proto()))
905 .collect(),
906 writers: self
907 .state
908 .state
909 .collections
910 .writers
911 .iter()
912 .map(|(id, state)| (id.into_proto(), state.into_proto()))
913 .collect(),
914 schemas: self
915 .state
916 .state
917 .collections
918 .schemas
919 .iter()
920 .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
921 .collect(),
922 trace: Some(self.state.state.collections.trace.into_proto()),
923 diffs: self.diffs.as_ref().map(|x| x.into_proto()),
924 }
925 }
926
927 fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
928 let applier_version = if x.applier_version.is_empty() {
929 semver::Version::new(0, 0, 0)
933 } else {
934 semver::Version::parse(&x.applier_version).map_err(|err| {
935 TryFromProtoError::InvalidSemverVersion(format!(
936 "invalid applier_version {}: {}",
937 x.applier_version, err
938 ))
939 })?
940 };
941
942 let mut rollups = BTreeMap::new();
943 for (seqno, rollup) in x.rollups {
944 rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
945 }
946 for (seqno, key) in x.deprecated_rollups {
947 rollups.insert(
948 seqno.into_rust()?,
949 HollowRollup {
950 key: key.into_rust()?,
951 encoded_size_bytes: None,
952 },
953 );
954 }
955 let mut leased_readers = BTreeMap::new();
956 for (id, state) in x.leased_readers {
957 leased_readers.insert(id.into_rust()?, state.into_rust()?);
958 }
959 let mut critical_readers = BTreeMap::new();
960 for (id, state) in x.critical_readers {
961 critical_readers.insert(id.into_rust()?, state.into_rust()?);
962 }
963 let mut writers = BTreeMap::new();
964 for (id, state) in x.writers {
965 writers.insert(id.into_rust()?, state.into_rust()?);
966 }
967 let mut schemas = BTreeMap::new();
968 for (id, x) in x.schemas {
969 schemas.insert(id.into_rust()?, x.into_rust()?);
970 }
971 let active_rollup = x
972 .active_rollup
973 .map(|rollup| rollup.into_rust())
974 .transpose()?;
975 let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
976 let collections = StateCollections {
977 rollups,
978 active_rollup,
979 active_gc,
980 last_gc_req: x.last_gc_req.into_rust()?,
981 leased_readers,
982 critical_readers,
983 writers,
984 schemas,
985 trace: x.trace.into_rust_if_some("trace")?,
986 };
987 let state = State {
988 applier_version,
989 shard_id: x.shard_id.into_rust()?,
990 seqno: x.seqno.into_rust()?,
991 walltime_ms: x.walltime_ms,
992 hostname: x.hostname,
993 collections,
994 };
995
996 let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
997 if let Some(diffs) = &diffs {
998 if diffs.lower != state.latest_rollup().0.next() {
999 return Err(TryFromProtoError::InvalidPersistState(format!(
1000 "diffs lower ({}) should match latest rollup's successor: ({})",
1001 diffs.lower,
1002 state.latest_rollup().0.next()
1003 )));
1004 }
1005 if diffs.upper != state.seqno.next() {
1006 return Err(TryFromProtoError::InvalidPersistState(format!(
1007 "diffs upper ({}) should match state's successor: ({})",
1008 diffs.lower,
1009 state.seqno.next()
1010 )));
1011 }
1012 }
1013
1014 Ok(Rollup {
1015 state: UntypedState {
1016 state,
1017 key_codec: x.key_codec.into_rust()?,
1018 val_codec: x.val_codec.into_rust()?,
1019 ts_codec: x.ts_codec.into_rust()?,
1020 diff_codec: x.diff_codec.into_rust()?,
1021 },
1022 diffs,
1023 })
1024 }
1025}
1026
1027impl RustType<ProtoVersionedData> for VersionedData {
1028 fn into_proto(&self) -> ProtoVersionedData {
1029 ProtoVersionedData {
1030 seqno: self.seqno.into_proto(),
1031 data: Bytes::clone(&self.data),
1032 }
1033 }
1034
1035 fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1036 Ok(Self {
1037 seqno: proto.seqno.into_rust()?,
1038 data: proto.data,
1039 })
1040 }
1041}
1042
1043impl RustType<ProtoSpineId> for SpineId {
1044 fn into_proto(&self) -> ProtoSpineId {
1045 ProtoSpineId {
1046 lo: self.0.into_proto(),
1047 hi: self.1.into_proto(),
1048 }
1049 }
1050
1051 fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1052 Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1053 }
1054}
1055
1056impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1057 fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1058 let (id, batch) = entry;
1059 ProtoIdHollowBatch {
1060 id: Some(id.into_proto()),
1061 batch: Some(batch.into_proto()),
1062 }
1063 }
1064
1065 fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1066 let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1067 let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1068 Ok((id, batch))
1069 }
1070}
1071
1072impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1073 fn into_proto(&self) -> ProtoSpineBatch {
1074 ProtoSpineBatch {
1075 desc: Some(self.desc.into_proto()),
1076 parts: self.parts.into_proto(),
1077 level: self.level.into_proto(),
1078 descs: self.descs.into_proto(),
1079 }
1080 }
1081
1082 fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1083 let level = proto.level.into_rust()?;
1084 let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1085 let parts = proto.parts.into_rust()?;
1086 let descs = proto.descs.into_rust()?;
1087 Ok(ThinSpineBatch {
1088 level,
1089 desc,
1090 parts,
1091 descs,
1092 })
1093 }
1094}
1095
1096impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1097 fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1098 let (id, batch) = entry;
1099 ProtoIdSpineBatch {
1100 id: Some(id.into_proto()),
1101 batch: Some(batch.into_proto()),
1102 }
1103 }
1104
1105 fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1106 let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1107 let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1108 Ok((id, batch))
1109 }
1110}
1111
1112impl RustType<ProtoCompaction> for ActiveCompaction {
1113 fn into_proto(&self) -> ProtoCompaction {
1114 ProtoCompaction {
1115 start_ms: self.start_ms,
1116 }
1117 }
1118
1119 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1120 Ok(Self {
1121 start_ms: proto.start_ms,
1122 })
1123 }
1124}
1125
1126impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1127 fn into_proto(&self) -> ProtoMerge {
1128 ProtoMerge {
1129 since: Some(self.since.into_proto()),
1130 remaining_work: self.remaining_work.into_proto(),
1131 active_compaction: self.active_compaction.into_proto(),
1132 }
1133 }
1134
1135 fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1136 let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1137 let remaining_work = proto.remaining_work.into_rust()?;
1138 let active_compaction = proto.active_compaction.into_rust()?;
1139 Ok(Self {
1140 since,
1141 remaining_work,
1142 active_compaction,
1143 })
1144 }
1145}
1146
1147impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1148 fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1149 ProtoIdMerge {
1150 id: Some(id.into_proto()),
1151 merge: Some(merge.into_proto()),
1152 }
1153 }
1154
1155 fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1156 let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1157 let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1158 Ok((id, merge))
1159 }
1160}
1161
1162impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1163 fn into_proto(&self) -> ProtoTrace {
1164 let since = self.since.into_proto();
1165 let legacy_batches = self
1166 .legacy_batches
1167 .iter()
1168 .map(|(b, _)| b.into_proto())
1169 .collect();
1170 let hollow_batches = self.hollow_batches.into_proto();
1171 let spine_batches = self.spine_batches.into_proto();
1172 let merges = self.merges.into_proto();
1173 ProtoTrace {
1174 since: Some(since),
1175 legacy_batches,
1176 hollow_batches,
1177 spine_batches,
1178 merges,
1179 }
1180 }
1181
1182 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1183 let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1184 let legacy_batches = proto
1185 .legacy_batches
1186 .into_iter()
1187 .map(|b| b.into_rust().map(|b| (b, ())))
1188 .collect::<Result<_, _>>()?;
1189 let hollow_batches = proto.hollow_batches.into_rust()?;
1190 let spine_batches = proto.spine_batches.into_rust()?;
1191 let merges = proto.merges.into_rust()?;
1192 Ok(FlatTrace {
1193 since,
1194 legacy_batches,
1195 hollow_batches,
1196 spine_batches,
1197 merges,
1198 })
1199 }
1200}
1201
1202impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1203 fn into_proto(&self) -> ProtoTrace {
1204 self.flatten().into_proto()
1205 }
1206
1207 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1208 Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1209 }
1210}
1211
1212impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1213 fn into_proto(&self) -> ProtoLeasedReaderState {
1214 ProtoLeasedReaderState {
1215 seqno: self.seqno.into_proto(),
1216 since: Some(self.since.into_proto()),
1217 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1218 lease_duration_ms: self.lease_duration_ms.into_proto(),
1219 debug: Some(self.debug.into_proto()),
1220 }
1221 }
1222
1223 fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1224 let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1225 if lease_duration_ms == 0 {
1230 lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1231 .expect("lease duration as millis should fit within u64");
1232 }
1233 let debug = proto.debug.unwrap_or_default().into_rust()?;
1236 Ok(LeasedReaderState {
1237 seqno: proto.seqno.into_rust()?,
1238 since: proto
1239 .since
1240 .into_rust_if_some("ProtoLeasedReaderState::since")?,
1241 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1242 lease_duration_ms,
1243 debug,
1244 })
1245 }
1246}
1247
1248impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1249 fn into_proto(&self) -> ProtoCriticalReaderState {
1250 ProtoCriticalReaderState {
1251 since: Some(self.since.into_proto()),
1252 opaque: i64::from_le_bytes(self.opaque.0),
1253 opaque_codec: self.opaque_codec.clone(),
1254 debug: Some(self.debug.into_proto()),
1255 }
1256 }
1257
1258 fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1259 let debug = proto.debug.unwrap_or_default().into_rust()?;
1262 Ok(CriticalReaderState {
1263 since: proto
1264 .since
1265 .into_rust_if_some("ProtoCriticalReaderState::since")?,
1266 opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1267 opaque_codec: proto.opaque_codec,
1268 debug,
1269 })
1270 }
1271}
1272
1273impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1274 fn into_proto(&self) -> ProtoWriterState {
1275 ProtoWriterState {
1276 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1277 lease_duration_ms: self.lease_duration_ms.into_proto(),
1278 most_recent_write_token: self.most_recent_write_token.into_proto(),
1279 most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1280 debug: Some(self.debug.into_proto()),
1281 }
1282 }
1283
1284 fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1285 let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1291 IdempotencyToken::SENTINEL
1292 } else {
1293 proto.most_recent_write_token.into_rust()?
1294 };
1295 let most_recent_write_upper = match proto.most_recent_write_upper {
1296 Some(x) => x.into_rust()?,
1297 None => Antichain::from_elem(T::minimum()),
1298 };
1299 let debug = proto.debug.unwrap_or_default().into_rust()?;
1302 Ok(WriterState {
1303 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1304 lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1305 most_recent_write_token,
1306 most_recent_write_upper,
1307 debug,
1308 })
1309 }
1310}
1311
1312impl RustType<ProtoHandleDebugState> for HandleDebugState {
1313 fn into_proto(&self) -> ProtoHandleDebugState {
1314 ProtoHandleDebugState {
1315 hostname: self.hostname.into_proto(),
1316 purpose: self.purpose.into_proto(),
1317 }
1318 }
1319
1320 fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1321 Ok(HandleDebugState {
1322 hostname: proto.hostname,
1323 purpose: proto.purpose,
1324 })
1325 }
1326}
1327
1328impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1329 fn into_proto(&self) -> ProtoHollowRun {
1330 ProtoHollowRun {
1331 parts: self.parts.into_proto(),
1332 }
1333 }
1334
1335 fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1336 Ok(HollowRun {
1337 parts: proto.parts.into_rust()?,
1338 })
1339 }
1340}
1341
1342impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1343 fn into_proto(&self) -> ProtoHollowBatch {
1344 let mut run_meta = self.run_meta.into_proto();
1345 let run_meta_default = RunMeta::default().into_proto();
1347 while run_meta.last() == Some(&run_meta_default) {
1348 run_meta.pop();
1349 }
1350 ProtoHollowBatch {
1351 desc: Some(self.desc.into_proto()),
1352 parts: self.parts.into_proto(),
1353 len: self.len.into_proto(),
1354 runs: self.run_splits.into_proto(),
1355 run_meta,
1356 deprecated_keys: vec![],
1357 }
1358 }
1359
1360 fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1361 let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1362 parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1365 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1366 key: PartialBatchKey(key),
1367 encoded_size_bytes: 0,
1368 key_lower: vec![],
1369 structured_key_lower: None,
1370 stats: None,
1371 ts_rewrite: None,
1372 diffs_sum: None,
1373 format: None,
1374 schema_id: None,
1375 deprecated_schema_id: None,
1376 }))
1377 }));
1378 let run_splits: Vec<usize> = proto.runs.into_rust()?;
1380 let num_runs = if parts.is_empty() {
1381 0
1382 } else {
1383 run_splits.len() + 1
1384 };
1385 let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1386 run_meta.resize(num_runs, RunMeta::default());
1387 Ok(HollowBatch {
1388 desc: proto.desc.into_rust_if_some("desc")?,
1389 parts,
1390 len: proto.len.into_rust()?,
1391 run_splits,
1392 run_meta,
1393 })
1394 }
1395}
1396
1397impl RustType<String> for RunId {
1398 fn into_proto(&self) -> String {
1399 self.to_string()
1400 }
1401
1402 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1403 RunId::from_str(&proto).map_err(|_| {
1404 TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1405 })
1406 }
1407}
1408
1409impl RustType<ProtoRunMeta> for RunMeta {
1410 fn into_proto(&self) -> ProtoRunMeta {
1411 let order = match self.order {
1412 None => ProtoRunOrder::Unknown,
1413 Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1414 Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1415 Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1416 };
1417 ProtoRunMeta {
1418 order: order.into(),
1419 schema_id: self.schema.into_proto(),
1420 deprecated_schema_id: self.deprecated_schema.into_proto(),
1421 id: self.id.into_proto(),
1422 len: self.len.into_proto(),
1423 }
1424 }
1425
1426 fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1427 let order = match ProtoRunOrder::try_from(proto.order)? {
1428 ProtoRunOrder::Unknown => None,
1429 ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1430 ProtoRunOrder::Codec => Some(RunOrder::Codec),
1431 ProtoRunOrder::Structured => Some(RunOrder::Structured),
1432 };
1433 Ok(Self {
1434 order,
1435 schema: proto.schema_id.into_rust()?,
1436 deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1437 id: proto.id.into_rust()?,
1438 len: proto.len.into_rust()?,
1439 })
1440 }
1441}
1442
1443impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1444 fn into_proto(&self) -> ProtoHollowBatchPart {
1445 match self {
1446 RunPart::Single(part) => part.into_proto(),
1447 RunPart::Many(runs) => runs.into_proto(),
1448 }
1449 }
1450
1451 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1452 let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1453 RunPart::Many(proto.into_rust()?)
1454 } else {
1455 RunPart::Single(proto.into_rust()?)
1456 };
1457 Ok(run_part)
1458 }
1459}
1460
1461impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1462 fn into_proto(&self) -> ProtoHollowBatchPart {
1463 let part = ProtoHollowBatchPart {
1464 kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1465 key: self.key.into_proto(),
1466 max_part_bytes: self.max_part_bytes.into_proto(),
1467 })),
1468 encoded_size_bytes: self.hollow_bytes.into_proto(),
1469 key_lower: Bytes::copy_from_slice(&self.key_lower),
1470 diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1471 key_stats: None,
1472 ts_rewrite: None,
1473 format: None,
1474 schema_id: None,
1475 structured_key_lower: self.structured_key_lower.into_proto(),
1476 deprecated_schema_id: None,
1477 };
1478 part
1479 }
1480
1481 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1482 let run_proto = match proto.kind {
1483 Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1484 _ => Err(TryFromProtoError::UnknownEnumVariant(
1485 "ProtoHollowBatchPart::kind".to_string(),
1486 ))?,
1487 };
1488 Ok(Self {
1489 key: run_proto.key.into_rust()?,
1490 hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1491 max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1492 key_lower: proto.key_lower.to_vec(),
1493 structured_key_lower: proto.structured_key_lower.into_rust()?,
1494 diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1495 _phantom_data: Default::default(),
1496 })
1497 }
1498}
1499
1500impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1501 fn into_proto(&self) -> ProtoHollowBatchPart {
1502 match self {
1503 BatchPart::Hollow(x) => ProtoHollowBatchPart {
1504 kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1505 encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1506 key_lower: Bytes::copy_from_slice(&x.key_lower),
1507 structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1508 key_stats: x.stats.into_proto(),
1509 ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1510 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1511 format: x.format.map(|f| f.into_proto()),
1512 schema_id: x.schema_id.into_proto(),
1513 deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1514 },
1515 BatchPart::Inline {
1516 updates,
1517 ts_rewrite,
1518 schema_id,
1519 deprecated_schema_id,
1520 } => ProtoHollowBatchPart {
1521 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1522 encoded_size_bytes: 0,
1523 key_lower: Bytes::new(),
1524 structured_key_lower: None,
1525 key_stats: None,
1526 ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1527 diffs_sum: None,
1528 format: None,
1529 schema_id: schema_id.into_proto(),
1530 deprecated_schema_id: deprecated_schema_id.into_proto(),
1531 },
1532 }
1533 }
1534
1535 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1536 let ts_rewrite = match proto.ts_rewrite {
1537 Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1538 None => None,
1539 };
1540 let schema_id = proto.schema_id.into_rust()?;
1541 let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1542 match proto.kind {
1543 Some(proto_hollow_batch_part::Kind::Key(key)) => {
1544 Ok(BatchPart::Hollow(HollowBatchPart {
1545 key: key.into_rust()?,
1546 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1547 key_lower: proto.key_lower.into(),
1548 structured_key_lower: proto.structured_key_lower.into_rust()?,
1549 stats: proto.key_stats.into_rust()?,
1550 ts_rewrite,
1551 diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1552 format: proto.format.map(|f| f.into_rust()).transpose()?,
1553 schema_id,
1554 deprecated_schema_id,
1555 }))
1556 }
1557 Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1558 assert_eq!(proto.encoded_size_bytes, 0);
1559 assert_eq!(proto.key_lower.len(), 0);
1560 assert_none!(proto.key_stats);
1561 assert_none!(proto.diffs_sum);
1562 let updates = LazyInlineBatchPart(x.into_rust()?);
1563 Ok(BatchPart::Inline {
1564 updates,
1565 ts_rewrite,
1566 schema_id,
1567 deprecated_schema_id,
1568 })
1569 }
1570 _ => Err(TryFromProtoError::unknown_enum_variant(
1571 "ProtoHollowBatchPart::kind",
1572 )),
1573 }
1574 }
1575}
1576
1577impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1578 fn into_proto(&self) -> proto_hollow_batch_part::Format {
1579 match self {
1580 BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1581 BatchColumnarFormat::Both(version) => {
1582 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1583 }
1584 BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1585 }
1586 }
1587
1588 fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1589 let format = match proto {
1590 proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1591 proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1592 BatchColumnarFormat::Both(version.cast_into())
1593 }
1594 proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1595 };
1596 Ok(format)
1597 }
1598}
1599
1600#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1605pub struct LazyPartStats {
1606 key: LazyProto<ProtoStructStats>,
1607}
1608
1609impl Debug for LazyPartStats {
1610 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1611 f.debug_tuple("LazyPartStats")
1612 .field(&self.decode())
1613 .finish()
1614 }
1615}
1616
1617impl LazyPartStats {
1618 pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1619 let PartStats { key } = x;
1620 let mut proto_stats = ProtoStructStats::from_rust(key);
1621 map_proto(&mut proto_stats);
1622 LazyPartStats {
1623 key: LazyProto::from(&proto_stats),
1624 }
1625 }
1626 pub fn decode(&self) -> PartStats {
1631 let key = self.key.decode().expect("valid proto");
1632 PartStats {
1633 key: key.into_rust().expect("valid stats"),
1634 }
1635 }
1636}
1637
1638impl RustType<Bytes> for LazyPartStats {
1639 fn into_proto(&self) -> Bytes {
1640 let LazyPartStats { key } = self;
1641 key.into_proto()
1642 }
1643
1644 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1645 Ok(LazyPartStats {
1646 key: proto.into_rust()?,
1647 })
1648 }
1649}
1650
1651#[cfg(test)]
1652pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1653 proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1654}
1655
1656#[allow(unused_parens)]
1657impl Arbitrary for LazyPartStats {
1658 type Parameters = ();
1659 type Strategy =
1660 proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1661
1662 fn arbitrary_with(_: ()) -> Self::Strategy {
1663 Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1664 LazyPartStats::encode(&x, |_| {})
1665 })
1666 }
1667}
1668
1669impl ProtoInlineBatchPart {
1670 pub(crate) fn into_rust<T: Timestamp + Codec64>(
1671 lgbytes: &ColumnarMetrics,
1672 proto: Self,
1673 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1674 let updates = proto
1675 .updates
1676 .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1677 let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1678
1679 Ok(BlobTraceBatchPart {
1680 desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1681 index: proto.index.into_rust()?,
1682 updates,
1683 })
1684 }
1685}
1686
1687#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1689pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1690
1691impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1692 fn from(value: &ProtoInlineBatchPart) -> Self {
1693 LazyInlineBatchPart(value.into())
1694 }
1695}
1696
1697impl Serialize for LazyInlineBatchPart {
1698 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1699 let proto = self.0.decode().expect("valid proto");
1702 let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1703 let () = s.serialize_field("desc", &proto.desc)?;
1704 let () = s.serialize_field("index", &proto.index)?;
1705 let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1706 s.end()
1707 }
1708}
1709
1710impl LazyInlineBatchPart {
1711 pub(crate) fn encoded_size_bytes(&self) -> usize {
1712 self.0.buf.len()
1713 }
1714
1715 pub fn decode<T: Timestamp + Codec64>(
1721 &self,
1722 lgbytes: &ColumnarMetrics,
1723 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1724 let proto = self.0.decode().expect("valid proto");
1725 ProtoInlineBatchPart::into_rust(lgbytes, proto)
1726 }
1727}
1728
1729impl RustType<Bytes> for LazyInlineBatchPart {
1730 fn into_proto(&self) -> Bytes {
1731 self.0.into_proto()
1732 }
1733
1734 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1735 Ok(LazyInlineBatchPart(proto.into_rust()?))
1736 }
1737}
1738
1739impl RustType<ProtoHollowRollup> for HollowRollup {
1740 fn into_proto(&self) -> ProtoHollowRollup {
1741 ProtoHollowRollup {
1742 key: self.key.into_proto(),
1743 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1744 }
1745 }
1746
1747 fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1748 Ok(HollowRollup {
1749 key: proto.key.into_rust()?,
1750 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1751 })
1752 }
1753}
1754
1755impl RustType<ProtoActiveRollup> for ActiveRollup {
1756 fn into_proto(&self) -> ProtoActiveRollup {
1757 ProtoActiveRollup {
1758 start_ms: self.start_ms,
1759 seqno: self.seqno.into_proto(),
1760 }
1761 }
1762
1763 fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1764 Ok(ActiveRollup {
1765 start_ms: proto.start_ms,
1766 seqno: proto.seqno.into_rust()?,
1767 })
1768 }
1769}
1770
1771impl RustType<ProtoActiveGc> for ActiveGc {
1772 fn into_proto(&self) -> ProtoActiveGc {
1773 ProtoActiveGc {
1774 start_ms: self.start_ms,
1775 seqno: self.seqno.into_proto(),
1776 }
1777 }
1778
1779 fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1780 Ok(ActiveGc {
1781 start_ms: proto.start_ms,
1782 seqno: proto.seqno.into_rust()?,
1783 })
1784 }
1785}
1786
1787impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1788 fn into_proto(&self) -> ProtoU64Description {
1789 ProtoU64Description {
1790 lower: Some(self.lower().into_proto()),
1791 upper: Some(self.upper().into_proto()),
1792 since: Some(self.since().into_proto()),
1793 }
1794 }
1795
1796 fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1797 Ok(Description::new(
1798 proto.lower.into_rust_if_some("lower")?,
1799 proto.upper.into_rust_if_some("upper")?,
1800 proto.since.into_rust_if_some("since")?,
1801 ))
1802 }
1803}
1804
1805impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1806 fn into_proto(&self) -> ProtoU64Antichain {
1807 ProtoU64Antichain {
1808 elements: self
1809 .elements()
1810 .iter()
1811 .map(|x| i64::from_le_bytes(T::encode(x)))
1812 .collect(),
1813 }
1814 }
1815
1816 fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1817 let elements = proto
1818 .elements
1819 .iter()
1820 .map(|x| T::decode(x.to_le_bytes()))
1821 .collect::<Vec<_>>();
1822 Ok(Antichain::from(elements))
1823 }
1824}
1825
1826#[cfg(test)]
1827mod tests {
1828 use bytes::Bytes;
1829 use mz_build_info::DUMMY_BUILD_INFO;
1830 use mz_dyncfg::ConfigUpdates;
1831 use mz_ore::assert_err;
1832 use mz_persist::location::SeqNo;
1833 use proptest::prelude::*;
1834
1835 use crate::ShardId;
1836 use crate::internal::paths::PartialRollupKey;
1837 use crate::internal::state::tests::any_state;
1838 use crate::internal::state::{BatchPart, HandleDebugState};
1839 use crate::internal::state_diff::StateDiff;
1840 use crate::tests::new_test_client_cache;
1841
1842 use super::*;
1843
1844 #[mz_ore::test]
1845 fn applier_version_state() {
1846 let v1 = semver::Version::new(1, 0, 0);
1847 let v2 = semver::Version::new(2, 0, 0);
1848 let v3 = semver::Version::new(3, 0, 0);
1849
1850 let shard_id = ShardId::new();
1852 let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1853 let rollup =
1854 Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1855 let mut buf = Vec::new();
1856 rollup.encode(&mut buf).expect("serializable");
1857 let bytes = Bytes::from(buf);
1858
1859 assert_eq!(
1861 UntypedState::<u64>::decode(&v2, bytes.clone())
1862 .check_codecs(&shard_id)
1863 .as_ref(),
1864 Ok(&state)
1865 );
1866 assert_eq!(
1867 UntypedState::<u64>::decode(&v3, bytes.clone())
1868 .check_codecs(&shard_id)
1869 .as_ref(),
1870 Ok(&state)
1871 );
1872
1873 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1878 assert_err!(v1_res);
1879 }
1880
1881 #[mz_ore::test]
1882 fn applier_version_state_diff() {
1883 let v1 = semver::Version::new(1, 0, 0);
1884 let v2 = semver::Version::new(2, 0, 0);
1885 let v3 = semver::Version::new(3, 0, 0);
1886
1887 let diff = StateDiff::<u64>::new(
1889 v2.clone(),
1890 SeqNo(0),
1891 SeqNo(1),
1892 2,
1893 PartialRollupKey("rollup".into()),
1894 );
1895 let mut buf = Vec::new();
1896 diff.encode(&mut buf);
1897 let bytes = Bytes::from(buf);
1898
1899 assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
1901 assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
1902
1903 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
1908 assert_err!(v1_res);
1909 }
1910
1911 #[mz_ore::test]
1912 fn hollow_batch_migration_keys() {
1913 let x = HollowBatch::new_run(
1914 Description::new(
1915 Antichain::from_elem(1u64),
1916 Antichain::from_elem(2u64),
1917 Antichain::from_elem(3u64),
1918 ),
1919 vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1920 key: PartialBatchKey("a".into()),
1921 encoded_size_bytes: 5,
1922 key_lower: vec![],
1923 structured_key_lower: None,
1924 stats: None,
1925 ts_rewrite: None,
1926 diffs_sum: None,
1927 format: None,
1928 schema_id: None,
1929 deprecated_schema_id: None,
1930 }))],
1931 4,
1932 );
1933 let mut old = x.into_proto();
1934 old.deprecated_keys = vec!["b".into()];
1936 let mut expected = x;
1940 expected
1945 .parts
1946 .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1947 key: PartialBatchKey("b".into()),
1948 encoded_size_bytes: 0,
1949 key_lower: vec![],
1950 structured_key_lower: None,
1951 stats: None,
1952 ts_rewrite: None,
1953 diffs_sum: None,
1954 format: None,
1955 schema_id: None,
1956 deprecated_schema_id: None,
1957 })));
1958 assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
1959 }
1960
1961 #[mz_ore::test]
1962 fn reader_state_migration_lease_duration() {
1963 let x = LeasedReaderState {
1964 seqno: SeqNo(1),
1965 since: Antichain::from_elem(2u64),
1966 last_heartbeat_timestamp_ms: 3,
1967 debug: HandleDebugState {
1968 hostname: "host".to_owned(),
1969 purpose: "purpose".to_owned(),
1970 },
1971 lease_duration_ms: 0,
1973 };
1974 let old = x.into_proto();
1975 let mut expected = x;
1976 expected.lease_duration_ms =
1979 u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
1980 assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
1981 }
1982
1983 #[mz_ore::test]
1984 fn writer_state_migration_most_recent_write() {
1985 let proto = ProtoWriterState {
1986 last_heartbeat_timestamp_ms: 1,
1987 lease_duration_ms: 2,
1988 most_recent_write_token: "".into(),
1991 most_recent_write_upper: None,
1992 debug: Some(ProtoHandleDebugState {
1993 hostname: "host".to_owned(),
1994 purpose: "purpose".to_owned(),
1995 }),
1996 };
1997 let expected = WriterState {
1998 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
1999 lease_duration_ms: proto.lease_duration_ms,
2000 most_recent_write_token: IdempotencyToken::SENTINEL,
2001 most_recent_write_upper: Antichain::from_elem(0),
2002 debug: HandleDebugState {
2003 hostname: "host".to_owned(),
2004 purpose: "purpose".to_owned(),
2005 },
2006 };
2007 assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2008 }
2009
2010 #[mz_ore::test]
2011 fn state_migration_rollups() {
2012 let r1 = HollowRollup {
2013 key: PartialRollupKey("foo".to_owned()),
2014 encoded_size_bytes: None,
2015 };
2016 let r2 = HollowRollup {
2017 key: PartialRollupKey("bar".to_owned()),
2018 encoded_size_bytes: Some(2),
2019 };
2020 let shard_id = ShardId::new();
2021 let mut state = TypedState::<(), (), u64, i64>::new(
2022 DUMMY_BUILD_INFO.semver_version(),
2023 shard_id,
2024 "host".to_owned(),
2025 0,
2026 );
2027 state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2028 let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2029
2030 proto.deprecated_rollups.insert(1, r1.key.0.clone());
2032
2033 let state: Rollup<u64> = proto.into_rust().unwrap();
2034 let state = state.state;
2035 let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2036 let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2037 assert_eq!(
2038 state
2039 .state
2040 .collections
2041 .rollups
2042 .into_iter()
2043 .collect::<Vec<_>>(),
2044 expected
2045 );
2046 }
2047
2048 #[mz_persist_proc::test(tokio::test)]
2049 #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2051 let r1_rollup = HollowRollup {
2052 key: PartialRollupKey("foo".to_owned()),
2053 encoded_size_bytes: None,
2054 };
2055 let r1 = StateFieldDiff {
2056 key: SeqNo(1),
2057 val: StateFieldValDiff::Insert(r1_rollup.clone()),
2058 };
2059 let r2_rollup = HollowRollup {
2060 key: PartialRollupKey("bar".to_owned()),
2061 encoded_size_bytes: Some(2),
2062 };
2063 let r2 = StateFieldDiff {
2064 key: SeqNo(2),
2065 val: StateFieldValDiff::Insert(r2_rollup.clone()),
2066 };
2067 let r3_rollup = HollowRollup {
2068 key: PartialRollupKey("baz".to_owned()),
2069 encoded_size_bytes: None,
2070 };
2071 let r3 = StateFieldDiff {
2072 key: SeqNo(3),
2073 val: StateFieldValDiff::Delete(r3_rollup.clone()),
2074 };
2075 let mut diff = StateDiff::<u64>::new(
2076 DUMMY_BUILD_INFO.semver_version(),
2077 SeqNo(4),
2078 SeqNo(5),
2079 0,
2080 PartialRollupKey("ignored".to_owned()),
2081 );
2082 diff.rollups.push(r2.clone());
2083 diff.rollups.push(r3.clone());
2084 let mut diff_proto = diff.into_proto();
2085
2086 let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2087 let mut field_diffs_writer = field_diffs.into_writer();
2088
2089 field_diffs_into_proto(
2091 ProtoStateField::DeprecatedRollups,
2092 &[StateFieldDiff {
2093 key: r1.key,
2094 val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2095 }],
2096 &mut field_diffs_writer,
2097 );
2098
2099 assert_none!(diff_proto.field_diffs);
2100 diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2101
2102 let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2103 assert_eq!(
2104 diff.rollups.into_iter().collect::<Vec<_>>(),
2105 vec![r2, r3, r1]
2106 );
2107
2108 let shard_id = ShardId::new();
2111 let mut state = TypedState::<(), (), u64, i64>::new(
2112 DUMMY_BUILD_INFO.semver_version(),
2113 shard_id,
2114 "host".to_owned(),
2115 0,
2116 );
2117 state.state.seqno = SeqNo(4);
2118 let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2119 rollup
2120 .deprecated_rollups
2121 .insert(3, r3_rollup.key.into_proto());
2122 let state: Rollup<u64> = rollup.into_rust().unwrap();
2123 let state = state.state;
2124 let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2125 let cache = new_test_client_cache(&dyncfgs);
2126 let encoded_diff = VersionedData {
2127 seqno: SeqNo(5),
2128 data: diff_proto.encode_to_vec().into(),
2129 };
2130 state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2131 assert_eq!(
2132 state
2133 .state
2134 .collections
2135 .rollups
2136 .into_iter()
2137 .collect::<Vec<_>>(),
2138 vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2139 );
2140 }
2141
2142 #[mz_ore::test]
2143 #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2145 fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2146 let before = UntypedState {
2147 key_codec: <() as Codec>::codec_name(),
2148 val_codec: <() as Codec>::codec_name(),
2149 ts_codec: <T as Codec64>::codec_name(),
2150 diff_codec: <i64 as Codec64>::codec_name(),
2151 state,
2152 };
2153 let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2154 let after: Rollup<T> = proto.into_rust().unwrap();
2155 let after = after.state;
2156 assert_eq!(before, after);
2157 }
2158
2159 proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2160 }
2161
2162 #[mz_ore::test]
2163 fn check_data_versions_with_self_managed_versions() {
2164 #[track_caller]
2165 fn testcase(
2166 code: &str,
2167 data: &str,
2168 self_managed_versions: &[Version],
2169 expected: Result<(), ()>,
2170 ) {
2171 let code = Version::parse(code).unwrap();
2172 let data = Version::parse(data).unwrap();
2173 let actual = cfg::check_data_version_with_self_managed_versions(
2174 &code,
2175 &data,
2176 self_managed_versions,
2177 )
2178 .map_err(|_| ());
2179 assert_eq!(actual, expected);
2180 }
2181
2182 let none = [];
2183 let one = [Version::new(0, 130, 0)];
2184 let two = [Version::new(0, 130, 0), Version::new(0, 140, 0)];
2185 let three = [
2186 Version::new(0, 130, 0),
2187 Version::new(0, 140, 0),
2188 Version::new(0, 150, 0),
2189 ];
2190
2191 testcase("0.130.0", "0.128.0", &none, Ok(()));
2192 testcase("0.130.0", "0.129.0", &none, Ok(()));
2193 testcase("0.130.0", "0.130.0", &none, Ok(()));
2194 testcase("0.130.0", "0.130.1", &none, Ok(()));
2195 testcase("0.130.1", "0.130.0", &none, Ok(()));
2196 testcase("0.130.0", "0.131.0", &none, Ok(()));
2197 testcase("0.130.0", "0.132.0", &none, Err(()));
2198
2199 testcase("0.129.0", "0.127.0", &none, Ok(()));
2200 testcase("0.129.0", "0.128.0", &none, Ok(()));
2201 testcase("0.129.0", "0.129.0", &none, Ok(()));
2202 testcase("0.129.0", "0.129.1", &none, Ok(()));
2203 testcase("0.129.1", "0.129.0", &none, Ok(()));
2204 testcase("0.129.0", "0.130.0", &none, Ok(()));
2205 testcase("0.129.0", "0.131.0", &none, Err(()));
2206
2207 testcase("0.130.0", "0.128.0", &one, Ok(()));
2208 testcase("0.130.0", "0.129.0", &one, Ok(()));
2209 testcase("0.130.0", "0.130.0", &one, Ok(()));
2210 testcase("0.130.0", "0.130.1", &one, Ok(()));
2211 testcase("0.130.1", "0.130.0", &one, Ok(()));
2212 testcase("0.130.0", "0.131.0", &one, Ok(()));
2213 testcase("0.130.0", "0.132.0", &one, Ok(()));
2214
2215 testcase("0.129.0", "0.127.0", &one, Ok(()));
2216 testcase("0.129.0", "0.128.0", &one, Ok(()));
2217 testcase("0.129.0", "0.129.0", &one, Ok(()));
2218 testcase("0.129.0", "0.129.1", &one, Ok(()));
2219 testcase("0.129.1", "0.129.0", &one, Ok(()));
2220 testcase("0.129.0", "0.130.0", &one, Ok(()));
2221 testcase("0.129.0", "0.131.0", &one, Err(()));
2222
2223 testcase("0.131.0", "0.129.0", &one, Ok(()));
2224 testcase("0.131.0", "0.130.0", &one, Ok(()));
2225 testcase("0.131.0", "0.131.0", &one, Ok(()));
2226 testcase("0.131.0", "0.131.1", &one, Ok(()));
2227 testcase("0.131.1", "0.131.0", &one, Ok(()));
2228 testcase("0.131.0", "0.132.0", &one, Ok(()));
2229 testcase("0.131.0", "0.133.0", &one, Err(()));
2230
2231 testcase("0.130.0", "0.128.0", &two, Ok(()));
2232 testcase("0.130.0", "0.129.0", &two, Ok(()));
2233 testcase("0.130.0", "0.130.0", &two, Ok(()));
2234 testcase("0.130.0", "0.130.1", &two, Ok(()));
2235 testcase("0.130.1", "0.130.0", &two, Ok(()));
2236 testcase("0.130.0", "0.131.0", &two, Ok(()));
2237 testcase("0.130.0", "0.132.0", &two, Ok(()));
2238 testcase("0.130.0", "0.135.0", &two, Ok(()));
2239 testcase("0.130.0", "0.138.0", &two, Ok(()));
2240 testcase("0.130.0", "0.139.0", &two, Ok(()));
2241 testcase("0.130.0", "0.140.0", &two, Ok(()));
2242 testcase("0.130.9", "0.140.0", &two, Ok(()));
2243 testcase("0.130.0", "0.140.1", &two, Ok(()));
2244 testcase("0.130.3", "0.140.1", &two, Ok(()));
2245 testcase("0.130.3", "0.140.9", &two, Ok(()));
2246 testcase("0.130.0", "0.141.0", &two, Err(()));
2247 testcase("0.129.0", "0.133.0", &two, Err(()));
2248 testcase("0.129.0", "0.140.0", &two, Err(()));
2249 testcase("0.131.0", "0.133.0", &two, Err(()));
2250 testcase("0.131.0", "0.140.0", &two, Err(()));
2251
2252 testcase("0.130.0", "0.128.0", &three, Ok(()));
2253 testcase("0.130.0", "0.129.0", &three, Ok(()));
2254 testcase("0.130.0", "0.130.0", &three, Ok(()));
2255 testcase("0.130.0", "0.130.1", &three, Ok(()));
2256 testcase("0.130.1", "0.130.0", &three, Ok(()));
2257 testcase("0.130.0", "0.131.0", &three, Ok(()));
2258 testcase("0.130.0", "0.132.0", &three, Ok(()));
2259 testcase("0.130.0", "0.135.0", &three, Ok(()));
2260 testcase("0.130.0", "0.138.0", &three, Ok(()));
2261 testcase("0.130.0", "0.139.0", &three, Ok(()));
2262 testcase("0.130.0", "0.140.0", &three, Ok(()));
2263 testcase("0.130.9", "0.140.0", &three, Ok(()));
2264 testcase("0.130.0", "0.140.1", &three, Ok(()));
2265 testcase("0.130.3", "0.140.1", &three, Ok(()));
2266 testcase("0.130.3", "0.140.9", &three, Ok(()));
2267 testcase("0.130.0", "0.141.0", &three, Err(()));
2268 testcase("0.129.0", "0.133.0", &three, Err(()));
2269 testcase("0.129.0", "0.140.0", &three, Err(()));
2270 testcase("0.131.0", "0.133.0", &three, Err(()));
2271 testcase("0.131.0", "0.140.0", &three, Err(()));
2272 testcase("0.130.0", "0.150.0", &three, Err(()));
2273
2274 testcase("0.140.0", "0.138.0", &three, Ok(()));
2275 testcase("0.140.0", "0.139.0", &three, Ok(()));
2276 testcase("0.140.0", "0.140.0", &three, Ok(()));
2277 testcase("0.140.0", "0.140.1", &three, Ok(()));
2278 testcase("0.140.1", "0.140.0", &three, Ok(()));
2279 testcase("0.140.0", "0.141.0", &three, Ok(()));
2280 testcase("0.140.0", "0.142.0", &three, Ok(()));
2281 testcase("0.140.0", "0.145.0", &three, Ok(()));
2282 testcase("0.140.0", "0.148.0", &three, Ok(()));
2283 testcase("0.140.0", "0.149.0", &three, Ok(()));
2284 testcase("0.140.0", "0.150.0", &three, Ok(()));
2285 testcase("0.140.9", "0.150.0", &three, Ok(()));
2286 testcase("0.140.0", "0.150.1", &three, Ok(()));
2287 testcase("0.140.3", "0.150.1", &three, Ok(()));
2288 testcase("0.140.3", "0.150.9", &three, Ok(()));
2289 testcase("0.140.0", "0.151.0", &three, Err(()));
2290 testcase("0.139.0", "0.143.0", &three, Err(()));
2291 testcase("0.139.0", "0.150.0", &three, Err(()));
2292 testcase("0.141.0", "0.143.0", &three, Err(()));
2293 testcase("0.141.0", "0.150.0", &three, Err(()));
2294
2295 testcase("0.150.0", "0.148.0", &three, Ok(()));
2296 testcase("0.150.0", "0.149.0", &three, Ok(()));
2297 testcase("0.150.0", "0.150.0", &three, Ok(()));
2298 testcase("0.150.0", "0.150.1", &three, Ok(()));
2299 testcase("0.150.1", "0.150.0", &three, Ok(()));
2300 testcase("0.150.0", "0.151.0", &three, Ok(()));
2301 testcase("0.150.0", "0.152.0", &three, Ok(()));
2302 testcase("0.150.0", "0.155.0", &three, Ok(()));
2303 testcase("0.150.0", "0.158.0", &three, Ok(()));
2304 testcase("0.150.0", "0.159.0", &three, Ok(()));
2305 testcase("0.150.0", "0.160.0", &three, Ok(()));
2306 testcase("0.150.9", "0.160.0", &three, Ok(()));
2307 testcase("0.150.0", "0.160.1", &three, Ok(()));
2308 testcase("0.150.3", "0.160.1", &three, Ok(()));
2309 testcase("0.150.3", "0.160.9", &three, Ok(()));
2310 testcase("0.150.0", "0.161.0", &three, Ok(()));
2311 testcase("0.149.0", "0.153.0", &three, Err(()));
2312 testcase("0.149.0", "0.160.0", &three, Err(()));
2313 testcase("0.151.0", "0.153.0", &three, Err(()));
2314 testcase("0.151.0", "0.160.0", &three, Err(()));
2315 }
2316
2317 #[mz_ore::test]
2318 fn check_data_versions() {
2319 #[track_caller]
2320 fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2321 let code = Version::parse(code).unwrap();
2322 let data = Version::parse(data).unwrap();
2323 #[allow(clippy::disallowed_methods)]
2324 let actual =
2325 std::panic::catch_unwind(|| check_data_version(&code, &data)).map_err(|_| ());
2326 assert_eq!(actual, expected);
2327 }
2328
2329 testcase("0.10.0-dev", "0.10.0-dev", Ok(()));
2330 testcase("0.10.0-dev", "0.10.0", Ok(()));
2331 testcase("0.10.0-dev", "0.11.0-dev", Ok(()));
2334 testcase("0.10.0-dev", "0.11.0", Ok(()));
2335 testcase("0.10.0-dev", "0.12.0-dev", Err(()));
2336 testcase("0.10.0-dev", "0.12.0", Err(()));
2337 testcase("0.10.0-dev", "0.13.0-dev", Err(()));
2338
2339 testcase("0.10.0", "0.8.0-dev", Ok(()));
2340 testcase("0.10.0", "0.8.0", Ok(()));
2341 testcase("0.10.0", "0.9.0-dev", Ok(()));
2342 testcase("0.10.0", "0.9.0", Ok(()));
2343 testcase("0.10.0", "0.10.0-dev", Ok(()));
2344 testcase("0.10.0", "0.10.0", Ok(()));
2345 testcase("0.10.0", "0.11.0-dev", Ok(()));
2348 testcase("0.10.0", "0.11.0", Ok(()));
2349 testcase("0.10.0", "0.11.1", Ok(()));
2350 testcase("0.10.0", "0.11.1000000", Ok(()));
2351 testcase("0.10.0", "0.12.0-dev", Err(()));
2352 testcase("0.10.0", "0.12.0", Err(()));
2353 testcase("0.10.0", "0.13.0-dev", Err(()));
2354
2355 testcase("0.10.1", "0.9.0", Ok(()));
2356 testcase("0.10.1", "0.10.0", Ok(()));
2357 testcase("0.10.1", "0.11.0", Ok(()));
2358 testcase("0.10.1", "0.11.1", Ok(()));
2359 testcase("0.10.1", "0.11.100", Ok(()));
2360
2361 testcase("0.10.0", "0.10.1", Ok(()));
2367 }
2368}