1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use mz_persist::metrics::ColumnarMetrics;
13use std::borrow::Cow;
14use std::cmp::Ordering;
15use std::collections::BTreeMap;
16use std::fmt::{Debug, Formatter};
17use std::marker::PhantomData;
18use std::ops::ControlFlow::{self, Break, Continue};
19use std::ops::{Deref, DerefMut};
20use std::time::Duration;
21
22use arrow::array::{Array, ArrayData, make_array};
23use arrow::datatypes::DataType;
24use bytes::Bytes;
25use differential_dataflow::Hashable;
26use differential_dataflow::lattice::Lattice;
27use differential_dataflow::trace::Description;
28use differential_dataflow::trace::implementations::BatchContainer;
29use futures::Stream;
30use futures_util::StreamExt;
31use mz_dyncfg::Config;
32use mz_ore::cast::CastFrom;
33use mz_ore::now::EpochMillis;
34use mz_ore::soft_panic_or_log;
35use mz_ore::vec::PartialOrdVecExt;
36use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
37use mz_persist::location::{Blob, SeqNo};
38use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
39use mz_persist_types::columnar::{ColumnEncoder, Schema};
40use mz_persist_types::schema::{SchemaId, backward_compatible};
41use mz_persist_types::{Codec, Codec64, Opaque};
42use mz_proto::ProtoType;
43use mz_proto::RustType;
44use proptest_derive::Arbitrary;
45use semver::Version;
46use serde::ser::SerializeStruct;
47use serde::{Serialize, Serializer};
48use timely::order::TotalOrder;
49use timely::progress::{Antichain, Timestamp};
50use timely::{Container, PartialOrder};
51use tracing::info;
52use uuid::Uuid;
53
54use crate::critical::CriticalReaderId;
55use crate::error::InvalidUsage;
56use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
57use crate::internal::gc::GcReq;
58use crate::internal::machine::retry_external;
59use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
60use crate::internal::trace::{
61 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
62};
63use crate::metrics::Metrics;
64use crate::read::LeasedReaderId;
65use crate::schema::CaESchema;
66use crate::write::WriterId;
67use crate::{PersistConfig, ShardId};
68
69include!(concat!(
70 env!("OUT_DIR"),
71 "/mz_persist_client.internal.state.rs"
72));
73
74include!(concat!(
75 env!("OUT_DIR"),
76 "/mz_persist_client.internal.diff.rs"
77));
78
79pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
87 "persist_rollup_threshold",
88 128,
89 "The number of seqnos between rollups.",
90);
91
92pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
95 "persist_rollup_fallback_threshold_ms",
96 5000,
97 "The number of milliseconds before a worker claims an already claimed rollup.",
98);
99
100pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
103 "persist_rollup_use_active_rollup",
104 false,
105 "Whether to use the new active rollup tracking mechanism.",
106);
107
108pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
111 "persist_gc_fallback_threshold_ms",
112 900000,
113 "The number of milliseconds before a worker claims an already claimed GC.",
114);
115
116pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
119 "persist_gc_use_active_gc",
120 false,
121 "Whether to use the new active GC tracking mechanism.",
122);
123
124#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
127#[serde(into = "String")]
128pub struct IdempotencyToken(pub(crate) [u8; 16]);
129
130impl std::fmt::Display for IdempotencyToken {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 write!(f, "i{}", Uuid::from_bytes(self.0))
133 }
134}
135
136impl std::fmt::Debug for IdempotencyToken {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
139 }
140}
141
142impl std::str::FromStr for IdempotencyToken {
143 type Err = String;
144
145 fn from_str(s: &str) -> Result<Self, Self::Err> {
146 parse_id('i', "IdempotencyToken", s).map(IdempotencyToken)
147 }
148}
149
150impl From<IdempotencyToken> for String {
151 fn from(x: IdempotencyToken) -> Self {
152 x.to_string()
153 }
154}
155
156impl IdempotencyToken {
157 pub(crate) fn new() -> Self {
158 IdempotencyToken(*Uuid::new_v4().as_bytes())
159 }
160 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
161}
162
163#[derive(Clone, Debug, PartialEq, Serialize)]
164pub struct LeasedReaderState<T> {
165 pub seqno: SeqNo,
167 pub since: Antichain<T>,
169 pub last_heartbeat_timestamp_ms: u64,
171 pub lease_duration_ms: u64,
174 pub debug: HandleDebugState,
176}
177
178#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
179#[serde(into = "u64")]
180pub struct OpaqueState(pub [u8; 8]);
181
182impl From<OpaqueState> for u64 {
183 fn from(value: OpaqueState) -> Self {
184 u64::from_le_bytes(value.0)
185 }
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct CriticalReaderState<T> {
190 pub since: Antichain<T>,
192 pub opaque: OpaqueState,
194 pub opaque_codec: String,
196 pub debug: HandleDebugState,
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize)]
201pub struct WriterState<T> {
202 pub last_heartbeat_timestamp_ms: u64,
204 pub lease_duration_ms: u64,
207 pub most_recent_write_token: IdempotencyToken,
210 pub most_recent_write_upper: Antichain<T>,
213 pub debug: HandleDebugState,
215}
216
217#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
219pub struct HandleDebugState {
220 pub hostname: String,
223 pub purpose: String,
225}
226
227#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
231#[serde(tag = "type")]
232pub enum BatchPart<T> {
233 Hollow(HollowBatchPart<T>),
234 Inline {
235 updates: LazyInlineBatchPart,
236 ts_rewrite: Option<Antichain<T>>,
237 schema_id: Option<SchemaId>,
238
239 deprecated_schema_id: Option<SchemaId>,
241 },
242}
243
244fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
245 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
246 let proto = lower.decode()?;
247 let data = ArrayData::from_proto(proto)?;
248 ensure!(data.len() == 1);
249 Ok(ArrayBound::new(make_array(data), 0))
250 };
251
252 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
253
254 match decoded {
255 Ok(bound) => Some(bound),
256 Err(e) => {
257 soft_panic_or_log!("failed to decode bound: {e:#?}");
258 None
259 }
260 }
261}
262
263impl<T> BatchPart<T> {
264 pub fn hollow_bytes(&self) -> usize {
265 match self {
266 BatchPart::Hollow(x) => x.encoded_size_bytes,
267 BatchPart::Inline { .. } => 0,
268 }
269 }
270
271 pub fn is_inline(&self) -> bool {
272 matches!(self, BatchPart::Inline { .. })
273 }
274
275 pub fn inline_bytes(&self) -> usize {
276 match self {
277 BatchPart::Hollow(_) => 0,
278 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
279 }
280 }
281
282 pub fn writer_key(&self) -> Option<WriterKey> {
283 match self {
284 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
285 BatchPart::Inline { .. } => None,
286 }
287 }
288
289 pub fn encoded_size_bytes(&self) -> usize {
290 match self {
291 BatchPart::Hollow(x) => x.encoded_size_bytes,
292 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
293 }
294 }
295
296 pub fn printable_name(&self) -> &str {
299 match self {
300 BatchPart::Hollow(x) => x.key.0.as_str(),
301 BatchPart::Inline { .. } => "<inline>",
302 }
303 }
304
305 pub fn stats(&self) -> Option<&LazyPartStats> {
306 match self {
307 BatchPart::Hollow(x) => x.stats.as_ref(),
308 BatchPart::Inline { .. } => None,
309 }
310 }
311
312 pub fn key_lower(&self) -> &[u8] {
313 match self {
314 BatchPart::Hollow(x) => x.key_lower.as_slice(),
315 BatchPart::Inline { .. } => &[],
322 }
323 }
324
325 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
326 let part = match self {
327 BatchPart::Hollow(part) => part,
328 BatchPart::Inline { .. } => return None,
329 };
330
331 decode_structured_lower(part.structured_key_lower.as_ref()?)
332 }
333
334 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
335 match self {
336 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
337 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
338 }
339 }
340
341 pub fn schema_id(&self) -> Option<SchemaId> {
342 match self {
343 BatchPart::Hollow(x) => x.schema_id,
344 BatchPart::Inline { schema_id, .. } => *schema_id,
345 }
346 }
347
348 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
349 match self {
350 BatchPart::Hollow(x) => x.deprecated_schema_id,
351 BatchPart::Inline {
352 deprecated_schema_id,
353 ..
354 } => *deprecated_schema_id,
355 }
356 }
357}
358
359impl<T: Timestamp + Codec64> BatchPart<T> {
360 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
361 match self {
362 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
363 BatchPart::Inline { updates, .. } => {
364 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
365 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
366 }
367 }
368 }
369}
370
371#[derive(Debug, Clone)]
373pub struct HollowRun<T> {
374 pub(crate) parts: Vec<RunPart<T>>,
376}
377
378#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
381pub struct HollowRunRef<T> {
382 pub key: PartialBatchKey,
383
384 pub hollow_bytes: usize,
386
387 pub max_part_bytes: usize,
389
390 pub key_lower: Vec<u8>,
392
393 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
395
396 pub(crate) _phantom_data: PhantomData<T>,
397}
398impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
399 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
400 Some(self.cmp(other))
401 }
402}
403
404impl<T: Eq> Ord for HollowRunRef<T> {
405 fn cmp(&self, other: &Self) -> Ordering {
406 self.key.cmp(&other.key)
407 }
408}
409
410impl<T> HollowRunRef<T> {
411 pub fn writer_key(&self) -> Option<WriterKey> {
412 Some(self.key.split()?.0)
413 }
414}
415
416impl<T: Timestamp + Codec64> HollowRunRef<T> {
417 pub async fn set(
419 shard_id: ShardId,
420 blob: &dyn Blob,
421 writer: &WriterKey,
422 data: HollowRun<T>,
423 metrics: &Metrics,
424 ) -> Self {
425 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
426 let max_part_bytes = data
427 .parts
428 .iter()
429 .map(|p| p.max_part_bytes())
430 .max()
431 .unwrap_or(0);
432 let key_lower = data
433 .parts
434 .first()
435 .map_or(vec![], |p| p.key_lower().to_vec());
436 let structured_key_lower = match data.parts.first() {
437 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
438 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
439 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
440 };
441
442 let key = PartialBatchKey::new(writer, &PartId::new());
443 let blob_key = key.complete(&shard_id);
444 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
445 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
446 blob.set(&blob_key, bytes.clone())
447 })
448 .await;
449 Self {
450 key,
451 hollow_bytes,
452 max_part_bytes,
453 key_lower,
454 structured_key_lower,
455 _phantom_data: Default::default(),
456 }
457 }
458
459 pub async fn get(
463 &self,
464 shard_id: ShardId,
465 blob: &dyn Blob,
466 metrics: &Metrics,
467 ) -> Option<HollowRun<T>> {
468 let blob_key = self.key.complete(&shard_id);
469 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
470 blob.get(&blob_key)
471 })
472 .await?;
473 let proto_runs: ProtoHollowRun =
474 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
475 let runs = proto_runs
476 .into_rust()
477 .expect("illegal state: invalid encoded runs proto");
478 Some(runs)
479 }
480}
481
482#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
486#[serde(untagged)]
487pub enum RunPart<T> {
488 Single(BatchPart<T>),
489 Many(HollowRunRef<T>),
490}
491
492impl<T: Ord> PartialOrd<Self> for RunPart<T> {
493 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
494 Some(self.cmp(other))
495 }
496}
497
498impl<T: Ord> Ord for RunPart<T> {
499 fn cmp(&self, other: &Self) -> Ordering {
500 match (self, other) {
501 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
502 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
503 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
504 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
505 }
506 }
507}
508
509impl<T> RunPart<T> {
510 #[cfg(test)]
511 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
512 match self {
513 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
514 _ => panic!("expected hollow part!"),
515 }
516 }
517
518 pub fn hollow_bytes(&self) -> usize {
519 match self {
520 Self::Single(p) => p.hollow_bytes(),
521 Self::Many(r) => r.hollow_bytes,
522 }
523 }
524
525 pub fn is_inline(&self) -> bool {
526 match self {
527 Self::Single(p) => p.is_inline(),
528 Self::Many(_) => false,
529 }
530 }
531
532 pub fn inline_bytes(&self) -> usize {
533 match self {
534 Self::Single(p) => p.inline_bytes(),
535 Self::Many(_) => 0,
536 }
537 }
538
539 pub fn max_part_bytes(&self) -> usize {
540 match self {
541 Self::Single(p) => p.encoded_size_bytes(),
542 Self::Many(r) => r.max_part_bytes,
543 }
544 }
545
546 pub fn writer_key(&self) -> Option<WriterKey> {
547 match self {
548 Self::Single(p) => p.writer_key(),
549 Self::Many(r) => r.writer_key(),
550 }
551 }
552
553 pub fn encoded_size_bytes(&self) -> usize {
554 match self {
555 Self::Single(p) => p.encoded_size_bytes(),
556 Self::Many(r) => r.hollow_bytes,
557 }
558 }
559
560 pub fn schema_id(&self) -> Option<SchemaId> {
561 match self {
562 Self::Single(p) => p.schema_id(),
563 Self::Many(_) => None,
564 }
565 }
566
567 pub fn printable_name(&self) -> &str {
570 match self {
571 Self::Single(p) => p.printable_name(),
572 Self::Many(r) => r.key.0.as_str(),
573 }
574 }
575
576 pub fn stats(&self) -> Option<&LazyPartStats> {
577 match self {
578 Self::Single(p) => p.stats(),
579 Self::Many(_) => None,
581 }
582 }
583
584 pub fn key_lower(&self) -> &[u8] {
585 match self {
586 Self::Single(p) => p.key_lower(),
587 Self::Many(r) => r.key_lower.as_slice(),
588 }
589 }
590
591 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
592 match self {
593 Self::Single(p) => p.structured_key_lower(),
594 Self::Many(_) => None,
595 }
596 }
597
598 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
599 match self {
600 Self::Single(p) => p.ts_rewrite(),
601 Self::Many(_) => None,
602 }
603 }
604}
605
606#[derive(Clone, Debug)]
608pub struct MissingBlob(BlobKey);
609
610impl std::fmt::Display for MissingBlob {
611 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
612 write!(f, "unexpectedly missing key: {}", self.0)
613 }
614}
615
616impl std::error::Error for MissingBlob {}
617
618impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
619 pub fn part_stream<'a>(
620 &'a self,
621 shard_id: ShardId,
622 blob: &'a dyn Blob,
623 metrics: &'a Metrics,
624 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
625 try_stream! {
626 match self {
627 RunPart::Single(p) => {
628 yield Cow::Borrowed(p);
629 }
630 RunPart::Many(r) => {
631 let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
632 for run_part in fetched.parts {
633 for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
634 yield Cow::Owned(batch_part?.into_owned());
635 }
636 }
637 }
638 }
639 }
640 }
641}
642
643impl<T: Ord> PartialOrd for BatchPart<T> {
644 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
645 Some(self.cmp(other))
646 }
647}
648
649impl<T: Ord> Ord for BatchPart<T> {
650 fn cmp(&self, other: &Self) -> Ordering {
651 match (self, other) {
652 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
653 (
654 BatchPart::Inline {
655 updates: s_updates,
656 ts_rewrite: s_ts_rewrite,
657 schema_id: s_schema_id,
658 deprecated_schema_id: s_deprecated_schema_id,
659 },
660 BatchPart::Inline {
661 updates: o_updates,
662 ts_rewrite: o_ts_rewrite,
663 schema_id: o_schema_id,
664 deprecated_schema_id: o_deprecated_schema_id,
665 },
666 ) => (
667 s_updates,
668 s_ts_rewrite.as_ref().map(|x| x.elements()),
669 s_schema_id,
670 s_deprecated_schema_id,
671 )
672 .cmp(&(
673 o_updates,
674 o_ts_rewrite.as_ref().map(|x| x.elements()),
675 o_schema_id,
676 o_deprecated_schema_id,
677 )),
678 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
679 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
680 }
681 }
682}
683
684#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
686pub(crate) enum RunOrder {
687 Unordered,
689 Codec,
691 Structured,
693}
694
695#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
697pub struct RunMeta {
698 pub(crate) order: Option<RunOrder>,
700 pub(crate) schema: Option<SchemaId>,
702
703 pub(crate) deprecated_schema: Option<SchemaId>,
705}
706
707#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
709pub struct HollowBatchPart<T> {
710 pub key: PartialBatchKey,
712 pub encoded_size_bytes: usize,
714 #[serde(serialize_with = "serialize_part_bytes")]
717 pub key_lower: Vec<u8>,
718 #[serde(serialize_with = "serialize_lazy_proto")]
720 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
721 #[serde(serialize_with = "serialize_part_stats")]
723 pub stats: Option<LazyPartStats>,
724 pub ts_rewrite: Option<Antichain<T>>,
732 #[serde(serialize_with = "serialize_diffs_sum")]
740 pub diffs_sum: Option<[u8; 8]>,
741 pub format: Option<BatchColumnarFormat>,
746 pub schema_id: Option<SchemaId>,
751
752 pub deprecated_schema_id: Option<SchemaId>,
754}
755
756#[derive(Clone, PartialEq, Eq)]
760pub struct HollowBatch<T> {
761 pub desc: Description<T>,
763 pub len: usize,
765 pub(crate) parts: Vec<RunPart<T>>,
767 pub(crate) run_splits: Vec<usize>,
775 pub(crate) run_meta: Vec<RunMeta>,
778}
779
780impl<T: Debug> Debug for HollowBatch<T> {
781 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
782 let HollowBatch {
783 desc,
784 parts,
785 len,
786 run_splits: runs,
787 run_meta,
788 } = self;
789 f.debug_struct("HollowBatch")
790 .field(
791 "desc",
792 &(
793 desc.lower().elements(),
794 desc.upper().elements(),
795 desc.since().elements(),
796 ),
797 )
798 .field("parts", &parts)
799 .field("len", &len)
800 .field("runs", &runs)
801 .field("run_meta", &run_meta)
802 .finish()
803 }
804}
805
806impl<T: Serialize> serde::Serialize for HollowBatch<T> {
807 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
808 let HollowBatch {
809 desc,
810 len,
811 parts: _,
813 run_splits: _,
814 run_meta: _,
815 } = self;
816 let mut s = s.serialize_struct("HollowBatch", 5)?;
817 let () = s.serialize_field("lower", &desc.lower().elements())?;
818 let () = s.serialize_field("upper", &desc.upper().elements())?;
819 let () = s.serialize_field("since", &desc.since().elements())?;
820 let () = s.serialize_field("len", len)?;
821 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
822 s.end()
823 }
824}
825
826impl<T: Ord> PartialOrd for HollowBatch<T> {
827 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
828 Some(self.cmp(other))
829 }
830}
831
832impl<T: Ord> Ord for HollowBatch<T> {
833 fn cmp(&self, other: &Self) -> Ordering {
834 let HollowBatch {
837 desc: self_desc,
838 parts: self_parts,
839 len: self_len,
840 run_splits: self_runs,
841 run_meta: self_run_meta,
842 } = self;
843 let HollowBatch {
844 desc: other_desc,
845 parts: other_parts,
846 len: other_len,
847 run_splits: other_runs,
848 run_meta: other_run_meta,
849 } = other;
850 (
851 self_desc.lower().elements(),
852 self_desc.upper().elements(),
853 self_desc.since().elements(),
854 self_parts,
855 self_len,
856 self_runs,
857 self_run_meta,
858 )
859 .cmp(&(
860 other_desc.lower().elements(),
861 other_desc.upper().elements(),
862 other_desc.since().elements(),
863 other_parts,
864 other_len,
865 other_runs,
866 other_run_meta,
867 ))
868 }
869}
870
871impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
872 pub fn part_stream<'a>(
873 &'a self,
874 shard_id: ShardId,
875 blob: &'a dyn Blob,
876 metrics: &'a Metrics,
877 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
878 stream! {
879 for part in &self.parts {
880 for await part in part.part_stream(shard_id, blob, metrics) {
881 yield part;
882 }
883 }
884 }
885 }
886}
887impl<T> HollowBatch<T> {
888 pub(crate) fn new(
895 desc: Description<T>,
896 parts: Vec<RunPart<T>>,
897 len: usize,
898 run_meta: Vec<RunMeta>,
899 run_splits: Vec<usize>,
900 ) -> Self {
901 debug_assert!(
902 run_splits.is_strictly_sorted(),
903 "run indices should be strictly increasing"
904 );
905 debug_assert!(
906 run_splits.first().map_or(true, |i| *i > 0),
907 "run indices should be positive"
908 );
909 debug_assert!(
910 run_splits.last().map_or(true, |i| *i < parts.len()),
911 "run indices should be valid indices into parts"
912 );
913 debug_assert!(
914 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
915 "all metadata should correspond to a run"
916 );
917
918 Self {
919 desc,
920 len,
921 parts,
922 run_splits,
923 run_meta,
924 }
925 }
926
927 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
929 let run_meta = if parts.is_empty() {
930 vec![]
931 } else {
932 vec![RunMeta::default()]
933 };
934 Self {
935 desc,
936 len,
937 parts,
938 run_splits: vec![],
939 run_meta,
940 }
941 }
942
943 pub(crate) fn empty(desc: Description<T>) -> Self {
945 Self {
946 desc,
947 len: 0,
948 parts: vec![],
949 run_splits: vec![],
950 run_meta: vec![],
951 }
952 }
953
954 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
955 let run_ends = self
956 .run_splits
957 .iter()
958 .copied()
959 .chain(std::iter::once(self.parts.len()));
960 let run_metas = self.run_meta.iter();
961 let run_parts = run_ends
962 .scan(0, |start, end| {
963 let range = *start..end;
964 *start = end;
965 Some(range)
966 })
967 .filter(|range| !range.is_empty())
968 .map(|range| &self.parts[range]);
969 run_metas.zip(run_parts)
970 }
971
972 pub(crate) fn inline_bytes(&self) -> usize {
973 self.parts.iter().map(|x| x.inline_bytes()).sum()
974 }
975
976 pub fn is_empty(&self) -> bool {
977 self.parts.is_empty()
978 }
979
980 pub fn part_count(&self) -> usize {
981 self.parts.len()
982 }
983
984 pub fn encoded_size_bytes(&self) -> usize {
986 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
987 }
988}
989
990impl<T: Timestamp + TotalOrder> HollowBatch<T> {
992 pub(crate) fn rewrite_ts(
993 &mut self,
994 frontier: &Antichain<T>,
995 new_upper: Antichain<T>,
996 ) -> Result<(), String> {
997 if !PartialOrder::less_than(frontier, &new_upper) {
998 return Err(format!(
999 "rewrite frontier {:?} !< rewrite upper {:?}",
1000 frontier.elements(),
1001 new_upper.elements(),
1002 ));
1003 }
1004 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1005 return Err(format!(
1006 "rewrite upper {:?} < batch upper {:?}",
1007 new_upper.elements(),
1008 self.desc.upper().elements(),
1009 ));
1010 }
1011
1012 if PartialOrder::less_than(frontier, self.desc.lower()) {
1015 return Err(format!(
1016 "rewrite frontier {:?} < batch lower {:?}",
1017 frontier.elements(),
1018 self.desc.lower().elements(),
1019 ));
1020 }
1021 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1022 return Err(format!(
1023 "batch since {:?} != minimum antichain {:?}",
1024 self.desc.since().elements(),
1025 &[T::minimum()],
1026 ));
1027 }
1028 for part in self.parts.iter() {
1029 let Some(ts_rewrite) = part.ts_rewrite() else {
1030 continue;
1031 };
1032 if PartialOrder::less_than(frontier, ts_rewrite) {
1033 return Err(format!(
1034 "rewrite frontier {:?} < batch rewrite {:?}",
1035 frontier.elements(),
1036 ts_rewrite.elements(),
1037 ));
1038 }
1039 }
1040
1041 self.desc = Description::new(
1042 self.desc.lower().clone(),
1043 new_upper,
1044 self.desc.since().clone(),
1045 );
1046 for part in &mut self.parts {
1047 match part {
1048 RunPart::Single(BatchPart::Hollow(part)) => {
1049 part.ts_rewrite = Some(frontier.clone())
1050 }
1051 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1052 *ts_rewrite = Some(frontier.clone())
1053 }
1054 RunPart::Many(runs) => {
1055 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1058 }
1059 }
1060 }
1061 Ok(())
1062 }
1063}
1064
1065impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1066 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1067 Some(self.cmp(other))
1068 }
1069}
1070
1071impl<T: Ord> Ord for HollowBatchPart<T> {
1072 fn cmp(&self, other: &Self) -> Ordering {
1073 let HollowBatchPart {
1076 key: self_key,
1077 encoded_size_bytes: self_encoded_size_bytes,
1078 key_lower: self_key_lower,
1079 structured_key_lower: self_structured_key_lower,
1080 stats: self_stats,
1081 ts_rewrite: self_ts_rewrite,
1082 diffs_sum: self_diffs_sum,
1083 format: self_format,
1084 schema_id: self_schema_id,
1085 deprecated_schema_id: self_deprecated_schema_id,
1086 } = self;
1087 let HollowBatchPart {
1088 key: other_key,
1089 encoded_size_bytes: other_encoded_size_bytes,
1090 key_lower: other_key_lower,
1091 structured_key_lower: other_structured_key_lower,
1092 stats: other_stats,
1093 ts_rewrite: other_ts_rewrite,
1094 diffs_sum: other_diffs_sum,
1095 format: other_format,
1096 schema_id: other_schema_id,
1097 deprecated_schema_id: other_deprecated_schema_id,
1098 } = other;
1099 (
1100 self_key,
1101 self_encoded_size_bytes,
1102 self_key_lower,
1103 self_structured_key_lower,
1104 self_stats,
1105 self_ts_rewrite.as_ref().map(|x| x.elements()),
1106 self_diffs_sum,
1107 self_format,
1108 self_schema_id,
1109 self_deprecated_schema_id,
1110 )
1111 .cmp(&(
1112 other_key,
1113 other_encoded_size_bytes,
1114 other_key_lower,
1115 other_structured_key_lower,
1116 other_stats,
1117 other_ts_rewrite.as_ref().map(|x| x.elements()),
1118 other_diffs_sum,
1119 other_format,
1120 other_schema_id,
1121 other_deprecated_schema_id,
1122 ))
1123 }
1124}
1125
1126#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1128pub struct HollowRollup {
1129 pub key: PartialRollupKey,
1131 pub encoded_size_bytes: Option<usize>,
1133}
1134
1135#[derive(Debug)]
1137pub enum HollowBlobRef<'a, T> {
1138 Batch(&'a HollowBatch<T>),
1139 Rollup(&'a HollowRollup),
1140}
1141
1142#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1144pub struct ActiveRollup {
1145 pub seqno: SeqNo,
1146 pub start_ms: u64,
1147}
1148
1149#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1151pub struct ActiveGc {
1152 pub seqno: SeqNo,
1153 pub start_ms: u64,
1154}
1155
1156#[derive(Debug)]
1161#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1162pub struct NoOpStateTransition<T>(pub T);
1163
1164#[derive(Debug, Clone)]
1166#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1167pub struct StateCollections<T> {
1168 pub(crate) last_gc_req: SeqNo,
1171
1172 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1174
1175 pub(crate) active_rollup: Option<ActiveRollup>,
1177 pub(crate) active_gc: Option<ActiveGc>,
1179
1180 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1181 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1182 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1183 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1184
1185 pub(crate) trace: Trace<T>,
1190}
1191
1192#[derive(Debug, Clone, Serialize, PartialEq)]
1208pub struct EncodedSchemas {
1209 pub key: Bytes,
1211 pub key_data_type: Bytes,
1214 pub val: Bytes,
1216 pub val_data_type: Bytes,
1219}
1220
1221impl EncodedSchemas {
1222 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1223 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1224 DataType::from_proto(proto).expect("valid DataType")
1225 }
1226}
1227
1228#[derive(Debug)]
1229#[cfg_attr(test, derive(PartialEq))]
1230pub enum CompareAndAppendBreak<T> {
1231 AlreadyCommitted,
1232 Upper {
1233 shard_upper: Antichain<T>,
1234 writer_upper: Antichain<T>,
1235 },
1236 InvalidUsage(InvalidUsage<T>),
1237 InlineBackpressure,
1238}
1239
1240#[derive(Debug)]
1241#[cfg_attr(test, derive(PartialEq))]
1242pub enum SnapshotErr<T> {
1243 AsOfNotYetAvailable(SeqNo, Upper<T>),
1244 AsOfHistoricalDistinctionsLost(Since<T>),
1245}
1246
1247impl<T> StateCollections<T>
1248where
1249 T: Timestamp + Lattice + Codec64,
1250{
1251 pub fn add_rollup(
1252 &mut self,
1253 add_rollup: (SeqNo, &HollowRollup),
1254 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1255 let (rollup_seqno, rollup) = add_rollup;
1256 let applied = match self.rollups.get(&rollup_seqno) {
1257 Some(x) => x.key == rollup.key,
1258 None => {
1259 self.active_rollup = None;
1260 self.rollups.insert(rollup_seqno, rollup.to_owned());
1261 true
1262 }
1263 };
1264 Continue(applied)
1268 }
1269
1270 pub fn remove_rollups(
1271 &mut self,
1272 remove_rollups: &[(SeqNo, PartialRollupKey)],
1273 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1274 if remove_rollups.is_empty() || self.is_tombstone() {
1275 return Break(NoOpStateTransition(vec![]));
1276 }
1277
1278 self.active_gc = None;
1281
1282 let mut removed = vec![];
1283 for (seqno, key) in remove_rollups {
1284 let removed_key = self.rollups.remove(seqno);
1285 debug_assert!(
1286 removed_key.as_ref().map_or(true, |x| &x.key == key),
1287 "{} vs {:?}",
1288 key,
1289 removed_key
1290 );
1291
1292 if removed_key.is_some() {
1293 removed.push(*seqno);
1294 }
1295 }
1296
1297 Continue(removed)
1298 }
1299
1300 pub fn register_leased_reader(
1301 &mut self,
1302 hostname: &str,
1303 reader_id: &LeasedReaderId,
1304 purpose: &str,
1305 seqno: SeqNo,
1306 lease_duration: Duration,
1307 heartbeat_timestamp_ms: u64,
1308 use_critical_since: bool,
1309 ) -> ControlFlow<
1310 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1311 (LeasedReaderState<T>, SeqNo),
1312 > {
1313 let since = if use_critical_since {
1314 self.critical_since()
1315 .unwrap_or_else(|| self.trace.since().clone())
1316 } else {
1317 self.trace.since().clone()
1318 };
1319 let reader_state = LeasedReaderState {
1320 debug: HandleDebugState {
1321 hostname: hostname.to_owned(),
1322 purpose: purpose.to_owned(),
1323 },
1324 seqno,
1325 since,
1326 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1327 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1328 .expect("lease duration as millis should fit within u64"),
1329 };
1330
1331 if self.is_tombstone() {
1336 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1337 }
1338
1339 self.leased_readers
1341 .insert(reader_id.clone(), reader_state.clone());
1342 Continue((reader_state, self.seqno_since(seqno)))
1343 }
1344
1345 pub fn register_critical_reader<O: Opaque + Codec64>(
1346 &mut self,
1347 hostname: &str,
1348 reader_id: &CriticalReaderId,
1349 purpose: &str,
1350 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1351 let state = CriticalReaderState {
1352 debug: HandleDebugState {
1353 hostname: hostname.to_owned(),
1354 purpose: purpose.to_owned(),
1355 },
1356 since: self.trace.since().clone(),
1357 opaque: OpaqueState(Codec64::encode(&O::initial())),
1358 opaque_codec: O::codec_name(),
1359 };
1360
1361 if self.is_tombstone() {
1366 return Break(NoOpStateTransition(state));
1367 }
1368
1369 let state = match self.critical_readers.get_mut(reader_id) {
1370 Some(existing_state) => {
1371 existing_state.debug = state.debug;
1372 existing_state.clone()
1373 }
1374 None => {
1375 self.critical_readers
1376 .insert(reader_id.clone(), state.clone());
1377 state
1378 }
1379 };
1380 Continue(state)
1381 }
1382
1383 pub fn register_schema<K: Codec, V: Codec>(
1384 &mut self,
1385 key_schema: &K::Schema,
1386 val_schema: &V::Schema,
1387 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1388 fn encode_data_type(data_type: &DataType) -> Bytes {
1389 let proto = data_type.into_proto();
1390 prost::Message::encode_to_vec(&proto).into()
1391 }
1392
1393 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1405 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1406 });
1407 match existing_id {
1408 Some((schema_id, _)) => {
1409 Break(NoOpStateTransition(Some(*schema_id)))
1414 }
1415 None if self.is_tombstone() => {
1416 Break(NoOpStateTransition(None))
1418 }
1419 None if self.schemas.is_empty() => {
1420 let id = SchemaId(self.schemas.len());
1424 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1425 .expect("valid key schema");
1426 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1427 .expect("valid val schema");
1428 let prev = self.schemas.insert(
1429 id,
1430 EncodedSchemas {
1431 key: K::encode_schema(key_schema),
1432 key_data_type: encode_data_type(&key_data_type),
1433 val: V::encode_schema(val_schema),
1434 val_data_type: encode_data_type(&val_data_type),
1435 },
1436 );
1437 assert_eq!(prev, None);
1438 Continue(Some(id))
1439 }
1440 None => {
1441 info!(
1442 "register_schemas got {:?} expected {:?}",
1443 key_schema,
1444 self.schemas
1445 .iter()
1446 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1447 .collect::<Vec<_>>()
1448 );
1449 Break(NoOpStateTransition(None))
1452 }
1453 }
1454 }
1455
1456 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1457 &mut self,
1458 expected: SchemaId,
1459 key_schema: &K::Schema,
1460 val_schema: &V::Schema,
1461 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1462 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1463 let array = Schema::encoder(schema).expect("valid schema").finish();
1467 Array::data_type(&array).clone()
1468 }
1469
1470 let (current_id, current) = self
1471 .schemas
1472 .last_key_value()
1473 .expect("all shards have a schema");
1474 if *current_id != expected {
1475 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1476 schema_id: *current_id,
1477 key: K::decode_schema(¤t.key),
1478 val: V::decode_schema(¤t.val),
1479 }));
1480 }
1481
1482 let current_key = K::decode_schema(¤t.key);
1483 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1484 let current_val = V::decode_schema(¤t.val);
1485 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1486
1487 let key_dt = data_type(key_schema);
1488 let val_dt = data_type(val_schema);
1489
1490 if current_key == *key_schema
1492 && current_key_dt == key_dt
1493 && current_val == *val_schema
1494 && current_val_dt == val_dt
1495 {
1496 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1497 }
1498
1499 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1500 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1501 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1502 return Break(NoOpStateTransition(CaESchema::Incompatible));
1503 };
1504 if key_fn.contains_drop() || val_fn.contains_drop() {
1508 return Break(NoOpStateTransition(CaESchema::Incompatible));
1509 }
1510
1511 let id = SchemaId(self.schemas.len());
1515 self.schemas.insert(
1516 id,
1517 EncodedSchemas {
1518 key: K::encode_schema(key_schema),
1519 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1520 val: V::encode_schema(val_schema),
1521 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1522 },
1523 );
1524 Continue(CaESchema::Ok(id))
1525 }
1526
1527 pub fn compare_and_append(
1528 &mut self,
1529 batch: &HollowBatch<T>,
1530 writer_id: &WriterId,
1531 heartbeat_timestamp_ms: u64,
1532 lease_duration_ms: u64,
1533 idempotency_token: &IdempotencyToken,
1534 debug_info: &HandleDebugState,
1535 inline_writes_total_max_bytes: usize,
1536 claim_compaction_percent: usize,
1537 claim_compaction_min_version: Option<&Version>,
1538 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1539 if self.is_tombstone() {
1544 assert_eq!(self.trace.upper(), &Antichain::new());
1545 return Break(CompareAndAppendBreak::Upper {
1546 shard_upper: Antichain::new(),
1547 writer_upper: Antichain::new(),
1552 });
1553 }
1554
1555 let writer_state = self
1556 .writers
1557 .entry(writer_id.clone())
1558 .or_insert_with(|| WriterState {
1559 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1560 lease_duration_ms,
1561 most_recent_write_token: IdempotencyToken::SENTINEL,
1562 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1563 debug: debug_info.clone(),
1564 });
1565
1566 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1567 return Break(CompareAndAppendBreak::InvalidUsage(
1568 InvalidUsage::InvalidBounds {
1569 lower: batch.desc.lower().clone(),
1570 upper: batch.desc.upper().clone(),
1571 },
1572 ));
1573 }
1574
1575 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1578 return Break(CompareAndAppendBreak::InvalidUsage(
1579 InvalidUsage::InvalidEmptyTimeInterval {
1580 lower: batch.desc.lower().clone(),
1581 upper: batch.desc.upper().clone(),
1582 keys: batch
1583 .parts
1584 .iter()
1585 .map(|x| x.printable_name().to_owned())
1586 .collect(),
1587 },
1588 ));
1589 }
1590
1591 if idempotency_token == &writer_state.most_recent_write_token {
1592 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1597 assert!(
1598 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1599 "{:?} vs {:?}",
1600 batch.desc.upper(),
1601 self.trace.upper()
1602 );
1603 return Break(CompareAndAppendBreak::AlreadyCommitted);
1604 }
1605
1606 let shard_upper = self.trace.upper();
1607 if shard_upper != batch.desc.lower() {
1608 return Break(CompareAndAppendBreak::Upper {
1609 shard_upper: shard_upper.clone(),
1610 writer_upper: writer_state.most_recent_write_upper.clone(),
1611 });
1612 }
1613
1614 let new_inline_bytes = batch.inline_bytes();
1615 if new_inline_bytes > 0 {
1616 let mut existing_inline_bytes = 0;
1617 self.trace
1618 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1619 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1623 return Break(CompareAndAppendBreak::InlineBackpressure);
1624 }
1625 }
1626
1627 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1628 self.trace.push_batch(batch.clone())
1629 } else {
1630 Vec::new()
1631 };
1632
1633 let all_empty_reqs = merge_reqs
1636 .iter()
1637 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1638 if all_empty_reqs && !batch.is_empty() {
1639 let mut reqs_to_take = claim_compaction_percent / 100;
1640 if (usize::cast_from(idempotency_token.hashed()) % 100)
1641 < (claim_compaction_percent % 100)
1642 {
1643 reqs_to_take += 1;
1644 }
1645 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1646 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1647 merge_reqs.extend(
1648 self.trace
1651 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1652 .take(reqs_to_take),
1653 )
1654 }
1655
1656 for req in &merge_reqs {
1657 self.trace.claim_compaction(
1658 req.id,
1659 ActiveCompaction {
1660 start_ms: heartbeat_timestamp_ms,
1661 },
1662 )
1663 }
1664
1665 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1666 writer_state.most_recent_write_token = idempotency_token.clone();
1667 assert!(
1669 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1670 "{:?} vs {:?}",
1671 &writer_state.most_recent_write_upper,
1672 batch.desc.upper()
1673 );
1674 writer_state
1675 .most_recent_write_upper
1676 .clone_from(batch.desc.upper());
1677
1678 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1680 heartbeat_timestamp_ms,
1681 writer_state.last_heartbeat_timestamp_ms,
1682 );
1683
1684 Continue(merge_reqs)
1685 }
1686
1687 pub fn apply_merge_res(
1688 &mut self,
1689 res: &FueledMergeRes<T>,
1690 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1691 if self.is_tombstone() {
1696 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1697 }
1698
1699 let apply_merge_result = self.trace.apply_merge_res(res);
1700 Continue(apply_merge_result)
1701 }
1702
1703 pub fn spine_exert(
1704 &mut self,
1705 fuel: usize,
1706 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1707 let (merge_reqs, did_work) = self.trace.exert(fuel);
1708 if did_work {
1709 Continue(merge_reqs)
1710 } else {
1711 assert!(merge_reqs.is_empty());
1712 Break(NoOpStateTransition(Vec::new()))
1715 }
1716 }
1717
1718 pub fn downgrade_since(
1719 &mut self,
1720 reader_id: &LeasedReaderId,
1721 seqno: SeqNo,
1722 outstanding_seqno: Option<SeqNo>,
1723 new_since: &Antichain<T>,
1724 heartbeat_timestamp_ms: u64,
1725 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1726 if self.is_tombstone() {
1731 return Break(NoOpStateTransition(Since(Antichain::new())));
1732 }
1733
1734 let Some(reader_state) = self.leased_reader(reader_id) else {
1737 tracing::warn!(
1738 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1739 );
1740 return Break(NoOpStateTransition(Since(Antichain::new())));
1741 };
1742
1743 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1746 heartbeat_timestamp_ms,
1747 reader_state.last_heartbeat_timestamp_ms,
1748 );
1749
1750 let seqno = match outstanding_seqno {
1751 Some(outstanding_seqno) => {
1752 assert!(
1753 outstanding_seqno >= reader_state.seqno,
1754 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1755 is behind current reader_state ({:?})",
1756 outstanding_seqno,
1757 reader_state.seqno,
1758 );
1759 std::cmp::min(outstanding_seqno, seqno)
1760 }
1761 None => seqno,
1762 };
1763
1764 reader_state.seqno = seqno;
1765
1766 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1767 reader_state.since.clone_from(new_since);
1768 self.update_since();
1769 new_since.clone()
1770 } else {
1771 reader_state.since.clone()
1774 };
1775
1776 Continue(Since(reader_current_since))
1777 }
1778
1779 pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1780 &mut self,
1781 reader_id: &CriticalReaderId,
1782 expected_opaque: &O,
1783 (new_opaque, new_since): (&O, &Antichain<T>),
1784 ) -> ControlFlow<
1785 NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1786 Result<Since<T>, (O, Since<T>)>,
1787 > {
1788 if self.is_tombstone() {
1793 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1797 }
1798
1799 let reader_state = self.critical_reader(reader_id);
1800 assert_eq!(reader_state.opaque_codec, O::codec_name());
1801
1802 if &O::decode(reader_state.opaque.0) != expected_opaque {
1803 return Continue(Err((
1806 Codec64::decode(reader_state.opaque.0),
1807 Since(reader_state.since.clone()),
1808 )));
1809 }
1810
1811 reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1812 if PartialOrder::less_equal(&reader_state.since, new_since) {
1813 reader_state.since.clone_from(new_since);
1814 self.update_since();
1815 Continue(Ok(Since(new_since.clone())))
1816 } else {
1817 Continue(Ok(Since(reader_state.since.clone())))
1821 }
1822 }
1823
1824 pub fn heartbeat_leased_reader(
1825 &mut self,
1826 reader_id: &LeasedReaderId,
1827 heartbeat_timestamp_ms: u64,
1828 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1829 if self.is_tombstone() {
1834 return Break(NoOpStateTransition(false));
1835 }
1836
1837 match self.leased_readers.get_mut(reader_id) {
1838 Some(reader_state) => {
1839 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1840 heartbeat_timestamp_ms,
1841 reader_state.last_heartbeat_timestamp_ms,
1842 );
1843 Continue(true)
1844 }
1845 None => Continue(false),
1848 }
1849 }
1850
1851 pub fn expire_leased_reader(
1852 &mut self,
1853 reader_id: &LeasedReaderId,
1854 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1855 if self.is_tombstone() {
1860 return Break(NoOpStateTransition(false));
1861 }
1862
1863 let existed = self.leased_readers.remove(reader_id).is_some();
1864 if existed {
1865 }
1879 Continue(existed)
1882 }
1883
1884 pub fn expire_critical_reader(
1885 &mut self,
1886 reader_id: &CriticalReaderId,
1887 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1888 if self.is_tombstone() {
1893 return Break(NoOpStateTransition(false));
1894 }
1895
1896 let existed = self.critical_readers.remove(reader_id).is_some();
1897 if existed {
1898 }
1912 Continue(existed)
1916 }
1917
1918 pub fn expire_writer(
1919 &mut self,
1920 writer_id: &WriterId,
1921 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1922 if self.is_tombstone() {
1927 return Break(NoOpStateTransition(false));
1928 }
1929
1930 let existed = self.writers.remove(writer_id).is_some();
1931 Continue(existed)
1935 }
1936
1937 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
1938 self.leased_readers.get_mut(id)
1939 }
1940
1941 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
1942 self.critical_readers
1943 .get_mut(id)
1944 .unwrap_or_else(|| {
1945 panic!(
1946 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
1947 id
1948 )
1949 })
1950 }
1951
1952 fn critical_since(&self) -> Option<Antichain<T>> {
1953 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
1954 let mut since = critical_sinces.next().cloned()?;
1955 for s in critical_sinces {
1956 since.meet_assign(s);
1957 }
1958 Some(since)
1959 }
1960
1961 fn update_since(&mut self) {
1962 let mut sinces_iter = self
1963 .leased_readers
1964 .values()
1965 .map(|x| &x.since)
1966 .chain(self.critical_readers.values().map(|x| &x.since));
1967 let mut since = match sinces_iter.next() {
1968 Some(since) => since.clone(),
1969 None => {
1970 return;
1973 }
1974 };
1975 while let Some(s) = sinces_iter.next() {
1976 since.meet_assign(s);
1977 }
1978 self.trace.downgrade_since(&since);
1979 }
1980
1981 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
1982 let mut seqno_since = seqno;
1983 for cap in self.leased_readers.values() {
1984 seqno_since = std::cmp::min(seqno_since, cap.seqno);
1985 }
1986 seqno_since
1988 }
1989
1990 fn tombstone_batch() -> HollowBatch<T> {
1991 HollowBatch::empty(Description::new(
1992 Antichain::from_elem(T::minimum()),
1993 Antichain::new(),
1994 Antichain::new(),
1995 ))
1996 }
1997
1998 pub(crate) fn is_tombstone(&self) -> bool {
1999 self.trace.upper().is_empty()
2000 && self.trace.since().is_empty()
2001 && self.writers.is_empty()
2002 && self.leased_readers.is_empty()
2003 && self.critical_readers.is_empty()
2004 }
2005
2006 pub(crate) fn is_single_empty_batch(&self) -> bool {
2007 let mut batch_count = 0;
2008 let mut is_empty = true;
2009 self.trace.map_batches(|b| {
2010 batch_count += 1;
2011 is_empty &= b.is_empty()
2012 });
2013 batch_count <= 1 && is_empty
2014 }
2015
2016 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2017 assert_eq!(self.trace.upper(), &Antichain::new());
2018 assert_eq!(self.trace.since(), &Antichain::new());
2019
2020 let was_tombstone = self.is_tombstone();
2023
2024 self.writers.clear();
2026 self.leased_readers.clear();
2027 self.critical_readers.clear();
2028
2029 debug_assert!(self.is_tombstone());
2030
2031 let mut to_replace = None;
2040 let mut batch_count = 0;
2041 self.trace.map_batches(|b| {
2042 batch_count += 1;
2043 if !b.is_empty() && to_replace.is_none() {
2044 to_replace = Some(b.desc.clone());
2045 }
2046 });
2047 if let Some(desc) = to_replace {
2048 let fake_merge = FueledMergeRes {
2052 output: HollowBatch::empty(desc),
2053 };
2054 let result = self.trace.apply_merge_res(&fake_merge);
2055 assert!(
2056 result.matched(),
2057 "merge with a matching desc should always match"
2058 );
2059 Continue(())
2060 } else if batch_count > 1 {
2061 let mut new_trace = Trace::default();
2066 new_trace.downgrade_since(&Antichain::new());
2067 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2068 assert_eq!(merge_reqs, Vec::new());
2069 self.trace = new_trace;
2070 Continue(())
2071 } else if !was_tombstone {
2072 Continue(())
2075 } else {
2076 Break(NoOpStateTransition(()))
2079 }
2080 }
2081}
2082
2083#[derive(Debug)]
2085#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2086pub struct State<T> {
2087 pub(crate) applier_version: semver::Version,
2088 pub(crate) shard_id: ShardId,
2089
2090 pub(crate) seqno: SeqNo,
2091 pub(crate) walltime_ms: u64,
2094 pub(crate) hostname: String,
2097 pub(crate) collections: StateCollections<T>,
2098}
2099
2100pub struct TypedState<K, V, T, D> {
2103 pub(crate) state: State<T>,
2104
2105 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2113}
2114
2115impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2116 #[cfg(any(test, debug_assertions))]
2117 pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2118 TypedState {
2119 state: State {
2120 applier_version,
2121 shard_id: self.shard_id.clone(),
2122 seqno: self.seqno.clone(),
2123 walltime_ms: self.walltime_ms,
2124 hostname,
2125 collections: self.collections.clone(),
2126 },
2127 _phantom: PhantomData,
2128 }
2129 }
2130
2131 pub(crate) fn clone_for_rollup(&self) -> Self {
2132 TypedState {
2133 state: State {
2134 applier_version: self.applier_version.clone(),
2135 shard_id: self.shard_id.clone(),
2136 seqno: self.seqno.clone(),
2137 walltime_ms: self.walltime_ms,
2138 hostname: self.hostname.clone(),
2139 collections: self.collections.clone(),
2140 },
2141 _phantom: PhantomData,
2142 }
2143 }
2144}
2145
2146impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2147 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2148 let TypedState { state, _phantom } = self;
2151 f.debug_struct("TypedState").field("state", state).finish()
2152 }
2153}
2154
2155#[cfg(any(test, debug_assertions))]
2157impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2158 fn eq(&self, other: &Self) -> bool {
2159 let TypedState {
2162 state: self_state,
2163 _phantom,
2164 } = self;
2165 let TypedState {
2166 state: other_state,
2167 _phantom,
2168 } = other;
2169 self_state == other_state
2170 }
2171}
2172
2173impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2174 type Target = State<T>;
2175
2176 fn deref(&self) -> &Self::Target {
2177 &self.state
2178 }
2179}
2180
2181impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2182 fn deref_mut(&mut self) -> &mut Self::Target {
2183 &mut self.state
2184 }
2185}
2186
2187impl<K, V, T, D> TypedState<K, V, T, D>
2188where
2189 K: Codec,
2190 V: Codec,
2191 T: Timestamp + Lattice + Codec64,
2192 D: Codec64,
2193{
2194 pub fn new(
2195 applier_version: Version,
2196 shard_id: ShardId,
2197 hostname: String,
2198 walltime_ms: u64,
2199 ) -> Self {
2200 let state = State {
2201 applier_version,
2202 shard_id,
2203 seqno: SeqNo::minimum(),
2204 walltime_ms,
2205 hostname,
2206 collections: StateCollections {
2207 last_gc_req: SeqNo::minimum(),
2208 rollups: BTreeMap::new(),
2209 active_rollup: None,
2210 active_gc: None,
2211 leased_readers: BTreeMap::new(),
2212 critical_readers: BTreeMap::new(),
2213 writers: BTreeMap::new(),
2214 schemas: BTreeMap::new(),
2215 trace: Trace::default(),
2216 },
2217 };
2218 TypedState {
2219 state,
2220 _phantom: PhantomData,
2221 }
2222 }
2223
2224 pub fn clone_apply<R, E, WorkFn>(
2225 &self,
2226 cfg: &PersistConfig,
2227 work_fn: &mut WorkFn,
2228 ) -> ControlFlow<E, (R, Self)>
2229 where
2230 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2231 {
2232 let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2237 let mut new_state = State {
2238 applier_version: new_applier_version.clone(),
2239 shard_id: self.shard_id,
2240 seqno: self.seqno.next(),
2241 walltime_ms: (cfg.now)(),
2242 hostname: cfg.hostname.clone(),
2243 collections: self.collections.clone(),
2244 };
2245 if new_state.walltime_ms <= self.walltime_ms {
2248 new_state.walltime_ms = self.walltime_ms + 1;
2249 }
2250
2251 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2252 let new_state = TypedState {
2253 state: new_state,
2254 _phantom: PhantomData,
2255 };
2256 Continue((work_ret, new_state))
2257 }
2258}
2259
2260impl<T> State<T>
2261where
2262 T: Timestamp + Lattice + Codec64,
2263{
2264 pub fn shard_id(&self) -> ShardId {
2265 self.shard_id
2266 }
2267
2268 pub fn seqno(&self) -> SeqNo {
2269 self.seqno
2270 }
2271
2272 pub fn since(&self) -> &Antichain<T> {
2273 self.collections.trace.since()
2274 }
2275
2276 pub fn upper(&self) -> &Antichain<T> {
2277 self.collections.trace.upper()
2278 }
2279
2280 pub fn spine_batch_count(&self) -> usize {
2281 self.collections.trace.num_spine_batches()
2282 }
2283
2284 pub fn size_metrics(&self) -> StateSizeMetrics {
2285 let mut ret = StateSizeMetrics::default();
2286 self.blobs().for_each(|x| match x {
2287 HollowBlobRef::Batch(x) => {
2288 ret.hollow_batch_count += 1;
2289 ret.batch_part_count += x.part_count();
2290 ret.num_updates += x.len;
2291
2292 let batch_size = x.encoded_size_bytes();
2293 for x in x.parts.iter() {
2294 if x.ts_rewrite().is_some() {
2295 ret.rewrite_part_count += 1;
2296 }
2297 if x.is_inline() {
2298 ret.inline_part_count += 1;
2299 ret.inline_part_bytes += x.inline_bytes();
2300 }
2301 }
2302 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2303 ret.state_batches_bytes += batch_size;
2304 }
2305 HollowBlobRef::Rollup(x) => {
2306 ret.state_rollup_count += 1;
2307 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2308 }
2309 });
2310 ret
2311 }
2312
2313 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2314 self.collections
2317 .rollups
2318 .iter()
2319 .rev()
2320 .next()
2321 .expect("State should have at least one rollup if seqno > minimum")
2322 }
2323
2324 pub(crate) fn seqno_since(&self) -> SeqNo {
2325 self.collections.seqno_since(self.seqno)
2326 }
2327
2328 pub fn maybe_gc(
2340 &mut self,
2341 is_write: bool,
2342 use_active_gc: bool,
2343 fallback_threshold_ms: u64,
2344 now: u64,
2345 ) -> Option<GcReq> {
2346 let gc_threshold = std::cmp::max(
2350 1,
2351 u64::from(self.seqno.0.next_power_of_two().trailing_zeros()),
2352 );
2353 let new_seqno_since = self.seqno_since();
2354 let should_gc = new_seqno_since
2355 .0
2356 .saturating_sub(self.collections.last_gc_req.0)
2357 >= gc_threshold;
2358
2359 let should_gc = if use_active_gc && !should_gc {
2362 match self.collections.active_gc {
2363 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2364 None => false,
2365 }
2366 } else {
2367 should_gc
2368 };
2369 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2372 let tombstone_needs_gc = self.collections.is_tombstone();
2377 let should_gc = should_gc || tombstone_needs_gc;
2378 let should_gc = if use_active_gc {
2379 should_gc
2383 && match self.collections.active_gc {
2384 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2385 None => true,
2386 }
2387 } else {
2388 should_gc
2389 };
2390 if should_gc {
2391 self.collections.last_gc_req = new_seqno_since;
2392 Some(GcReq {
2393 shard_id: self.shard_id,
2394 new_seqno_since,
2395 })
2396 } else {
2397 None
2398 }
2399 }
2400
2401 pub fn seqnos_held(&self) -> usize {
2403 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2404 }
2405
2406 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2408 let mut metrics = ExpiryMetrics::default();
2409 let shard_id = self.shard_id();
2410 self.collections.leased_readers.retain(|id, state| {
2411 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2412 if !retain {
2413 info!(
2414 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2415 state.debug.purpose
2416 );
2417 metrics.readers_expired += 1;
2418 }
2419 retain
2420 });
2421 self.collections.writers.retain(|id, state| {
2423 let retain =
2424 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2425 if !retain {
2426 info!(
2427 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2428 state.debug.purpose
2429 );
2430 metrics.writers_expired += 1;
2431 }
2432 retain
2433 });
2434 metrics
2435 }
2436
2437 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2441 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2442 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2443 self.collections.trace.since().clone(),
2444 )));
2445 }
2446 let upper = self.collections.trace.upper();
2447 if PartialOrder::less_equal(upper, as_of) {
2448 return Err(SnapshotErr::AsOfNotYetAvailable(
2449 self.seqno,
2450 Upper(upper.clone()),
2451 ));
2452 }
2453
2454 let batches = self
2455 .collections
2456 .trace
2457 .batches()
2458 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2459 .cloned()
2460 .collect();
2461 Ok(batches)
2462 }
2463
2464 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<Result<(), Upper<T>>, Since<T>> {
2466 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2467 return Err(Since(self.collections.trace.since().clone()));
2468 }
2469 let upper = self.collections.trace.upper();
2470 if PartialOrder::less_equal(upper, as_of) {
2471 return Ok(Err(Upper(upper.clone())));
2472 }
2473 Ok(Ok(()))
2474 }
2475
2476 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2477 self.collections
2480 .trace
2481 .batches()
2482 .find(|b| {
2483 PartialOrder::less_equal(b.desc.lower(), frontier)
2484 && PartialOrder::less_than(frontier, b.desc.upper())
2485 })
2486 .cloned()
2487 .ok_or(self.seqno)
2488 }
2489
2490 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2491 self.collections.active_rollup
2492 }
2493
2494 pub fn need_rollup(
2495 &self,
2496 threshold: usize,
2497 use_active_rollup: bool,
2498 fallback_threshold_ms: u64,
2499 now: u64,
2500 ) -> Option<SeqNo> {
2501 let (latest_rollup_seqno, _) = self.latest_rollup();
2502
2503 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2509 return Some(self.seqno);
2510 }
2511
2512 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2513
2514 if use_active_rollup {
2515 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2521 match self.active_rollup() {
2522 Some(active_rollup) => {
2523 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2524 return Some(self.seqno);
2525 }
2526 }
2527 None => {
2528 return Some(self.seqno);
2529 }
2530 }
2531 }
2532 } else {
2533 if seqnos_since_last_rollup > 0
2537 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2538 {
2539 return Some(self.seqno);
2540 }
2541
2542 if seqnos_since_last_rollup
2545 > u64::cast_from(
2546 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2547 )
2548 {
2549 return Some(self.seqno);
2550 }
2551 }
2552
2553 None
2554 }
2555
2556 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<T>> {
2557 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2558 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2559 batches.chain(rollups)
2560 }
2561}
2562
2563fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2564 let val = hex::encode(val);
2565 val.serialize(s)
2566}
2567
2568fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2569 val: &Option<LazyProto<T>>,
2570 s: S,
2571) -> Result<S::Ok, S::Error> {
2572 val.as_ref()
2573 .map(|lazy| hex::encode(&lazy.into_proto()))
2574 .serialize(s)
2575}
2576
2577fn serialize_part_stats<S: Serializer>(
2578 val: &Option<LazyPartStats>,
2579 s: S,
2580) -> Result<S::Ok, S::Error> {
2581 let val = val.as_ref().map(|x| x.decode().key);
2582 val.serialize(s)
2583}
2584
2585fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2586 let val = val.map(i64::decode);
2588 val.serialize(s)
2589}
2590
2591impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2597 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2598 let State {
2599 applier_version,
2600 shard_id,
2601 seqno,
2602 walltime_ms,
2603 hostname,
2604 collections:
2605 StateCollections {
2606 last_gc_req,
2607 rollups,
2608 active_rollup,
2609 active_gc,
2610 leased_readers,
2611 critical_readers,
2612 writers,
2613 schemas,
2614 trace,
2615 },
2616 } = self;
2617 let mut s = s.serialize_struct("State", 13)?;
2618 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2619 let () = s.serialize_field("shard_id", shard_id)?;
2620 let () = s.serialize_field("seqno", seqno)?;
2621 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2622 let () = s.serialize_field("hostname", hostname)?;
2623 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2624 let () = s.serialize_field("rollups", rollups)?;
2625 let () = s.serialize_field("active_rollup", active_rollup)?;
2626 let () = s.serialize_field("active_gc", active_gc)?;
2627 let () = s.serialize_field("leased_readers", leased_readers)?;
2628 let () = s.serialize_field("critical_readers", critical_readers)?;
2629 let () = s.serialize_field("writers", writers)?;
2630 let () = s.serialize_field("schemas", schemas)?;
2631 let () = s.serialize_field("since", &trace.since().elements())?;
2632 let () = s.serialize_field("upper", &trace.upper().elements())?;
2633 let trace = trace.flatten();
2634 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2635 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2636 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2637 let () = s.serialize_field("merges", &trace.merges)?;
2638 s.end()
2639 }
2640}
2641
2642#[derive(Debug, Default)]
2643pub struct StateSizeMetrics {
2644 pub hollow_batch_count: usize,
2645 pub batch_part_count: usize,
2646 pub rewrite_part_count: usize,
2647 pub num_updates: usize,
2648 pub largest_batch_bytes: usize,
2649 pub state_batches_bytes: usize,
2650 pub state_rollups_bytes: usize,
2651 pub state_rollup_count: usize,
2652 pub inline_part_count: usize,
2653 pub inline_part_bytes: usize,
2654}
2655
2656#[derive(Default)]
2657pub struct ExpiryMetrics {
2658 pub(crate) readers_expired: usize,
2659 pub(crate) writers_expired: usize,
2660}
2661
2662#[derive(Debug, Clone, PartialEq)]
2664pub struct Since<T>(pub Antichain<T>);
2665
2666#[derive(Debug, PartialEq)]
2668pub struct Upper<T>(pub Antichain<T>);
2669
2670#[cfg(test)]
2671pub(crate) mod tests {
2672 use std::ops::Range;
2673
2674 use bytes::Bytes;
2675 use mz_build_info::DUMMY_BUILD_INFO;
2676 use mz_dyncfg::ConfigUpdates;
2677 use mz_ore::now::SYSTEM_TIME;
2678 use mz_ore::{assert_none, assert_ok};
2679 use mz_proto::RustType;
2680 use proptest::prelude::*;
2681 use proptest::strategy::ValueTree;
2682
2683 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2684 use crate::PersistLocation;
2685 use crate::cache::PersistClientCache;
2686 use crate::internal::encoding::any_some_lazy_part_stats;
2687 use crate::internal::paths::RollupId;
2688 use crate::internal::trace::tests::any_trace;
2689 use crate::tests::new_test_client_cache;
2690
2691 use super::*;
2692
2693 const LEASE_DURATION_MS: u64 = 900 * 1000;
2694 fn debug_state() -> HandleDebugState {
2695 HandleDebugState {
2696 hostname: "debug".to_owned(),
2697 purpose: "finding the bugs".to_owned(),
2698 }
2699 }
2700
2701 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2702 Strategy::prop_map(
2703 (
2704 any::<T>(),
2705 any::<T>(),
2706 any::<T>(),
2707 proptest::collection::vec(any_run_part::<T>(), 0..3),
2708 any::<usize>(),
2709 any::<bool>(),
2710 ),
2711 |(t0, t1, since, parts, len, runs)| {
2712 let (lower, upper) = if t0 <= t1 {
2713 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2714 } else {
2715 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2716 };
2717 let since = Antichain::from_elem(since);
2718 if runs && parts.len() > 2 {
2719 let split_at = parts.len() / 2;
2720 HollowBatch::new(
2721 Description::new(lower, upper, since),
2722 parts,
2723 len % 10,
2724 vec![RunMeta::default(), RunMeta::default()],
2725 vec![split_at],
2726 )
2727 } else {
2728 HollowBatch::new_run(Description::new(lower, upper, since), parts, len % 10)
2729 }
2730 },
2731 )
2732 }
2733
2734 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2735 Strategy::prop_map(
2736 (
2737 any::<bool>(),
2738 any_hollow_batch_part(),
2739 any::<Option<T>>(),
2740 any::<Option<SchemaId>>(),
2741 any::<Option<SchemaId>>(),
2742 ),
2743 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2744 if is_hollow {
2745 BatchPart::Hollow(hollow)
2746 } else {
2747 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2748 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2749 BatchPart::Inline {
2750 updates,
2751 ts_rewrite,
2752 schema_id,
2753 deprecated_schema_id,
2754 }
2755 }
2756 },
2757 )
2758 }
2759
2760 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2761 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2762 }
2763
2764 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2765 -> impl Strategy<Value = HollowBatchPart<T>> {
2766 Strategy::prop_map(
2767 (
2768 any::<PartialBatchKey>(),
2769 any::<usize>(),
2770 any::<Vec<u8>>(),
2771 any_some_lazy_part_stats(),
2772 any::<Option<T>>(),
2773 any::<[u8; 8]>(),
2774 any::<Option<BatchColumnarFormat>>(),
2775 any::<Option<SchemaId>>(),
2776 any::<Option<SchemaId>>(),
2777 ),
2778 |(
2779 key,
2780 encoded_size_bytes,
2781 key_lower,
2782 stats,
2783 ts_rewrite,
2784 diffs_sum,
2785 format,
2786 schema_id,
2787 deprecated_schema_id,
2788 )| {
2789 HollowBatchPart {
2790 key,
2791 encoded_size_bytes,
2792 key_lower,
2793 structured_key_lower: None,
2794 stats,
2795 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2796 diffs_sum: Some(diffs_sum),
2797 format,
2798 schema_id,
2799 deprecated_schema_id,
2800 }
2801 },
2802 )
2803 }
2804
2805 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
2806 Strategy::prop_map(
2807 (
2808 any::<SeqNo>(),
2809 any::<Option<T>>(),
2810 any::<u64>(),
2811 any::<u64>(),
2812 any::<HandleDebugState>(),
2813 ),
2814 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
2815 if lease_duration_ms == 0 {
2819 lease_duration_ms += 1;
2820 }
2821 LeasedReaderState {
2822 seqno,
2823 since: since.map_or_else(Antichain::new, Antichain::from_elem),
2824 last_heartbeat_timestamp_ms,
2825 lease_duration_ms,
2826 debug,
2827 }
2828 },
2829 )
2830 }
2831
2832 pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
2833 {
2834 Strategy::prop_map(
2835 (
2836 any::<Option<T>>(),
2837 any::<OpaqueState>(),
2838 any::<String>(),
2839 any::<HandleDebugState>(),
2840 ),
2841 |(since, opaque, opaque_codec, debug)| CriticalReaderState {
2842 since: since.map_or_else(Antichain::new, Antichain::from_elem),
2843 opaque,
2844 opaque_codec,
2845 debug,
2846 },
2847 )
2848 }
2849
2850 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
2851 Strategy::prop_map(
2852 (
2853 any::<u64>(),
2854 any::<u64>(),
2855 any::<IdempotencyToken>(),
2856 any::<Option<T>>(),
2857 any::<HandleDebugState>(),
2858 ),
2859 |(
2860 last_heartbeat_timestamp_ms,
2861 lease_duration_ms,
2862 most_recent_write_token,
2863 most_recent_write_upper,
2864 debug,
2865 )| WriterState {
2866 last_heartbeat_timestamp_ms,
2867 lease_duration_ms,
2868 most_recent_write_token,
2869 most_recent_write_upper: most_recent_write_upper
2870 .map_or_else(Antichain::new, Antichain::from_elem),
2871 debug,
2872 },
2873 )
2874 }
2875
2876 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
2877 Strategy::prop_map(
2878 (
2879 any::<Vec<u8>>(),
2880 any::<Vec<u8>>(),
2881 any::<Vec<u8>>(),
2882 any::<Vec<u8>>(),
2883 ),
2884 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
2885 key: Bytes::from(key),
2886 key_data_type: Bytes::from(key_data_type),
2887 val: Bytes::from(val),
2888 val_data_type: Bytes::from(val_data_type),
2889 },
2890 )
2891 }
2892
2893 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
2894 num_trace_batches: Range<usize>,
2895 ) -> impl Strategy<Value = State<T>> {
2896 let part1 = (
2897 any::<ShardId>(),
2898 any::<SeqNo>(),
2899 any::<u64>(),
2900 any::<String>(),
2901 any::<SeqNo>(),
2902 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
2903 proptest::option::of(any::<ActiveRollup>()),
2904 );
2905
2906 let part2 = (
2907 proptest::option::of(any::<ActiveGc>()),
2908 proptest::collection::btree_map(
2909 any::<LeasedReaderId>(),
2910 any_leased_reader_state::<T>(),
2911 1..3,
2912 ),
2913 proptest::collection::btree_map(
2914 any::<CriticalReaderId>(),
2915 any_critical_reader_state::<T>(),
2916 1..3,
2917 ),
2918 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
2919 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
2920 any_trace::<T>(num_trace_batches),
2921 );
2922
2923 (part1, part2).prop_map(
2924 |(
2925 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
2926 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
2927 )| State {
2928 applier_version: semver::Version::new(1, 2, 3),
2929 shard_id,
2930 seqno,
2931 walltime_ms,
2932 hostname,
2933 collections: StateCollections {
2934 last_gc_req,
2935 rollups,
2936 active_rollup,
2937 active_gc,
2938 leased_readers,
2939 critical_readers,
2940 writers,
2941 schemas,
2942 trace,
2943 },
2944 },
2945 )
2946 }
2947
2948 pub(crate) fn hollow<T: Timestamp>(
2949 lower: T,
2950 upper: T,
2951 keys: &[&str],
2952 len: usize,
2953 ) -> HollowBatch<T> {
2954 HollowBatch::new_run(
2955 Description::new(
2956 Antichain::from_elem(lower),
2957 Antichain::from_elem(upper),
2958 Antichain::from_elem(T::minimum()),
2959 ),
2960 keys.iter()
2961 .map(|x| {
2962 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
2963 key: PartialBatchKey((*x).to_owned()),
2964 encoded_size_bytes: 0,
2965 key_lower: vec![],
2966 structured_key_lower: None,
2967 stats: None,
2968 ts_rewrite: None,
2969 diffs_sum: None,
2970 format: None,
2971 schema_id: None,
2972 deprecated_schema_id: None,
2973 }))
2974 })
2975 .collect(),
2976 len,
2977 )
2978 }
2979
2980 #[mz_ore::test]
2981 fn downgrade_since() {
2982 let mut state = TypedState::<(), (), u64, i64>::new(
2983 DUMMY_BUILD_INFO.semver_version(),
2984 ShardId::new(),
2985 "".to_owned(),
2986 0,
2987 );
2988 let reader = LeasedReaderId::new();
2989 let seqno = SeqNo::minimum();
2990 let now = SYSTEM_TIME.clone();
2991 let _ = state.collections.register_leased_reader(
2992 "",
2993 &reader,
2994 "",
2995 seqno,
2996 Duration::from_secs(10),
2997 now(),
2998 false,
2999 );
3000
3001 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3003
3004 assert_eq!(
3006 state.collections.downgrade_since(
3007 &reader,
3008 seqno,
3009 None,
3010 &Antichain::from_elem(2),
3011 now()
3012 ),
3013 Continue(Since(Antichain::from_elem(2)))
3014 );
3015 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3016 assert_eq!(
3018 state.collections.downgrade_since(
3019 &reader,
3020 seqno,
3021 None,
3022 &Antichain::from_elem(2),
3023 now()
3024 ),
3025 Continue(Since(Antichain::from_elem(2)))
3026 );
3027 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3028 assert_eq!(
3030 state.collections.downgrade_since(
3031 &reader,
3032 seqno,
3033 None,
3034 &Antichain::from_elem(1),
3035 now()
3036 ),
3037 Continue(Since(Antichain::from_elem(2)))
3038 );
3039 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3040
3041 let reader2 = LeasedReaderId::new();
3043 let _ = state.collections.register_leased_reader(
3044 "",
3045 &reader2,
3046 "",
3047 seqno,
3048 Duration::from_secs(10),
3049 now(),
3050 false,
3051 );
3052
3053 assert_eq!(
3055 state.collections.downgrade_since(
3056 &reader2,
3057 seqno,
3058 None,
3059 &Antichain::from_elem(3),
3060 now()
3061 ),
3062 Continue(Since(Antichain::from_elem(3)))
3063 );
3064 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3065 assert_eq!(
3067 state.collections.downgrade_since(
3068 &reader,
3069 seqno,
3070 None,
3071 &Antichain::from_elem(5),
3072 now()
3073 ),
3074 Continue(Since(Antichain::from_elem(5)))
3075 );
3076 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3077
3078 assert_eq!(
3080 state.collections.expire_leased_reader(&reader),
3081 Continue(true)
3082 );
3083 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3084
3085 let reader3 = LeasedReaderId::new();
3087 let _ = state.collections.register_leased_reader(
3088 "",
3089 &reader3,
3090 "",
3091 seqno,
3092 Duration::from_secs(10),
3093 now(),
3094 false,
3095 );
3096
3097 assert_eq!(
3099 state.collections.downgrade_since(
3100 &reader3,
3101 seqno,
3102 None,
3103 &Antichain::from_elem(10),
3104 now()
3105 ),
3106 Continue(Since(Antichain::from_elem(10)))
3107 );
3108 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3109
3110 assert_eq!(
3112 state.collections.expire_leased_reader(&reader2),
3113 Continue(true)
3114 );
3115 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3120
3121 assert_eq!(
3123 state.collections.expire_leased_reader(&reader3),
3124 Continue(true)
3125 );
3126 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3131 }
3132
3133 #[mz_ore::test]
3134 fn compare_and_downgrade_since() {
3135 let mut state = TypedState::<(), (), u64, i64>::new(
3136 DUMMY_BUILD_INFO.semver_version(),
3137 ShardId::new(),
3138 "".to_owned(),
3139 0,
3140 );
3141 let reader = CriticalReaderId::new();
3142 let _ = state
3143 .collections
3144 .register_critical_reader::<u64>("", &reader, "");
3145
3146 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3148 assert_eq!(
3150 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3151 u64::initial()
3152 );
3153
3154 assert_eq!(
3156 state.collections.compare_and_downgrade_since::<u64>(
3157 &reader,
3158 &u64::initial(),
3159 (&1, &Antichain::from_elem(2)),
3160 ),
3161 Continue(Ok(Since(Antichain::from_elem(2))))
3162 );
3163 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3164 assert_eq!(
3165 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3166 1
3167 );
3168 assert_eq!(
3170 state.collections.compare_and_downgrade_since::<u64>(
3171 &reader,
3172 &1,
3173 (&2, &Antichain::from_elem(2)),
3174 ),
3175 Continue(Ok(Since(Antichain::from_elem(2))))
3176 );
3177 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3178 assert_eq!(
3179 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3180 2
3181 );
3182 assert_eq!(
3184 state.collections.compare_and_downgrade_since::<u64>(
3185 &reader,
3186 &2,
3187 (&3, &Antichain::from_elem(1)),
3188 ),
3189 Continue(Ok(Since(Antichain::from_elem(2))))
3190 );
3191 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3192 assert_eq!(
3193 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3194 3
3195 );
3196 }
3197
3198 #[mz_ore::test]
3199 fn compare_and_append() {
3200 let state = &mut TypedState::<String, String, u64, i64>::new(
3201 DUMMY_BUILD_INFO.semver_version(),
3202 ShardId::new(),
3203 "".to_owned(),
3204 0,
3205 )
3206 .collections;
3207
3208 let writer_id = WriterId::new();
3209 let now = SYSTEM_TIME.clone();
3210
3211 assert_eq!(state.trace.num_spine_batches(), 0);
3213 assert_eq!(state.trace.num_hollow_batches(), 0);
3214 assert_eq!(state.trace.num_updates(), 0);
3215
3216 assert_eq!(
3218 state.compare_and_append(
3219 &hollow(1, 2, &["key1"], 1),
3220 &writer_id,
3221 now(),
3222 LEASE_DURATION_MS,
3223 &IdempotencyToken::new(),
3224 &debug_state(),
3225 0,
3226 100,
3227 None
3228 ),
3229 Break(CompareAndAppendBreak::Upper {
3230 shard_upper: Antichain::from_elem(0),
3231 writer_upper: Antichain::from_elem(0)
3232 })
3233 );
3234
3235 assert!(
3237 state
3238 .compare_and_append(
3239 &hollow(0, 5, &[], 0),
3240 &writer_id,
3241 now(),
3242 LEASE_DURATION_MS,
3243 &IdempotencyToken::new(),
3244 &debug_state(),
3245 0,
3246 100,
3247 None
3248 )
3249 .is_continue()
3250 );
3251
3252 assert_eq!(
3254 state.compare_and_append(
3255 &hollow(5, 4, &["key1"], 1),
3256 &writer_id,
3257 now(),
3258 LEASE_DURATION_MS,
3259 &IdempotencyToken::new(),
3260 &debug_state(),
3261 0,
3262 100,
3263 None
3264 ),
3265 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3266 lower: Antichain::from_elem(5),
3267 upper: Antichain::from_elem(4)
3268 }))
3269 );
3270
3271 assert_eq!(
3273 state.compare_and_append(
3274 &hollow(5, 5, &["key1"], 1),
3275 &writer_id,
3276 now(),
3277 LEASE_DURATION_MS,
3278 &IdempotencyToken::new(),
3279 &debug_state(),
3280 0,
3281 100,
3282 None
3283 ),
3284 Break(CompareAndAppendBreak::InvalidUsage(
3285 InvalidEmptyTimeInterval {
3286 lower: Antichain::from_elem(5),
3287 upper: Antichain::from_elem(5),
3288 keys: vec!["key1".to_owned()],
3289 }
3290 ))
3291 );
3292
3293 assert!(
3295 state
3296 .compare_and_append(
3297 &hollow(5, 5, &[], 0),
3298 &writer_id,
3299 now(),
3300 LEASE_DURATION_MS,
3301 &IdempotencyToken::new(),
3302 &debug_state(),
3303 0,
3304 100,
3305 None
3306 )
3307 .is_continue()
3308 );
3309 }
3310
3311 #[mz_ore::test]
3312 fn snapshot() {
3313 let now = SYSTEM_TIME.clone();
3314
3315 let mut state = TypedState::<String, String, u64, i64>::new(
3316 DUMMY_BUILD_INFO.semver_version(),
3317 ShardId::new(),
3318 "".to_owned(),
3319 0,
3320 );
3321 assert_eq!(
3323 state.snapshot(&Antichain::from_elem(0)),
3324 Err(SnapshotErr::AsOfNotYetAvailable(
3325 SeqNo(0),
3326 Upper(Antichain::from_elem(0))
3327 ))
3328 );
3329
3330 assert_eq!(
3332 state.snapshot(&Antichain::from_elem(5)),
3333 Err(SnapshotErr::AsOfNotYetAvailable(
3334 SeqNo(0),
3335 Upper(Antichain::from_elem(0))
3336 ))
3337 );
3338
3339 let writer_id = WriterId::new();
3340
3341 assert!(
3343 state
3344 .collections
3345 .compare_and_append(
3346 &hollow(0, 5, &["key1"], 1),
3347 &writer_id,
3348 now(),
3349 LEASE_DURATION_MS,
3350 &IdempotencyToken::new(),
3351 &debug_state(),
3352 0,
3353 100,
3354 None
3355 )
3356 .is_continue()
3357 );
3358
3359 assert_eq!(
3361 state.snapshot(&Antichain::from_elem(0)),
3362 Ok(vec![hollow(0, 5, &["key1"], 1)])
3363 );
3364
3365 assert_eq!(
3367 state.snapshot(&Antichain::from_elem(4)),
3368 Ok(vec![hollow(0, 5, &["key1"], 1)])
3369 );
3370
3371 assert_eq!(
3373 state.snapshot(&Antichain::from_elem(5)),
3374 Err(SnapshotErr::AsOfNotYetAvailable(
3375 SeqNo(0),
3376 Upper(Antichain::from_elem(5))
3377 ))
3378 );
3379 assert_eq!(
3380 state.snapshot(&Antichain::from_elem(6)),
3381 Err(SnapshotErr::AsOfNotYetAvailable(
3382 SeqNo(0),
3383 Upper(Antichain::from_elem(5))
3384 ))
3385 );
3386
3387 let reader = LeasedReaderId::new();
3388 let _ = state.collections.register_leased_reader(
3390 "",
3391 &reader,
3392 "",
3393 SeqNo::minimum(),
3394 Duration::from_secs(10),
3395 now(),
3396 false,
3397 );
3398 assert_eq!(
3399 state.collections.downgrade_since(
3400 &reader,
3401 SeqNo::minimum(),
3402 None,
3403 &Antichain::from_elem(2),
3404 now()
3405 ),
3406 Continue(Since(Antichain::from_elem(2)))
3407 );
3408 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3409 assert_eq!(
3411 state.snapshot(&Antichain::from_elem(1)),
3412 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3413 Antichain::from_elem(2)
3414 )))
3415 );
3416
3417 assert!(
3419 state
3420 .collections
3421 .compare_and_append(
3422 &hollow(5, 10, &[], 0),
3423 &writer_id,
3424 now(),
3425 LEASE_DURATION_MS,
3426 &IdempotencyToken::new(),
3427 &debug_state(),
3428 0,
3429 100,
3430 None
3431 )
3432 .is_continue()
3433 );
3434
3435 assert_eq!(
3437 state.snapshot(&Antichain::from_elem(7)),
3438 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3439 );
3440
3441 assert_eq!(
3443 state.snapshot(&Antichain::from_elem(10)),
3444 Err(SnapshotErr::AsOfNotYetAvailable(
3445 SeqNo(0),
3446 Upper(Antichain::from_elem(10))
3447 ))
3448 );
3449
3450 assert!(
3452 state
3453 .collections
3454 .compare_and_append(
3455 &hollow(10, 15, &["key2"], 1),
3456 &writer_id,
3457 now(),
3458 LEASE_DURATION_MS,
3459 &IdempotencyToken::new(),
3460 &debug_state(),
3461 0,
3462 100,
3463 None
3464 )
3465 .is_continue()
3466 );
3467
3468 assert_eq!(
3471 state.snapshot(&Antichain::from_elem(9)),
3472 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3473 );
3474
3475 assert_eq!(
3477 state.snapshot(&Antichain::from_elem(10)),
3478 Ok(vec![
3479 hollow(0, 5, &["key1"], 1),
3480 hollow(5, 10, &[], 0),
3481 hollow(10, 15, &["key2"], 1)
3482 ])
3483 );
3484
3485 assert_eq!(
3486 state.snapshot(&Antichain::from_elem(11)),
3487 Ok(vec![
3488 hollow(0, 5, &["key1"], 1),
3489 hollow(5, 10, &[], 0),
3490 hollow(10, 15, &["key2"], 1)
3491 ])
3492 );
3493 }
3494
3495 #[mz_ore::test]
3496 fn next_listen_batch() {
3497 let mut state = TypedState::<String, String, u64, i64>::new(
3498 DUMMY_BUILD_INFO.semver_version(),
3499 ShardId::new(),
3500 "".to_owned(),
3501 0,
3502 );
3503
3504 assert_eq!(
3507 state.next_listen_batch(&Antichain::from_elem(0)),
3508 Err(SeqNo(0))
3509 );
3510 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3511
3512 let writer_id = WriterId::new();
3513 let now = SYSTEM_TIME.clone();
3514
3515 assert!(
3517 state
3518 .collections
3519 .compare_and_append(
3520 &hollow(0, 5, &["key1"], 1),
3521 &writer_id,
3522 now(),
3523 LEASE_DURATION_MS,
3524 &IdempotencyToken::new(),
3525 &debug_state(),
3526 0,
3527 100,
3528 None
3529 )
3530 .is_continue()
3531 );
3532 assert!(
3533 state
3534 .collections
3535 .compare_and_append(
3536 &hollow(5, 10, &["key2"], 1),
3537 &writer_id,
3538 now(),
3539 LEASE_DURATION_MS,
3540 &IdempotencyToken::new(),
3541 &debug_state(),
3542 0,
3543 100,
3544 None
3545 )
3546 .is_continue()
3547 );
3548
3549 for t in 0..=4 {
3551 assert_eq!(
3552 state.next_listen_batch(&Antichain::from_elem(t)),
3553 Ok(hollow(0, 5, &["key1"], 1))
3554 );
3555 }
3556
3557 for t in 5..=9 {
3559 assert_eq!(
3560 state.next_listen_batch(&Antichain::from_elem(t)),
3561 Ok(hollow(5, 10, &["key2"], 1))
3562 );
3563 }
3564
3565 assert_eq!(
3567 state.next_listen_batch(&Antichain::from_elem(10)),
3568 Err(SeqNo(0))
3569 );
3570
3571 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3574 }
3575
3576 #[mz_ore::test]
3577 fn expire_writer() {
3578 let mut state = TypedState::<String, String, u64, i64>::new(
3579 DUMMY_BUILD_INFO.semver_version(),
3580 ShardId::new(),
3581 "".to_owned(),
3582 0,
3583 );
3584 let now = SYSTEM_TIME.clone();
3585
3586 let writer_id_one = WriterId::new();
3587
3588 let writer_id_two = WriterId::new();
3589
3590 assert!(
3592 state
3593 .collections
3594 .compare_and_append(
3595 &hollow(0, 2, &["key1"], 1),
3596 &writer_id_one,
3597 now(),
3598 LEASE_DURATION_MS,
3599 &IdempotencyToken::new(),
3600 &debug_state(),
3601 0,
3602 100,
3603 None
3604 )
3605 .is_continue()
3606 );
3607
3608 assert!(
3609 state
3610 .collections
3611 .expire_writer(&writer_id_one)
3612 .is_continue()
3613 );
3614
3615 assert!(
3617 state
3618 .collections
3619 .compare_and_append(
3620 &hollow(2, 5, &["key2"], 1),
3621 &writer_id_two,
3622 now(),
3623 LEASE_DURATION_MS,
3624 &IdempotencyToken::new(),
3625 &debug_state(),
3626 0,
3627 100,
3628 None
3629 )
3630 .is_continue()
3631 );
3632 }
3633
3634 #[mz_ore::test]
3635 fn maybe_gc_active_gc() {
3636 const GC_USE_ACTIVE_GC: bool = true;
3637 const GC_FALLBACK_THRESHOLD_MS: u64 = 5000;
3638 let now_fn = SYSTEM_TIME.clone();
3639
3640 let mut state = TypedState::<String, String, u64, i64>::new(
3641 DUMMY_BUILD_INFO.semver_version(),
3642 ShardId::new(),
3643 "".to_owned(),
3644 0,
3645 );
3646
3647 let now = now_fn();
3648 assert_eq!(
3650 state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3651 None
3652 );
3653 assert_eq!(
3654 state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3655 None
3656 );
3657
3658 state.seqno = SeqNo(100);
3661 assert_eq!(state.seqno_since(), SeqNo(100));
3662
3663 let writer_id = WriterId::new();
3665 state.collections.compare_and_append(
3666 &hollow(1, 2, &["key1"], 1),
3667 &writer_id,
3668 now,
3669 LEASE_DURATION_MS,
3670 &IdempotencyToken::new(),
3671 &debug_state(),
3672 0,
3673 100,
3674 None,
3675 );
3676 assert_eq!(
3677 state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3678 None
3679 );
3680
3681 assert_eq!(
3683 state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3684 Some(GcReq {
3685 shard_id: state.shard_id,
3686 new_seqno_since: SeqNo(100)
3687 })
3688 );
3689
3690 state.collections.active_gc = Some(ActiveGc {
3692 seqno: state.seqno,
3693 start_ms: now,
3694 });
3695
3696 state.seqno = SeqNo(200);
3697 assert_eq!(state.seqno_since(), SeqNo(200));
3698
3699 assert_eq!(
3700 state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3701 None
3702 );
3703
3704 state.seqno = SeqNo(300);
3705 assert_eq!(state.seqno_since(), SeqNo(300));
3706 let new_now = now + GC_FALLBACK_THRESHOLD_MS + 1;
3708 assert_eq!(
3709 state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, new_now),
3710 Some(GcReq {
3711 shard_id: state.shard_id,
3712 new_seqno_since: SeqNo(300)
3713 })
3714 );
3715
3716 state.seqno = SeqNo(301);
3720 assert_eq!(state.seqno_since(), SeqNo(301));
3721 assert_eq!(
3722 state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, new_now),
3723 Some(GcReq {
3724 shard_id: state.shard_id,
3725 new_seqno_since: SeqNo(301)
3726 })
3727 );
3728
3729 state.collections.active_gc = None;
3730
3731 state.seqno = SeqNo(400);
3734 assert_eq!(state.seqno_since(), SeqNo(400));
3735
3736 let now = now_fn();
3737
3738 let _ = state.collections.expire_writer(&writer_id);
3740 assert_eq!(
3741 state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, now),
3742 Some(GcReq {
3743 shard_id: state.shard_id,
3744 new_seqno_since: SeqNo(400)
3745 })
3746 );
3747 }
3748
3749 #[mz_ore::test]
3750 fn maybe_gc_classic() {
3751 const GC_USE_ACTIVE_GC: bool = false;
3752 const GC_FALLBACK_THRESHOLD_MS: u64 = 5000;
3753 const NOW_MS: u64 = 0;
3754
3755 let mut state = TypedState::<String, String, u64, i64>::new(
3756 DUMMY_BUILD_INFO.semver_version(),
3757 ShardId::new(),
3758 "".to_owned(),
3759 0,
3760 );
3761
3762 assert_eq!(
3764 state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3765 None
3766 );
3767 assert_eq!(
3768 state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3769 None
3770 );
3771
3772 state.seqno = SeqNo(100);
3775 assert_eq!(state.seqno_since(), SeqNo(100));
3776
3777 let writer_id = WriterId::new();
3779 let now = SYSTEM_TIME.clone();
3780 state.collections.compare_and_append(
3781 &hollow(1, 2, &["key1"], 1),
3782 &writer_id,
3783 now(),
3784 LEASE_DURATION_MS,
3785 &IdempotencyToken::new(),
3786 &debug_state(),
3787 0,
3788 100,
3789 None,
3790 );
3791 assert_eq!(
3792 state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3793 None
3794 );
3795
3796 assert_eq!(
3798 state.maybe_gc(true, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3799 Some(GcReq {
3800 shard_id: state.shard_id,
3801 new_seqno_since: SeqNo(100)
3802 })
3803 );
3804
3805 state.seqno = SeqNo(200);
3808 assert_eq!(state.seqno_since(), SeqNo(200));
3809
3810 let _ = state.collections.expire_writer(&writer_id);
3812 assert_eq!(
3813 state.maybe_gc(false, GC_USE_ACTIVE_GC, GC_FALLBACK_THRESHOLD_MS, NOW_MS),
3814 Some(GcReq {
3815 shard_id: state.shard_id,
3816 new_seqno_since: SeqNo(200)
3817 })
3818 );
3819 }
3820
3821 #[mz_ore::test]
3822 fn need_rollup_active_rollup() {
3823 const ROLLUP_THRESHOLD: usize = 3;
3824 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
3825 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
3826 let now = SYSTEM_TIME.clone();
3827
3828 mz_ore::test::init_logging();
3829 let mut state = TypedState::<String, String, u64, i64>::new(
3830 DUMMY_BUILD_INFO.semver_version(),
3831 ShardId::new(),
3832 "".to_owned(),
3833 0,
3834 );
3835
3836 let rollup_seqno = SeqNo(5);
3837 let rollup = HollowRollup {
3838 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
3839 encoded_size_bytes: None,
3840 };
3841
3842 assert!(
3843 state
3844 .collections
3845 .add_rollup((rollup_seqno, &rollup))
3846 .is_continue()
3847 );
3848
3849 state.seqno = SeqNo(5);
3851 assert_none!(state.need_rollup(
3852 ROLLUP_THRESHOLD,
3853 ROLLUP_USE_ACTIVE_ROLLUP,
3854 ROLLUP_FALLBACK_THRESHOLD_MS,
3855 now()
3856 ));
3857
3858 state.seqno = SeqNo(6);
3860 assert_none!(state.need_rollup(
3861 ROLLUP_THRESHOLD,
3862 ROLLUP_USE_ACTIVE_ROLLUP,
3863 ROLLUP_FALLBACK_THRESHOLD_MS,
3864 now()
3865 ));
3866 state.seqno = SeqNo(7);
3867 assert_none!(state.need_rollup(
3868 ROLLUP_THRESHOLD,
3869 ROLLUP_USE_ACTIVE_ROLLUP,
3870 ROLLUP_FALLBACK_THRESHOLD_MS,
3871 now()
3872 ));
3873 state.seqno = SeqNo(8);
3874 assert_none!(state.need_rollup(
3875 ROLLUP_THRESHOLD,
3876 ROLLUP_USE_ACTIVE_ROLLUP,
3877 ROLLUP_FALLBACK_THRESHOLD_MS,
3878 now()
3879 ));
3880
3881 let mut current_time = now();
3882 state.seqno = SeqNo(9);
3884 assert_eq!(
3885 state
3886 .need_rollup(
3887 ROLLUP_THRESHOLD,
3888 ROLLUP_USE_ACTIVE_ROLLUP,
3889 ROLLUP_FALLBACK_THRESHOLD_MS,
3890 current_time
3891 )
3892 .expect("rollup"),
3893 SeqNo(9)
3894 );
3895
3896 state.collections.active_rollup = Some(ActiveRollup {
3897 seqno: SeqNo(9),
3898 start_ms: current_time,
3899 });
3900
3901 assert_none!(state.need_rollup(
3903 ROLLUP_THRESHOLD,
3904 ROLLUP_USE_ACTIVE_ROLLUP,
3905 ROLLUP_FALLBACK_THRESHOLD_MS,
3906 current_time
3907 ));
3908
3909 state.seqno = SeqNo(10);
3910 assert_none!(state.need_rollup(
3913 ROLLUP_THRESHOLD,
3914 ROLLUP_USE_ACTIVE_ROLLUP,
3915 ROLLUP_FALLBACK_THRESHOLD_MS,
3916 current_time
3917 ));
3918
3919 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
3921 assert_eq!(
3922 state
3923 .need_rollup(
3924 ROLLUP_THRESHOLD,
3925 ROLLUP_USE_ACTIVE_ROLLUP,
3926 ROLLUP_FALLBACK_THRESHOLD_MS,
3927 current_time
3928 )
3929 .expect("rollup"),
3930 SeqNo(10)
3931 );
3932
3933 state.seqno = SeqNo(9);
3934 state.collections.active_rollup = None;
3936 let rollup_seqno = SeqNo(9);
3937 let rollup = HollowRollup {
3938 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
3939 encoded_size_bytes: None,
3940 };
3941 assert!(
3942 state
3943 .collections
3944 .add_rollup((rollup_seqno, &rollup))
3945 .is_continue()
3946 );
3947
3948 state.seqno = SeqNo(11);
3949 assert_none!(state.need_rollup(
3951 ROLLUP_THRESHOLD,
3952 ROLLUP_USE_ACTIVE_ROLLUP,
3953 ROLLUP_FALLBACK_THRESHOLD_MS,
3954 current_time
3955 ));
3956 state.seqno = SeqNo(13);
3958 assert_eq!(
3959 state
3960 .need_rollup(
3961 ROLLUP_THRESHOLD,
3962 ROLLUP_USE_ACTIVE_ROLLUP,
3963 ROLLUP_FALLBACK_THRESHOLD_MS,
3964 current_time
3965 )
3966 .expect("rollup"),
3967 SeqNo(13)
3968 );
3969 }
3970
3971 #[mz_ore::test]
3972 fn need_rollup_classic() {
3973 const ROLLUP_THRESHOLD: usize = 3;
3974 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
3975 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
3976 const NOW: u64 = 0;
3977
3978 mz_ore::test::init_logging();
3979 let mut state = TypedState::<String, String, u64, i64>::new(
3980 DUMMY_BUILD_INFO.semver_version(),
3981 ShardId::new(),
3982 "".to_owned(),
3983 0,
3984 );
3985
3986 let rollup_seqno = SeqNo(5);
3987 let rollup = HollowRollup {
3988 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
3989 encoded_size_bytes: None,
3990 };
3991
3992 assert!(
3993 state
3994 .collections
3995 .add_rollup((rollup_seqno, &rollup))
3996 .is_continue()
3997 );
3998
3999 state.seqno = SeqNo(5);
4001 assert_none!(state.need_rollup(
4002 ROLLUP_THRESHOLD,
4003 ROLLUP_USE_ACTIVE_ROLLUP,
4004 ROLLUP_FALLBACK_THRESHOLD_MS,
4005 NOW
4006 ));
4007
4008 state.seqno = SeqNo(6);
4010 assert_none!(state.need_rollup(
4011 ROLLUP_THRESHOLD,
4012 ROLLUP_USE_ACTIVE_ROLLUP,
4013 ROLLUP_FALLBACK_THRESHOLD_MS,
4014 NOW
4015 ));
4016 state.seqno = SeqNo(7);
4017 assert_none!(state.need_rollup(
4018 ROLLUP_THRESHOLD,
4019 ROLLUP_USE_ACTIVE_ROLLUP,
4020 ROLLUP_FALLBACK_THRESHOLD_MS,
4021 NOW
4022 ));
4023
4024 state.seqno = SeqNo(8);
4026 assert_eq!(
4027 state
4028 .need_rollup(
4029 ROLLUP_THRESHOLD,
4030 ROLLUP_USE_ACTIVE_ROLLUP,
4031 ROLLUP_FALLBACK_THRESHOLD_MS,
4032 NOW
4033 )
4034 .expect("rollup"),
4035 SeqNo(8)
4036 );
4037
4038 state.seqno = SeqNo(9);
4040 assert_none!(state.need_rollup(
4041 ROLLUP_THRESHOLD,
4042 ROLLUP_USE_ACTIVE_ROLLUP,
4043 ROLLUP_FALLBACK_THRESHOLD_MS,
4044 NOW
4045 ));
4046
4047 state.seqno = SeqNo(11);
4049 assert_eq!(
4050 state
4051 .need_rollup(
4052 ROLLUP_THRESHOLD,
4053 ROLLUP_USE_ACTIVE_ROLLUP,
4054 ROLLUP_FALLBACK_THRESHOLD_MS,
4055 NOW
4056 )
4057 .expect("rollup"),
4058 SeqNo(11)
4059 );
4060
4061 let rollup_seqno = SeqNo(6);
4063 let rollup = HollowRollup {
4064 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4065 encoded_size_bytes: None,
4066 };
4067 assert!(
4068 state
4069 .collections
4070 .add_rollup((rollup_seqno, &rollup))
4071 .is_continue()
4072 );
4073
4074 state.seqno = SeqNo(8);
4075 assert_none!(state.need_rollup(
4076 ROLLUP_THRESHOLD,
4077 ROLLUP_USE_ACTIVE_ROLLUP,
4078 ROLLUP_FALLBACK_THRESHOLD_MS,
4079 NOW
4080 ));
4081 state.seqno = SeqNo(9);
4082 assert_eq!(
4083 state
4084 .need_rollup(
4085 ROLLUP_THRESHOLD,
4086 ROLLUP_USE_ACTIVE_ROLLUP,
4087 ROLLUP_FALLBACK_THRESHOLD_MS,
4088 NOW
4089 )
4090 .expect("rollup"),
4091 SeqNo(9)
4092 );
4093
4094 let fallback_seqno = SeqNo(
4096 rollup_seqno.0
4097 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4098 );
4099 state.seqno = fallback_seqno;
4100 assert_eq!(
4101 state
4102 .need_rollup(
4103 ROLLUP_THRESHOLD,
4104 ROLLUP_USE_ACTIVE_ROLLUP,
4105 ROLLUP_FALLBACK_THRESHOLD_MS,
4106 NOW
4107 )
4108 .expect("rollup"),
4109 fallback_seqno
4110 );
4111 state.seqno = fallback_seqno.next();
4112 assert_eq!(
4113 state
4114 .need_rollup(
4115 ROLLUP_THRESHOLD,
4116 ROLLUP_USE_ACTIVE_ROLLUP,
4117 ROLLUP_FALLBACK_THRESHOLD_MS,
4118 NOW
4119 )
4120 .expect("rollup"),
4121 fallback_seqno.next()
4122 );
4123 }
4124
4125 #[mz_ore::test]
4126 fn idempotency_token_sentinel() {
4127 assert_eq!(
4128 IdempotencyToken::SENTINEL.to_string(),
4129 "i11111111-1111-1111-1111-111111111111"
4130 );
4131 }
4132
4133 #[mz_ore::test]
4142 fn state_inspect_serde_json() {
4143 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4144 let mut runner = proptest::test_runner::TestRunner::deterministic();
4145 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4146 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4147 assert_eq!(
4148 json.trim(),
4149 STATE_SERDE_JSON.trim(),
4150 "\n\nNEW GOLDEN\n{}\n",
4151 json
4152 );
4153 }
4154
4155 #[mz_persist_proc::test(tokio::test)]
4156 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4158 let mut clients = new_test_client_cache(&dyncfgs);
4159 let shard_id = ShardId::new();
4160
4161 async fn open_and_write(
4162 clients: &mut PersistClientCache,
4163 version: semver::Version,
4164 shard_id: ShardId,
4165 ) -> Result<(), tokio::task::JoinError> {
4166 clients.cfg.build_version = version.clone();
4167 clients.clear_state_cache();
4168 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4169 mz_ore::task::spawn(|| version.to_string(), async move {
4171 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4172 let current = *write.upper().as_option().unwrap();
4173 write
4175 .expect_compare_and_append_batch(&mut [], current, current + 1)
4176 .await;
4177 })
4178 .await
4179 }
4180
4181 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4183 assert_ok!(res);
4184
4185 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4187 assert_ok!(res);
4188
4189 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4191 assert_ok!(res);
4192
4193 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4195 assert!(res.unwrap_err().is_panic());
4196 }
4197}