1use std::cmp::Ordering;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Formatter};
13use std::hash::{Hash, Hasher};
14use std::marker::PhantomData;
15use std::str::FromStr;
16use std::sync::Arc;
17
18use bytes::{Buf, Bytes};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::Description;
21use mz_ore::cast::CastInto;
22use mz_ore::{assert_none, halt, soft_panic_or_log};
23use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
24use mz_persist::location::{SeqNo, VersionedData};
25use mz_persist::metrics::ColumnarMetrics;
26use mz_persist_types::schema::SchemaId;
27use mz_persist_types::stats::{PartStats, ProtoStructStats};
28use mz_persist_types::{Codec, Codec64};
29use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
30use proptest::prelude::Arbitrary;
31use proptest::strategy::Strategy;
32use prost::Message;
33use semver::Version;
34use serde::ser::SerializeStruct;
35use serde::{Deserialize, Serialize, Serializer};
36use timely::progress::{Antichain, Timestamp};
37use uuid::Uuid;
38
39use crate::critical::CriticalReaderId;
40use crate::error::{CodecMismatch, CodecMismatchT};
41use crate::internal::metrics::Metrics;
42use crate::internal::paths::{PartialBatchKey, PartialRollupKey};
43use crate::internal::state::{
44 ActiveGc, ActiveRollup, BatchPart, CriticalReaderState, EncodedSchemas, HandleDebugState,
45 HollowBatch, HollowBatchPart, HollowRollup, HollowRun, HollowRunRef, IdempotencyToken,
46 LeasedReaderState, OpaqueState, ProtoActiveGc, ProtoActiveRollup, ProtoCompaction,
47 ProtoCriticalReaderState, ProtoEncodedSchemas, ProtoHandleDebugState, ProtoHollowBatch,
48 ProtoHollowBatchPart, ProtoHollowRollup, ProtoHollowRun, ProtoHollowRunRef, ProtoIdHollowBatch,
49 ProtoIdMerge, ProtoIdSpineBatch, ProtoInlineBatchPart, ProtoInlinedDiffs,
50 ProtoLeasedReaderState, ProtoMerge, ProtoRollup, ProtoRunMeta, ProtoRunOrder, ProtoSpineBatch,
51 ProtoSpineId, ProtoStateDiff, ProtoStateField, ProtoStateFieldDiffType, ProtoStateFieldDiffs,
52 ProtoTrace, ProtoU64Antichain, ProtoU64Description, ProtoVersionedData, ProtoWriterState,
53 RunId, RunMeta, RunOrder, RunPart, State, StateCollections, TypedState, WriterState,
54 proto_hollow_batch_part,
55};
56use crate::internal::state_diff::{
57 ProtoStateFieldDiff, ProtoStateFieldDiffsWriter, StateDiff, StateFieldDiff, StateFieldValDiff,
58};
59use crate::internal::trace::{
60 ActiveCompaction, FlatTrace, SpineId, ThinMerge, ThinSpineBatch, Trace,
61};
62use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
63use crate::{PersistConfig, ShardId, WriterId, cfg};
64
65#[derive(Debug)]
67pub struct Schemas<K: Codec, V: Codec> {
68 pub id: Option<SchemaId>,
71 pub key: Arc<K::Schema>,
73 pub val: Arc<V::Schema>,
75}
76
77impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
78 fn clone(&self) -> Self {
79 Self {
80 id: self.id,
81 key: Arc::clone(&self.key),
82 val: Arc::clone(&self.val),
83 }
84 }
85}
86
87#[derive(Clone, Serialize, Deserialize)]
110pub struct LazyProto<T> {
111 buf: Bytes,
112 _phantom: PhantomData<fn() -> T>,
113}
114
115impl<T: Message + Default> Debug for LazyProto<T> {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 match self.decode() {
118 Ok(proto) => Debug::fmt(&proto, f),
119 Err(err) => f
120 .debug_struct(&format!("LazyProto<{}>", std::any::type_name::<T>()))
121 .field("err", &err)
122 .finish(),
123 }
124 }
125}
126
127impl<T> PartialEq for LazyProto<T> {
128 fn eq(&self, other: &Self) -> bool {
129 self.cmp(other) == Ordering::Equal
130 }
131}
132
133impl<T> Eq for LazyProto<T> {}
134
135impl<T> PartialOrd for LazyProto<T> {
136 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
137 Some(self.cmp(other))
138 }
139}
140
141impl<T> Ord for LazyProto<T> {
142 fn cmp(&self, other: &Self) -> Ordering {
143 let LazyProto {
144 buf: self_buf,
145 _phantom: _,
146 } = self;
147 let LazyProto {
148 buf: other_buf,
149 _phantom: _,
150 } = other;
151 self_buf.cmp(other_buf)
152 }
153}
154
155impl<T> Hash for LazyProto<T> {
156 fn hash<H: Hasher>(&self, state: &mut H) {
157 let LazyProto { buf, _phantom } = self;
158 buf.hash(state);
159 }
160}
161
162impl<T: Message + Default> From<&T> for LazyProto<T> {
163 fn from(value: &T) -> Self {
164 let buf = Bytes::from(value.encode_to_vec());
165 LazyProto {
166 buf,
167 _phantom: PhantomData,
168 }
169 }
170}
171
172impl<T: Message + Default> LazyProto<T> {
173 pub fn decode(&self) -> Result<T, prost::DecodeError> {
174 T::decode(&*self.buf)
175 }
176
177 pub fn decode_to<R: RustType<T>>(&self) -> anyhow::Result<R> {
178 Ok(T::decode(&*self.buf)?.into_rust()?)
179 }
180}
181
182impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
183 fn into_proto(&self) -> Bytes {
184 self.buf.clone()
185 }
186
187 fn from_proto(buf: Bytes) -> Result<Self, TryFromProtoError> {
188 Ok(Self {
189 buf,
190 _phantom: PhantomData,
191 })
192 }
193}
194
195#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
207pub(crate) struct MetadataMap(BTreeMap<String, Bytes>);
208
209#[allow(unused)]
214#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
215pub(crate) struct MetadataKey<V, P = V> {
216 name: &'static str,
217 type_: PhantomData<(V, P)>,
218}
219
220impl<V, P> MetadataKey<V, P> {
221 #[allow(unused)]
222 pub(crate) const fn new(name: &'static str) -> Self {
223 MetadataKey {
224 name,
225 type_: PhantomData,
226 }
227 }
228}
229
230impl serde::Serialize for MetadataMap {
231 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
232 where
233 S: Serializer,
234 {
235 serializer.collect_map(self.0.iter())
236 }
237}
238
239impl MetadataMap {
240 pub fn is_empty(&self) -> bool {
242 self.0.is_empty()
243 }
244
245 #[allow(unused)]
247 pub fn set<V: RustType<P>, P: prost::Message>(&mut self, key: MetadataKey<V, P>, value: V) {
248 self.0.insert(
249 String::from(key.name),
250 Bytes::from(value.into_proto_owned().encode_to_vec()),
251 );
252 }
253
254 #[allow(unused)]
256 pub fn get<V: RustType<P>, P: prost::Message + Default>(
257 &self,
258 key: MetadataKey<V, P>,
259 ) -> Option<V> {
260 let proto = match P::decode(self.0.get(key.name)?.as_ref()) {
261 Ok(decoded) => decoded,
262 Err(err) => {
263 soft_panic_or_log!(
265 "error when decoding {key}; was it redefined? {err}",
266 key = key.name
267 );
268 return None;
269 }
270 };
271
272 match proto.into_rust() {
273 Ok(proto) => Some(proto),
274 Err(err) => {
275 soft_panic_or_log!(
277 "error when decoding {key}; was it redefined? {err}",
278 key = key.name
279 );
280 None
281 }
282 }
283 }
284}
285impl RustType<BTreeMap<String, Bytes>> for MetadataMap {
286 fn into_proto(&self) -> BTreeMap<String, Bytes> {
287 self.0.clone()
288 }
289 fn from_proto(proto: BTreeMap<String, Bytes>) -> Result<Self, TryFromProtoError> {
290 Ok(MetadataMap(proto))
291 }
292}
293
294pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
295 let uuid_encoded = match encoded.strip_prefix(id_prefix) {
296 Some(x) => x,
297 None => return Err(format!("invalid {} {}: incorrect prefix", id_type, encoded)),
298 };
299 let uuid = Uuid::parse_str(uuid_encoded)
300 .map_err(|err| format!("invalid {} {}: {}", id_type, encoded, err))?;
301 Ok(*uuid.as_bytes())
302}
303
304pub(crate) fn assert_code_can_read_data(code_version: &Version, data_version: &Version) {
305 if !cfg::code_can_read_data(code_version, data_version) {
306 if cfg!(test) {
309 panic!("code at version {code_version} cannot read data with version {data_version}");
310 } else {
311 halt!("code at version {code_version} cannot read data with version {data_version}");
312 }
313 }
314}
315
316impl RustType<String> for LeasedReaderId {
317 fn into_proto(&self) -> String {
318 self.to_string()
319 }
320
321 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
322 match proto.parse() {
323 Ok(x) => Ok(x),
324 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
325 }
326 }
327}
328
329impl RustType<String> for CriticalReaderId {
330 fn into_proto(&self) -> String {
331 self.to_string()
332 }
333
334 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
335 match proto.parse() {
336 Ok(x) => Ok(x),
337 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
338 }
339 }
340}
341
342impl RustType<String> for WriterId {
343 fn into_proto(&self) -> String {
344 self.to_string()
345 }
346
347 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
348 match proto.parse() {
349 Ok(x) => Ok(x),
350 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
351 }
352 }
353}
354
355impl RustType<ProtoEncodedSchemas> for EncodedSchemas {
356 fn into_proto(&self) -> ProtoEncodedSchemas {
357 ProtoEncodedSchemas {
358 key: self.key.clone(),
359 key_data_type: self.key_data_type.clone(),
360 val: self.val.clone(),
361 val_data_type: self.val_data_type.clone(),
362 }
363 }
364
365 fn from_proto(proto: ProtoEncodedSchemas) -> Result<Self, TryFromProtoError> {
366 Ok(EncodedSchemas {
367 key: proto.key,
368 key_data_type: proto.key_data_type,
369 val: proto.val,
370 val_data_type: proto.val_data_type,
371 })
372 }
373}
374
375impl RustType<String> for IdempotencyToken {
376 fn into_proto(&self) -> String {
377 self.to_string()
378 }
379
380 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
381 match proto.parse() {
382 Ok(x) => Ok(x),
383 Err(_) => Err(TryFromProtoError::InvalidShardId(proto)),
384 }
385 }
386}
387
388impl RustType<String> for PartialBatchKey {
389 fn into_proto(&self) -> String {
390 self.0.clone()
391 }
392
393 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
394 Ok(PartialBatchKey(proto))
395 }
396}
397
398impl RustType<String> for PartialRollupKey {
399 fn into_proto(&self) -> String {
400 self.0.clone()
401 }
402
403 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
404 Ok(PartialRollupKey(proto))
405 }
406}
407
408impl<T: Timestamp + Lattice + Codec64> StateDiff<T> {
409 pub fn encode<B>(&self, buf: &mut B)
410 where
411 B: bytes::BufMut,
412 {
413 self.into_proto()
414 .encode(buf)
415 .expect("no required fields means no initialization errors");
416 }
417
418 pub fn decode(build_version: &Version, buf: Bytes) -> Self {
419 let proto = ProtoStateDiff::decode(buf)
420 .expect("internal error: invalid encoded state");
425 let diff = Self::from_proto(proto).expect("internal error: invalid encoded state");
426 assert_code_can_read_data(build_version, &diff.applier_version);
427 diff
428 }
429}
430
431impl<T: Timestamp + Codec64> RustType<ProtoStateDiff> for StateDiff<T> {
432 fn into_proto(&self) -> ProtoStateDiff {
433 let StateDiff {
435 applier_version,
436 seqno_from,
437 seqno_to,
438 walltime_ms,
439 latest_rollup_key,
440 rollups,
441 active_rollup,
442 active_gc,
443 hostname,
444 last_gc_req,
445 leased_readers,
446 critical_readers,
447 writers,
448 schemas,
449 since,
450 legacy_batches,
451 hollow_batches,
452 spine_batches,
453 merges,
454 } = self;
455
456 let proto = ProtoStateFieldDiffs::default();
457
458 let mut writer = proto.into_writer();
460
461 field_diffs_into_proto(ProtoStateField::Hostname, hostname, &mut writer);
462 field_diffs_into_proto(ProtoStateField::LastGcReq, last_gc_req, &mut writer);
463 field_diffs_into_proto(ProtoStateField::Rollups, rollups, &mut writer);
464 field_diffs_into_proto(ProtoStateField::ActiveRollup, active_rollup, &mut writer);
465 field_diffs_into_proto(ProtoStateField::ActiveGc, active_gc, &mut writer);
466 field_diffs_into_proto(ProtoStateField::LeasedReaders, leased_readers, &mut writer);
467 field_diffs_into_proto(
468 ProtoStateField::CriticalReaders,
469 critical_readers,
470 &mut writer,
471 );
472 field_diffs_into_proto(ProtoStateField::Writers, writers, &mut writer);
473 field_diffs_into_proto(ProtoStateField::Schemas, schemas, &mut writer);
474 field_diffs_into_proto(ProtoStateField::Since, since, &mut writer);
475 field_diffs_into_proto(ProtoStateField::LegacyBatches, legacy_batches, &mut writer);
476 field_diffs_into_proto(ProtoStateField::HollowBatches, hollow_batches, &mut writer);
477 field_diffs_into_proto(ProtoStateField::SpineBatches, spine_batches, &mut writer);
478 field_diffs_into_proto(ProtoStateField::SpineMerges, merges, &mut writer);
479
480 let field_diffs = writer.into_proto();
482
483 debug_assert_eq!(field_diffs.validate(), Ok(()));
484 ProtoStateDiff {
485 applier_version: applier_version.to_string(),
486 seqno_from: seqno_from.into_proto(),
487 seqno_to: seqno_to.into_proto(),
488 walltime_ms: walltime_ms.into_proto(),
489 latest_rollup_key: latest_rollup_key.into_proto(),
490 field_diffs: Some(field_diffs),
491 }
492 }
493
494 fn from_proto(proto: ProtoStateDiff) -> Result<Self, TryFromProtoError> {
495 let applier_version = if proto.applier_version.is_empty() {
496 semver::Version::new(0, 0, 0)
500 } else {
501 semver::Version::parse(&proto.applier_version).map_err(|err| {
502 TryFromProtoError::InvalidSemverVersion(format!(
503 "invalid applier_version {}: {}",
504 proto.applier_version, err
505 ))
506 })?
507 };
508 let mut state_diff = StateDiff::new(
509 applier_version,
510 proto.seqno_from.into_rust()?,
511 proto.seqno_to.into_rust()?,
512 proto.walltime_ms,
513 proto.latest_rollup_key.into_rust()?,
514 );
515 if let Some(field_diffs) = proto.field_diffs {
516 debug_assert_eq!(field_diffs.validate(), Ok(()));
517 for field_diff in field_diffs.iter() {
518 let (field, diff) = field_diff?;
519 match field {
520 ProtoStateField::Hostname => field_diff_into_rust::<(), String, _, _, _, _>(
521 diff,
522 &mut state_diff.hostname,
523 |()| Ok(()),
524 |v| v.into_rust(),
525 )?,
526 ProtoStateField::LastGcReq => field_diff_into_rust::<(), u64, _, _, _, _>(
527 diff,
528 &mut state_diff.last_gc_req,
529 |()| Ok(()),
530 |v| v.into_rust(),
531 )?,
532 ProtoStateField::ActiveGc => {
533 field_diff_into_rust::<(), ProtoActiveGc, _, _, _, _>(
534 diff,
535 &mut state_diff.active_gc,
536 |()| Ok(()),
537 |v| v.into_rust(),
538 )?
539 }
540 ProtoStateField::ActiveRollup => {
541 field_diff_into_rust::<(), ProtoActiveRollup, _, _, _, _>(
542 diff,
543 &mut state_diff.active_rollup,
544 |()| Ok(()),
545 |v| v.into_rust(),
546 )?
547 }
548 ProtoStateField::Rollups => {
549 field_diff_into_rust::<u64, ProtoHollowRollup, _, _, _, _>(
550 diff,
551 &mut state_diff.rollups,
552 |k| k.into_rust(),
553 |v| v.into_rust(),
554 )?
555 }
556 ProtoStateField::DeprecatedRollups => {
560 field_diff_into_rust::<u64, String, _, _, _, _>(
561 diff,
562 &mut state_diff.rollups,
563 |k| k.into_rust(),
564 |v| {
565 Ok(HollowRollup {
566 key: v.into_rust()?,
567 encoded_size_bytes: None,
568 })
569 },
570 )?
571 }
572 ProtoStateField::LeasedReaders => {
573 field_diff_into_rust::<String, ProtoLeasedReaderState, _, _, _, _>(
574 diff,
575 &mut state_diff.leased_readers,
576 |k| k.into_rust(),
577 |v| v.into_rust(),
578 )?
579 }
580 ProtoStateField::CriticalReaders => {
581 field_diff_into_rust::<String, ProtoCriticalReaderState, _, _, _, _>(
582 diff,
583 &mut state_diff.critical_readers,
584 |k| k.into_rust(),
585 |v| v.into_rust(),
586 )?
587 }
588 ProtoStateField::Writers => {
589 field_diff_into_rust::<String, ProtoWriterState, _, _, _, _>(
590 diff,
591 &mut state_diff.writers,
592 |k| k.into_rust(),
593 |v| v.into_rust(),
594 )?
595 }
596 ProtoStateField::Schemas => {
597 field_diff_into_rust::<u64, ProtoEncodedSchemas, _, _, _, _>(
598 diff,
599 &mut state_diff.schemas,
600 |k| k.into_rust(),
601 |v| v.into_rust(),
602 )?
603 }
604 ProtoStateField::Since => {
605 field_diff_into_rust::<(), ProtoU64Antichain, _, _, _, _>(
606 diff,
607 &mut state_diff.since,
608 |()| Ok(()),
609 |v| v.into_rust(),
610 )?
611 }
612 ProtoStateField::LegacyBatches => {
613 field_diff_into_rust::<ProtoHollowBatch, (), _, _, _, _>(
614 diff,
615 &mut state_diff.legacy_batches,
616 |k| k.into_rust(),
617 |()| Ok(()),
618 )?
619 }
620 ProtoStateField::HollowBatches => {
621 field_diff_into_rust::<ProtoSpineId, ProtoHollowBatch, _, _, _, _>(
622 diff,
623 &mut state_diff.hollow_batches,
624 |k| k.into_rust(),
625 |v| v.into_rust(),
626 )?
627 }
628 ProtoStateField::SpineBatches => {
629 field_diff_into_rust::<ProtoSpineId, ProtoSpineBatch, _, _, _, _>(
630 diff,
631 &mut state_diff.spine_batches,
632 |k| k.into_rust(),
633 |v| v.into_rust(),
634 )?
635 }
636 ProtoStateField::SpineMerges => {
637 field_diff_into_rust::<ProtoSpineId, ProtoMerge, _, _, _, _>(
638 diff,
639 &mut state_diff.merges,
640 |k| k.into_rust(),
641 |v| v.into_rust(),
642 )?
643 }
644 }
645 }
646 }
647 Ok(state_diff)
648 }
649}
650
651fn field_diffs_into_proto<K, KP, V, VP>(
652 field: ProtoStateField,
653 diffs: &[StateFieldDiff<K, V>],
654 writer: &mut ProtoStateFieldDiffsWriter,
655) where
656 KP: prost::Message,
657 K: RustType<KP>,
658 VP: prost::Message,
659 V: RustType<VP>,
660{
661 for diff in diffs.iter() {
662 field_diff_into_proto(field, diff, writer);
663 }
664}
665
666fn field_diff_into_proto<K, KP, V, VP>(
667 field: ProtoStateField,
668 diff: &StateFieldDiff<K, V>,
669 writer: &mut ProtoStateFieldDiffsWriter,
670) where
671 KP: prost::Message,
672 K: RustType<KP>,
673 VP: prost::Message,
674 V: RustType<VP>,
675{
676 writer.push_field(field);
677 writer.encode_proto(&diff.key.into_proto());
678 match &diff.val {
679 StateFieldValDiff::Insert(to) => {
680 writer.push_diff_type(ProtoStateFieldDiffType::Insert);
681 writer.encode_proto(&to.into_proto());
682 }
683 StateFieldValDiff::Update(from, to) => {
684 writer.push_diff_type(ProtoStateFieldDiffType::Update);
685 writer.encode_proto(&from.into_proto());
686 writer.encode_proto(&to.into_proto());
687 }
688 StateFieldValDiff::Delete(from) => {
689 writer.push_diff_type(ProtoStateFieldDiffType::Delete);
690 writer.encode_proto(&from.into_proto());
691 }
692 };
693}
694
695fn field_diff_into_rust<KP, VP, K, V, KFn, VFn>(
696 proto: ProtoStateFieldDiff<'_>,
697 diffs: &mut Vec<StateFieldDiff<K, V>>,
698 k_fn: KFn,
699 v_fn: VFn,
700) -> Result<(), TryFromProtoError>
701where
702 KP: prost::Message + Default,
703 VP: prost::Message + Default,
704 KFn: Fn(KP) -> Result<K, TryFromProtoError>,
705 VFn: Fn(VP) -> Result<V, TryFromProtoError>,
706{
707 let val = match proto.diff_type {
708 ProtoStateFieldDiffType::Insert => {
709 let to = VP::decode(proto.to)
710 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
711 StateFieldValDiff::Insert(v_fn(to)?)
712 }
713 ProtoStateFieldDiffType::Update => {
714 let from = VP::decode(proto.from)
715 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
716 let to = VP::decode(proto.to)
717 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
718
719 StateFieldValDiff::Update(v_fn(from)?, v_fn(to)?)
720 }
721 ProtoStateFieldDiffType::Delete => {
722 let from = VP::decode(proto.from)
723 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
724 StateFieldValDiff::Delete(v_fn(from)?)
725 }
726 };
727 let key = KP::decode(proto.key)
728 .map_err(|err| TryFromProtoError::InvalidPersistState(err.to_string()))?;
729 diffs.push(StateFieldDiff {
730 key: k_fn(key)?,
731 val,
732 });
733 Ok(())
734}
735
736#[derive(Debug)]
739#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
740pub struct UntypedState<T> {
741 pub(crate) key_codec: String,
742 pub(crate) val_codec: String,
743 pub(crate) ts_codec: String,
744 pub(crate) diff_codec: String,
745
746 state: State<T>,
750}
751
752impl<T: Timestamp + Lattice + Codec64> UntypedState<T> {
753 pub fn seqno(&self) -> SeqNo {
754 self.state.seqno
755 }
756
757 pub fn rollups(&self) -> &BTreeMap<SeqNo, HollowRollup> {
758 &self.state.collections.rollups
759 }
760
761 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
762 self.state.latest_rollup()
763 }
764
765 pub fn apply_encoded_diffs<'a, I: IntoIterator<Item = &'a VersionedData>>(
766 &mut self,
767 cfg: &PersistConfig,
768 metrics: &Metrics,
769 diffs: I,
770 ) {
771 if T::codec_name() != self.ts_codec {
775 return;
776 }
777 self.state.apply_encoded_diffs(cfg, metrics, diffs);
778 }
779
780 pub fn check_codecs<K: Codec, V: Codec, D: Codec64>(
781 self,
782 shard_id: &ShardId,
783 ) -> Result<TypedState<K, V, T, D>, Box<CodecMismatch>> {
784 assert_eq!(shard_id, &self.state.shard_id);
787 if K::codec_name() != self.key_codec
788 || V::codec_name() != self.val_codec
789 || T::codec_name() != self.ts_codec
790 || D::codec_name() != self.diff_codec
791 {
792 return Err(Box::new(CodecMismatch {
793 requested: (
794 K::codec_name(),
795 V::codec_name(),
796 T::codec_name(),
797 D::codec_name(),
798 None,
799 ),
800 actual: (
801 self.key_codec,
802 self.val_codec,
803 self.ts_codec,
804 self.diff_codec,
805 None,
806 ),
807 }));
808 }
809 Ok(TypedState {
810 state: self.state,
811 _phantom: PhantomData,
812 })
813 }
814
815 pub(crate) fn check_ts_codec(self, shard_id: &ShardId) -> Result<State<T>, CodecMismatchT> {
816 assert_eq!(shard_id, &self.state.shard_id);
819 if T::codec_name() != self.ts_codec {
820 return Err(CodecMismatchT {
821 requested: T::codec_name(),
822 actual: self.ts_codec,
823 });
824 }
825 Ok(self.state)
826 }
827
828 pub fn decode(build_version: &Version, buf: impl Buf) -> Self {
829 let proto = ProtoRollup::decode(buf)
830 .expect("internal error: invalid encoded state");
835 let state = Rollup::from_proto(proto)
836 .expect("internal error: invalid encoded state")
837 .state;
838 assert_code_can_read_data(build_version, &state.state.collections.version);
839 state
840 }
841}
842
843impl<K, V, T, D> From<TypedState<K, V, T, D>> for UntypedState<T>
844where
845 K: Codec,
846 V: Codec,
847 T: Codec64,
848 D: Codec64,
849{
850 fn from(typed_state: TypedState<K, V, T, D>) -> Self {
851 UntypedState {
852 key_codec: K::codec_name(),
853 val_codec: V::codec_name(),
854 ts_codec: T::codec_name(),
855 diff_codec: D::codec_name(),
856 state: typed_state.state,
857 }
858 }
859}
860
861#[derive(Debug)]
868pub struct Rollup<T> {
869 pub(crate) state: UntypedState<T>,
870 pub(crate) diffs: Option<InlinedDiffs>,
871}
872
873impl<T: Timestamp + Lattice + Codec64> Rollup<T> {
874 pub(crate) fn from(state: UntypedState<T>, diffs: Vec<VersionedData>) -> Self {
878 let latest_rollup_seqno = *state.latest_rollup().0;
879 let mut verify_seqno = latest_rollup_seqno;
880 for diff in &diffs {
881 assert_eq!(verify_seqno.next(), diff.seqno);
882 verify_seqno = diff.seqno;
883 }
884 assert_eq!(verify_seqno, state.seqno());
885
886 let diffs = Some(InlinedDiffs::from(
887 latest_rollup_seqno.next(),
888 state.seqno().next(),
889 diffs,
890 ));
891
892 Self { state, diffs }
893 }
894
895 pub(crate) fn from_untyped_state_without_diffs(state: UntypedState<T>) -> Self {
896 Self { state, diffs: None }
897 }
898
899 pub(crate) fn from_state_without_diffs(
900 state: State<T>,
901 key_codec: String,
902 val_codec: String,
903 ts_codec: String,
904 diff_codec: String,
905 ) -> Self {
906 Self::from_untyped_state_without_diffs(UntypedState {
907 key_codec,
908 val_codec,
909 ts_codec,
910 diff_codec,
911 state,
912 })
913 }
914}
915
916#[derive(Debug)]
917pub(crate) struct InlinedDiffs {
918 pub(crate) lower: SeqNo,
919 pub(crate) upper: SeqNo,
920 pub(crate) diffs: Vec<VersionedData>,
921}
922
923impl InlinedDiffs {
924 pub(crate) fn description(&self) -> Description<SeqNo> {
925 Description::new(
926 Antichain::from_elem(self.lower),
927 Antichain::from_elem(self.upper),
928 Antichain::from_elem(SeqNo::minimum()),
929 )
930 }
931
932 fn from(lower: SeqNo, upper: SeqNo, diffs: Vec<VersionedData>) -> Self {
933 for diff in &diffs {
934 assert!(diff.seqno >= lower);
935 assert!(diff.seqno < upper);
936 }
937 Self {
938 lower,
939 upper,
940 diffs,
941 }
942 }
943}
944
945impl RustType<ProtoInlinedDiffs> for InlinedDiffs {
946 fn into_proto(&self) -> ProtoInlinedDiffs {
947 ProtoInlinedDiffs {
948 lower: self.lower.into_proto(),
949 upper: self.upper.into_proto(),
950 diffs: self.diffs.into_proto(),
951 }
952 }
953
954 fn from_proto(proto: ProtoInlinedDiffs) -> Result<Self, TryFromProtoError> {
955 Ok(Self {
956 lower: proto.lower.into_rust()?,
957 upper: proto.upper.into_rust()?,
958 diffs: proto.diffs.into_rust()?,
959 })
960 }
961}
962
963impl<T: Timestamp + Lattice + Codec64> RustType<ProtoRollup> for Rollup<T> {
964 fn into_proto(&self) -> ProtoRollup {
965 ProtoRollup {
966 applier_version: self.state.state.collections.version.to_string(),
967 shard_id: self.state.state.shard_id.into_proto(),
968 seqno: self.state.state.seqno.into_proto(),
969 walltime_ms: self.state.state.walltime_ms.into_proto(),
970 hostname: self.state.state.hostname.into_proto(),
971 key_codec: self.state.key_codec.into_proto(),
972 val_codec: self.state.val_codec.into_proto(),
973 ts_codec: T::codec_name(),
974 diff_codec: self.state.diff_codec.into_proto(),
975 last_gc_req: self.state.state.collections.last_gc_req.into_proto(),
976 active_rollup: self.state.state.collections.active_rollup.into_proto(),
977 active_gc: self.state.state.collections.active_gc.into_proto(),
978 rollups: self
979 .state
980 .state
981 .collections
982 .rollups
983 .iter()
984 .map(|(seqno, key)| (seqno.into_proto(), key.into_proto()))
985 .collect(),
986 deprecated_rollups: Default::default(),
987 leased_readers: self
988 .state
989 .state
990 .collections
991 .leased_readers
992 .iter()
993 .map(|(id, state)| (id.into_proto(), state.into_proto()))
994 .collect(),
995 critical_readers: self
996 .state
997 .state
998 .collections
999 .critical_readers
1000 .iter()
1001 .map(|(id, state)| (id.into_proto(), state.into_proto()))
1002 .collect(),
1003 writers: self
1004 .state
1005 .state
1006 .collections
1007 .writers
1008 .iter()
1009 .map(|(id, state)| (id.into_proto(), state.into_proto()))
1010 .collect(),
1011 schemas: self
1012 .state
1013 .state
1014 .collections
1015 .schemas
1016 .iter()
1017 .map(|(id, schema)| (id.into_proto(), schema.into_proto()))
1018 .collect(),
1019 trace: Some(self.state.state.collections.trace.into_proto()),
1020 diffs: self.diffs.as_ref().map(|x| x.into_proto()),
1021 }
1022 }
1023
1024 fn from_proto(x: ProtoRollup) -> Result<Self, TryFromProtoError> {
1025 let applier_version = if x.applier_version.is_empty() {
1026 semver::Version::new(0, 0, 0)
1030 } else {
1031 semver::Version::parse(&x.applier_version).map_err(|err| {
1032 TryFromProtoError::InvalidSemverVersion(format!(
1033 "invalid applier_version {}: {}",
1034 x.applier_version, err
1035 ))
1036 })?
1037 };
1038
1039 let mut rollups = BTreeMap::new();
1040 for (seqno, rollup) in x.rollups {
1041 rollups.insert(seqno.into_rust()?, rollup.into_rust()?);
1042 }
1043 for (seqno, key) in x.deprecated_rollups {
1044 rollups.insert(
1045 seqno.into_rust()?,
1046 HollowRollup {
1047 key: key.into_rust()?,
1048 encoded_size_bytes: None,
1049 },
1050 );
1051 }
1052 let mut leased_readers = BTreeMap::new();
1053 for (id, state) in x.leased_readers {
1054 leased_readers.insert(id.into_rust()?, state.into_rust()?);
1055 }
1056 let mut critical_readers = BTreeMap::new();
1057 for (id, state) in x.critical_readers {
1058 critical_readers.insert(id.into_rust()?, state.into_rust()?);
1059 }
1060 let mut writers = BTreeMap::new();
1061 for (id, state) in x.writers {
1062 writers.insert(id.into_rust()?, state.into_rust()?);
1063 }
1064 let mut schemas = BTreeMap::new();
1065 for (id, x) in x.schemas {
1066 schemas.insert(id.into_rust()?, x.into_rust()?);
1067 }
1068 let active_rollup = x
1069 .active_rollup
1070 .map(|rollup| rollup.into_rust())
1071 .transpose()?;
1072 let active_gc = x.active_gc.map(|gc| gc.into_rust()).transpose()?;
1073 let collections = StateCollections {
1074 version: applier_version.clone(),
1075 rollups,
1076 active_rollup,
1077 active_gc,
1078 last_gc_req: x.last_gc_req.into_rust()?,
1079 leased_readers,
1080 critical_readers,
1081 writers,
1082 schemas,
1083 trace: x.trace.into_rust_if_some("trace")?,
1084 };
1085 let state = State {
1086 shard_id: x.shard_id.into_rust()?,
1087 seqno: x.seqno.into_rust()?,
1088 walltime_ms: x.walltime_ms,
1089 hostname: x.hostname,
1090 collections,
1091 };
1092
1093 let diffs: Option<InlinedDiffs> = x.diffs.map(|diffs| diffs.into_rust()).transpose()?;
1094 if let Some(diffs) = &diffs {
1095 if diffs.lower != state.latest_rollup().0.next() {
1096 return Err(TryFromProtoError::InvalidPersistState(format!(
1097 "diffs lower ({}) should match latest rollup's successor: ({})",
1098 diffs.lower,
1099 state.latest_rollup().0.next()
1100 )));
1101 }
1102 if diffs.upper != state.seqno.next() {
1103 return Err(TryFromProtoError::InvalidPersistState(format!(
1104 "diffs upper ({}) should match state's successor: ({})",
1105 diffs.lower,
1106 state.seqno.next()
1107 )));
1108 }
1109 }
1110
1111 Ok(Rollup {
1112 state: UntypedState {
1113 state,
1114 key_codec: x.key_codec.into_rust()?,
1115 val_codec: x.val_codec.into_rust()?,
1116 ts_codec: x.ts_codec.into_rust()?,
1117 diff_codec: x.diff_codec.into_rust()?,
1118 },
1119 diffs,
1120 })
1121 }
1122}
1123
1124impl RustType<ProtoVersionedData> for VersionedData {
1125 fn into_proto(&self) -> ProtoVersionedData {
1126 ProtoVersionedData {
1127 seqno: self.seqno.into_proto(),
1128 data: Bytes::clone(&self.data),
1129 }
1130 }
1131
1132 fn from_proto(proto: ProtoVersionedData) -> Result<Self, TryFromProtoError> {
1133 Ok(Self {
1134 seqno: proto.seqno.into_rust()?,
1135 data: proto.data,
1136 })
1137 }
1138}
1139
1140impl RustType<ProtoSpineId> for SpineId {
1141 fn into_proto(&self) -> ProtoSpineId {
1142 ProtoSpineId {
1143 lo: self.0.into_proto(),
1144 hi: self.1.into_proto(),
1145 }
1146 }
1147
1148 fn from_proto(proto: ProtoSpineId) -> Result<Self, TryFromProtoError> {
1149 Ok(SpineId(proto.lo.into_rust()?, proto.hi.into_rust()?))
1150 }
1151}
1152
1153impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, Arc<HollowBatch<T>>> for ProtoIdHollowBatch {
1154 fn from_rust<'a>(entry: (&'a SpineId, &'a Arc<HollowBatch<T>>)) -> Self {
1155 let (id, batch) = entry;
1156 ProtoIdHollowBatch {
1157 id: Some(id.into_proto()),
1158 batch: Some(batch.into_proto()),
1159 }
1160 }
1161
1162 fn into_rust(self) -> Result<(SpineId, Arc<HollowBatch<T>>), TryFromProtoError> {
1163 let id = self.id.into_rust_if_some("ProtoIdHollowBatch::id")?;
1164 let batch = Arc::new(self.batch.into_rust_if_some("ProtoIdHollowBatch::batch")?);
1165 Ok((id, batch))
1166 }
1167}
1168
1169impl<T: Timestamp + Codec64> RustType<ProtoSpineBatch> for ThinSpineBatch<T> {
1170 fn into_proto(&self) -> ProtoSpineBatch {
1171 ProtoSpineBatch {
1172 desc: Some(self.desc.into_proto()),
1173 parts: self.parts.into_proto(),
1174 level: self.level.into_proto(),
1175 descs: self.descs.into_proto(),
1176 }
1177 }
1178
1179 fn from_proto(proto: ProtoSpineBatch) -> Result<Self, TryFromProtoError> {
1180 let level = proto.level.into_rust()?;
1181 let desc = proto.desc.into_rust_if_some("ProtoSpineBatch::desc")?;
1182 let parts = proto.parts.into_rust()?;
1183 let descs = proto.descs.into_rust()?;
1184 Ok(ThinSpineBatch {
1185 level,
1186 desc,
1187 parts,
1188 descs,
1189 })
1190 }
1191}
1192
1193impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinSpineBatch<T>> for ProtoIdSpineBatch {
1194 fn from_rust<'a>(entry: (&'a SpineId, &'a ThinSpineBatch<T>)) -> Self {
1195 let (id, batch) = entry;
1196 ProtoIdSpineBatch {
1197 id: Some(id.into_proto()),
1198 batch: Some(batch.into_proto()),
1199 }
1200 }
1201
1202 fn into_rust(self) -> Result<(SpineId, ThinSpineBatch<T>), TryFromProtoError> {
1203 let id = self.id.into_rust_if_some("ProtoHollowBatch::id")?;
1204 let batch = self.batch.into_rust_if_some("ProtoHollowBatch::batch")?;
1205 Ok((id, batch))
1206 }
1207}
1208
1209impl RustType<ProtoCompaction> for ActiveCompaction {
1210 fn into_proto(&self) -> ProtoCompaction {
1211 ProtoCompaction {
1212 start_ms: self.start_ms,
1213 }
1214 }
1215
1216 fn from_proto(proto: ProtoCompaction) -> Result<Self, TryFromProtoError> {
1217 Ok(Self {
1218 start_ms: proto.start_ms,
1219 })
1220 }
1221}
1222
1223impl<T: Timestamp + Codec64> RustType<ProtoMerge> for ThinMerge<T> {
1224 fn into_proto(&self) -> ProtoMerge {
1225 ProtoMerge {
1226 since: Some(self.since.into_proto()),
1227 remaining_work: self.remaining_work.into_proto(),
1228 active_compaction: self.active_compaction.into_proto(),
1229 }
1230 }
1231
1232 fn from_proto(proto: ProtoMerge) -> Result<Self, TryFromProtoError> {
1233 let since = proto.since.into_rust_if_some("ProtoMerge::since")?;
1234 let remaining_work = proto.remaining_work.into_rust()?;
1235 let active_compaction = proto.active_compaction.into_rust()?;
1236 Ok(Self {
1237 since,
1238 remaining_work,
1239 active_compaction,
1240 })
1241 }
1242}
1243
1244impl<T: Timestamp + Codec64> ProtoMapEntry<SpineId, ThinMerge<T>> for ProtoIdMerge {
1245 fn from_rust<'a>((id, merge): (&'a SpineId, &'a ThinMerge<T>)) -> Self {
1246 ProtoIdMerge {
1247 id: Some(id.into_proto()),
1248 merge: Some(merge.into_proto()),
1249 }
1250 }
1251
1252 fn into_rust(self) -> Result<(SpineId, ThinMerge<T>), TryFromProtoError> {
1253 let id = self.id.into_rust_if_some("ProtoIdMerge::id")?;
1254 let merge = self.merge.into_rust_if_some("ProtoIdMerge::merge")?;
1255 Ok((id, merge))
1256 }
1257}
1258
1259impl<T: Timestamp + Codec64> RustType<ProtoTrace> for FlatTrace<T> {
1260 fn into_proto(&self) -> ProtoTrace {
1261 let since = self.since.into_proto();
1262 let legacy_batches = self
1263 .legacy_batches
1264 .iter()
1265 .map(|(b, _)| b.into_proto())
1266 .collect();
1267 let hollow_batches = self.hollow_batches.into_proto();
1268 let spine_batches = self.spine_batches.into_proto();
1269 let merges = self.merges.into_proto();
1270 ProtoTrace {
1271 since: Some(since),
1272 legacy_batches,
1273 hollow_batches,
1274 spine_batches,
1275 merges,
1276 }
1277 }
1278
1279 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1280 let since = proto.since.into_rust_if_some("ProtoTrace::since")?;
1281 let legacy_batches = proto
1282 .legacy_batches
1283 .into_iter()
1284 .map(|b| b.into_rust().map(|b| (b, ())))
1285 .collect::<Result<_, _>>()?;
1286 let hollow_batches = proto.hollow_batches.into_rust()?;
1287 let spine_batches = proto.spine_batches.into_rust()?;
1288 let merges = proto.merges.into_rust()?;
1289 Ok(FlatTrace {
1290 since,
1291 legacy_batches,
1292 hollow_batches,
1293 spine_batches,
1294 merges,
1295 })
1296 }
1297}
1298
1299impl<T: Timestamp + Lattice + Codec64> RustType<ProtoTrace> for Trace<T> {
1300 fn into_proto(&self) -> ProtoTrace {
1301 self.flatten().into_proto()
1302 }
1303
1304 fn from_proto(proto: ProtoTrace) -> Result<Self, TryFromProtoError> {
1305 Trace::unflatten(proto.into_rust()?).map_err(TryFromProtoError::InvalidPersistState)
1306 }
1307}
1308
1309impl<T: Timestamp + Codec64> RustType<ProtoLeasedReaderState> for LeasedReaderState<T> {
1310 fn into_proto(&self) -> ProtoLeasedReaderState {
1311 ProtoLeasedReaderState {
1312 seqno: self.seqno.into_proto(),
1313 since: Some(self.since.into_proto()),
1314 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1315 lease_duration_ms: self.lease_duration_ms.into_proto(),
1316 debug: Some(self.debug.into_proto()),
1317 }
1318 }
1319
1320 fn from_proto(proto: ProtoLeasedReaderState) -> Result<Self, TryFromProtoError> {
1321 let mut lease_duration_ms = proto.lease_duration_ms.into_rust()?;
1322 if lease_duration_ms == 0 {
1327 lease_duration_ms = u64::try_from(READER_LEASE_DURATION.default().as_millis())
1328 .expect("lease duration as millis should fit within u64");
1329 }
1330 let debug = proto.debug.unwrap_or_default().into_rust()?;
1333 Ok(LeasedReaderState {
1334 seqno: proto.seqno.into_rust()?,
1335 since: proto
1336 .since
1337 .into_rust_if_some("ProtoLeasedReaderState::since")?,
1338 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1339 lease_duration_ms,
1340 debug,
1341 })
1342 }
1343}
1344
1345impl<T: Timestamp + Codec64> RustType<ProtoCriticalReaderState> for CriticalReaderState<T> {
1346 fn into_proto(&self) -> ProtoCriticalReaderState {
1347 ProtoCriticalReaderState {
1348 since: Some(self.since.into_proto()),
1349 opaque: i64::from_le_bytes(self.opaque.0),
1350 opaque_codec: self.opaque_codec.clone(),
1351 debug: Some(self.debug.into_proto()),
1352 }
1353 }
1354
1355 fn from_proto(proto: ProtoCriticalReaderState) -> Result<Self, TryFromProtoError> {
1356 let debug = proto.debug.unwrap_or_default().into_rust()?;
1359 Ok(CriticalReaderState {
1360 since: proto
1361 .since
1362 .into_rust_if_some("ProtoCriticalReaderState::since")?,
1363 opaque: OpaqueState(i64::to_le_bytes(proto.opaque)),
1364 opaque_codec: proto.opaque_codec,
1365 debug,
1366 })
1367 }
1368}
1369
1370impl<T: Timestamp + Codec64> RustType<ProtoWriterState> for WriterState<T> {
1371 fn into_proto(&self) -> ProtoWriterState {
1372 ProtoWriterState {
1373 last_heartbeat_timestamp_ms: self.last_heartbeat_timestamp_ms.into_proto(),
1374 lease_duration_ms: self.lease_duration_ms.into_proto(),
1375 most_recent_write_token: self.most_recent_write_token.into_proto(),
1376 most_recent_write_upper: Some(self.most_recent_write_upper.into_proto()),
1377 debug: Some(self.debug.into_proto()),
1378 }
1379 }
1380
1381 fn from_proto(proto: ProtoWriterState) -> Result<Self, TryFromProtoError> {
1382 let most_recent_write_token = if proto.most_recent_write_token.is_empty() {
1388 IdempotencyToken::SENTINEL
1389 } else {
1390 proto.most_recent_write_token.into_rust()?
1391 };
1392 let most_recent_write_upper = match proto.most_recent_write_upper {
1393 Some(x) => x.into_rust()?,
1394 None => Antichain::from_elem(T::minimum()),
1395 };
1396 let debug = proto.debug.unwrap_or_default().into_rust()?;
1399 Ok(WriterState {
1400 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms.into_rust()?,
1401 lease_duration_ms: proto.lease_duration_ms.into_rust()?,
1402 most_recent_write_token,
1403 most_recent_write_upper,
1404 debug,
1405 })
1406 }
1407}
1408
1409impl RustType<ProtoHandleDebugState> for HandleDebugState {
1410 fn into_proto(&self) -> ProtoHandleDebugState {
1411 ProtoHandleDebugState {
1412 hostname: self.hostname.into_proto(),
1413 purpose: self.purpose.into_proto(),
1414 }
1415 }
1416
1417 fn from_proto(proto: ProtoHandleDebugState) -> Result<Self, TryFromProtoError> {
1418 Ok(HandleDebugState {
1419 hostname: proto.hostname,
1420 purpose: proto.purpose,
1421 })
1422 }
1423}
1424
1425impl<T: Timestamp + Codec64> RustType<ProtoHollowRun> for HollowRun<T> {
1426 fn into_proto(&self) -> ProtoHollowRun {
1427 ProtoHollowRun {
1428 parts: self.parts.into_proto(),
1429 }
1430 }
1431
1432 fn from_proto(proto: ProtoHollowRun) -> Result<Self, TryFromProtoError> {
1433 Ok(HollowRun {
1434 parts: proto.parts.into_rust()?,
1435 })
1436 }
1437}
1438
1439impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
1440 fn into_proto(&self) -> ProtoHollowBatch {
1441 let mut run_meta = self.run_meta.into_proto();
1442 let run_meta_default = RunMeta::default().into_proto();
1444 while run_meta.last() == Some(&run_meta_default) {
1445 run_meta.pop();
1446 }
1447 ProtoHollowBatch {
1448 desc: Some(self.desc.into_proto()),
1449 parts: self.parts.into_proto(),
1450 len: self.len.into_proto(),
1451 runs: self.run_splits.into_proto(),
1452 run_meta,
1453 deprecated_keys: vec![],
1454 }
1455 }
1456
1457 fn from_proto(proto: ProtoHollowBatch) -> Result<Self, TryFromProtoError> {
1458 let mut parts: Vec<RunPart<T>> = proto.parts.into_rust()?;
1459 parts.extend(proto.deprecated_keys.into_iter().map(|key| {
1462 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
1463 key: PartialBatchKey(key),
1464 meta: Default::default(),
1465 encoded_size_bytes: 0,
1466 key_lower: vec![],
1467 structured_key_lower: None,
1468 stats: None,
1469 ts_rewrite: None,
1470 diffs_sum: None,
1471 format: None,
1472 schema_id: None,
1473 deprecated_schema_id: None,
1474 }))
1475 }));
1476 let run_splits: Vec<usize> = proto.runs.into_rust()?;
1478 let num_runs = if parts.is_empty() {
1479 0
1480 } else {
1481 run_splits.len() + 1
1482 };
1483 let mut run_meta: Vec<RunMeta> = proto.run_meta.into_rust()?;
1484 run_meta.resize(num_runs, RunMeta::default());
1485 Ok(HollowBatch {
1486 desc: proto.desc.into_rust_if_some("desc")?,
1487 parts,
1488 len: proto.len.into_rust()?,
1489 run_splits,
1490 run_meta,
1491 })
1492 }
1493}
1494
1495impl RustType<String> for RunId {
1496 fn into_proto(&self) -> String {
1497 self.to_string()
1498 }
1499
1500 fn from_proto(proto: String) -> Result<Self, TryFromProtoError> {
1501 RunId::from_str(&proto).map_err(|_| {
1502 TryFromProtoError::InvalidPersistState(format!("invalid RunId: {}", proto))
1503 })
1504 }
1505}
1506
1507impl RustType<ProtoRunMeta> for RunMeta {
1508 fn into_proto(&self) -> ProtoRunMeta {
1509 let order = match self.order {
1510 None => ProtoRunOrder::Unknown,
1511 Some(RunOrder::Unordered) => ProtoRunOrder::Unordered,
1512 Some(RunOrder::Codec) => ProtoRunOrder::Codec,
1513 Some(RunOrder::Structured) => ProtoRunOrder::Structured,
1514 };
1515 ProtoRunMeta {
1516 order: order.into(),
1517 schema_id: self.schema.into_proto(),
1518 deprecated_schema_id: self.deprecated_schema.into_proto(),
1519 id: self.id.into_proto(),
1520 len: self.len.into_proto(),
1521 meta: self.meta.into_proto(),
1522 }
1523 }
1524
1525 fn from_proto(proto: ProtoRunMeta) -> Result<Self, TryFromProtoError> {
1526 let order = match ProtoRunOrder::try_from(proto.order)? {
1527 ProtoRunOrder::Unknown => None,
1528 ProtoRunOrder::Unordered => Some(RunOrder::Unordered),
1529 ProtoRunOrder::Codec => Some(RunOrder::Codec),
1530 ProtoRunOrder::Structured => Some(RunOrder::Structured),
1531 };
1532 Ok(Self {
1533 order,
1534 schema: proto.schema_id.into_rust()?,
1535 deprecated_schema: proto.deprecated_schema_id.into_rust()?,
1536 id: proto.id.into_rust()?,
1537 len: proto.len.into_rust()?,
1538 meta: proto.meta.into_rust()?,
1539 })
1540 }
1541}
1542
1543impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for RunPart<T> {
1544 fn into_proto(&self) -> ProtoHollowBatchPart {
1545 match self {
1546 RunPart::Single(part) => part.into_proto(),
1547 RunPart::Many(runs) => runs.into_proto(),
1548 }
1549 }
1550
1551 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1552 let run_part = if let Some(proto_hollow_batch_part::Kind::RunRef(_)) = proto.kind {
1553 RunPart::Many(proto.into_rust()?)
1554 } else {
1555 RunPart::Single(proto.into_rust()?)
1556 };
1557 Ok(run_part)
1558 }
1559}
1560
1561impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T> {
1562 fn into_proto(&self) -> ProtoHollowBatchPart {
1563 let part = ProtoHollowBatchPart {
1564 kind: Some(proto_hollow_batch_part::Kind::RunRef(ProtoHollowRunRef {
1565 key: self.key.into_proto(),
1566 max_part_bytes: self.max_part_bytes.into_proto(),
1567 })),
1568 encoded_size_bytes: self.hollow_bytes.into_proto(),
1569 key_lower: Bytes::copy_from_slice(&self.key_lower),
1570 diffs_sum: self.diffs_sum.map(i64::from_le_bytes),
1571 key_stats: None,
1572 ts_rewrite: None,
1573 format: None,
1574 schema_id: None,
1575 structured_key_lower: self.structured_key_lower.into_proto(),
1576 deprecated_schema_id: None,
1577 metadata: BTreeMap::default(),
1578 };
1579 part
1580 }
1581
1582 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1583 let run_proto = match proto.kind {
1584 Some(proto_hollow_batch_part::Kind::RunRef(proto_ref)) => proto_ref,
1585 _ => Err(TryFromProtoError::UnknownEnumVariant(
1586 "ProtoHollowBatchPart::kind".to_string(),
1587 ))?,
1588 };
1589 Ok(Self {
1590 key: run_proto.key.into_rust()?,
1591 hollow_bytes: proto.encoded_size_bytes.into_rust()?,
1592 max_part_bytes: run_proto.max_part_bytes.into_rust()?,
1593 key_lower: proto.key_lower.to_vec(),
1594 structured_key_lower: proto.structured_key_lower.into_rust()?,
1595 diffs_sum: proto.diffs_sum.as_ref().map(|x| i64::to_le_bytes(*x)),
1596 _phantom_data: Default::default(),
1597 })
1598 }
1599}
1600
1601impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
1602 fn into_proto(&self) -> ProtoHollowBatchPart {
1603 match self {
1604 BatchPart::Hollow(x) => ProtoHollowBatchPart {
1605 kind: Some(proto_hollow_batch_part::Kind::Key(x.key.into_proto())),
1606 encoded_size_bytes: x.encoded_size_bytes.into_proto(),
1607 key_lower: Bytes::copy_from_slice(&x.key_lower),
1608 structured_key_lower: x.structured_key_lower.as_ref().map(|lazy| lazy.buf.clone()),
1609 key_stats: x.stats.into_proto(),
1610 ts_rewrite: x.ts_rewrite.as_ref().map(|x| x.into_proto()),
1611 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
1612 format: x.format.map(|f| f.into_proto()),
1613 schema_id: x.schema_id.into_proto(),
1614 deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1615 metadata: BTreeMap::default(),
1616 },
1617 BatchPart::Inline {
1618 updates,
1619 ts_rewrite,
1620 schema_id,
1621 deprecated_schema_id,
1622 } => ProtoHollowBatchPart {
1623 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
1624 encoded_size_bytes: 0,
1625 key_lower: Bytes::new(),
1626 structured_key_lower: None,
1627 key_stats: None,
1628 ts_rewrite: ts_rewrite.as_ref().map(|x| x.into_proto()),
1629 diffs_sum: None,
1630 format: None,
1631 schema_id: schema_id.into_proto(),
1632 deprecated_schema_id: deprecated_schema_id.into_proto(),
1633 metadata: BTreeMap::default(),
1634 },
1635 }
1636 }
1637
1638 fn from_proto(proto: ProtoHollowBatchPart) -> Result<Self, TryFromProtoError> {
1639 let ts_rewrite = match proto.ts_rewrite {
1640 Some(ts_rewrite) => Some(ts_rewrite.into_rust()?),
1641 None => None,
1642 };
1643 let schema_id = proto.schema_id.into_rust()?;
1644 let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
1645 match proto.kind {
1646 Some(proto_hollow_batch_part::Kind::Key(key)) => {
1647 Ok(BatchPart::Hollow(HollowBatchPart {
1648 key: key.into_rust()?,
1649 meta: proto.metadata.into_rust()?,
1650 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1651 key_lower: proto.key_lower.into(),
1652 structured_key_lower: proto.structured_key_lower.into_rust()?,
1653 stats: proto.key_stats.into_rust()?,
1654 ts_rewrite,
1655 diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
1656 format: proto.format.map(|f| f.into_rust()).transpose()?,
1657 schema_id,
1658 deprecated_schema_id,
1659 }))
1660 }
1661 Some(proto_hollow_batch_part::Kind::Inline(x)) => {
1662 assert_eq!(proto.encoded_size_bytes, 0);
1663 assert_eq!(proto.key_lower.len(), 0);
1664 assert_none!(proto.key_stats);
1665 assert_none!(proto.diffs_sum);
1666 let updates = LazyInlineBatchPart(x.into_rust()?);
1667 Ok(BatchPart::Inline {
1668 updates,
1669 ts_rewrite,
1670 schema_id,
1671 deprecated_schema_id,
1672 })
1673 }
1674 _ => Err(TryFromProtoError::unknown_enum_variant(
1675 "ProtoHollowBatchPart::kind",
1676 )),
1677 }
1678 }
1679}
1680
1681impl RustType<proto_hollow_batch_part::Format> for BatchColumnarFormat {
1682 fn into_proto(&self) -> proto_hollow_batch_part::Format {
1683 match self {
1684 BatchColumnarFormat::Row => proto_hollow_batch_part::Format::Row(()),
1685 BatchColumnarFormat::Both(version) => {
1686 proto_hollow_batch_part::Format::RowAndColumnar((*version).cast_into())
1687 }
1688 BatchColumnarFormat::Structured => proto_hollow_batch_part::Format::Structured(()),
1689 }
1690 }
1691
1692 fn from_proto(proto: proto_hollow_batch_part::Format) -> Result<Self, TryFromProtoError> {
1693 let format = match proto {
1694 proto_hollow_batch_part::Format::Row(_) => BatchColumnarFormat::Row,
1695 proto_hollow_batch_part::Format::RowAndColumnar(version) => {
1696 BatchColumnarFormat::Both(version.cast_into())
1697 }
1698 proto_hollow_batch_part::Format::Structured(_) => BatchColumnarFormat::Structured,
1699 };
1700 Ok(format)
1701 }
1702}
1703
1704#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1709pub struct LazyPartStats {
1710 key: LazyProto<ProtoStructStats>,
1711}
1712
1713impl Debug for LazyPartStats {
1714 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1715 f.debug_tuple("LazyPartStats")
1716 .field(&self.decode())
1717 .finish()
1718 }
1719}
1720
1721impl LazyPartStats {
1722 pub(crate) fn encode(x: &PartStats, map_proto: impl FnOnce(&mut ProtoStructStats)) -> Self {
1723 let PartStats { key } = x;
1724 let mut proto_stats = ProtoStructStats::from_rust(key);
1725 map_proto(&mut proto_stats);
1726 LazyPartStats {
1727 key: LazyProto::from(&proto_stats),
1728 }
1729 }
1730 pub fn decode(&self) -> PartStats {
1735 let key = self.key.decode().expect("valid proto");
1736 PartStats {
1737 key: key.into_rust().expect("valid stats"),
1738 }
1739 }
1740}
1741
1742impl RustType<Bytes> for LazyPartStats {
1743 fn into_proto(&self) -> Bytes {
1744 let LazyPartStats { key } = self;
1745 key.into_proto()
1746 }
1747
1748 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1749 Ok(LazyPartStats {
1750 key: proto.into_rust()?,
1751 })
1752 }
1753}
1754
1755#[cfg(test)]
1756pub(crate) fn any_some_lazy_part_stats() -> impl Strategy<Value = Option<LazyPartStats>> {
1757 proptest::prelude::any::<LazyPartStats>().prop_map(Some)
1758}
1759
1760#[allow(unused_parens)]
1761impl Arbitrary for LazyPartStats {
1762 type Parameters = ();
1763 type Strategy =
1764 proptest::strategy::Map<(<PartStats as Arbitrary>::Strategy), fn((PartStats)) -> Self>;
1765
1766 fn arbitrary_with(_: ()) -> Self::Strategy {
1767 Strategy::prop_map((proptest::prelude::any::<PartStats>()), |(x)| {
1768 LazyPartStats::encode(&x, |_| {})
1769 })
1770 }
1771}
1772
1773impl ProtoInlineBatchPart {
1774 pub(crate) fn into_rust<T: Timestamp + Codec64>(
1775 lgbytes: &ColumnarMetrics,
1776 proto: Self,
1777 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1778 let updates = proto
1779 .updates
1780 .ok_or_else(|| TryFromProtoError::missing_field("ProtoInlineBatchPart::updates"))?;
1781 let updates = BlobTraceUpdates::from_proto(lgbytes, updates)?;
1782
1783 Ok(BlobTraceBatchPart {
1784 desc: proto.desc.into_rust_if_some("ProtoInlineBatchPart::desc")?,
1785 index: proto.index.into_rust()?,
1786 updates,
1787 })
1788 }
1789}
1790
1791#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
1793pub struct LazyInlineBatchPart(LazyProto<ProtoInlineBatchPart>);
1794
1795impl From<&ProtoInlineBatchPart> for LazyInlineBatchPart {
1796 fn from(value: &ProtoInlineBatchPart) -> Self {
1797 LazyInlineBatchPart(value.into())
1798 }
1799}
1800
1801impl Serialize for LazyInlineBatchPart {
1802 fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
1803 let proto = self.0.decode().expect("valid proto");
1806 let mut s = s.serialize_struct("InlineBatchPart", 3)?;
1807 let () = s.serialize_field("desc", &proto.desc)?;
1808 let () = s.serialize_field("index", &proto.index)?;
1809 let () = s.serialize_field("updates[len]", &proto.updates.map_or(0, |x| x.len))?;
1810 s.end()
1811 }
1812}
1813
1814impl LazyInlineBatchPart {
1815 pub(crate) fn encoded_size_bytes(&self) -> usize {
1816 self.0.buf.len()
1817 }
1818
1819 pub fn decode<T: Timestamp + Codec64>(
1825 &self,
1826 lgbytes: &ColumnarMetrics,
1827 ) -> Result<BlobTraceBatchPart<T>, TryFromProtoError> {
1828 let proto = self.0.decode().expect("valid proto");
1829 ProtoInlineBatchPart::into_rust(lgbytes, proto)
1830 }
1831}
1832
1833impl RustType<Bytes> for LazyInlineBatchPart {
1834 fn into_proto(&self) -> Bytes {
1835 self.0.into_proto()
1836 }
1837
1838 fn from_proto(proto: Bytes) -> Result<Self, TryFromProtoError> {
1839 Ok(LazyInlineBatchPart(proto.into_rust()?))
1840 }
1841}
1842
1843impl RustType<ProtoHollowRollup> for HollowRollup {
1844 fn into_proto(&self) -> ProtoHollowRollup {
1845 ProtoHollowRollup {
1846 key: self.key.into_proto(),
1847 encoded_size_bytes: self.encoded_size_bytes.into_proto(),
1848 }
1849 }
1850
1851 fn from_proto(proto: ProtoHollowRollup) -> Result<Self, TryFromProtoError> {
1852 Ok(HollowRollup {
1853 key: proto.key.into_rust()?,
1854 encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
1855 })
1856 }
1857}
1858
1859impl RustType<ProtoActiveRollup> for ActiveRollup {
1860 fn into_proto(&self) -> ProtoActiveRollup {
1861 ProtoActiveRollup {
1862 start_ms: self.start_ms,
1863 seqno: self.seqno.into_proto(),
1864 }
1865 }
1866
1867 fn from_proto(proto: ProtoActiveRollup) -> Result<Self, TryFromProtoError> {
1868 Ok(ActiveRollup {
1869 start_ms: proto.start_ms,
1870 seqno: proto.seqno.into_rust()?,
1871 })
1872 }
1873}
1874
1875impl RustType<ProtoActiveGc> for ActiveGc {
1876 fn into_proto(&self) -> ProtoActiveGc {
1877 ProtoActiveGc {
1878 start_ms: self.start_ms,
1879 seqno: self.seqno.into_proto(),
1880 }
1881 }
1882
1883 fn from_proto(proto: ProtoActiveGc) -> Result<Self, TryFromProtoError> {
1884 Ok(ActiveGc {
1885 start_ms: proto.start_ms,
1886 seqno: proto.seqno.into_rust()?,
1887 })
1888 }
1889}
1890
1891impl<T: Timestamp + Codec64> RustType<ProtoU64Description> for Description<T> {
1892 fn into_proto(&self) -> ProtoU64Description {
1893 ProtoU64Description {
1894 lower: Some(self.lower().into_proto()),
1895 upper: Some(self.upper().into_proto()),
1896 since: Some(self.since().into_proto()),
1897 }
1898 }
1899
1900 fn from_proto(proto: ProtoU64Description) -> Result<Self, TryFromProtoError> {
1901 Ok(Description::new(
1902 proto.lower.into_rust_if_some("lower")?,
1903 proto.upper.into_rust_if_some("upper")?,
1904 proto.since.into_rust_if_some("since")?,
1905 ))
1906 }
1907}
1908
1909impl<T: Timestamp + Codec64> RustType<ProtoU64Antichain> for Antichain<T> {
1910 fn into_proto(&self) -> ProtoU64Antichain {
1911 ProtoU64Antichain {
1912 elements: self
1913 .elements()
1914 .iter()
1915 .map(|x| i64::from_le_bytes(T::encode(x)))
1916 .collect(),
1917 }
1918 }
1919
1920 fn from_proto(proto: ProtoU64Antichain) -> Result<Self, TryFromProtoError> {
1921 let elements = proto
1922 .elements
1923 .iter()
1924 .map(|x| T::decode(x.to_le_bytes()))
1925 .collect::<Vec<_>>();
1926 Ok(Antichain::from(elements))
1927 }
1928}
1929
1930#[cfg(test)]
1931mod tests {
1932 use bytes::Bytes;
1933 use mz_build_info::DUMMY_BUILD_INFO;
1934 use mz_dyncfg::ConfigUpdates;
1935 use mz_ore::assert_err;
1936 use mz_persist::location::SeqNo;
1937 use proptest::prelude::*;
1938
1939 use crate::ShardId;
1940 use crate::internal::paths::PartialRollupKey;
1941 use crate::internal::state::tests::any_state;
1942 use crate::internal::state::{BatchPart, HandleDebugState};
1943 use crate::internal::state_diff::StateDiff;
1944 use crate::tests::new_test_client_cache;
1945
1946 use super::*;
1947
1948 #[mz_ore::test]
1949 fn metadata_map() {
1950 const COUNT: MetadataKey<u64> = MetadataKey::new("count");
1951
1952 let mut map = MetadataMap::default();
1953 map.set(COUNT, 100);
1954 let mut map = MetadataMap::from_proto(map.into_proto()).unwrap();
1955 assert_eq!(map.get(COUNT), Some(100));
1956
1957 const ANTICHAIN: MetadataKey<Antichain<u64>, ProtoU64Antichain> =
1958 MetadataKey::new("antichain");
1959 assert_none!(map.get(ANTICHAIN));
1960
1961 map.set(ANTICHAIN, Antichain::from_elem(30));
1962 let map = MetadataMap::from_proto(map.into_proto()).unwrap();
1963 assert_eq!(map.get(COUNT), Some(100));
1964 assert_eq!(map.get(ANTICHAIN), Some(Antichain::from_elem(30)));
1965 }
1966
1967 #[mz_ore::test]
1968 fn applier_version_state() {
1969 let v1 = semver::Version::new(1, 0, 0);
1970 let v2 = semver::Version::new(2, 0, 0);
1971 let v3 = semver::Version::new(3, 0, 0);
1972
1973 let shard_id = ShardId::new();
1975 let state = TypedState::<(), (), u64, i64>::new(v2.clone(), shard_id, "".to_owned(), 0);
1976 let rollup =
1977 Rollup::from_untyped_state_without_diffs(state.clone_for_rollup().into()).into_proto();
1978 let mut buf = Vec::new();
1979 rollup.encode(&mut buf).expect("serializable");
1980 let bytes = Bytes::from(buf);
1981
1982 assert_eq!(
1984 UntypedState::<u64>::decode(&v2, bytes.clone())
1985 .check_codecs(&shard_id)
1986 .as_ref(),
1987 Ok(&state)
1988 );
1989 assert_eq!(
1990 UntypedState::<u64>::decode(&v3, bytes.clone())
1991 .check_codecs(&shard_id)
1992 .as_ref(),
1993 Ok(&state)
1994 );
1995
1996 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| UntypedState::<u64>::decode(&v1, bytes.clone()));
2001 assert_err!(v1_res);
2002 }
2003
2004 #[mz_ore::test]
2005 fn applier_version_state_diff() {
2006 let v1 = semver::Version::new(1, 0, 0);
2007 let v2 = semver::Version::new(2, 0, 0);
2008 let v3 = semver::Version::new(3, 0, 0);
2009
2010 let diff = StateDiff::<u64>::new(
2012 v2.clone(),
2013 SeqNo(0),
2014 SeqNo(1),
2015 2,
2016 PartialRollupKey("rollup".into()),
2017 );
2018 let mut buf = Vec::new();
2019 diff.encode(&mut buf);
2020 let bytes = Bytes::from(buf);
2021
2022 assert_eq!(StateDiff::decode(&v2, bytes.clone()), diff);
2024 assert_eq!(StateDiff::decode(&v3, bytes.clone()), diff);
2025
2026 #[allow(clippy::disallowed_methods)] let v1_res = std::panic::catch_unwind(|| StateDiff::<u64>::decode(&v1, bytes));
2031 assert_err!(v1_res);
2032 }
2033
2034 #[mz_ore::test]
2035 fn hollow_batch_migration_keys() {
2036 let x = HollowBatch::new_run(
2037 Description::new(
2038 Antichain::from_elem(1u64),
2039 Antichain::from_elem(2u64),
2040 Antichain::from_elem(3u64),
2041 ),
2042 vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2043 key: PartialBatchKey("a".into()),
2044 meta: Default::default(),
2045 encoded_size_bytes: 5,
2046 key_lower: vec![],
2047 structured_key_lower: None,
2048 stats: None,
2049 ts_rewrite: None,
2050 diffs_sum: None,
2051 format: None,
2052 schema_id: None,
2053 deprecated_schema_id: None,
2054 }))],
2055 4,
2056 );
2057 let mut old = x.into_proto();
2058 old.deprecated_keys = vec!["b".into()];
2060 let mut expected = x;
2064 expected
2069 .parts
2070 .push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2071 key: PartialBatchKey("b".into()),
2072 meta: Default::default(),
2073 encoded_size_bytes: 0,
2074 key_lower: vec![],
2075 structured_key_lower: None,
2076 stats: None,
2077 ts_rewrite: None,
2078 diffs_sum: None,
2079 format: None,
2080 schema_id: None,
2081 deprecated_schema_id: None,
2082 })));
2083 assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
2084 }
2085
2086 #[mz_ore::test]
2087 fn reader_state_migration_lease_duration() {
2088 let x = LeasedReaderState {
2089 seqno: SeqNo(1),
2090 since: Antichain::from_elem(2u64),
2091 last_heartbeat_timestamp_ms: 3,
2092 debug: HandleDebugState {
2093 hostname: "host".to_owned(),
2094 purpose: "purpose".to_owned(),
2095 },
2096 lease_duration_ms: 0,
2098 };
2099 let old = x.into_proto();
2100 let mut expected = x;
2101 expected.lease_duration_ms =
2104 u64::try_from(READER_LEASE_DURATION.default().as_millis()).unwrap();
2105 assert_eq!(<LeasedReaderState<u64>>::from_proto(old).unwrap(), expected);
2106 }
2107
2108 #[mz_ore::test]
2109 fn writer_state_migration_most_recent_write() {
2110 let proto = ProtoWriterState {
2111 last_heartbeat_timestamp_ms: 1,
2112 lease_duration_ms: 2,
2113 most_recent_write_token: "".into(),
2116 most_recent_write_upper: None,
2117 debug: Some(ProtoHandleDebugState {
2118 hostname: "host".to_owned(),
2119 purpose: "purpose".to_owned(),
2120 }),
2121 };
2122 let expected = WriterState {
2123 last_heartbeat_timestamp_ms: proto.last_heartbeat_timestamp_ms,
2124 lease_duration_ms: proto.lease_duration_ms,
2125 most_recent_write_token: IdempotencyToken::SENTINEL,
2126 most_recent_write_upper: Antichain::from_elem(0),
2127 debug: HandleDebugState {
2128 hostname: "host".to_owned(),
2129 purpose: "purpose".to_owned(),
2130 },
2131 };
2132 assert_eq!(<WriterState<u64>>::from_proto(proto).unwrap(), expected);
2133 }
2134
2135 #[mz_ore::test]
2136 fn state_migration_rollups() {
2137 let r1 = HollowRollup {
2138 key: PartialRollupKey("foo".to_owned()),
2139 encoded_size_bytes: None,
2140 };
2141 let r2 = HollowRollup {
2142 key: PartialRollupKey("bar".to_owned()),
2143 encoded_size_bytes: Some(2),
2144 };
2145 let shard_id = ShardId::new();
2146 let mut state = TypedState::<(), (), u64, i64>::new(
2147 DUMMY_BUILD_INFO.semver_version(),
2148 shard_id,
2149 "host".to_owned(),
2150 0,
2151 );
2152 state.state.collections.rollups.insert(SeqNo(2), r2.clone());
2153 let mut proto = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2154
2155 proto.deprecated_rollups.insert(1, r1.key.0.clone());
2157
2158 let state: Rollup<u64> = proto.into_rust().unwrap();
2159 let state = state.state;
2160 let state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2161 let expected = vec![(SeqNo(1), r1), (SeqNo(2), r2)];
2162 assert_eq!(
2163 state
2164 .state
2165 .collections
2166 .rollups
2167 .into_iter()
2168 .collect::<Vec<_>>(),
2169 expected
2170 );
2171 }
2172
2173 #[mz_persist_proc::test(tokio::test)]
2174 #[cfg_attr(miri, ignore)] async fn state_diff_migration_rollups(dyncfgs: ConfigUpdates) {
2176 let r1_rollup = HollowRollup {
2177 key: PartialRollupKey("foo".to_owned()),
2178 encoded_size_bytes: None,
2179 };
2180 let r1 = StateFieldDiff {
2181 key: SeqNo(1),
2182 val: StateFieldValDiff::Insert(r1_rollup.clone()),
2183 };
2184 let r2_rollup = HollowRollup {
2185 key: PartialRollupKey("bar".to_owned()),
2186 encoded_size_bytes: Some(2),
2187 };
2188 let r2 = StateFieldDiff {
2189 key: SeqNo(2),
2190 val: StateFieldValDiff::Insert(r2_rollup.clone()),
2191 };
2192 let r3_rollup = HollowRollup {
2193 key: PartialRollupKey("baz".to_owned()),
2194 encoded_size_bytes: None,
2195 };
2196 let r3 = StateFieldDiff {
2197 key: SeqNo(3),
2198 val: StateFieldValDiff::Delete(r3_rollup.clone()),
2199 };
2200 let mut diff = StateDiff::<u64>::new(
2201 DUMMY_BUILD_INFO.semver_version(),
2202 SeqNo(4),
2203 SeqNo(5),
2204 0,
2205 PartialRollupKey("ignored".to_owned()),
2206 );
2207 diff.rollups.push(r2.clone());
2208 diff.rollups.push(r3.clone());
2209 let mut diff_proto = diff.into_proto();
2210
2211 let field_diffs = std::mem::take(&mut diff_proto.field_diffs).unwrap();
2212 let mut field_diffs_writer = field_diffs.into_writer();
2213
2214 field_diffs_into_proto(
2216 ProtoStateField::DeprecatedRollups,
2217 &[StateFieldDiff {
2218 key: r1.key,
2219 val: StateFieldValDiff::Insert(r1_rollup.key.clone()),
2220 }],
2221 &mut field_diffs_writer,
2222 );
2223
2224 assert_none!(diff_proto.field_diffs);
2225 diff_proto.field_diffs = Some(field_diffs_writer.into_proto());
2226
2227 let diff = StateDiff::<u64>::from_proto(diff_proto.clone()).unwrap();
2228 assert_eq!(
2229 diff.rollups.into_iter().collect::<Vec<_>>(),
2230 vec![r2, r3, r1]
2231 );
2232
2233 let shard_id = ShardId::new();
2236 let mut state = TypedState::<(), (), u64, i64>::new(
2237 DUMMY_BUILD_INFO.semver_version(),
2238 shard_id,
2239 "host".to_owned(),
2240 0,
2241 );
2242 state.state.seqno = SeqNo(4);
2243 let mut rollup = Rollup::from_untyped_state_without_diffs(state.into()).into_proto();
2244 rollup
2245 .deprecated_rollups
2246 .insert(3, r3_rollup.key.into_proto());
2247 let state: Rollup<u64> = rollup.into_rust().unwrap();
2248 let state = state.state;
2249 let mut state = state.check_codecs::<(), (), i64>(&shard_id).unwrap();
2250 let cache = new_test_client_cache(&dyncfgs);
2251 let encoded_diff = VersionedData {
2252 seqno: SeqNo(5),
2253 data: diff_proto.encode_to_vec().into(),
2254 };
2255 state.apply_encoded_diffs(cache.cfg(), &cache.metrics, std::iter::once(&encoded_diff));
2256 assert_eq!(
2257 state
2258 .state
2259 .collections
2260 .rollups
2261 .into_iter()
2262 .collect::<Vec<_>>(),
2263 vec![(SeqNo(1), r1_rollup), (SeqNo(2), r2_rollup)]
2264 );
2265 }
2266
2267 #[mz_ore::test]
2268 #[cfg_attr(miri, ignore)] fn state_proto_roundtrip() {
2270 fn testcase<T: Timestamp + Lattice + Codec64>(state: State<T>) {
2271 let before = UntypedState {
2272 key_codec: <() as Codec>::codec_name(),
2273 val_codec: <() as Codec>::codec_name(),
2274 ts_codec: <T as Codec64>::codec_name(),
2275 diff_codec: <i64 as Codec64>::codec_name(),
2276 state,
2277 };
2278 let proto = Rollup::from_untyped_state_without_diffs(before.clone()).into_proto();
2279 let after: Rollup<T> = proto.into_rust().unwrap();
2280 let after = after.state;
2281 assert_eq!(before, after);
2282 }
2283
2284 proptest!(|(state in any_state::<u64>(0..3))| testcase(state));
2285 }
2286
2287 #[mz_ore::test]
2288 fn check_data_versions() {
2289 #[track_caller]
2290 fn testcase(code: &str, data: &str, expected: Result<(), ()>) {
2291 let code = Version::parse(code).unwrap();
2292 let data = Version::parse(data).unwrap();
2293 #[allow(clippy::disallowed_methods)]
2294 let actual = cfg::code_can_write_data(&code, &data)
2295 .then_some(())
2296 .ok_or(());
2297 assert_eq!(actual, expected, "data at {data} read by code {code}");
2298 }
2299
2300 testcase("0.160.0-dev", "0.160.0-dev", Ok(()));
2301 testcase("0.160.0-dev", "0.160.0", Err(()));
2302 testcase("0.160.0-dev", "0.161.0-dev", Err(()));
2305 testcase("0.160.0-dev", "0.161.0", Err(()));
2306 testcase("0.160.0-dev", "0.162.0-dev", Err(()));
2307 testcase("0.160.0-dev", "0.162.0", Err(()));
2308 testcase("0.160.0-dev", "0.163.0-dev", Err(()));
2309
2310 testcase("0.160.0", "0.158.0-dev", Ok(()));
2311 testcase("0.160.0", "0.158.0", Ok(()));
2312 testcase("0.160.0", "0.159.0-dev", Ok(()));
2313 testcase("0.160.0", "0.159.0", Ok(()));
2314 testcase("0.160.0", "0.160.0-dev", Ok(()));
2315 testcase("0.160.0", "0.160.0", Ok(()));
2316
2317 testcase("0.160.0", "0.161.0-dev", Err(()));
2318 testcase("0.160.0", "0.161.0", Err(()));
2319 testcase("0.160.0", "0.161.1", Err(()));
2320 testcase("0.160.0", "0.161.1000000", Err(()));
2321 testcase("0.160.0", "0.162.0-dev", Err(()));
2322 testcase("0.160.0", "0.162.0", Err(()));
2323 testcase("0.160.0", "0.163.0-dev", Err(()));
2324
2325 testcase("0.160.1", "0.159.0", Ok(()));
2326 testcase("0.160.1", "0.160.0", Ok(()));
2327 testcase("0.160.1", "0.161.0", Err(()));
2328 testcase("0.160.1", "0.161.1", Err(()));
2329 testcase("0.160.1", "0.161.100", Err(()));
2330 testcase("0.160.0", "0.160.1", Err(()));
2331
2332 testcase("0.160.1", "26.0.0", Err(()));
2333 testcase("26.0.0", "0.160.1", Ok(()));
2334 testcase("26.2.0", "0.160.1", Ok(()));
2335 testcase("26.200.200", "0.160.1", Ok(()));
2336
2337 testcase("27.0.0", "0.160.1", Err(()));
2338 testcase("27.0.0", "0.16000.1", Err(()));
2339 testcase("27.0.0", "26.0.1", Ok(()));
2340 testcase("27.1000.100", "26.0.1", Ok(()));
2341 testcase("28.0.0", "26.0.1", Err(()));
2342 testcase("28.0.0", "26.1000.1", Err(()));
2343 testcase("28.0.0", "27.0.0", Ok(()));
2344 }
2345}