1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt, soft_panic_or_log};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize, Serializer};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::{CriticalReaderId, Opaque};
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44 ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45 HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46 LeasedReaderState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction, ProtoCriticalReaderState,
47 ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch, ProtoHollowBatchPart,
48 ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch, ProtoIdMerge,
49 ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs, ProtoLeasedReaderState, ProtoMerge,
50 ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch, ProtoSpineId, ProtoStateDiff,
51 ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs, ProtoTrace, ProtoU64Antichain,
52 ProtoU64Description, ProtoVersionedData, ProtoWriterState, RunId, RunMeta, RunOrder, RunPart,
53 State, StateCollections, TypedState, WriterState, proto_hollow_batch_part,
54};
55use crate::internal::state_diff::{
56 ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
57};
58use crate::internal::trace::{
59 ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
60};
61use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
62use crate::{PersistConfig, ShardId, WriterId, cfg};
63
64#[derive(Debug)]
66pub struct Schemas<K: Codec, V: Codec> {
67 pub id: Option<SchemaId>,
70 pub key: Arc<K::Schema>,
72 pub val: Arc<V::Schema>,
74}
75
76impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
77 fn clone(&self) -> Self {
78 Self {
79 id: self.id,
80 key: Arc::clone(&self.key),
81 val: Arc::clone(&self.val),
82 }
83 }
84}
85
86#[derive(Clone, Serialize, Deserialize)]
109pub struct LazyProto<T> {
110 buf: Bytes,
111 _phantom: PhantomData<fn() -> T>,
112}
113
114impl<T: Message + Default + Debug> Debug for LazyProto<T> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 match self.decode() {
117 Ok(proto) => Debug::fmt(&proto, f),
118 Err(err) => f
119 .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
120 .field("err", &err)
121 .finish(),
122 }
123 }
124}
125
126impl<T> PartialEq for LazyProto<T> {
127 fn eq(&self, other: &Self) -> bool {
128 self.cmp(other) == Ordering::Equal
129 }
130}
131
132impl<T> Eq for LazyProto<T> {}
133
134impl<T> PartialOrd for LazyProto<T> {
135 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
136 Some(self.cmp(other))
137 }
138}
139
140impl<T> Ord for LazyProto<T> {
141 fn cmp(&self, other: &Self) -> Ordering {
142 let LazyProto {
143 buf: self_buf,
144 _phantom: _,
145 } = self;
146 let LazyProto {
147 buf: other_buf,
148 _phantom: _,
149 } = other;
150 self_buf.cmp(other_buf)
151 }
152}
153
154impl<T> Hash for LazyProto<T> {
155 fn hash<H: Hasher>(&self, state: &mut H) {
156 let LazyProto { buf, _phantom } = self;
157 buf.hash(state);
158 }
159}
160
161impl<T: Message + Default> From<&T> for LazyProto<T> {
162 fn from(value: &T) -> Self {
163 let buf = Bytes::from(value.encode_to_vec());
164 LazyProto {
165 buf,
166 _phantom: PhantomData,
167 }
168 }
169}
170
171impl<T: Message + Default> LazyProto<T> {
172 pub fn decode(&self) -> Result<T, prost::DecodeError> {
173 T::decode(&*self.buf)
174 }
175
176 pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
177 Ok(T::decode(&*self.buf)?.into_rust()?)
178 }
179}
180
181impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
182 fn into_proto(&self) -> Bytes {
183 self.buf.clone()
184 }
185
186 fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
187 Ok(Self {
188 buf,
189 _phantom: PhantomData,
190 })
191 }
192}
193
194#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
206pub(crate) struct MetadataMap(BTreeMap<String, Bytes>);
207
208#[allow(unused)]
213#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
214pub(crate) struct MetadataKey<V, P = V> {
215 name: &'static str,
216 type_: PhantomData<(V, P)>,
217}
218
219impl<V, P> MetadataKey<V, P> {
220 #[allow(unused)]
221 pub(crate) const fn new(name: &'static str) -> Self {
222 MetadataKey {
223 name,
224 type_: PhantomData,
225 }
226 }
227}
228
229impl serde::Serialize for MetadataMap {
230 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
231 where
232 S: Serializer,
233 {
234 serializer.collect_map(self.0.iter())
235 }
236}
237
238impl MetadataMap {
239 pub fn is_empty(&self) -> bool {
241 self.0.is_empty()
242 }
243
244 #[allow(unused)]
246 pub fn set<V: RustType<P>, P: prost::Message>(&mut self, key: MetadataKey<V, P>, value: V) {
247 self.0.insert(
248 String::from(key.name),
249 Bytes::from(value.into_proto_owned().encode_to_vec()),
250 );
251 }
252
253 #[allow(unused)]
255 pub fn get<V: RustType<P>, P: prost::Message + Default>(
256 &self,
257 key: MetadataKey<V, P>,
258 ) -> Option<V> {
259 let proto = match P::decode(self.0.get(key.name)?.as_ref()) {
260 Ok(decoded) => decoded,
261 Err(err) => {
262 soft_panic_or_log!(
264 "error when decoding {key}; was it redefined? {err}",
265 key = key.name
266 );
267 return None;
268 }
269 };
270
271 match proto.into_rust() {
272 Ok(proto) => Some(proto),
273 Err(err) => {
274 soft_panic_or_log!(
276 "error when decoding {key}; was it redefined? {err}",
277 key = key.name
278 );
279 None
280 }
281 }
282 }
283}
284impl RustType<BTreeMap<String, Bytes>> for MetadataMap {
285 fn into_proto(&self) -> BTreeMap<String, Bytes> {
286 self.0.clone()
287 }
288 fn from_proto(proto: BTreeMap<String, Bytes>) -> Result<Self, TryFromProtoError> {
289 Ok(MetadataMap(proto))
290 }
291}
292
293pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
294 let uuid_encoded = match encoded.strip_prefix(id_prefix) {
295 Some(x) => x,
296 None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
297 };
298 let uuid = Uuid::parse_str(uuid_encoded)
299 .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
300 Ok(*uuid.as_bytes())
301}
302
303pub(crate) fn assert_code_can_read_data(code_version: &Version, data_version: &Version) {
304 if !cfg::code_can_read_data(code_version, data_version) {
305 if cfg!(test) {
308 panic!("code at version {code_version} cannot read data with version {data_version}");
309 } else {
310 halt!("code at version {code_version} cannot read data with version {data_version}");
311 }
312 }
313}
314
315impl RustType<String> for LeasedReaderId {
316 fn into_proto(&self) -> String {
317 self.to_string()
318 }
319
320 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
321 match proto.parse() {
322 Ok(x) => Ok(x),
323 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
324 }
325 }
326}
327
328impl RustType<String> for CriticalReaderId {
329 fn into_proto(&self) -> String {
330 self.to_string()
331 }
332
333 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
334 match proto.parse() {
335 Ok(x) => Ok(x),
336 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
337 }
338 }
339}
340
341impl RustType<String> for WriterId {
342 fn into_proto(&self) -> String {
343 self.to_string()
344 }
345
346 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
347 match proto.parse() {
348 Ok(x) => Ok(x),
349 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
350 }
351 }
352}
353
354impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
355 fn into_proto(&self) -> ProtoEncodedSchemas {
356 ProtoEncodedSchemas {
357 key: self.key.clone(),
358 key_data_type: self.key_data_type.clone(),
359 val: self.val.clone(),
360 val_data_type: self.val_data_type.clone(),
361 }
362 }
363
364 fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
365 Ok(EncodedSchemas {
366 key: proto.key,
367 key_data_type: proto.key_data_type,
368 val: proto.val,
369 val_data_type: proto.val_data_type,
370 })
371 }
372}
373
374impl RustType<String> for IdempotencyToken {
375 fn into_proto(&self) -> String {
376 self.to_string()
377 }
378
379 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
380 match proto.parse() {
381 Ok(x) => Ok(x),
382 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
383 }
384 }
385}
386
387impl RustType<String> for PartialBatchKey {
388 fn into_proto(&self) -> String {
389 self.0.clone()
390 }
391
392 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
393 Ok(PartialBatchKey(proto))
394 }
395}
396
397impl RustType<String> for PartialRollupKey {
398 fn into_proto(&self) -> String {
399 self.0.clone()
400 }
401
402 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
403 Ok(PartialRollupKey(proto))
404 }
405}
406
407impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
408 pub fn encode<B>(&self, buf: &mut B)
409 where
410 B: bytes::BufMut,
411 {
412 self.into_proto()
413 .encode(buf)
414 .expect("no required fields means no initialization errors");
415 }
416
417 pub fn decode(build_version: &Version, buf: Bytes) -> Self {
418 let proto = ProtoStateDiff::decode(buf)
419 .expect("internal error: invalid encoded state");
424 let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
425 assert_code_can_read_data(build_version, &diff.applier_version);
426 diff
427 }
428}
429
430impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
431 fn into_proto(&self) -> ProtoStateDiff {
432 let StateDiff {
434 applier_version,
435 seqno_from,
436 seqno_to,
437 walltime_ms,
438 latest_rollup_key,
439 rollups,
440 active_rollup,
441 active_gc,
442 hostname,
443 last_gc_req,
444 leased_readers,
445 critical_readers,
446 writers,
447 schemas,
448 since,
449 legacy_batches,
450 hollow_batches,
451 spine_batches,
452 merges,
453 } = self;
454
455 let proto = ProtoStateFieldDiffs::default();
456
457 let mut writer = proto.into_writer();
459
460 field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
461 field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
462 field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
463 field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
464 field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
465 field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
466 field_diffs_into_proto(
467 ProtoStateField::CriticalReaders,
468 critical_readers,
469 &mut writer,
470 );
471 field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
472 field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
473 field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
474 field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
475 field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
476 field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
477 field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
478
479 let field_diffs = writer.into_proto();
481
482 debug_assert_eq!(field_diffs.validate(), Ok(()));
483 ProtoStateDiff {
484 applier_version: applier_version.to_string(),
485 seqno_from: seqno_from.into_proto(),
486 seqno_to: seqno_to.into_proto(),
487 walltime_ms: walltime_ms.into_proto(),
488 latest_rollup_key: latest_rollup_key.into_proto(),
489 field_diffs: Some(field_diffs),
490 }
491 }
492
493 fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
494 let applier_version = if proto.applier_version.is_empty() {
495 semver::Version::new(0, 0, 0)
499 } else {
500 semver::Version::parse(&proto.applier_version).map_err(|err| {
501 TryFromProtoError::InvalidSemverVersion(format!(
502 "invalid applier_version {}: {}",
503 proto.applier_version, err
504 ))
505 })?
506 };
507 let mut state_diff = StateDiff::new(
508 applier_version,
509 proto.seqno_from.into_rust()?,
510 proto.seqno_to.into_rust()?,
511 proto.walltime_ms,
512 proto.latest_rollup_key.into_rust()?,
513 );
514 if let Some(field_diffs) = proto.field_diffs {
515 debug_assert_eq!(field_diffs.validate(), Ok(()));
516 for field_diff in field_diffs.iter() {
517 let (field, diff) = field_diff?;
518 match field {
519 ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
520 diff,
521 &mut state_diff.hostname,
522 |()| Ok(()),
523 |v| v.into_rust(),
524 )?,
525 ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
526 diff,
527 &mut state_diff.last_gc_req,
528 |()| Ok(()),
529 |v| v.into_rust(),
530 )?,
531 ProtoStateField::ActiveGc => {
532 field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
533 diff,
534 &mut state_diff.active_gc,
535 |()| Ok(()),
536 |v| v.into_rust(),
537 )?
538 }
539 ProtoStateField::ActiveRollup => {
540 field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
541 diff,
542 &mut state_diff.active_rollup,
543 |()| Ok(()),
544 |v| v.into_rust(),
545 )?
546 }
547 ProtoStateField::Rollups => {
548 field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
549 diff,
550 &mut state_diff.rollups,
551 |k| k.into_rust(),
552 |v| v.into_rust(),
553 )?
554 }
555 ProtoStateField::DeprecatedRollups => {
559 field_diff_into_rust::<u64, String, _, _, _, _>(
560 diff,
561 &mut state_diff.rollups,
562 |k| k.into_rust(),
563 |v| {
564 Ok(HollowRollup {
565 key: v.into_rust()?,
566 encoded_size_bytes: None,
567 })
568 },
569 )?
570 }
571 ProtoStateField::LeasedReaders => {
572 field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
573 diff,
574 &mut state_diff.leased_readers,
575 |k| k.into_rust(),
576 |v| v.into_rust(),
577 )?
578 }
579 ProtoStateField::CriticalReaders => {
580 field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
581 diff,
582 &mut state_diff.critical_readers,
583 |k| k.into_rust(),
584 |v| v.into_rust(),
585 )?
586 }
587 ProtoStateField::Writers => {
588 field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
589 diff,
590 &mut state_diff.writers,
591 |k| k.into_rust(),
592 |v| v.into_rust(),
593 )?
594 }
595 ProtoStateField::Schemas => {
596 field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
597 diff,
598 &mut state_diff.schemas,
599 |k| k.into_rust(),
600 |v| v.into_rust(),
601 )?
602 }
603 ProtoStateField::Since => {
604 field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
605 diff,
606 &mut state_diff.since,
607 |()| Ok(()),
608 |v| v.into_rust(),
609 )?
610 }
611 ProtoStateField::LegacyBatches => {
612 field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
613 diff,
614 &mut state_diff.legacy_batches,
615 |k| k.into_rust(),
616 |()| Ok(()),
617 )?
618 }
619 ProtoStateField::HollowBatches => {
620 field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
621 diff,
622 &mut state_diff.hollow_batches,
623 |k| k.into_rust(),
624 |v| v.into_rust(),
625 )?
626 }
627 ProtoStateField::SpineBatches => {
628 field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
629 diff,
630 &mut state_diff.spine_batches,
631 |k| k.into_rust(),
632 |v| v.into_rust(),
633 )?
634 }
635 ProtoStateField::SpineMerges => {
636 field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
637 diff,
638 &mut state_diff.merges,
639 |k| k.into_rust(),
640 |v| v.into_rust(),
641 )?
642 }
643 }
644 }
645 }
646 Ok(state_diff)
647 }
648}
649
650fn field_diffs_into_proto<K, KP, V, VP>(
651 field: ProtoStateField,
652 diffs: &[StateFieldDiff<K, V>],
653 writer: &mut ProtoStateFieldDiffsWriter,
654) where
655 KP: prost::Message,
656 K: RustType<KP>,
657 VP: prost::Message,
658 V: RustType<VP>,
659{
660 for diff in diffs.iter() {
661 field_diff_into_proto(field, diff, writer);
662 }
663}
664
665fn field_diff_into_proto<K, KP, V, VP>(
666 field: ProtoStateField,
667 diff: &StateFieldDiff<K, V>,
668 writer: &mut ProtoStateFieldDiffsWriter,
669) where
670 KP: prost::Message,
671 K: RustType<KP>,
672 VP: prost::Message,
673 V: RustType<VP>,
674{
675 writer.push_field(field);
676 writer.encode_proto(&diff.key.into_proto());
677 match &diff.val {
678 StateFieldValDiff::Insert(to) => {
679 writer.push_diff_type(ProtoStateFieldDiffType::Insert);
680 writer.encode_proto(&to.into_proto());
681 }
682 StateFieldValDiff::Update(from, to) => {
683 writer.push_diff_type(ProtoStateFieldDiffType::Update);
684 writer.encode_proto(&from.into_proto());
685 writer.encode_proto(&to.into_proto());
686 }
687 StateFieldValDiff::Delete(from) => {
688 writer.push_diff_type(ProtoStateFieldDiffType::Delete);
689 writer.encode_proto(&from.into_proto());
690 }
691 };
692}
693
694fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
695 proto: ProtoStateFieldDiff<'_>,
696 diffs: &mut Vec<StateFieldDiff<K, V>>,
697 k_fn: KFn,
698 v_fn: VFn,
699) -> Result<(), TryFromProtoError>
700where
701 KP: prost::Message + Default,
702 VP: prost::Message + Default,
703 KFn: Fn(KP) -> Result<K, TryFromProtoError>,
704 VFn: Fn(VP) -> Result<V, TryFromProtoError>,
705{
706 let val = match proto.diff_type {
707 ProtoStateFieldDiffType::Insert => {
708 let to = VP::decode(proto.to)
709 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
710 StateFieldValDiff::Insert(v_fn(to)?)
711 }
712 ProtoStateFieldDiffType::Update => {
713 let from = VP::decode(proto.from)
714 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
715 let to = VP::decode(proto.to)
716 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
717
718 StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
719 }
720 ProtoStateFieldDiffType::Delete => {
721 let from = VP::decode(proto.from)
722 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
723 StateFieldValDiff::Delete(v_fn(from)?)
724 }
725 };
726 let key = KP::decode(proto.key)
727 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
728 diffs.push(StateFieldDiff {
729 key: k_fn(key)?,
730 val,
731 });
732 Ok(())
733}
734
735#[derive(Debug)]
738#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
739pub struct UntypedState<T> {
740 pub(crate) key_codec: String,
741 pub(crate) val_codec: String,
742 pub(crate) ts_codec: String,
743 pub(crate) diff_codec: String,
744
745 state: State<T>,
749}
750
751impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
752 pub fn seqno(&self) -> SeqNo {
753 self.state.seqno
754 }
755
756 pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
757 &self.state.collections.rollups
758 }
759
760 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
761 self.state.latest_rollup()
762 }
763
764 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
765 &mut self,
766 cfg: &PersistConfig,
767 metrics: &Metrics,
768 diffs: I,
769 ) {
770 if T::codec_name() != self.ts_codec {
774 return;
775 }
776 self.state.apply_encoded_diffs(cfg, metrics, diffs);
777 }
778
779 pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
780 self,
781 shard_id: &ShardId,
782 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
783 assert_eq!(shard_id, &self.state.shard_id);
786 if K::codec_name() != self.key_codec
787 || V::codec_name() != self.val_codec
788 || T::codec_name() != self.ts_codec
789 || D::codec_name() != self.diff_codec
790 {
791 return Err(Box::new(CodecMismatch {
792 requested: (
793 K::codec_name(),
794 V::codec_name(),
795 T::codec_name(),
796 D::codec_name(),
797 None,
798 ),
799 actual: (
800 self.key_codec,
801 self.val_codec,
802 self.ts_codec,
803 self.diff_codec,
804 None,
805 ),
806 }));
807 }
808 Ok(TypedState {
809 state: self.state,
810 _phantom: PhantomData,
811 })
812 }
813
814 pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
815 assert_eq!(shard_id, &self.state.shard_id);
818 if T::codec_name() != self.ts_codec {
819 return Err(CodecMismatchT {
820 requested: T::codec_name(),
821 actual: self.ts_codec,
822 });
823 }
824 Ok(self.state)
825 }
826
827 pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
828 let proto = ProtoRollup::decode(buf)
829 .expect("internal error: invalid encoded state");
834 let state = Rollup::from_proto(proto)
835 .expect("internal error: invalid encoded state")
836 .state;
837 assert_code_can_read_data(build_version, &state.state.collections.version);
838 state
839 }
840}
841
842impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
843where
844 K: Codec,
845 V: Codec,
846 T: Codec64,
847 D: Codec64,
848{
849 fn from(typed_state: TypedState<K, V, T, D>) -> Self {
850 UntypedState {
851 key_codec: K::codec_name(),
852 val_codec: V::codec_name(),
853 ts_codec: T::codec_name(),
854 diff_codec: D::codec_name(),
855 state: typed_state.state,
856 }
857 }
858}
859
860#[derive(Debug)]
867pub struct Rollup<T> {
868 pub(crate) state: UntypedState<T>,
869 pub(crate) diffs: Option<InlinedDiffs>,
870}
871
872impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
873 pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
877 let latest_rollup_seqno = *state.latest_rollup().0;
878 let mut verify_seqno = latest_rollup_seqno;
879 for diff in &diffs {
880 assert_eq!(verify_seqno.next(), diff.seqno);
881 verify_seqno = diff.seqno;
882 }
883 assert_eq!(verify_seqno, state.seqno());
884
885 let diffs = Some(InlinedDiffs::from(
886 latest_rollup_seqno.next(),
887 state.seqno().next(),
888 diffs,
889 ));
890
891 Self { state, diffs }
892 }
893
894 pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
895 Self { state, diffs: None }
896 }
897
898 pub(crate) fn from_state_without_diffs(
899 state: State<T>,
900 key_codec: String,
901 val_codec: String,
902 ts_codec: String,
903 diff_codec: String,
904 ) -> Self {
905 Self::from_untyped_state_without_diffs(UntypedState {
906 key_codec,
907 val_codec,
908 ts_codec,
909 diff_codec,
910 state,
911 })
912 }
913}
914
915#[derive(Debug)]
916pub(crate) struct InlinedDiffs {
917 pub(crate) lower: SeqNo,
918 pub(crate) upper: SeqNo,
919 pub(crate) diffs: Vec<VersionedData>,
920}
921
922impl InlinedDiffs {
923 pub(crate) fn description(&self) -> Description<SeqNo> {
924 Description::new(
925 Antichain::from_elem(self.lower),
926 Antichain::from_elem(self.upper),
927 Antichain::from_elem(SeqNo::minimum()),
928 )
929 }
930
931 fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
932 for diff in &diffs {
933 assert!(diff.seqno >= lower);
934 assert!(diff.seqno < upper);
935 }
936 Self {
937 lower,
938 upper,
939 diffs,
940 }
941 }
942}
943
944impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
945 fn into_proto(&self) -> ProtoInlinedDiffs {
946 ProtoInlinedDiffs {
947 lower: self.lower.into_proto(),
948 upper: self.upper.into_proto(),
949 diffs: self.diffs.into_proto(),
950 }
951 }
952
953 fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
954 Ok(Self {
955 lower: proto.lower.into_rust()?,
956 upper: proto.upper.into_rust()?,
957 diffs: proto.diffs.into_rust()?,
958 })
959 }
960}
961
962impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
963 fn into_proto(&self) -> ProtoRollup {
964 ProtoRollup {
965 applier_version: self.state.state.collections.version.to_string(),
966 shard_id: self.state.state.shard_id.into_proto(),
967 seqno: self.state.state.seqno.into_proto(),
968 walltime_ms: self.state.state.walltime_ms.into_proto(),
969 hostname: self.state.state.hostname.into_proto(),
970 key_codec: self.state.key_codec.into_proto(),
971 val_codec: self.state.val_codec.into_proto(),
972 ts_codec: T::codec_name(),
973 diff_codec: self.state.diff_codec.into_proto(),
974 last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
975 active_rollup: self.state.state.collections.active_rollup.into_proto(),
976 active_gc: self.state.state.collections.active_gc.into_proto(),
977 rollups: self
978 .state
979 .state
980 .collections
981 .rollups
982 .iter()
983 .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
984 .collect(),
985 deprecated_rollups: Default::default(),
986 leased_readers: self
987 .state
988 .state
989 .collections
990 .leased_readers
991 .iter()
992 .map(|(id, state)| (id.into_proto(), state.into_proto()))
993 .collect(),
994 critical_readers: self
995 .state
996 .state
997 .collections
998 .critical_readers
999 .iter()
1000 .map(|(id, state)| (id.into_proto(), state.into_proto()))
1001 .collect(),
1002 writers: self
1003 .state
1004 .state
1005 .collections
1006 .writers
1007 .iter()
1008 .map(|(id, state)| (id.into_proto(), state.into_proto()))
1009 .collect(),
1010 schemas: self
1011 .state
1012 .state
1013 .collections
1014 .schemas
1015 .iter()
1016 .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
1017 .collect(),
1018 trace: Some(self.state.state.collections.trace.into_proto()),
1019 diffs: self.diffs.as_ref().map(|x| x.into_proto()),
1020 }
1021 }
1022
1023 fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
1024 let applier_version = if x.applier_version.is_empty() {
1025 semver::Version::new(0, 0, 0)
1029 } else {
1030 semver::Version::parse(&x.applier_version).map_err(|err| {
1031 TryFromProtoError::InvalidSemverVersion(format!(
1032 "invalid applier_version {}: {}",
1033 x.applier_version, err
1034 ))
1035 })?
1036 };
1037
1038 let mut rollups = BTreeMap::new();
1039 for (seqno, rollup) in x.rollups {
1040 rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
1041 }
1042 for (seqno, key) in x.deprecated_rollups {
1043 rollups.insert(
1044 seqno.into_rust()?,
1045 HollowRollup {
1046 key: key.into_rust()?,
1047 encoded_size_bytes: None,
1048 },
1049 );
1050 }
1051 let mut leased_readers = BTreeMap::new();
1052 for (id, state) in x.leased_readers {
1053 leased_readers.insert(id.into_rust()?, state.into_rust()?);
1054 }
1055 let mut critical_readers = BTreeMap::new();
1056 for (id, state) in x.critical_readers {
1057 critical_readers.insert(id.into_rust()?, state.into_rust()?);
1058 }
1059 let mut writers = BTreeMap::new();
1060 for (id, state) in x.writers {
1061 writers.insert(id.into_rust()?, state.into_rust()?);
1062 }
1063 let mut schemas = BTreeMap::new();
1064 for (id, x) in x.schemas {
1065 schemas.insert(id.into_rust()?, x.into_rust()?);
1066 }
1067 let active_rollup = x
1068 .active_rollup
1069 .map(|rollup| rollup.into_rust())
1070 .transpose()?;
1071 let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
1072 let collections = StateCollections {
1073 version: applier_version.clone(),
1074 rollups,
1075 active_rollup,
1076 active_gc,
1077 last_gc_req: x.last_gc_req.into_rust()?,
1078 leased_readers,
1079 critical_readers,
1080 writers,
1081 schemas,
1082 trace: x.trace.into_rust_if_some("trace")?,
1083 };
1084 let state = State {
1085 shard_id: x.shard_id.into_rust()?,
1086 seqno: x.seqno.into_rust()?,
1087 walltime_ms: x.walltime_ms,
1088 hostname: x.hostname,
1089 collections,
1090 };
1091
1092 let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
1093 if let Some(diffs) = &diffs {
1094 if diffs.lower != state.latest_rollup().0.next() {
1095 return Err(TryFromProtoError::InvalidPersistState(format!(
1096 "diffs lower ({}) should match latest rollup's successor: ({})",
1097 diffs.lower,
1098 state.latest_rollup().0.next()
1099 )));
1100 }
1101 if diffs.upper != state.seqno.next() {
1102 return Err(TryFromProtoError::InvalidPersistState(format!(
1103 "diffs upper ({}) should match state's successor: ({})",
1104 diffs.lower,
1105 state.seqno.next()
1106 )));
1107 }
1108 }
1109
1110 Ok(Rollup {
1111 state: UntypedState {
1112 state,
1113 key_codec: x.key_codec.into_rust()?,
1114 val_codec: x.val_codec.into_rust()?,
1115 ts_codec: x.ts_codec.into_rust()?,
1116 diff_codec: x.diff_codec.into_rust()?,
1117 },
1118 diffs,
1119 })
1120 }
1121}
1122
1123impl RustType<ProtoVersionedData> for VersionedData {
1124 fn into_proto(&self) -> ProtoVersionedData {
1125 ProtoVersionedData {
1126 seqno: self.seqno.into_proto(),
1127 data: Bytes::clone(&self.data),
1128 }
1129 }
1130
1131 fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1132 Ok(Self {
1133 seqno: proto.seqno.into_rust()?,
1134 data: proto.data,
1135 })
1136 }
1137}
1138
1139impl RustType<ProtoSpineId> for SpineId {
1140 fn into_proto(&self) -> ProtoSpineId {
1141 ProtoSpineId {
1142 lo: self.0.into_proto(),
1143 hi: self.1.into_proto(),
1144 }
1145 }
1146
1147 fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1148 Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1149 }
1150}
1151
1152impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1153 fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1154 let (id, batch) = entry;
1155 ProtoIdHollowBatch {
1156 id: Some(id.into_proto()),
1157 batch: Some(batch.into_proto()),
1158 }
1159 }
1160
1161 fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1162 let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1163 let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1164 Ok((id, batch))
1165 }
1166}
1167
1168impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1169 fn into_proto(&self) -> ProtoSpineBatch {
1170 ProtoSpineBatch {
1171 desc: Some(self.desc.into_proto()),
1172 parts: self.parts.into_proto(),
1173 level: self.level.into_proto(),
1174 descs: self.descs.into_proto(),
1175 }
1176 }
1177
1178 fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1179 let level = proto.level.into_rust()?;
1180 let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1181 let parts = proto.parts.into_rust()?;
1182 let descs = proto.descs.into_rust()?;
1183 Ok(ThinSpineBatch {
1184 level,
1185 desc,
1186 parts,
1187 descs,
1188 })
1189 }
1190}
1191
1192impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1193 fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1194 let (id, batch) = entry;
1195 ProtoIdSpineBatch {
1196 id: Some(id.into_proto()),
1197 batch: Some(batch.into_proto()),
1198 }
1199 }
1200
1201 fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1202 let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1203 let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1204 Ok((id, batch))
1205 }
1206}
1207
1208impl RustType<ProtoCompaction> for ActiveCompaction {
1209 fn into_proto(&self) -> ProtoCompaction {
1210 ProtoCompaction {
1211 start_ms: self.start_ms,
1212 }
1213 }
1214
1215 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1216 Ok(Self {
1217 start_ms: proto.start_ms,
1218 })
1219 }
1220}
1221
1222impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1223 fn into_proto(&self) -> ProtoMerge {
1224 ProtoMerge {
1225 since: Some(self.since.into_proto()),
1226 remaining_work: self.remaining_work.into_proto(),
1227 active_compaction: self.active_compaction.into_proto(),
1228 }
1229 }
1230
1231 fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1232 let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1233 let remaining_work = proto.remaining_work.into_rust()?;
1234 let active_compaction = proto.active_compaction.into_rust()?;
1235 Ok(Self {
1236 since,
1237 remaining_work,
1238 active_compaction,
1239 })
1240 }
1241}
1242
1243impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1244 fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1245 ProtoIdMerge {
1246 id: Some(id.into_proto()),
1247 merge: Some(merge.into_proto()),
1248 }
1249 }
1250
1251 fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1252 let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1253 let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1254 Ok((id, merge))
1255 }
1256}
1257
1258impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1259 fn into_proto(&self) -> ProtoTrace {
1260 let since = self.since.into_proto();
1261 let legacy_batches = self
1262 .legacy_batches
1263 .iter()
1264 .map(|(b, _)| b.into_proto())
1265 .collect();
1266 let hollow_batches = self.hollow_batches.into_proto();
1267 let spine_batches = self.spine_batches.into_proto();
1268 let merges = self.merges.into_proto();
1269 ProtoTrace {
1270 since: Some(since),
1271 legacy_batches,
1272 hollow_batches,
1273 spine_batches,
1274 merges,
1275 }
1276 }
1277
1278 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1279 let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1280 let legacy_batches = proto
1281 .legacy_batches
1282 .into_iter()
1283 .map(|b| b.into_rust().map(|b| (b, ())))
1284 .collect::<Result<_, _>>()?;
1285 let hollow_batches = proto.hollow_batches.into_rust()?;
1286 let spine_batches = proto.spine_batches.into_rust()?;
1287 let merges = proto.merges.into_rust()?;
1288 Ok(FlatTrace {
1289 since,
1290 legacy_batches,
1291 hollow_batches,
1292 spine_batches,
1293 merges,
1294 })
1295 }
1296}
1297
1298impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1299 fn into_proto(&self) -> ProtoTrace {
1300 self.flatten().into_proto()
1301 }
1302
1303 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1304 Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1305 }
1306}
1307
1308impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1309 fn into_proto(&self) -> ProtoLeasedReaderState {
1310 ProtoLeasedReaderState {
1311 seqno: self.seqno.into_proto(),
1312 since: Some(self.since.into_proto()),
1313 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1314 lease_duration_ms: self.lease_duration_ms.into_proto(),
1315 debug: Some(self.debug.into_proto()),
1316 }
1317 }
1318
1319 fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1320 let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1321 if lease_duration_ms == 0 {
1326 lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1327 .expect("lease duration as millis should fit within u64");
1328 }
1329 let debug = proto.debug.unwrap_or_default().into_rust()?;
1332 Ok(LeasedReaderState {
1333 seqno: proto.seqno.into_rust()?,
1334 since: proto
1335 .since
1336 .into_rust_if_some("ProtoLeasedReaderState::since")?,
1337 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1338 lease_duration_ms,
1339 debug,
1340 })
1341 }
1342}
1343
1344impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1345 fn into_proto(&self) -> ProtoCriticalReaderState {
1346 ProtoCriticalReaderState {
1347 since: Some(self.since.into_proto()),
1348 opaque: i64::from_le_bytes(self.opaque.1),
1349 opaque_codec: self.opaque.0.clone(),
1350 debug: Some(self.debug.into_proto()),
1351 }
1352 }
1353
1354 fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1355 let debug = proto.debug.unwrap_or_default().into_rust()?;
1358 Ok(CriticalReaderState {
1359 since: proto
1360 .since
1361 .into_rust_if_some("ProtoCriticalReaderState::since")?,
1362 opaque: Opaque(proto.opaque_codec, i64::to_le_bytes(proto.opaque)),
1363 debug,
1364 })
1365 }
1366}
1367
1368impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1369 fn into_proto(&self) -> ProtoWriterState {
1370 ProtoWriterState {
1371 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1372 lease_duration_ms: self.lease_duration_ms.into_proto(),
1373 most_recent_write_token: self.most_recent_write_token.into_proto(),
1374 most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1375 debug: Some(self.debug.into_proto()),
1376 }
1377 }
1378
1379 fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1380 let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1386 IdempotencyToken::SENTINEL
1387 } else {
1388 proto.most_recent_write_token.into_rust()?
1389 };
1390 let most_recent_write_upper = match proto.most_recent_write_upper {
1391 Some(x) => x.into_rust()?,
1392 None => Antichain::from_elem(T::minimum()),
1393 };
1394 let debug = proto.debug.unwrap_or_default().into_rust()?;
1397 Ok(WriterState {
1398 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1399 lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1400 most_recent_write_token,
1401 most_recent_write_upper,
1402 debug,
1403 })
1404 }
1405}
1406
1407impl RustType<ProtoHandleDebugState> for HandleDebugState {
1408 fn into_proto(&self) -> ProtoHandleDebugState {
1409 ProtoHandleDebugState {
1410 hostname: self.hostname.into_proto(),
1411 purpose: self.purpose.into_proto(),
1412 }
1413 }
1414
1415 fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1416 Ok(HandleDebugState {
1417 hostname: proto.hostname,
1418 purpose: proto.purpose,
1419 })
1420 }
1421}
1422
1423impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1424 fn into_proto(&self) -> ProtoHollowRun {
1425 ProtoHollowRun {
1426 parts: self.parts.into_proto(),
1427 }
1428 }
1429
1430 fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1431 Ok(HollowRun {
1432 parts: proto.parts.into_rust()?,
1433 })
1434 }
1435}
1436
1437impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1438 fn into_proto(&self) -> ProtoHollowBatch {
1439 let mut run_meta = self.run_meta.into_proto();
1440 let run_meta_default = RunMeta::default().into_proto();
1442 while run_meta.last() == Some(&run_meta_default) {
1443 run_meta.pop();
1444 }
1445 ProtoHollowBatch {
1446 desc: Some(self.desc.into_proto()),
1447 parts: self.parts.into_proto(),
1448 len: self.len.into_proto(),
1449 runs: self.run_splits.into_proto(),
1450 run_meta,
1451 deprecated_keys: vec![],
1452 }
1453 }
1454
1455 fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1456 let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1457 parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1460 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1461 key: PartialBatchKey(key),
1462 meta: Default::default(),
1463 encoded_size_bytes: 0,
1464 key_lower: vec![],
1465 structured_key_lower: None,
1466 stats: None,
1467 ts_rewrite: None,
1468 diffs_sum: None,
1469 format: None,
1470 schema_id: None,
1471 deprecated_schema_id: None,
1472 }))
1473 }));
1474 let run_splits: Vec<usize> = proto.runs.into_rust()?;
1476 let num_runs = if parts.is_empty() {
1477 0
1478 } else {
1479 run_splits.len() + 1
1480 };
1481 let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1482 run_meta.resize(num_runs, RunMeta::default());
1483 Ok(HollowBatch {
1484 desc: proto.desc.into_rust_if_some("desc")?,
1485 parts,
1486 len: proto.len.into_rust()?,
1487 run_splits,
1488 run_meta,
1489 })
1490 }
1491}
1492
1493impl RustType<String> for RunId {
1494 fn into_proto(&self) -> String {
1495 self.to_string()
1496 }
1497
1498 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1499 RunId::from_str(&proto).map_err(|_| {
1500 TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1501 })
1502 }
1503}
1504
1505impl RustType<ProtoRunMeta> for RunMeta {
1506 fn into_proto(&self) -> ProtoRunMeta {
1507 let order = match self.order {
1508 None => ProtoRunOrder::Unknown,
1509 Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1510 Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1511 Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1512 };
1513 ProtoRunMeta {
1514 order: order.into(),
1515 schema_id: self.schema.into_proto(),
1516 deprecated_schema_id: self.deprecated_schema.into_proto(),
1517 id: self.id.into_proto(),
1518 len: self.len.into_proto(),
1519 meta: self.meta.into_proto(),
1520 }
1521 }
1522
1523 fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1524 let order = match ProtoRunOrder::try_from(proto.order)? {
1525 ProtoRunOrder::Unknown => None,
1526 ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1527 ProtoRunOrder::Codec => Some(RunOrder::Codec),
1528 ProtoRunOrder::Structured => Some(RunOrder::Structured),
1529 };
1530 Ok(Self {
1531 order,
1532 schema: proto.schema_id.into_rust()?,
1533 deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1534 id: proto.id.into_rust()?,
1535 len: proto.len.into_rust()?,
1536 meta: proto.meta.into_rust()?,
1537 })
1538 }
1539}
1540
1541impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1542 fn into_proto(&self) -> ProtoHollowBatchPart {
1543 match self {
1544 RunPart::Single(part) => part.into_proto(),
1545 RunPart::Many(runs) => runs.into_proto(),
1546 }
1547 }
1548
1549 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1550 let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1551 RunPart::Many(proto.into_rust()?)
1552 } else {
1553 RunPart::Single(proto.into_rust()?)
1554 };
1555 Ok(run_part)
1556 }
1557}
1558
1559impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1560 fn into_proto(&self) -> ProtoHollowBatchPart {
1561 let part = ProtoHollowBatchPart {
1562 kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1563 key: self.key.into_proto(),
1564 max_part_bytes: self.max_part_bytes.into_proto(),
1565 })),
1566 encoded_size_bytes: self.hollow_bytes.into_proto(),
1567 key_lower: Bytes::copy_from_slice(&self.key_lower),
1568 diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1569 key_stats: None,
1570 ts_rewrite: None,
1571 format: None,
1572 schema_id: None,
1573 structured_key_lower: self.structured_key_lower.into_proto(),
1574 deprecated_schema_id: None,
1575 metadata: BTreeMap::default(),
1576 };
1577 part
1578 }
1579
1580 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1581 let run_proto = match proto.kind {
1582 Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1583 _ => Err(TryFromProtoError::UnknownEnumVariant(
1584 "ProtoHollowBatchPart::kind".to_string(),
1585 ))?,
1586 };
1587 Ok(Self {
1588 key: run_proto.key.into_rust()?,
1589 hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1590 max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1591 key_lower: proto.key_lower.to_vec(),
1592 structured_key_lower: proto.structured_key_lower.into_rust()?,
1593 diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1594 _phantom_data: Default::default(),
1595 })
1596 }
1597}
1598
1599impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1600 fn into_proto(&self) -> ProtoHollowBatchPart {
1601 match self {
1602 BatchPart::Hollow(x) => ProtoHollowBatchPart {
1603 kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1604 encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1605 key_lower: Bytes::copy_from_slice(&x.key_lower),
1606 structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1607 key_stats: x.stats.into_proto(),
1608 ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1609 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1610 format: x.format.map(|f| f.into_proto()),
1611 schema_id: x.schema_id.into_proto(),
1612 deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1613 metadata: BTreeMap::default(),
1614 },
1615 BatchPart::Inline {
1616 updates,
1617 ts_rewrite,
1618 schema_id,
1619 deprecated_schema_id,
1620 } => ProtoHollowBatchPart {
1621 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1622 encoded_size_bytes: 0,
1623 key_lower: Bytes::new(),
1624 structured_key_lower: None,
1625 key_stats: None,
1626 ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1627 diffs_sum: None,
1628 format: None,
1629 schema_id: schema_id.into_proto(),
1630 deprecated_schema_id: deprecated_schema_id.into_proto(),
1631 metadata: BTreeMap::default(),
1632 },
1633 }
1634 }
1635
1636 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1637 let ts_rewrite = match proto.ts_rewrite {
1638 Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1639 None => None,
1640 };
1641 let schema_id = proto.schema_id.into_rust()?;
1642 let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1643 match proto.kind {
1644 Some(proto_hollow_batch_part::Kind::Key(key)) => {
1645 Ok(BatchPart::Hollow(HollowBatchPart {
1646 key: key.into_rust()?,
1647 meta: proto.metadata.into_rust()?,
1648 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1649 key_lower: proto.key_lower.into(),
1650 structured_key_lower: proto.structured_key_lower.into_rust()?,
1651 stats: proto.key_stats.into_rust()?,
1652 ts_rewrite,
1653 diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1654 format: proto.format.map(|f| f.into_rust()).transpose()?,
1655 schema_id,
1656 deprecated_schema_id,
1657 }))
1658 }
1659 Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1660 assert_eq!(proto.encoded_size_bytes, 0);
1661 assert_eq!(proto.key_lower.len(), 0);
1662 assert_none!(proto.key_stats);
1663 assert_none!(proto.diffs_sum);
1664 let updates = LazyInlineBatchPart(x.into_rust()?);
1665 Ok(BatchPart::Inline {
1666 updates,
1667 ts_rewrite,
1668 schema_id,
1669 deprecated_schema_id,
1670 })
1671 }
1672 _ => Err(TryFromProtoError::unknown_enum_variant(
1673 "ProtoHollowBatchPart::kind",
1674 )),
1675 }
1676 }
1677}
1678
1679impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1680 fn into_proto(&self) -> proto_hollow_batch_part::Format {
1681 match self {
1682 BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1683 BatchColumnarFormat::Both(version) => {
1684 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1685 }
1686 BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1687 }
1688 }
1689
1690 fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1691 let format = match proto {
1692 proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1693 proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1694 BatchColumnarFormat::Both(version.cast_into())
1695 }
1696 proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1697 };
1698 Ok(format)
1699 }
1700}
1701
1702#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1707pub struct LazyPartStats {
1708 key: LazyProto<ProtoStructStats>,
1709}
1710
1711impl Debug for LazyPartStats {
1712 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1713 f.debug_tuple("LazyPartStats")
1714 .field(&self.decode())
1715 .finish()
1716 }
1717}
1718
1719impl LazyPartStats {
1720 pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1721 let PartStats { key } = x;
1722 let mut proto_stats = ProtoStructStats::from_rust(key);
1723 map_proto(&mut proto_stats);
1724 LazyPartStats {
1725 key: LazyProto::from(&proto_stats),
1726 }
1727 }
1728 pub fn decode(&self) -> PartStats {
1733 let key = self.key.decode().expect("valid proto");
1734 PartStats {
1735 key: key.into_rust().expect("valid stats"),
1736 }
1737 }
1738}
1739
1740impl RustType<Bytes> for LazyPartStats {
1741 fn into_proto(&self) -> Bytes {
1742 let LazyPartStats { key } = self;
1743 key.into_proto()
1744 }
1745
1746 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1747 Ok(LazyPartStats {
1748 key: proto.into_rust()?,
1749 })
1750 }
1751}
1752
1753#[cfg(test)]
1754pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1755 proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1756}
1757
1758#[allow(unused_parens)]
1759impl Arbitrary for LazyPartStats {
1760 type Parameters = ();
1761 type Strategy =
1762 proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1763
1764 fn arbitrary_with(_: ()) -> Self::Strategy {
1765 Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1766 LazyPartStats::encode(&x, |_| {})
1767 })
1768 }
1769}
1770
1771impl ProtoInlineBatchPart {
1772 pub(crate) fn into_rust<T: Timestamp + Codec64>(
1773 lgbytes: &ColumnarMetrics,
1774 proto: Self,
1775 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1776 let updates = proto
1777 .updates
1778 .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1779 let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1780
1781 Ok(BlobTraceBatchPart {
1782 desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1783 index: proto.index.into_rust()?,
1784 updates,
1785 })
1786 }
1787}
1788
1789#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1791pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1792
1793impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1794 fn from(value: &ProtoInlineBatchPart) -> Self {
1795 LazyInlineBatchPart(value.into())
1796 }
1797}
1798
1799impl Serialize for LazyInlineBatchPart {
1800 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1801 let proto = self.0.decode().expect("valid proto");
1804 let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1805 let () = s.serialize_field("desc", &proto.desc)?;
1806 let () = s.serialize_field("index", &proto.index)?;
1807 let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1808 s.end()
1809 }
1810}
1811
1812impl LazyInlineBatchPart {
1813 pub(crate) fn encoded_size_bytes(&self) -> usize {
1814 self.0.buf.len()
1815 }
1816
1817 pub fn decode<T: Timestamp + Codec64>(
1823 &self,
1824 lgbytes: &ColumnarMetrics,
1825 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1826 let proto = self.0.decode().expect("valid proto");
1827 ProtoInlineBatchPart::into_rust(lgbytes, proto)
1828 }
1829}
1830
1831impl RustType<Bytes> for LazyInlineBatchPart {
1832 fn into_proto(&self) -> Bytes {
1833 self.0.into_proto()
1834 }
1835
1836 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1837 Ok(LazyInlineBatchPart(proto.into_rust()?))
1838 }
1839}
1840
1841impl RustType<ProtoHollowRollup> for HollowRollup {
1842 fn into_proto(&self) -> ProtoHollowRollup {
1843 ProtoHollowRollup {
1844 key: self.key.into_proto(),
1845 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1846 }
1847 }
1848
1849 fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1850 Ok(HollowRollup {
1851 key: proto.key.into_rust()?,
1852 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1853 })
1854 }
1855}
1856
1857impl RustType<ProtoActiveRollup> for ActiveRollup {
1858 fn into_proto(&self) -> ProtoActiveRollup {
1859 ProtoActiveRollup {
1860 start_ms: self.start_ms,
1861 seqno: self.seqno.into_proto(),
1862 }
1863 }
1864
1865 fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1866 Ok(ActiveRollup {
1867 start_ms: proto.start_ms,
1868 seqno: proto.seqno.into_rust()?,
1869 })
1870 }
1871}
1872
1873impl RustType<ProtoActiveGc> for ActiveGc {
1874 fn into_proto(&self) -> ProtoActiveGc {
1875 ProtoActiveGc {
1876 start_ms: self.start_ms,
1877 seqno: self.seqno.into_proto(),
1878 }
1879 }
1880
1881 fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1882 Ok(ActiveGc {
1883 start_ms: proto.start_ms,
1884 seqno: proto.seqno.into_rust()?,
1885 })
1886 }
1887}
1888
1889impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1890 fn into_proto(&self) -> ProtoU64Description {
1891 ProtoU64Description {
1892 lower: Some(self.lower().into_proto()),
1893 upper: Some(self.upper().into_proto()),
1894 since: Some(self.since().into_proto()),
1895 }
1896 }
1897
1898 fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1899 Ok(Description::new(
1900 proto.lower.into_rust_if_some("lower")?,
1901 proto.upper.into_rust_if_some("upper")?,
1902 proto.since.into_rust_if_some("since")?,
1903 ))
1904 }
1905}
1906
1907impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1908 fn into_proto(&self) -> ProtoU64Antichain {
1909 ProtoU64Antichain {
1910 elements: self
1911 .elements()
1912 .iter()
1913 .map(|x| i64::from_le_bytes(T::encode(x)))
1914 .collect(),
1915 }
1916 }
1917
1918 fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1919 let elements = proto
1920 .elements
1921 .iter()
1922 .map(|x| T::decode(x.to_le_bytes()))
1923 .collect::<Vec<_>>();
1924 Ok(Antichain::from(elements))
1925 }
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930 use bytes::Bytes;
1931 use mz_build_info::DUMMY_BUILD_INFO;
1932 use mz_dyncfg::ConfigUpdates;
1933 use mz_ore::assert_err;
1934 use mz_persist::location::SeqNo;
1935 use proptest::prelude::*;
1936
1937 use crate::ShardId;
1938 use crate::internal::paths::PartialRollupKey;
1939 use crate::internal::state::tests::any_state;
1940 use crate::internal::state::{BatchPart, HandleDebugState};
1941 use crate::internal::state_diff::StateDiff;
1942 use crate::tests::new_test_client_cache;
1943
1944 use super::*;
1945
1946 #[mz_ore::test]
1947 fn metadata_map() {
1948 const COUNT: MetadataKey<u64> = MetadataKey::new("count");
1949
1950 let mut map = MetadataMap::default();
1951 map.set(COUNT, 100);
1952 let mut map = MetadataMap::from_proto(map.into_proto()).unwrap();
1953 assert_eq!(map.get(COUNT), Some(100));
1954
1955 const ANTICHAIN: MetadataKey<Antichain<u64>, ProtoU64Antichain> =
1956 MetadataKey::new("antichain");
1957 assert_none!(map.get(ANTICHAIN));
1958
1959 map.set(ANTICHAIN, Antichain::from_elem(30));
1960 let map = MetadataMap::from_proto(map.into_proto()).unwrap();
1961 assert_eq!(map.get(COUNT), Some(100));
1962 assert_eq!(map.get(ANTICHAIN), Some(Antichain::from_elem(30)));
1963 }
1964
1965 #[mz_ore::test]
1966 fn applier_version_state() {
1967 let v1 = semver::Version::new(1, 0, 0);
1968 let v2 = semver::Version::new(2, 0, 0);
1969 let v3 = semver::Version::new(3, 0, 0);
1970
1971 let shard_id = ShardId::new();
1973 let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1974 let rollup =
1975 Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1976 let mut buf = Vec::new();
1977 rollup.encode(&mut buf).expect("serializable");
1978 let bytes = Bytes::from(buf);
1979
1980 assert_eq!(
1982 UntypedState::<u64>::decode(&v2, bytes.clone())
1983 .check_codecs(&shard_id)
1984 .as_ref(),
1985 Ok(&state)
1986 );
1987 assert_eq!(
1988 UntypedState::<u64>::decode(&v3, bytes.clone())
1989 .check_codecs(&shard_id)
1990 .as_ref(),
1991 Ok(&state)
1992 );
1993
1994 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
1999 assert_err!(v1_res);
2000 }
2001
2002 #[mz_ore::test]
2003 fn applier_version_state_diff() {
2004 let v1 = semver::Version::new(1, 0, 0);
2005 let v2 = semver::Version::new(2, 0, 0);
2006 let v3 = semver::Version::new(3, 0, 0);
2007
2008 let diff = StateDiff::<u64>::new(
2010 v2.clone(),
2011 SeqNo(0),
2012 SeqNo(1),
2013 2,
2014 PartialRollupKey("rollup".into()),
2015 );
2016 let mut buf = Vec::new();
2017 diff.encode(&mut buf);
2018 let bytes = Bytes::from(buf);
2019
2020 assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
2022 assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
2023
2024 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
2029 assert_err!(v1_res);
2030 }
2031
2032 #[mz_ore::test]
2033 fn hollow_batch_migration_keys() {
2034 let x = HollowBatch::new_run(
2035 Description::new(
2036 Antichain::from_elem(1u64),
2037 Antichain::from_elem(2u64),
2038 Antichain::from_elem(3u64),
2039 ),
2040 vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2041 key: PartialBatchKey("a".into()),
2042 meta: Default::default(),
2043 encoded_size_bytes: 5,
2044 key_lower: vec![],
2045 structured_key_lower: None,
2046 stats: None,
2047 ts_rewrite: None,
2048 diffs_sum: None,
2049 format: None,
2050 schema_id: None,
2051 deprecated_schema_id: None,
2052 }))],
2053 4,
2054 );
2055 let mut old = x.into_proto();
2056 old.deprecated_keys = vec!["b".into()];
2058 let mut expected = x;
2062 expected
2067 .parts
2068 .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2069 key: PartialBatchKey("b".into()),
2070 meta: Default::default(),
2071 encoded_size_bytes: 0,
2072 key_lower: vec![],
2073 structured_key_lower: None,
2074 stats: None,
2075 ts_rewrite: None,
2076 diffs_sum: None,
2077 format: None,
2078 schema_id: None,
2079 deprecated_schema_id: None,
2080 })));
2081 assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
2082 }
2083
2084 #[mz_ore::test]
2085 fn reader_state_migration_lease_duration() {
2086 let x = LeasedReaderState {
2087 seqno: SeqNo(1),
2088 since: Antichain::from_elem(2u64),
2089 last_heartbeat_timestamp_ms: 3,
2090 debug: HandleDebugState {
2091 hostname: "host".to_owned(),
2092 purpose: "purpose".to_owned(),
2093 },
2094 lease_duration_ms: 0,
2096 };
2097 let old = x.into_proto();
2098 let mut expected = x;
2099 expected.lease_duration_ms =
2102 u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
2103 assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
2104 }
2105
2106 #[mz_ore::test]
2107 fn writer_state_migration_most_recent_write() {
2108 let proto = ProtoWriterState {
2109 last_heartbeat_timestamp_ms: 1,
2110 lease_duration_ms: 2,
2111 most_recent_write_token: "".into(),
2114 most_recent_write_upper: None,
2115 debug: Some(ProtoHandleDebugState {
2116 hostname: "host".to_owned(),
2117 purpose: "purpose".to_owned(),
2118 }),
2119 };
2120 let expected = WriterState {
2121 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
2122 lease_duration_ms: proto.lease_duration_ms,
2123 most_recent_write_token: IdempotencyToken::SENTINEL,
2124 most_recent_write_upper: Antichain::from_elem(0),
2125 debug: HandleDebugState {
2126 hostname: "host".to_owned(),
2127 purpose: "purpose".to_owned(),
2128 },
2129 };
2130 assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2131 }
2132
2133 #[mz_ore::test]
2134 fn state_migration_rollups() {
2135 let r1 = HollowRollup {
2136 key: PartialRollupKey("foo".to_owned()),
2137 encoded_size_bytes: None,
2138 };
2139 let r2 = HollowRollup {
2140 key: PartialRollupKey("bar".to_owned()),
2141 encoded_size_bytes: Some(2),
2142 };
2143 let shard_id = ShardId::new();
2144 let mut state = TypedState::<(), (), u64, i64>::new(
2145 DUMMY_BUILD_INFO.semver_version(),
2146 shard_id,
2147 "host".to_owned(),
2148 0,
2149 );
2150 state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2151 let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2152
2153 proto.deprecated_rollups.insert(1, r1.key.0.clone());
2155
2156 let state: Rollup<u64> = proto.into_rust().unwrap();
2157 let state = state.state;
2158 let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2159 let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2160 assert_eq!(
2161 state
2162 .state
2163 .collections
2164 .rollups
2165 .into_iter()
2166 .collect::<Vec<_>>(),
2167 expected
2168 );
2169 }
2170
2171 #[mz_persist_proc::test(tokio::test)]
2172 #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2174 let r1_rollup = HollowRollup {
2175 key: PartialRollupKey("foo".to_owned()),
2176 encoded_size_bytes: None,
2177 };
2178 let r1 = StateFieldDiff {
2179 key: SeqNo(1),
2180 val: StateFieldValDiff::Insert(r1_rollup.clone()),
2181 };
2182 let r2_rollup = HollowRollup {
2183 key: PartialRollupKey("bar".to_owned()),
2184 encoded_size_bytes: Some(2),
2185 };
2186 let r2 = StateFieldDiff {
2187 key: SeqNo(2),
2188 val: StateFieldValDiff::Insert(r2_rollup.clone()),
2189 };
2190 let r3_rollup = HollowRollup {
2191 key: PartialRollupKey("baz".to_owned()),
2192 encoded_size_bytes: None,
2193 };
2194 let r3 = StateFieldDiff {
2195 key: SeqNo(3),
2196 val: StateFieldValDiff::Delete(r3_rollup.clone()),
2197 };
2198 let mut diff = StateDiff::<u64>::new(
2199 DUMMY_BUILD_INFO.semver_version(),
2200 SeqNo(4),
2201 SeqNo(5),
2202 0,
2203 PartialRollupKey("ignored".to_owned()),
2204 );
2205 diff.rollups.push(r2.clone());
2206 diff.rollups.push(r3.clone());
2207 let mut diff_proto = diff.into_proto();
2208
2209 let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2210 let mut field_diffs_writer = field_diffs.into_writer();
2211
2212 field_diffs_into_proto(
2214 ProtoStateField::DeprecatedRollups,
2215 &[StateFieldDiff {
2216 key: r1.key,
2217 val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2218 }],
2219 &mut field_diffs_writer,
2220 );
2221
2222 assert_none!(diff_proto.field_diffs);
2223 diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2224
2225 let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2226 assert_eq!(
2227 diff.rollups.into_iter().collect::<Vec<_>>(),
2228 vec![r2, r3, r1]
2229 );
2230
2231 let shard_id = ShardId::new();
2234 let mut state = TypedState::<(), (), u64, i64>::new(
2235 DUMMY_BUILD_INFO.semver_version(),
2236 shard_id,
2237 "host".to_owned(),
2238 0,
2239 );
2240 state.state.seqno = SeqNo(4);
2241 let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2242 rollup
2243 .deprecated_rollups
2244 .insert(3, r3_rollup.key.into_proto());
2245 let state: Rollup<u64> = rollup.into_rust().unwrap();
2246 let state = state.state;
2247 let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2248 let cache = new_test_client_cache(&dyncfgs);
2249 let encoded_diff = VersionedData {
2250 seqno: SeqNo(5),
2251 data: diff_proto.encode_to_vec().into(),
2252 };
2253 state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2254 assert_eq!(
2255 state
2256 .state
2257 .collections
2258 .rollups
2259 .into_iter()
2260 .collect::<Vec<_>>(),
2261 vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2262 );
2263 }
2264
2265 #[mz_ore::test]
2266 #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2268 fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2269 let before = UntypedState {
2270 key_codec: <() as Codec>::codec_name(),
2271 val_codec: <() as Codec>::codec_name(),
2272 ts_codec: <T as Codec64>::codec_name(),
2273 diff_codec: <i64 as Codec64>::codec_name(),
2274 state,
2275 };
2276 let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2277 let after: Rollup<T> = proto.into_rust().unwrap();
2278 let after = after.state;
2279 assert_eq!(before, after);
2280 }
2281
2282 proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2283 }
2284
2285 #[mz_ore::test]
2286 fn check_data_versions() {
2287 #[track_caller]
2288 fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2289 let code = Version::parse(code).unwrap();
2290 let data = Version::parse(data).unwrap();
2291 #[allow(clippy::disallowed_methods)]
2292 let actual = cfg::code_can_write_data(&code, &data)
2293 .then_some(())
2294 .ok_or(());
2295 assert_eq!(actual, expected, "data at {data} read by code {code}");
2296 }
2297
2298 testcase("0.160.0-dev", "0.160.0-dev", Ok(()));
2299 testcase("0.160.0-dev", "0.160.0", Err(()));
2300 testcase("0.160.0-dev", "0.161.0-dev", Err(()));
2303 testcase("0.160.0-dev", "0.161.0", Err(()));
2304 testcase("0.160.0-dev", "0.162.0-dev", Err(()));
2305 testcase("0.160.0-dev", "0.162.0", Err(()));
2306 testcase("0.160.0-dev", "0.163.0-dev", Err(()));
2307
2308 testcase("0.160.0", "0.158.0-dev", Ok(()));
2309 testcase("0.160.0", "0.158.0", Ok(()));
2310 testcase("0.160.0", "0.159.0-dev", Ok(()));
2311 testcase("0.160.0", "0.159.0", Ok(()));
2312 testcase("0.160.0", "0.160.0-dev", Ok(()));
2313 testcase("0.160.0", "0.160.0", Ok(()));
2314
2315 testcase("0.160.0", "0.161.0-dev", Err(()));
2316 testcase("0.160.0", "0.161.0", Err(()));
2317 testcase("0.160.0", "0.161.1", Err(()));
2318 testcase("0.160.0", "0.161.1000000", Err(()));
2319 testcase("0.160.0", "0.162.0-dev", Err(()));
2320 testcase("0.160.0", "0.162.0", Err(()));
2321 testcase("0.160.0", "0.163.0-dev", Err(()));
2322
2323 testcase("0.160.1", "0.159.0", Ok(()));
2324 testcase("0.160.1", "0.160.0", Ok(()));
2325 testcase("0.160.1", "0.161.0", Err(()));
2326 testcase("0.160.1", "0.161.1", Err(()));
2327 testcase("0.160.1", "0.161.100", Err(()));
2328 testcase("0.160.0", "0.160.1", Err(()));
2329
2330 testcase("0.160.1", "26.0.0", Err(()));
2331 testcase("26.0.0", "0.160.1", Ok(()));
2332 testcase("26.2.0", "0.160.1", Ok(()));
2333 testcase("26.200.200", "0.160.1", Ok(()));
2334
2335 testcase("27.0.0", "0.160.1", Err(()));
2336 testcase("27.0.0", "0.16000.1", Err(()));
2337 testcase("27.0.0", "26.0.1", Ok(()));
2338 testcase("27.1000.100", "26.0.1", Ok(()));
2339 testcase("28.0.0", "26.0.1", Err(()));
2340 testcase("28.0.0", "26.1000.1", Err(()));
2341 testcase("28.0.0", "27.0.0", Ok(()));
2342 }
2343}