1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
60use crate::internal::gc::GcReq;
61use crate::internal::machine::retry_external;
62use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
63use crate::internal::trace::{
64 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
65};
66use crate::metrics::Metrics;
67use crate::read::LeasedReaderId;
68use crate::schema::CaESchema;
69use crate::write::WriterId;
70use crate::{PersistConfig, ShardId};
71
72include!(concat!(
73 env!("OUT_DIR"),
74 "/mz_persist_client.internal.state.rs"
75));
76
77include!(concat!(
78 env!("OUT_DIR"),
79 "/mz_persist_client.internal.diff.rs"
80));
81
82pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
90 "persist_rollup_threshold",
91 128,
92 "The number of seqnos between rollups.",
93);
94
95pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
98 "persist_rollup_fallback_threshold_ms",
99 5000,
100 "The number of milliseconds before a worker claims an already claimed rollup.",
101);
102
103pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
106 "persist_rollup_use_active_rollup",
107 false,
108 "Whether to use the new active rollup tracking mechanism.",
109);
110
111pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
114 "persist_gc_fallback_threshold_ms",
115 900000,
116 "The number of milliseconds before a worker claims an already claimed GC.",
117);
118
119pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
121 "persist_gc_min_versions",
122 32,
123 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
124);
125
126pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
128 "persist_gc_max_versions",
129 128_000,
130 "The maximum number of versions to GC in a single GC run.",
131);
132
133pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
136 "persist_gc_use_active_gc",
137 false,
138 "Whether to use the new active GC tracking mechanism.",
139);
140
141pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
142 "persist_enable_incremental_compaction",
143 false,
144 "Whether to enable incremental compaction.",
145);
146
147#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
150#[serde(into = "String")]
151pub struct IdempotencyToken(pub(crate) [u8; 16]);
152
153impl std::fmt::Display for IdempotencyToken {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 write!(f, "i{}", Uuid::from_bytes(self.0))
156 }
157}
158
159impl std::fmt::Debug for IdempotencyToken {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
162 }
163}
164
165impl std::str::FromStr for IdempotencyToken {
166 type Err = String;
167
168 fn from_str(s: &str) -> Result<Self, Self::Err> {
169 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
170 }
171}
172
173impl From<IdempotencyToken> for String {
174 fn from(x: IdempotencyToken) -> Self {
175 x.to_string()
176 }
177}
178
179impl IdempotencyToken {
180 pub(crate) fn new() -> Self {
181 IdempotencyToken(*Uuid::new_v4().as_bytes())
182 }
183 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
184}
185
186#[derive(Clone, Debug, PartialEq, Serialize)]
187pub struct LeasedReaderState<T> {
188 pub seqno: SeqNo,
190 pub since: Antichain<T>,
192 pub last_heartbeat_timestamp_ms: u64,
194 pub lease_duration_ms: u64,
197 pub debug: HandleDebugState,
199}
200
201#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
202#[serde(into = "u64")]
203pub struct OpaqueState(pub [u8; 8]);
204
205impl From<OpaqueState> for u64 {
206 fn from(value: OpaqueState) -> Self {
207 u64::from_le_bytes(value.0)
208 }
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize)]
212pub struct CriticalReaderState<T> {
213 pub since: Antichain<T>,
215 pub opaque: OpaqueState,
217 pub opaque_codec: String,
219 pub debug: HandleDebugState,
221}
222
223#[derive(Clone, Debug, PartialEq, Serialize)]
224pub struct WriterState<T> {
225 pub last_heartbeat_timestamp_ms: u64,
227 pub lease_duration_ms: u64,
230 pub most_recent_write_token: IdempotencyToken,
233 pub most_recent_write_upper: Antichain<T>,
236 pub debug: HandleDebugState,
238}
239
240#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
242pub struct HandleDebugState {
243 pub hostname: String,
246 pub purpose: String,
248}
249
250#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
254#[serde(tag = "type")]
255pub enum BatchPart<T> {
256 Hollow(HollowBatchPart<T>),
257 Inline {
258 updates: LazyInlineBatchPart,
259 ts_rewrite: Option<Antichain<T>>,
260 schema_id: Option<SchemaId>,
261
262 deprecated_schema_id: Option<SchemaId>,
264 },
265}
266
267fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
268 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
269 let proto = lower.decode()?;
270 let data = ArrayData::from_proto(proto)?;
271 ensure!(data.len() == 1);
272 Ok(ArrayBound::new(make_array(data), 0))
273 };
274
275 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
276
277 match decoded {
278 Ok(bound) => Some(bound),
279 Err(e) => {
280 soft_panic_or_log!("failed to decode bound: {e:#?}");
281 None
282 }
283 }
284}
285
286impl<T> BatchPart<T> {
287 pub fn hollow_bytes(&self) -> usize {
288 match self {
289 BatchPart::Hollow(x) => x.encoded_size_bytes,
290 BatchPart::Inline { .. } => 0,
291 }
292 }
293
294 pub fn is_inline(&self) -> bool {
295 matches!(self, BatchPart::Inline { .. })
296 }
297
298 pub fn inline_bytes(&self) -> usize {
299 match self {
300 BatchPart::Hollow(_) => 0,
301 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
302 }
303 }
304
305 pub fn writer_key(&self) -> Option<WriterKey> {
306 match self {
307 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
308 BatchPart::Inline { .. } => None,
309 }
310 }
311
312 pub fn encoded_size_bytes(&self) -> usize {
313 match self {
314 BatchPart::Hollow(x) => x.encoded_size_bytes,
315 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
316 }
317 }
318
319 pub fn printable_name(&self) -> &str {
322 match self {
323 BatchPart::Hollow(x) => x.key.0.as_str(),
324 BatchPart::Inline { .. } => "<inline>",
325 }
326 }
327
328 pub fn stats(&self) -> Option<&LazyPartStats> {
329 match self {
330 BatchPart::Hollow(x) => x.stats.as_ref(),
331 BatchPart::Inline { .. } => None,
332 }
333 }
334
335 pub fn key_lower(&self) -> &[u8] {
336 match self {
337 BatchPart::Hollow(x) => x.key_lower.as_slice(),
338 BatchPart::Inline { .. } => &[],
345 }
346 }
347
348 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
349 let part = match self {
350 BatchPart::Hollow(part) => part,
351 BatchPart::Inline { .. } => return None,
352 };
353
354 decode_structured_lower(part.structured_key_lower.as_ref()?)
355 }
356
357 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
358 match self {
359 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
360 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
361 }
362 }
363
364 pub fn schema_id(&self) -> Option<SchemaId> {
365 match self {
366 BatchPart::Hollow(x) => x.schema_id,
367 BatchPart::Inline { schema_id, .. } => *schema_id,
368 }
369 }
370
371 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
372 match self {
373 BatchPart::Hollow(x) => x.deprecated_schema_id,
374 BatchPart::Inline {
375 deprecated_schema_id,
376 ..
377 } => *deprecated_schema_id,
378 }
379 }
380}
381
382impl<T: Timestamp + Codec64> BatchPart<T> {
383 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
384 match self {
385 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
386 BatchPart::Inline { updates, .. } => {
387 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
388 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
389 }
390 }
391 }
392
393 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
394 match self {
395 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
396 BatchPart::Inline { updates, .. } => Some(
397 updates
398 .decode::<T>(metrics)
399 .expect("valid inline part")
400 .updates
401 .diffs_sum(),
402 ),
403 }
404 }
405}
406
407#[derive(Debug, Clone)]
409pub struct HollowRun<T> {
410 pub(crate) parts: Vec<RunPart<T>>,
412}
413
414#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
417pub struct HollowRunRef<T> {
418 pub key: PartialBatchKey,
419
420 pub hollow_bytes: usize,
422
423 pub max_part_bytes: usize,
425
426 pub key_lower: Vec<u8>,
428
429 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
431
432 pub diffs_sum: Option<[u8; 8]>,
433
434 pub(crate) _phantom_data: PhantomData<T>,
435}
436impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
437 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
438 Some(self.cmp(other))
439 }
440}
441
442impl<T: Eq> Ord for HollowRunRef<T> {
443 fn cmp(&self, other: &Self) -> Ordering {
444 self.key.cmp(&other.key)
445 }
446}
447
448impl<T> HollowRunRef<T> {
449 pub fn writer_key(&self) -> Option<WriterKey> {
450 Some(self.key.split()?.0)
451 }
452}
453
454impl<T: Timestamp + Codec64> HollowRunRef<T> {
455 pub async fn set<D: Codec64 + Monoid>(
457 shard_id: ShardId,
458 blob: &dyn Blob,
459 writer: &WriterKey,
460 data: HollowRun<T>,
461 metrics: &Metrics,
462 ) -> Self {
463 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
464 let max_part_bytes = data
465 .parts
466 .iter()
467 .map(|p| p.max_part_bytes())
468 .max()
469 .unwrap_or(0);
470 let key_lower = data
471 .parts
472 .first()
473 .map_or(vec![], |p| p.key_lower().to_vec());
474 let structured_key_lower = match data.parts.first() {
475 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
476 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
477 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
478 };
479 let diffs_sum = data
480 .parts
481 .iter()
482 .map(|p| {
483 p.diffs_sum::<D>(&metrics.columnar)
484 .expect("valid diffs sum")
485 })
486 .reduce(|mut a, b| {
487 a.plus_equals(&b);
488 a
489 })
490 .expect("valid diffs sum")
491 .encode();
492
493 let key = PartialBatchKey::new(writer, &PartId::new());
494 let blob_key = key.complete(&shard_id);
495 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
496 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
497 blob.set(&blob_key, bytes.clone())
498 })
499 .await;
500 Self {
501 key,
502 hollow_bytes,
503 max_part_bytes,
504 key_lower,
505 structured_key_lower,
506 diffs_sum: Some(diffs_sum),
507 _phantom_data: Default::default(),
508 }
509 }
510
511 pub async fn get(
515 &self,
516 shard_id: ShardId,
517 blob: &dyn Blob,
518 metrics: &Metrics,
519 ) -> Option<HollowRun<T>> {
520 let blob_key = self.key.complete(&shard_id);
521 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
522 blob.get(&blob_key)
523 })
524 .await?;
525 let proto_runs: ProtoHollowRun =
526 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
527 let runs = proto_runs
528 .into_rust()
529 .expect("illegal state: invalid encoded runs proto");
530 Some(runs)
531 }
532}
533
534#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
538#[serde(untagged)]
539pub enum RunPart<T> {
540 Single(BatchPart<T>),
541 Many(HollowRunRef<T>),
542}
543
544impl<T: Ord> PartialOrd<Self> for RunPart<T> {
545 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
546 Some(self.cmp(other))
547 }
548}
549
550impl<T: Ord> Ord for RunPart<T> {
551 fn cmp(&self, other: &Self) -> Ordering {
552 match (self, other) {
553 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
554 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
555 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
556 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
557 }
558 }
559}
560
561impl<T> RunPart<T> {
562 #[cfg(test)]
563 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
564 match self {
565 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
566 _ => panic!("expected hollow part!"),
567 }
568 }
569
570 pub fn hollow_bytes(&self) -> usize {
571 match self {
572 Self::Single(p) => p.hollow_bytes(),
573 Self::Many(r) => r.hollow_bytes,
574 }
575 }
576
577 pub fn is_inline(&self) -> bool {
578 match self {
579 Self::Single(p) => p.is_inline(),
580 Self::Many(_) => false,
581 }
582 }
583
584 pub fn inline_bytes(&self) -> usize {
585 match self {
586 Self::Single(p) => p.inline_bytes(),
587 Self::Many(_) => 0,
588 }
589 }
590
591 pub fn max_part_bytes(&self) -> usize {
592 match self {
593 Self::Single(p) => p.encoded_size_bytes(),
594 Self::Many(r) => r.max_part_bytes,
595 }
596 }
597
598 pub fn writer_key(&self) -> Option<WriterKey> {
599 match self {
600 Self::Single(p) => p.writer_key(),
601 Self::Many(r) => r.writer_key(),
602 }
603 }
604
605 pub fn encoded_size_bytes(&self) -> usize {
606 match self {
607 Self::Single(p) => p.encoded_size_bytes(),
608 Self::Many(r) => r.hollow_bytes,
609 }
610 }
611
612 pub fn schema_id(&self) -> Option<SchemaId> {
613 match self {
614 Self::Single(p) => p.schema_id(),
615 Self::Many(_) => None,
616 }
617 }
618
619 pub fn printable_name(&self) -> &str {
622 match self {
623 Self::Single(p) => p.printable_name(),
624 Self::Many(r) => r.key.0.as_str(),
625 }
626 }
627
628 pub fn stats(&self) -> Option<&LazyPartStats> {
629 match self {
630 Self::Single(p) => p.stats(),
631 Self::Many(_) => None,
633 }
634 }
635
636 pub fn key_lower(&self) -> &[u8] {
637 match self {
638 Self::Single(p) => p.key_lower(),
639 Self::Many(r) => r.key_lower.as_slice(),
640 }
641 }
642
643 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
644 match self {
645 Self::Single(p) => p.structured_key_lower(),
646 Self::Many(_) => None,
647 }
648 }
649
650 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
651 match self {
652 Self::Single(p) => p.ts_rewrite(),
653 Self::Many(_) => None,
654 }
655 }
656}
657
658impl<T> RunPart<T>
659where
660 T: Timestamp + Codec64,
661{
662 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
663 match self {
664 Self::Single(p) => p.diffs_sum(metrics),
665 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
666 }
667 }
668}
669
670#[derive(Clone, Debug)]
672pub struct MissingBlob(BlobKey);
673
674impl std::fmt::Display for MissingBlob {
675 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
676 write!(f, "unexpectedly missing key: {}", self.0)
677 }
678}
679
680impl std::error::Error for MissingBlob {}
681
682impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
683 pub fn part_stream<'a>(
684 &'a self,
685 shard_id: ShardId,
686 blob: &'a dyn Blob,
687 metrics: &'a Metrics,
688 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
689 try_stream! {
690 match self {
691 RunPart::Single(p) => {
692 yield Cow::Borrowed(p);
693 }
694 RunPart::Many(r) => {
695 let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
696 for run_part in fetched.parts {
697 for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
698 yield Cow::Owned(batch_part?.into_owned());
699 }
700 }
701 }
702 }
703 }
704 }
705}
706
707impl<T: Ord> PartialOrd for BatchPart<T> {
708 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
709 Some(self.cmp(other))
710 }
711}
712
713impl<T: Ord> Ord for BatchPart<T> {
714 fn cmp(&self, other: &Self) -> Ordering {
715 match (self, other) {
716 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
717 (
718 BatchPart::Inline {
719 updates: s_updates,
720 ts_rewrite: s_ts_rewrite,
721 schema_id: s_schema_id,
722 deprecated_schema_id: s_deprecated_schema_id,
723 },
724 BatchPart::Inline {
725 updates: o_updates,
726 ts_rewrite: o_ts_rewrite,
727 schema_id: o_schema_id,
728 deprecated_schema_id: o_deprecated_schema_id,
729 },
730 ) => (
731 s_updates,
732 s_ts_rewrite.as_ref().map(|x| x.elements()),
733 s_schema_id,
734 s_deprecated_schema_id,
735 )
736 .cmp(&(
737 o_updates,
738 o_ts_rewrite.as_ref().map(|x| x.elements()),
739 o_schema_id,
740 o_deprecated_schema_id,
741 )),
742 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
743 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
744 }
745 }
746}
747
748#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
750pub(crate) enum RunOrder {
751 Unordered,
753 Codec,
755 Structured,
757}
758
759#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
760pub struct RunId(pub(crate) [u8; 16]);
761
762impl std::fmt::Display for RunId {
763 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
764 write!(f, "ri{}", Uuid::from_bytes(self.0))
765 }
766}
767
768impl std::fmt::Debug for RunId {
769 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
770 write!(f, "RunId({})", Uuid::from_bytes(self.0))
771 }
772}
773
774impl std::str::FromStr for RunId {
775 type Err = String;
776
777 fn from_str(s: &str) -> Result<Self, Self::Err> {
778 parse_id("ri", "RunId", s).map(RunId)
779 }
780}
781
782impl From<RunId> for String {
783 fn from(x: RunId) -> Self {
784 x.to_string()
785 }
786}
787
788impl RunId {
789 pub(crate) fn new() -> Self {
790 RunId(*Uuid::new_v4().as_bytes())
791 }
792}
793
794impl Arbitrary for RunId {
795 type Parameters = ();
796 type Strategy = proptest::strategy::BoxedStrategy<Self>;
797 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
798 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
799 RunId(*Uuid::from_u128(n).as_bytes())
800 })
801 .boxed()
802 }
803}
804
805#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
807pub struct RunMeta {
808 pub(crate) order: Option<RunOrder>,
810 pub(crate) schema: Option<SchemaId>,
812
813 pub(crate) deprecated_schema: Option<SchemaId>,
815
816 pub(crate) id: Option<RunId>,
818
819 pub(crate) len: Option<usize>,
821}
822
823#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
825pub struct HollowBatchPart<T> {
826 pub key: PartialBatchKey,
828 pub encoded_size_bytes: usize,
830 #[serde(serialize_with = "serialize_part_bytes")]
833 pub key_lower: Vec<u8>,
834 #[serde(serialize_with = "serialize_lazy_proto")]
836 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837 #[serde(serialize_with = "serialize_part_stats")]
839 pub stats: Option<LazyPartStats>,
840 pub ts_rewrite: Option<Antichain<T>>,
848 #[serde(serialize_with = "serialize_diffs_sum")]
856 pub diffs_sum: Option<[u8; 8]>,
857 pub format: Option<BatchColumnarFormat>,
862 pub schema_id: Option<SchemaId>,
867
868 pub deprecated_schema_id: Option<SchemaId>,
870}
871
872#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877 pub desc: Description<T>,
879 pub len: usize,
881 pub(crate) parts: Vec<RunPart<T>>,
883 pub(crate) run_splits: Vec<usize>,
891 pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898 let HollowBatch {
899 desc,
900 parts,
901 len,
902 run_splits: runs,
903 run_meta,
904 } = self;
905 f.debug_struct("HollowBatch")
906 .field(
907 "desc",
908 &(
909 desc.lower().elements(),
910 desc.upper().elements(),
911 desc.since().elements(),
912 ),
913 )
914 .field("parts", &parts)
915 .field("len", &len)
916 .field("runs", &runs)
917 .field("run_meta", &run_meta)
918 .finish()
919 }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924 let HollowBatch {
925 desc,
926 len,
927 parts: _,
929 run_splits: _,
930 run_meta: _,
931 } = self;
932 let mut s = s.serialize_struct("HollowBatch", 5)?;
933 let () = s.serialize_field("lower", &desc.lower().elements())?;
934 let () = s.serialize_field("upper", &desc.upper().elements())?;
935 let () = s.serialize_field("since", &desc.since().elements())?;
936 let () = s.serialize_field("len", len)?;
937 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938 s.end()
939 }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944 Some(self.cmp(other))
945 }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949 fn cmp(&self, other: &Self) -> Ordering {
950 let HollowBatch {
953 desc: self_desc,
954 parts: self_parts,
955 len: self_len,
956 run_splits: self_runs,
957 run_meta: self_run_meta,
958 } = self;
959 let HollowBatch {
960 desc: other_desc,
961 parts: other_parts,
962 len: other_len,
963 run_splits: other_runs,
964 run_meta: other_run_meta,
965 } = other;
966 (
967 self_desc.lower().elements(),
968 self_desc.upper().elements(),
969 self_desc.since().elements(),
970 self_parts,
971 self_len,
972 self_runs,
973 self_run_meta,
974 )
975 .cmp(&(
976 other_desc.lower().elements(),
977 other_desc.upper().elements(),
978 other_desc.since().elements(),
979 other_parts,
980 other_len,
981 other_runs,
982 other_run_meta,
983 ))
984 }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988 pub(crate) fn part_stream<'a>(
989 &'a self,
990 shard_id: ShardId,
991 blob: &'a dyn Blob,
992 metrics: &'a Metrics,
993 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994 stream! {
995 for part in &self.parts {
996 for await part in part.part_stream(shard_id, blob, metrics) {
997 yield part;
998 }
999 }
1000 }
1001 }
1002}
1003impl<T> HollowBatch<T> {
1004 pub(crate) fn new(
1011 desc: Description<T>,
1012 parts: Vec<RunPart<T>>,
1013 len: usize,
1014 run_meta: Vec<RunMeta>,
1015 run_splits: Vec<usize>,
1016 ) -> Self {
1017 debug_assert!(
1018 run_splits.is_strictly_sorted(),
1019 "run indices should be strictly increasing"
1020 );
1021 debug_assert!(
1022 run_splits.first().map_or(true, |i| *i > 0),
1023 "run indices should be positive"
1024 );
1025 debug_assert!(
1026 run_splits.last().map_or(true, |i| *i < parts.len()),
1027 "run indices should be valid indices into parts"
1028 );
1029 debug_assert!(
1030 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031 "all metadata should correspond to a run"
1032 );
1033
1034 Self {
1035 desc,
1036 len,
1037 parts,
1038 run_splits,
1039 run_meta,
1040 }
1041 }
1042
1043 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045 let run_meta = if parts.is_empty() {
1046 vec![]
1047 } else {
1048 vec![RunMeta::default()]
1049 };
1050 Self {
1051 desc,
1052 len,
1053 parts,
1054 run_splits: vec![],
1055 run_meta,
1056 }
1057 }
1058
1059 #[cfg(test)]
1060 pub(crate) fn new_run_for_test(
1061 desc: Description<T>,
1062 parts: Vec<RunPart<T>>,
1063 len: usize,
1064 run_id: RunId,
1065 ) -> Self {
1066 let run_meta = if parts.is_empty() {
1067 vec![]
1068 } else {
1069 let mut meta = RunMeta::default();
1070 meta.id = Some(run_id);
1071 vec![meta]
1072 };
1073 Self {
1074 desc,
1075 len,
1076 parts,
1077 run_splits: vec![],
1078 run_meta,
1079 }
1080 }
1081
1082 pub(crate) fn empty(desc: Description<T>) -> Self {
1084 Self {
1085 desc,
1086 len: 0,
1087 parts: vec![],
1088 run_splits: vec![],
1089 run_meta: vec![],
1090 }
1091 }
1092
1093 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094 let run_ends = self
1095 .run_splits
1096 .iter()
1097 .copied()
1098 .chain(std::iter::once(self.parts.len()));
1099 let run_metas = self.run_meta.iter();
1100 let run_parts = run_ends
1101 .scan(0, |start, end| {
1102 let range = *start..end;
1103 *start = end;
1104 Some(range)
1105 })
1106 .filter(|range| !range.is_empty())
1107 .map(|range| &self.parts[range]);
1108 run_metas.zip_eq(run_parts)
1109 }
1110
1111 pub(crate) fn inline_bytes(&self) -> usize {
1112 self.parts.iter().map(|x| x.inline_bytes()).sum()
1113 }
1114
1115 pub(crate) fn is_empty(&self) -> bool {
1116 self.parts.is_empty()
1117 }
1118
1119 pub(crate) fn part_count(&self) -> usize {
1120 self.parts.len()
1121 }
1122
1123 pub fn encoded_size_bytes(&self) -> usize {
1125 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126 }
1127}
1128
1129impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131 pub(crate) fn rewrite_ts(
1132 &mut self,
1133 frontier: &Antichain<T>,
1134 new_upper: Antichain<T>,
1135 ) -> Result<(), String> {
1136 if !PartialOrder::less_than(frontier, &new_upper) {
1137 return Err(format!(
1138 "rewrite frontier {:?} !< rewrite upper {:?}",
1139 frontier.elements(),
1140 new_upper.elements(),
1141 ));
1142 }
1143 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144 return Err(format!(
1145 "rewrite upper {:?} < batch upper {:?}",
1146 new_upper.elements(),
1147 self.desc.upper().elements(),
1148 ));
1149 }
1150
1151 if PartialOrder::less_than(frontier, self.desc.lower()) {
1154 return Err(format!(
1155 "rewrite frontier {:?} < batch lower {:?}",
1156 frontier.elements(),
1157 self.desc.lower().elements(),
1158 ));
1159 }
1160 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161 return Err(format!(
1162 "batch since {:?} != minimum antichain {:?}",
1163 self.desc.since().elements(),
1164 &[T::minimum()],
1165 ));
1166 }
1167 for part in self.parts.iter() {
1168 let Some(ts_rewrite) = part.ts_rewrite() else {
1169 continue;
1170 };
1171 if PartialOrder::less_than(frontier, ts_rewrite) {
1172 return Err(format!(
1173 "rewrite frontier {:?} < batch rewrite {:?}",
1174 frontier.elements(),
1175 ts_rewrite.elements(),
1176 ));
1177 }
1178 }
1179
1180 self.desc = Description::new(
1181 self.desc.lower().clone(),
1182 new_upper,
1183 self.desc.since().clone(),
1184 );
1185 for part in &mut self.parts {
1186 match part {
1187 RunPart::Single(BatchPart::Hollow(part)) => {
1188 part.ts_rewrite = Some(frontier.clone())
1189 }
1190 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191 *ts_rewrite = Some(frontier.clone())
1192 }
1193 RunPart::Many(runs) => {
1194 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197 }
1198 }
1199 }
1200 Ok(())
1201 }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206 Some(self.cmp(other))
1207 }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211 fn cmp(&self, other: &Self) -> Ordering {
1212 let HollowBatchPart {
1215 key: self_key,
1216 encoded_size_bytes: self_encoded_size_bytes,
1217 key_lower: self_key_lower,
1218 structured_key_lower: self_structured_key_lower,
1219 stats: self_stats,
1220 ts_rewrite: self_ts_rewrite,
1221 diffs_sum: self_diffs_sum,
1222 format: self_format,
1223 schema_id: self_schema_id,
1224 deprecated_schema_id: self_deprecated_schema_id,
1225 } = self;
1226 let HollowBatchPart {
1227 key: other_key,
1228 encoded_size_bytes: other_encoded_size_bytes,
1229 key_lower: other_key_lower,
1230 structured_key_lower: other_structured_key_lower,
1231 stats: other_stats,
1232 ts_rewrite: other_ts_rewrite,
1233 diffs_sum: other_diffs_sum,
1234 format: other_format,
1235 schema_id: other_schema_id,
1236 deprecated_schema_id: other_deprecated_schema_id,
1237 } = other;
1238 (
1239 self_key,
1240 self_encoded_size_bytes,
1241 self_key_lower,
1242 self_structured_key_lower,
1243 self_stats,
1244 self_ts_rewrite.as_ref().map(|x| x.elements()),
1245 self_diffs_sum,
1246 self_format,
1247 self_schema_id,
1248 self_deprecated_schema_id,
1249 )
1250 .cmp(&(
1251 other_key,
1252 other_encoded_size_bytes,
1253 other_key_lower,
1254 other_structured_key_lower,
1255 other_stats,
1256 other_ts_rewrite.as_ref().map(|x| x.elements()),
1257 other_diffs_sum,
1258 other_format,
1259 other_schema_id,
1260 other_deprecated_schema_id,
1261 ))
1262 }
1263}
1264
1265#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1267pub struct HollowRollup {
1268 pub key: PartialRollupKey,
1270 pub encoded_size_bytes: Option<usize>,
1272}
1273
1274#[derive(Debug)]
1276pub enum HollowBlobRef<'a, T> {
1277 Batch(&'a HollowBatch<T>),
1278 Rollup(&'a HollowRollup),
1279}
1280
1281#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1283pub struct ActiveRollup {
1284 pub seqno: SeqNo,
1285 pub start_ms: u64,
1286}
1287
1288#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1290pub struct ActiveGc {
1291 pub seqno: SeqNo,
1292 pub start_ms: u64,
1293}
1294
1295#[derive(Debug)]
1300#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1301pub struct NoOpStateTransition<T>(pub T);
1302
1303#[derive(Debug, Clone)]
1305#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1306pub struct StateCollections<T> {
1307 pub(crate) last_gc_req: SeqNo,
1310
1311 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1313
1314 pub(crate) active_rollup: Option<ActiveRollup>,
1316 pub(crate) active_gc: Option<ActiveGc>,
1318
1319 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1320 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1321 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1322 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1323
1324 pub(crate) trace: Trace<T>,
1329}
1330
1331#[derive(Debug, Clone, Serialize, PartialEq)]
1347pub struct EncodedSchemas {
1348 pub key: Bytes,
1350 pub key_data_type: Bytes,
1353 pub val: Bytes,
1355 pub val_data_type: Bytes,
1358}
1359
1360impl EncodedSchemas {
1361 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1362 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1363 DataType::from_proto(proto).expect("valid DataType")
1364 }
1365}
1366
1367#[derive(Debug)]
1368#[cfg_attr(test, derive(PartialEq))]
1369pub enum CompareAndAppendBreak<T> {
1370 AlreadyCommitted,
1371 Upper {
1372 shard_upper: Antichain<T>,
1373 writer_upper: Antichain<T>,
1374 },
1375 InvalidUsage(InvalidUsage<T>),
1376 InlineBackpressure,
1377}
1378
1379#[derive(Debug)]
1380#[cfg_attr(test, derive(PartialEq))]
1381pub enum SnapshotErr<T> {
1382 AsOfNotYetAvailable(SeqNo, Upper<T>),
1383 AsOfHistoricalDistinctionsLost(Since<T>),
1384}
1385
1386impl<T> StateCollections<T>
1387where
1388 T: Timestamp + Lattice + Codec64,
1389{
1390 pub fn add_rollup(
1391 &mut self,
1392 add_rollup: (SeqNo, &HollowRollup),
1393 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1394 let (rollup_seqno, rollup) = add_rollup;
1395 let applied = match self.rollups.get(&rollup_seqno) {
1396 Some(x) => x.key == rollup.key,
1397 None => {
1398 self.active_rollup = None;
1399 self.rollups.insert(rollup_seqno, rollup.to_owned());
1400 true
1401 }
1402 };
1403 Continue(applied)
1407 }
1408
1409 pub fn remove_rollups(
1410 &mut self,
1411 remove_rollups: &[(SeqNo, PartialRollupKey)],
1412 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1413 if remove_rollups.is_empty() || self.is_tombstone() {
1414 return Break(NoOpStateTransition(vec![]));
1415 }
1416
1417 self.active_gc = None;
1420
1421 let mut removed = vec![];
1422 for (seqno, key) in remove_rollups {
1423 let removed_key = self.rollups.remove(seqno);
1424 debug_assert!(
1425 removed_key.as_ref().map_or(true, |x| &x.key == key),
1426 "{} vs {:?}",
1427 key,
1428 removed_key
1429 );
1430
1431 if removed_key.is_some() {
1432 removed.push(*seqno);
1433 }
1434 }
1435
1436 Continue(removed)
1437 }
1438
1439 pub fn register_leased_reader(
1440 &mut self,
1441 hostname: &str,
1442 reader_id: &LeasedReaderId,
1443 purpose: &str,
1444 seqno: SeqNo,
1445 lease_duration: Duration,
1446 heartbeat_timestamp_ms: u64,
1447 use_critical_since: bool,
1448 ) -> ControlFlow<
1449 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1450 (LeasedReaderState<T>, SeqNo),
1451 > {
1452 let since = if use_critical_since {
1453 self.critical_since()
1454 .unwrap_or_else(|| self.trace.since().clone())
1455 } else {
1456 self.trace.since().clone()
1457 };
1458 let reader_state = LeasedReaderState {
1459 debug: HandleDebugState {
1460 hostname: hostname.to_owned(),
1461 purpose: purpose.to_owned(),
1462 },
1463 seqno,
1464 since,
1465 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1466 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1467 .expect("lease duration as millis should fit within u64"),
1468 };
1469
1470 if self.is_tombstone() {
1475 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1476 }
1477
1478 self.leased_readers
1480 .insert(reader_id.clone(), reader_state.clone());
1481 Continue((reader_state, self.seqno_since(seqno)))
1482 }
1483
1484 pub fn register_critical_reader<O: Opaque + Codec64>(
1485 &mut self,
1486 hostname: &str,
1487 reader_id: &CriticalReaderId,
1488 purpose: &str,
1489 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1490 let state = CriticalReaderState {
1491 debug: HandleDebugState {
1492 hostname: hostname.to_owned(),
1493 purpose: purpose.to_owned(),
1494 },
1495 since: self.trace.since().clone(),
1496 opaque: OpaqueState(Codec64::encode(&O::initial())),
1497 opaque_codec: O::codec_name(),
1498 };
1499
1500 if self.is_tombstone() {
1505 return Break(NoOpStateTransition(state));
1506 }
1507
1508 let state = match self.critical_readers.get_mut(reader_id) {
1509 Some(existing_state) => {
1510 existing_state.debug = state.debug;
1511 existing_state.clone()
1512 }
1513 None => {
1514 self.critical_readers
1515 .insert(reader_id.clone(), state.clone());
1516 state
1517 }
1518 };
1519 Continue(state)
1520 }
1521
1522 pub fn register_schema<K: Codec, V: Codec>(
1523 &mut self,
1524 key_schema: &K::Schema,
1525 val_schema: &V::Schema,
1526 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1527 fn encode_data_type(data_type: &DataType) -> Bytes {
1528 let proto = data_type.into_proto();
1529 prost::Message::encode_to_vec(&proto).into()
1530 }
1531
1532 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1544 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1545 });
1546 match existing_id {
1547 Some((schema_id, _)) => {
1548 Break(NoOpStateTransition(Some(*schema_id)))
1553 }
1554 None if self.is_tombstone() => {
1555 Break(NoOpStateTransition(None))
1557 }
1558 None if self.schemas.is_empty() => {
1559 let id = SchemaId(self.schemas.len());
1563 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1564 .expect("valid key schema");
1565 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1566 .expect("valid val schema");
1567 let prev = self.schemas.insert(
1568 id,
1569 EncodedSchemas {
1570 key: K::encode_schema(key_schema),
1571 key_data_type: encode_data_type(&key_data_type),
1572 val: V::encode_schema(val_schema),
1573 val_data_type: encode_data_type(&val_data_type),
1574 },
1575 );
1576 assert_eq!(prev, None);
1577 Continue(Some(id))
1578 }
1579 None => {
1580 info!(
1581 "register_schemas got {:?} expected {:?}",
1582 key_schema,
1583 self.schemas
1584 .iter()
1585 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1586 .collect::<Vec<_>>()
1587 );
1588 Break(NoOpStateTransition(None))
1591 }
1592 }
1593 }
1594
1595 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1596 &mut self,
1597 expected: SchemaId,
1598 key_schema: &K::Schema,
1599 val_schema: &V::Schema,
1600 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1601 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1602 let array = Schema::encoder(schema).expect("valid schema").finish();
1606 Array::data_type(&array).clone()
1607 }
1608
1609 let (current_id, current) = self
1610 .schemas
1611 .last_key_value()
1612 .expect("all shards have a schema");
1613 if *current_id != expected {
1614 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1615 schema_id: *current_id,
1616 key: K::decode_schema(¤t.key),
1617 val: V::decode_schema(¤t.val),
1618 }));
1619 }
1620
1621 let current_key = K::decode_schema(¤t.key);
1622 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1623 let current_val = V::decode_schema(¤t.val);
1624 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1625
1626 let key_dt = data_type(key_schema);
1627 let val_dt = data_type(val_schema);
1628
1629 if current_key == *key_schema
1631 && current_key_dt == key_dt
1632 && current_val == *val_schema
1633 && current_val_dt == val_dt
1634 {
1635 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1636 }
1637
1638 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1639 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1640 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1641 return Break(NoOpStateTransition(CaESchema::Incompatible));
1642 };
1643 if key_fn.contains_drop() || val_fn.contains_drop() {
1647 return Break(NoOpStateTransition(CaESchema::Incompatible));
1648 }
1649
1650 let id = SchemaId(self.schemas.len());
1654 self.schemas.insert(
1655 id,
1656 EncodedSchemas {
1657 key: K::encode_schema(key_schema),
1658 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1659 val: V::encode_schema(val_schema),
1660 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1661 },
1662 );
1663 Continue(CaESchema::Ok(id))
1664 }
1665
1666 pub fn compare_and_append(
1667 &mut self,
1668 batch: &HollowBatch<T>,
1669 writer_id: &WriterId,
1670 heartbeat_timestamp_ms: u64,
1671 lease_duration_ms: u64,
1672 idempotency_token: &IdempotencyToken,
1673 debug_info: &HandleDebugState,
1674 inline_writes_total_max_bytes: usize,
1675 claim_compaction_percent: usize,
1676 claim_compaction_min_version: Option<&Version>,
1677 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1678 if self.is_tombstone() {
1683 assert_eq!(self.trace.upper(), &Antichain::new());
1684 return Break(CompareAndAppendBreak::Upper {
1685 shard_upper: Antichain::new(),
1686 writer_upper: Antichain::new(),
1691 });
1692 }
1693
1694 let writer_state = self
1695 .writers
1696 .entry(writer_id.clone())
1697 .or_insert_with(|| WriterState {
1698 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1699 lease_duration_ms,
1700 most_recent_write_token: IdempotencyToken::SENTINEL,
1701 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1702 debug: debug_info.clone(),
1703 });
1704
1705 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1706 return Break(CompareAndAppendBreak::InvalidUsage(
1707 InvalidUsage::InvalidBounds {
1708 lower: batch.desc.lower().clone(),
1709 upper: batch.desc.upper().clone(),
1710 },
1711 ));
1712 }
1713
1714 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1717 return Break(CompareAndAppendBreak::InvalidUsage(
1718 InvalidUsage::InvalidEmptyTimeInterval {
1719 lower: batch.desc.lower().clone(),
1720 upper: batch.desc.upper().clone(),
1721 keys: batch
1722 .parts
1723 .iter()
1724 .map(|x| x.printable_name().to_owned())
1725 .collect(),
1726 },
1727 ));
1728 }
1729
1730 if idempotency_token == &writer_state.most_recent_write_token {
1731 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1736 assert!(
1737 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1738 "{:?} vs {:?}",
1739 batch.desc.upper(),
1740 self.trace.upper()
1741 );
1742 return Break(CompareAndAppendBreak::AlreadyCommitted);
1743 }
1744
1745 let shard_upper = self.trace.upper();
1746 if shard_upper != batch.desc.lower() {
1747 return Break(CompareAndAppendBreak::Upper {
1748 shard_upper: shard_upper.clone(),
1749 writer_upper: writer_state.most_recent_write_upper.clone(),
1750 });
1751 }
1752
1753 let new_inline_bytes = batch.inline_bytes();
1754 if new_inline_bytes > 0 {
1755 let mut existing_inline_bytes = 0;
1756 self.trace
1757 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1758 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1762 return Break(CompareAndAppendBreak::InlineBackpressure);
1763 }
1764 }
1765
1766 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1767 self.trace.push_batch(batch.clone())
1768 } else {
1769 Vec::new()
1770 };
1771
1772 let all_empty_reqs = merge_reqs
1775 .iter()
1776 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1777 if all_empty_reqs && !batch.is_empty() {
1778 let mut reqs_to_take = claim_compaction_percent / 100;
1779 if (usize::cast_from(idempotency_token.hashed()) % 100)
1780 < (claim_compaction_percent % 100)
1781 {
1782 reqs_to_take += 1;
1783 }
1784 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1785 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1786 merge_reqs.extend(
1787 self.trace
1790 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1791 .take(reqs_to_take),
1792 )
1793 }
1794
1795 for req in &merge_reqs {
1796 self.trace.claim_compaction(
1797 req.id,
1798 ActiveCompaction {
1799 start_ms: heartbeat_timestamp_ms,
1800 },
1801 )
1802 }
1803
1804 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1805 writer_state.most_recent_write_token = idempotency_token.clone();
1806 assert!(
1808 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1809 "{:?} vs {:?}",
1810 &writer_state.most_recent_write_upper,
1811 batch.desc.upper()
1812 );
1813 writer_state
1814 .most_recent_write_upper
1815 .clone_from(batch.desc.upper());
1816
1817 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1819 heartbeat_timestamp_ms,
1820 writer_state.last_heartbeat_timestamp_ms,
1821 );
1822
1823 Continue(merge_reqs)
1824 }
1825
1826 pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1827 &mut self,
1828 res: &FueledMergeRes<T>,
1829 metrics: &ColumnarMetrics,
1830 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1831 if self.is_tombstone() {
1836 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1837 }
1838
1839 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1840 Continue(apply_merge_result)
1841 }
1842
1843 pub fn spine_exert(
1844 &mut self,
1845 fuel: usize,
1846 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1847 let (merge_reqs, did_work) = self.trace.exert(fuel);
1848 if did_work {
1849 Continue(merge_reqs)
1850 } else {
1851 assert!(merge_reqs.is_empty());
1852 Break(NoOpStateTransition(Vec::new()))
1855 }
1856 }
1857
1858 pub fn downgrade_since(
1859 &mut self,
1860 reader_id: &LeasedReaderId,
1861 seqno: SeqNo,
1862 outstanding_seqno: Option<SeqNo>,
1863 new_since: &Antichain<T>,
1864 heartbeat_timestamp_ms: u64,
1865 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1866 if self.is_tombstone() {
1871 return Break(NoOpStateTransition(Since(Antichain::new())));
1872 }
1873
1874 let Some(reader_state) = self.leased_reader(reader_id) else {
1877 tracing::warn!(
1878 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1879 );
1880 return Break(NoOpStateTransition(Since(Antichain::new())));
1881 };
1882
1883 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1886 heartbeat_timestamp_ms,
1887 reader_state.last_heartbeat_timestamp_ms,
1888 );
1889
1890 let seqno = match outstanding_seqno {
1891 Some(outstanding_seqno) => {
1892 assert!(
1893 outstanding_seqno >= reader_state.seqno,
1894 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1895 is behind current reader_state ({:?})",
1896 outstanding_seqno,
1897 reader_state.seqno,
1898 );
1899 std::cmp::min(outstanding_seqno, seqno)
1900 }
1901 None => seqno,
1902 };
1903
1904 reader_state.seqno = seqno;
1905
1906 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1907 reader_state.since.clone_from(new_since);
1908 self.update_since();
1909 new_since.clone()
1910 } else {
1911 reader_state.since.clone()
1914 };
1915
1916 Continue(Since(reader_current_since))
1917 }
1918
1919 pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1920 &mut self,
1921 reader_id: &CriticalReaderId,
1922 expected_opaque: &O,
1923 (new_opaque, new_since): (&O, &Antichain<T>),
1924 ) -> ControlFlow<
1925 NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1926 Result<Since<T>, (O, Since<T>)>,
1927 > {
1928 if self.is_tombstone() {
1933 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1937 }
1938
1939 let reader_state = self.critical_reader(reader_id);
1940 assert_eq!(reader_state.opaque_codec, O::codec_name());
1941
1942 if &O::decode(reader_state.opaque.0) != expected_opaque {
1943 return Continue(Err((
1946 Codec64::decode(reader_state.opaque.0),
1947 Since(reader_state.since.clone()),
1948 )));
1949 }
1950
1951 reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1952 if PartialOrder::less_equal(&reader_state.since, new_since) {
1953 reader_state.since.clone_from(new_since);
1954 self.update_since();
1955 Continue(Ok(Since(new_since.clone())))
1956 } else {
1957 Continue(Ok(Since(reader_state.since.clone())))
1961 }
1962 }
1963
1964 pub fn heartbeat_leased_reader(
1965 &mut self,
1966 reader_id: &LeasedReaderId,
1967 heartbeat_timestamp_ms: u64,
1968 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1969 if self.is_tombstone() {
1974 return Break(NoOpStateTransition(false));
1975 }
1976
1977 match self.leased_readers.get_mut(reader_id) {
1978 Some(reader_state) => {
1979 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1980 heartbeat_timestamp_ms,
1981 reader_state.last_heartbeat_timestamp_ms,
1982 );
1983 Continue(true)
1984 }
1985 None => Continue(false),
1988 }
1989 }
1990
1991 pub fn expire_leased_reader(
1992 &mut self,
1993 reader_id: &LeasedReaderId,
1994 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1995 if self.is_tombstone() {
2000 return Break(NoOpStateTransition(false));
2001 }
2002
2003 let existed = self.leased_readers.remove(reader_id).is_some();
2004 if existed {
2005 }
2019 Continue(existed)
2022 }
2023
2024 pub fn expire_critical_reader(
2025 &mut self,
2026 reader_id: &CriticalReaderId,
2027 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2028 if self.is_tombstone() {
2033 return Break(NoOpStateTransition(false));
2034 }
2035
2036 let existed = self.critical_readers.remove(reader_id).is_some();
2037 if existed {
2038 }
2052 Continue(existed)
2056 }
2057
2058 pub fn expire_writer(
2059 &mut self,
2060 writer_id: &WriterId,
2061 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2062 if self.is_tombstone() {
2067 return Break(NoOpStateTransition(false));
2068 }
2069
2070 let existed = self.writers.remove(writer_id).is_some();
2071 Continue(existed)
2075 }
2076
2077 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2078 self.leased_readers.get_mut(id)
2079 }
2080
2081 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2082 self.critical_readers
2083 .get_mut(id)
2084 .unwrap_or_else(|| {
2085 panic!(
2086 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2087 id
2088 )
2089 })
2090 }
2091
2092 fn critical_since(&self) -> Option<Antichain<T>> {
2093 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2094 let mut since = critical_sinces.next().cloned()?;
2095 for s in critical_sinces {
2096 since.meet_assign(s);
2097 }
2098 Some(since)
2099 }
2100
2101 fn update_since(&mut self) {
2102 let mut sinces_iter = self
2103 .leased_readers
2104 .values()
2105 .map(|x| &x.since)
2106 .chain(self.critical_readers.values().map(|x| &x.since));
2107 let mut since = match sinces_iter.next() {
2108 Some(since) => since.clone(),
2109 None => {
2110 return;
2113 }
2114 };
2115 while let Some(s) = sinces_iter.next() {
2116 since.meet_assign(s);
2117 }
2118 self.trace.downgrade_since(&since);
2119 }
2120
2121 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2122 let mut seqno_since = seqno;
2123 for cap in self.leased_readers.values() {
2124 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2125 }
2126 seqno_since
2128 }
2129
2130 fn tombstone_batch() -> HollowBatch<T> {
2131 HollowBatch::empty(Description::new(
2132 Antichain::from_elem(T::minimum()),
2133 Antichain::new(),
2134 Antichain::new(),
2135 ))
2136 }
2137
2138 pub(crate) fn is_tombstone(&self) -> bool {
2139 self.trace.upper().is_empty()
2140 && self.trace.since().is_empty()
2141 && self.writers.is_empty()
2142 && self.leased_readers.is_empty()
2143 && self.critical_readers.is_empty()
2144 }
2145
2146 pub(crate) fn is_single_empty_batch(&self) -> bool {
2147 let mut batch_count = 0;
2148 let mut is_empty = true;
2149 self.trace.map_batches(|b| {
2150 batch_count += 1;
2151 is_empty &= b.is_empty()
2152 });
2153 batch_count <= 1 && is_empty
2154 }
2155
2156 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2157 assert_eq!(self.trace.upper(), &Antichain::new());
2158 assert_eq!(self.trace.since(), &Antichain::new());
2159
2160 let was_tombstone = self.is_tombstone();
2163
2164 self.writers.clear();
2166 self.leased_readers.clear();
2167 self.critical_readers.clear();
2168
2169 debug_assert!(self.is_tombstone());
2170
2171 let mut to_replace = None;
2180 let mut batch_count = 0;
2181 self.trace.map_batches(|b| {
2182 batch_count += 1;
2183 if !b.is_empty() && to_replace.is_none() {
2184 to_replace = Some(b.desc.clone());
2185 }
2186 });
2187 if let Some(desc) = to_replace {
2188 let result = self.trace.apply_tombstone_merge(&desc);
2192 assert!(
2193 result.matched(),
2194 "merge with a matching desc should always match"
2195 );
2196 Continue(())
2197 } else if batch_count > 1 {
2198 let mut new_trace = Trace::default();
2203 new_trace.downgrade_since(&Antichain::new());
2204 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2205 assert_eq!(merge_reqs, Vec::new());
2206 self.trace = new_trace;
2207 Continue(())
2208 } else if !was_tombstone {
2209 Continue(())
2212 } else {
2213 Break(NoOpStateTransition(()))
2216 }
2217 }
2218}
2219
2220#[derive(Debug)]
2222#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2223pub struct State<T> {
2224 pub(crate) applier_version: semver::Version,
2225 pub(crate) shard_id: ShardId,
2226
2227 pub(crate) seqno: SeqNo,
2228 pub(crate) walltime_ms: u64,
2231 pub(crate) hostname: String,
2234 pub(crate) collections: StateCollections<T>,
2235}
2236
2237pub struct TypedState<K, V, T, D> {
2240 pub(crate) state: State<T>,
2241
2242 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2250}
2251
2252impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2253 #[cfg(any(test, debug_assertions))]
2254 pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2255 TypedState {
2256 state: State {
2257 applier_version,
2258 shard_id: self.shard_id.clone(),
2259 seqno: self.seqno.clone(),
2260 walltime_ms: self.walltime_ms,
2261 hostname,
2262 collections: self.collections.clone(),
2263 },
2264 _phantom: PhantomData,
2265 }
2266 }
2267
2268 pub(crate) fn clone_for_rollup(&self) -> Self {
2269 TypedState {
2270 state: State {
2271 applier_version: self.applier_version.clone(),
2272 shard_id: self.shard_id.clone(),
2273 seqno: self.seqno.clone(),
2274 walltime_ms: self.walltime_ms,
2275 hostname: self.hostname.clone(),
2276 collections: self.collections.clone(),
2277 },
2278 _phantom: PhantomData,
2279 }
2280 }
2281}
2282
2283impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2284 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2285 let TypedState { state, _phantom } = self;
2288 f.debug_struct("TypedState").field("state", state).finish()
2289 }
2290}
2291
2292#[cfg(any(test, debug_assertions))]
2294impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2295 fn eq(&self, other: &Self) -> bool {
2296 let TypedState {
2299 state: self_state,
2300 _phantom,
2301 } = self;
2302 let TypedState {
2303 state: other_state,
2304 _phantom,
2305 } = other;
2306 self_state == other_state
2307 }
2308}
2309
2310impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2311 type Target = State<T>;
2312
2313 fn deref(&self) -> &Self::Target {
2314 &self.state
2315 }
2316}
2317
2318impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2319 fn deref_mut(&mut self) -> &mut Self::Target {
2320 &mut self.state
2321 }
2322}
2323
2324impl<K, V, T, D> TypedState<K, V, T, D>
2325where
2326 K: Codec,
2327 V: Codec,
2328 T: Timestamp + Lattice + Codec64,
2329 D: Codec64,
2330{
2331 pub fn new(
2332 applier_version: Version,
2333 shard_id: ShardId,
2334 hostname: String,
2335 walltime_ms: u64,
2336 ) -> Self {
2337 let state = State {
2338 applier_version,
2339 shard_id,
2340 seqno: SeqNo::minimum(),
2341 walltime_ms,
2342 hostname,
2343 collections: StateCollections {
2344 last_gc_req: SeqNo::minimum(),
2345 rollups: BTreeMap::new(),
2346 active_rollup: None,
2347 active_gc: None,
2348 leased_readers: BTreeMap::new(),
2349 critical_readers: BTreeMap::new(),
2350 writers: BTreeMap::new(),
2351 schemas: BTreeMap::new(),
2352 trace: Trace::default(),
2353 },
2354 };
2355 TypedState {
2356 state,
2357 _phantom: PhantomData,
2358 }
2359 }
2360
2361 pub fn clone_apply<R, E, WorkFn>(
2362 &self,
2363 cfg: &PersistConfig,
2364 work_fn: &mut WorkFn,
2365 ) -> ControlFlow<E, (R, Self)>
2366 where
2367 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2368 {
2369 let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2374 let mut new_state = State {
2375 applier_version: new_applier_version.clone(),
2376 shard_id: self.shard_id,
2377 seqno: self.seqno.next(),
2378 walltime_ms: (cfg.now)(),
2379 hostname: cfg.hostname.clone(),
2380 collections: self.collections.clone(),
2381 };
2382 if new_state.walltime_ms <= self.walltime_ms {
2385 new_state.walltime_ms = self.walltime_ms + 1;
2386 }
2387
2388 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2389 let new_state = TypedState {
2390 state: new_state,
2391 _phantom: PhantomData,
2392 };
2393 Continue((work_ret, new_state))
2394 }
2395}
2396
2397#[derive(Copy, Clone, Debug)]
2398pub struct GcConfig {
2399 pub use_active_gc: bool,
2400 pub fallback_threshold_ms: u64,
2401 pub min_versions: usize,
2402 pub max_versions: usize,
2403}
2404
2405impl<T> State<T>
2406where
2407 T: Timestamp + Lattice + Codec64,
2408{
2409 pub fn shard_id(&self) -> ShardId {
2410 self.shard_id
2411 }
2412
2413 pub fn seqno(&self) -> SeqNo {
2414 self.seqno
2415 }
2416
2417 pub fn since(&self) -> &Antichain<T> {
2418 self.collections.trace.since()
2419 }
2420
2421 pub fn upper(&self) -> &Antichain<T> {
2422 self.collections.trace.upper()
2423 }
2424
2425 pub fn spine_batch_count(&self) -> usize {
2426 self.collections.trace.num_spine_batches()
2427 }
2428
2429 pub fn size_metrics(&self) -> StateSizeMetrics {
2430 let mut ret = StateSizeMetrics::default();
2431 self.blobs().for_each(|x| match x {
2432 HollowBlobRef::Batch(x) => {
2433 ret.hollow_batch_count += 1;
2434 ret.batch_part_count += x.part_count();
2435 ret.num_updates += x.len;
2436
2437 let batch_size = x.encoded_size_bytes();
2438 for x in x.parts.iter() {
2439 if x.ts_rewrite().is_some() {
2440 ret.rewrite_part_count += 1;
2441 }
2442 if x.is_inline() {
2443 ret.inline_part_count += 1;
2444 ret.inline_part_bytes += x.inline_bytes();
2445 }
2446 }
2447 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2448 ret.state_batches_bytes += batch_size;
2449 }
2450 HollowBlobRef::Rollup(x) => {
2451 ret.state_rollup_count += 1;
2452 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2453 }
2454 });
2455 ret
2456 }
2457
2458 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2459 self.collections
2462 .rollups
2463 .iter()
2464 .rev()
2465 .next()
2466 .expect("State should have at least one rollup if seqno > minimum")
2467 }
2468
2469 pub(crate) fn seqno_since(&self) -> SeqNo {
2470 self.collections.seqno_since(self.seqno)
2471 }
2472
2473 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2485 let GcConfig {
2486 use_active_gc,
2487 fallback_threshold_ms,
2488 min_versions,
2489 max_versions,
2490 } = cfg;
2491 let gc_threshold = if use_active_gc {
2495 u64::cast_from(min_versions)
2496 } else {
2497 std::cmp::max(
2498 1,
2499 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2500 )
2501 };
2502 let new_seqno_since = self.seqno_since();
2503 let gc_until_seqno = new_seqno_since.min(SeqNo(
2506 self.collections
2507 .last_gc_req
2508 .0
2509 .saturating_add(u64::cast_from(max_versions)),
2510 ));
2511 let should_gc = new_seqno_since
2512 .0
2513 .saturating_sub(self.collections.last_gc_req.0)
2514 >= gc_threshold;
2515
2516 let should_gc = if use_active_gc && !should_gc {
2519 match self.collections.active_gc {
2520 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2521 None => false,
2522 }
2523 } else {
2524 should_gc
2525 };
2526 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2529 let tombstone_needs_gc = self.collections.is_tombstone();
2534 let should_gc = should_gc || tombstone_needs_gc;
2535 let should_gc = if use_active_gc {
2536 should_gc
2540 && match self.collections.active_gc {
2541 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2542 None => true,
2543 }
2544 } else {
2545 should_gc
2546 };
2547 if should_gc {
2548 self.collections.last_gc_req = gc_until_seqno;
2549 Some(GcReq {
2550 shard_id: self.shard_id,
2551 new_seqno_since: gc_until_seqno,
2552 })
2553 } else {
2554 None
2555 }
2556 }
2557
2558 pub fn seqnos_held(&self) -> usize {
2560 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2561 }
2562
2563 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2565 let mut metrics = ExpiryMetrics::default();
2566 let shard_id = self.shard_id();
2567 self.collections.leased_readers.retain(|id, state| {
2568 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2569 if !retain {
2570 info!(
2571 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2572 state.debug.purpose
2573 );
2574 metrics.readers_expired += 1;
2575 }
2576 retain
2577 });
2578 self.collections.writers.retain(|id, state| {
2580 let retain =
2581 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2582 if !retain {
2583 info!(
2584 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2585 state.debug.purpose
2586 );
2587 metrics.writers_expired += 1;
2588 }
2589 retain
2590 });
2591 metrics
2592 }
2593
2594 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2598 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2599 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2600 self.collections.trace.since().clone(),
2601 )));
2602 }
2603 let upper = self.collections.trace.upper();
2604 if PartialOrder::less_equal(upper, as_of) {
2605 return Err(SnapshotErr::AsOfNotYetAvailable(
2606 self.seqno,
2607 Upper(upper.clone()),
2608 ));
2609 }
2610
2611 let batches = self
2612 .collections
2613 .trace
2614 .batches()
2615 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2616 .cloned()
2617 .collect();
2618 Ok(batches)
2619 }
2620
2621 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2623 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2624 return Err(Since(self.collections.trace.since().clone()));
2625 }
2626 Ok(())
2627 }
2628
2629 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2630 self.collections
2633 .trace
2634 .batches()
2635 .find(|b| {
2636 PartialOrder::less_equal(b.desc.lower(), frontier)
2637 && PartialOrder::less_than(frontier, b.desc.upper())
2638 })
2639 .cloned()
2640 .ok_or(self.seqno)
2641 }
2642
2643 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2644 self.collections.active_rollup
2645 }
2646
2647 pub fn need_rollup(
2648 &self,
2649 threshold: usize,
2650 use_active_rollup: bool,
2651 fallback_threshold_ms: u64,
2652 now: u64,
2653 ) -> Option<SeqNo> {
2654 let (latest_rollup_seqno, _) = self.latest_rollup();
2655
2656 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2662 return Some(self.seqno);
2663 }
2664
2665 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2666
2667 if use_active_rollup {
2668 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2674 match self.active_rollup() {
2675 Some(active_rollup) => {
2676 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2677 return Some(self.seqno);
2678 }
2679 }
2680 None => {
2681 return Some(self.seqno);
2682 }
2683 }
2684 }
2685 } else {
2686 if seqnos_since_last_rollup > 0
2690 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2691 {
2692 return Some(self.seqno);
2693 }
2694
2695 if seqnos_since_last_rollup
2698 > u64::cast_from(
2699 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2700 )
2701 {
2702 return Some(self.seqno);
2703 }
2704 }
2705
2706 None
2707 }
2708
2709 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2710 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2711 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2712 batches.chain(rollups)
2713 }
2714}
2715
2716fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2717 let val = hex::encode(val);
2718 val.serialize(s)
2719}
2720
2721fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2722 val: &Option<LazyProto<T>>,
2723 s: S,
2724) -> Result<S::Ok, S::Error> {
2725 val.as_ref()
2726 .map(|lazy| hex::encode(&lazy.into_proto()))
2727 .serialize(s)
2728}
2729
2730fn serialize_part_stats<S: Serializer>(
2731 val: &Option<LazyPartStats>,
2732 s: S,
2733) -> Result<S::Ok, S::Error> {
2734 let val = val.as_ref().map(|x| x.decode().key);
2735 val.serialize(s)
2736}
2737
2738fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2739 let val = val.map(i64::decode);
2741 val.serialize(s)
2742}
2743
2744impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2750 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2751 let State {
2752 applier_version,
2753 shard_id,
2754 seqno,
2755 walltime_ms,
2756 hostname,
2757 collections:
2758 StateCollections {
2759 last_gc_req,
2760 rollups,
2761 active_rollup,
2762 active_gc,
2763 leased_readers,
2764 critical_readers,
2765 writers,
2766 schemas,
2767 trace,
2768 },
2769 } = self;
2770 let mut s = s.serialize_struct("State", 13)?;
2771 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2772 let () = s.serialize_field("shard_id", shard_id)?;
2773 let () = s.serialize_field("seqno", seqno)?;
2774 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2775 let () = s.serialize_field("hostname", hostname)?;
2776 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2777 let () = s.serialize_field("rollups", rollups)?;
2778 let () = s.serialize_field("active_rollup", active_rollup)?;
2779 let () = s.serialize_field("active_gc", active_gc)?;
2780 let () = s.serialize_field("leased_readers", leased_readers)?;
2781 let () = s.serialize_field("critical_readers", critical_readers)?;
2782 let () = s.serialize_field("writers", writers)?;
2783 let () = s.serialize_field("schemas", schemas)?;
2784 let () = s.serialize_field("since", &trace.since().elements())?;
2785 let () = s.serialize_field("upper", &trace.upper().elements())?;
2786 let trace = trace.flatten();
2787 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2788 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2789 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2790 let () = s.serialize_field("merges", &trace.merges)?;
2791 s.end()
2792 }
2793}
2794
2795#[derive(Debug, Default)]
2796pub struct StateSizeMetrics {
2797 pub hollow_batch_count: usize,
2798 pub batch_part_count: usize,
2799 pub rewrite_part_count: usize,
2800 pub num_updates: usize,
2801 pub largest_batch_bytes: usize,
2802 pub state_batches_bytes: usize,
2803 pub state_rollups_bytes: usize,
2804 pub state_rollup_count: usize,
2805 pub inline_part_count: usize,
2806 pub inline_part_bytes: usize,
2807}
2808
2809#[derive(Default)]
2810pub struct ExpiryMetrics {
2811 pub(crate) readers_expired: usize,
2812 pub(crate) writers_expired: usize,
2813}
2814
2815#[derive(Debug, Clone, PartialEq)]
2817pub struct Since<T>(pub Antichain<T>);
2818
2819#[derive(Debug, PartialEq)]
2821pub struct Upper<T>(pub Antichain<T>);
2822
2823#[cfg(test)]
2824pub(crate) mod tests {
2825 use std::ops::Range;
2826 use std::str::FromStr;
2827
2828 use bytes::Bytes;
2829 use mz_build_info::DUMMY_BUILD_INFO;
2830 use mz_dyncfg::ConfigUpdates;
2831 use mz_ore::now::SYSTEM_TIME;
2832 use mz_ore::{assert_none, assert_ok};
2833 use mz_proto::RustType;
2834 use proptest::prelude::*;
2835 use proptest::strategy::ValueTree;
2836
2837 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2838 use crate::PersistLocation;
2839 use crate::cache::PersistClientCache;
2840 use crate::internal::encoding::any_some_lazy_part_stats;
2841 use crate::internal::paths::RollupId;
2842 use crate::internal::trace::tests::any_trace;
2843 use crate::tests::new_test_client_cache;
2844
2845 use super::*;
2846
2847 const LEASE_DURATION_MS: u64 = 900 * 1000;
2848 fn debug_state() -> HandleDebugState {
2849 HandleDebugState {
2850 hostname: "debug".to_owned(),
2851 purpose: "finding the bugs".to_owned(),
2852 }
2853 }
2854
2855 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2856 num_runs: usize,
2857 ) -> impl Strategy<Value = HollowBatch<T>> {
2858 (
2859 any::<T>(),
2860 any::<T>(),
2861 any::<T>(),
2862 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2863 any::<usize>(),
2864 )
2865 .prop_map(move |(t0, t1, since, parts, len)| {
2866 let (lower, upper) = if t0 <= t1 {
2867 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2868 } else {
2869 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2870 };
2871 let since = Antichain::from_elem(since);
2872
2873 let run_splits = (1..num_runs)
2874 .map(|i| i * parts.len() / num_runs)
2875 .collect::<Vec<_>>();
2876
2877 let run_meta = (0..num_runs)
2878 .map(|_| {
2879 let mut meta = RunMeta::default();
2880 meta.id = Some(RunId::new());
2881 meta
2882 })
2883 .collect::<Vec<_>>();
2884
2885 HollowBatch::new(
2886 Description::new(lower, upper, since),
2887 parts,
2888 len % 10,
2889 run_meta,
2890 run_splits,
2891 )
2892 })
2893 }
2894
2895 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2896 Strategy::prop_map(
2897 (
2898 any::<T>(),
2899 any::<T>(),
2900 any::<T>(),
2901 proptest::collection::vec(any_run_part::<T>(), 0..20),
2902 any::<usize>(),
2903 0..=10usize,
2904 proptest::collection::vec(any::<RunId>(), 10),
2905 ),
2906 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2907 let (lower, upper) = if t0 <= t1 {
2908 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2909 } else {
2910 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2911 };
2912 let since = Antichain::from_elem(since);
2913 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2914 let run_splits = (1..num_runs)
2915 .map(|i| i * parts.len() / num_runs)
2916 .collect::<Vec<_>>();
2917
2918 let run_meta = (0..num_runs)
2919 .enumerate()
2920 .map(|(i, _)| {
2921 let mut meta = RunMeta::default();
2922 meta.id = Some(run_ids[i]);
2923 meta
2924 })
2925 .collect::<Vec<_>>();
2926
2927 HollowBatch::new(
2928 Description::new(lower, upper, since),
2929 parts,
2930 len % 10,
2931 run_meta,
2932 run_splits,
2933 )
2934 } else {
2935 HollowBatch::new_run_for_test(
2936 Description::new(lower, upper, since),
2937 parts,
2938 len % 10,
2939 run_ids[0],
2940 )
2941 }
2942 },
2943 )
2944 }
2945
2946 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2947 Strategy::prop_map(
2948 (
2949 any::<bool>(),
2950 any_hollow_batch_part(),
2951 any::<Option<T>>(),
2952 any::<Option<SchemaId>>(),
2953 any::<Option<SchemaId>>(),
2954 ),
2955 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2956 if is_hollow {
2957 BatchPart::Hollow(hollow)
2958 } else {
2959 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2960 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2961 BatchPart::Inline {
2962 updates,
2963 ts_rewrite,
2964 schema_id,
2965 deprecated_schema_id,
2966 }
2967 }
2968 },
2969 )
2970 }
2971
2972 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2973 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2974 }
2975
2976 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2977 -> impl Strategy<Value = HollowBatchPart<T>> {
2978 Strategy::prop_map(
2979 (
2980 any::<PartialBatchKey>(),
2981 any::<usize>(),
2982 any::<Vec<u8>>(),
2983 any_some_lazy_part_stats(),
2984 any::<Option<T>>(),
2985 any::<[u8; 8]>(),
2986 any::<Option<BatchColumnarFormat>>(),
2987 any::<Option<SchemaId>>(),
2988 any::<Option<SchemaId>>(),
2989 ),
2990 |(
2991 key,
2992 encoded_size_bytes,
2993 key_lower,
2994 stats,
2995 ts_rewrite,
2996 diffs_sum,
2997 format,
2998 schema_id,
2999 deprecated_schema_id,
3000 )| {
3001 HollowBatchPart {
3002 key,
3003 encoded_size_bytes,
3004 key_lower,
3005 structured_key_lower: None,
3006 stats,
3007 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3008 diffs_sum: Some(diffs_sum),
3009 format,
3010 schema_id,
3011 deprecated_schema_id,
3012 }
3013 },
3014 )
3015 }
3016
3017 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3018 Strategy::prop_map(
3019 (
3020 any::<SeqNo>(),
3021 any::<Option<T>>(),
3022 any::<u64>(),
3023 any::<u64>(),
3024 any::<HandleDebugState>(),
3025 ),
3026 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3027 if lease_duration_ms == 0 {
3031 lease_duration_ms += 1;
3032 }
3033 LeasedReaderState {
3034 seqno,
3035 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3036 last_heartbeat_timestamp_ms,
3037 lease_duration_ms,
3038 debug,
3039 }
3040 },
3041 )
3042 }
3043
3044 pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3045 {
3046 Strategy::prop_map(
3047 (
3048 any::<Option<T>>(),
3049 any::<OpaqueState>(),
3050 any::<String>(),
3051 any::<HandleDebugState>(),
3052 ),
3053 |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3054 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3055 opaque,
3056 opaque_codec,
3057 debug,
3058 },
3059 )
3060 }
3061
3062 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3063 Strategy::prop_map(
3064 (
3065 any::<u64>(),
3066 any::<u64>(),
3067 any::<IdempotencyToken>(),
3068 any::<Option<T>>(),
3069 any::<HandleDebugState>(),
3070 ),
3071 |(
3072 last_heartbeat_timestamp_ms,
3073 lease_duration_ms,
3074 most_recent_write_token,
3075 most_recent_write_upper,
3076 debug,
3077 )| WriterState {
3078 last_heartbeat_timestamp_ms,
3079 lease_duration_ms,
3080 most_recent_write_token,
3081 most_recent_write_upper: most_recent_write_upper
3082 .map_or_else(Antichain::new, Antichain::from_elem),
3083 debug,
3084 },
3085 )
3086 }
3087
3088 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3089 Strategy::prop_map(
3090 (
3091 any::<Vec<u8>>(),
3092 any::<Vec<u8>>(),
3093 any::<Vec<u8>>(),
3094 any::<Vec<u8>>(),
3095 ),
3096 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3097 key: Bytes::from(key),
3098 key_data_type: Bytes::from(key_data_type),
3099 val: Bytes::from(val),
3100 val_data_type: Bytes::from(val_data_type),
3101 },
3102 )
3103 }
3104
3105 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3106 num_trace_batches: Range<usize>,
3107 ) -> impl Strategy<Value = State<T>> {
3108 let part1 = (
3109 any::<ShardId>(),
3110 any::<SeqNo>(),
3111 any::<u64>(),
3112 any::<String>(),
3113 any::<SeqNo>(),
3114 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3115 proptest::option::of(any::<ActiveRollup>()),
3116 );
3117
3118 let part2 = (
3119 proptest::option::of(any::<ActiveGc>()),
3120 proptest::collection::btree_map(
3121 any::<LeasedReaderId>(),
3122 any_leased_reader_state::<T>(),
3123 1..3,
3124 ),
3125 proptest::collection::btree_map(
3126 any::<CriticalReaderId>(),
3127 any_critical_reader_state::<T>(),
3128 1..3,
3129 ),
3130 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3131 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3132 any_trace::<T>(num_trace_batches),
3133 );
3134
3135 (part1, part2).prop_map(
3136 |(
3137 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3138 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3139 )| State {
3140 applier_version: semver::Version::new(1, 2, 3),
3141 shard_id,
3142 seqno,
3143 walltime_ms,
3144 hostname,
3145 collections: StateCollections {
3146 last_gc_req,
3147 rollups,
3148 active_rollup,
3149 active_gc,
3150 leased_readers,
3151 critical_readers,
3152 writers,
3153 schemas,
3154 trace,
3155 },
3156 },
3157 )
3158 }
3159
3160 pub(crate) fn hollow<T: Timestamp>(
3161 lower: T,
3162 upper: T,
3163 keys: &[&str],
3164 len: usize,
3165 ) -> HollowBatch<T> {
3166 HollowBatch::new_run(
3167 Description::new(
3168 Antichain::from_elem(lower),
3169 Antichain::from_elem(upper),
3170 Antichain::from_elem(T::minimum()),
3171 ),
3172 keys.iter()
3173 .map(|x| {
3174 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3175 key: PartialBatchKey((*x).to_owned()),
3176 encoded_size_bytes: 0,
3177 key_lower: vec![],
3178 structured_key_lower: None,
3179 stats: None,
3180 ts_rewrite: None,
3181 diffs_sum: None,
3182 format: None,
3183 schema_id: None,
3184 deprecated_schema_id: None,
3185 }))
3186 })
3187 .collect(),
3188 len,
3189 )
3190 }
3191
3192 #[mz_ore::test]
3193 fn downgrade_since() {
3194 let mut state = TypedState::<(), (), u64, i64>::new(
3195 DUMMY_BUILD_INFO.semver_version(),
3196 ShardId::new(),
3197 "".to_owned(),
3198 0,
3199 );
3200 let reader = LeasedReaderId::new();
3201 let seqno = SeqNo::minimum();
3202 let now = SYSTEM_TIME.clone();
3203 let _ = state.collections.register_leased_reader(
3204 "",
3205 &reader,
3206 "",
3207 seqno,
3208 Duration::from_secs(10),
3209 now(),
3210 false,
3211 );
3212
3213 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3215
3216 assert_eq!(
3218 state.collections.downgrade_since(
3219 &reader,
3220 seqno,
3221 None,
3222 &Antichain::from_elem(2),
3223 now()
3224 ),
3225 Continue(Since(Antichain::from_elem(2)))
3226 );
3227 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3228 assert_eq!(
3230 state.collections.downgrade_since(
3231 &reader,
3232 seqno,
3233 None,
3234 &Antichain::from_elem(2),
3235 now()
3236 ),
3237 Continue(Since(Antichain::from_elem(2)))
3238 );
3239 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3240 assert_eq!(
3242 state.collections.downgrade_since(
3243 &reader,
3244 seqno,
3245 None,
3246 &Antichain::from_elem(1),
3247 now()
3248 ),
3249 Continue(Since(Antichain::from_elem(2)))
3250 );
3251 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3252
3253 let reader2 = LeasedReaderId::new();
3255 let _ = state.collections.register_leased_reader(
3256 "",
3257 &reader2,
3258 "",
3259 seqno,
3260 Duration::from_secs(10),
3261 now(),
3262 false,
3263 );
3264
3265 assert_eq!(
3267 state.collections.downgrade_since(
3268 &reader2,
3269 seqno,
3270 None,
3271 &Antichain::from_elem(3),
3272 now()
3273 ),
3274 Continue(Since(Antichain::from_elem(3)))
3275 );
3276 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3277 assert_eq!(
3279 state.collections.downgrade_since(
3280 &reader,
3281 seqno,
3282 None,
3283 &Antichain::from_elem(5),
3284 now()
3285 ),
3286 Continue(Since(Antichain::from_elem(5)))
3287 );
3288 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3289
3290 assert_eq!(
3292 state.collections.expire_leased_reader(&reader),
3293 Continue(true)
3294 );
3295 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3296
3297 let reader3 = LeasedReaderId::new();
3299 let _ = state.collections.register_leased_reader(
3300 "",
3301 &reader3,
3302 "",
3303 seqno,
3304 Duration::from_secs(10),
3305 now(),
3306 false,
3307 );
3308
3309 assert_eq!(
3311 state.collections.downgrade_since(
3312 &reader3,
3313 seqno,
3314 None,
3315 &Antichain::from_elem(10),
3316 now()
3317 ),
3318 Continue(Since(Antichain::from_elem(10)))
3319 );
3320 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3321
3322 assert_eq!(
3324 state.collections.expire_leased_reader(&reader2),
3325 Continue(true)
3326 );
3327 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3332
3333 assert_eq!(
3335 state.collections.expire_leased_reader(&reader3),
3336 Continue(true)
3337 );
3338 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3343 }
3344
3345 #[mz_ore::test]
3346 fn compare_and_downgrade_since() {
3347 let mut state = TypedState::<(), (), u64, i64>::new(
3348 DUMMY_BUILD_INFO.semver_version(),
3349 ShardId::new(),
3350 "".to_owned(),
3351 0,
3352 );
3353 let reader = CriticalReaderId::new();
3354 let _ = state
3355 .collections
3356 .register_critical_reader::<u64>("", &reader, "");
3357
3358 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3360 assert_eq!(
3362 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3363 u64::initial()
3364 );
3365
3366 assert_eq!(
3368 state.collections.compare_and_downgrade_since::<u64>(
3369 &reader,
3370 &u64::initial(),
3371 (&1, &Antichain::from_elem(2)),
3372 ),
3373 Continue(Ok(Since(Antichain::from_elem(2))))
3374 );
3375 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3376 assert_eq!(
3377 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3378 1
3379 );
3380 assert_eq!(
3382 state.collections.compare_and_downgrade_since::<u64>(
3383 &reader,
3384 &1,
3385 (&2, &Antichain::from_elem(2)),
3386 ),
3387 Continue(Ok(Since(Antichain::from_elem(2))))
3388 );
3389 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3390 assert_eq!(
3391 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3392 2
3393 );
3394 assert_eq!(
3396 state.collections.compare_and_downgrade_since::<u64>(
3397 &reader,
3398 &2,
3399 (&3, &Antichain::from_elem(1)),
3400 ),
3401 Continue(Ok(Since(Antichain::from_elem(2))))
3402 );
3403 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3404 assert_eq!(
3405 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3406 3
3407 );
3408 }
3409
3410 #[mz_ore::test]
3411 fn compare_and_append() {
3412 let state = &mut TypedState::<String, String, u64, i64>::new(
3413 DUMMY_BUILD_INFO.semver_version(),
3414 ShardId::new(),
3415 "".to_owned(),
3416 0,
3417 )
3418 .collections;
3419
3420 let writer_id = WriterId::new();
3421 let now = SYSTEM_TIME.clone();
3422
3423 assert_eq!(state.trace.num_spine_batches(), 0);
3425 assert_eq!(state.trace.num_hollow_batches(), 0);
3426 assert_eq!(state.trace.num_updates(), 0);
3427
3428 assert_eq!(
3430 state.compare_and_append(
3431 &hollow(1, 2, &["key1"], 1),
3432 &writer_id,
3433 now(),
3434 LEASE_DURATION_MS,
3435 &IdempotencyToken::new(),
3436 &debug_state(),
3437 0,
3438 100,
3439 None
3440 ),
3441 Break(CompareAndAppendBreak::Upper {
3442 shard_upper: Antichain::from_elem(0),
3443 writer_upper: Antichain::from_elem(0)
3444 })
3445 );
3446
3447 assert!(
3449 state
3450 .compare_and_append(
3451 &hollow(0, 5, &[], 0),
3452 &writer_id,
3453 now(),
3454 LEASE_DURATION_MS,
3455 &IdempotencyToken::new(),
3456 &debug_state(),
3457 0,
3458 100,
3459 None
3460 )
3461 .is_continue()
3462 );
3463
3464 assert_eq!(
3466 state.compare_and_append(
3467 &hollow(5, 4, &["key1"], 1),
3468 &writer_id,
3469 now(),
3470 LEASE_DURATION_MS,
3471 &IdempotencyToken::new(),
3472 &debug_state(),
3473 0,
3474 100,
3475 None
3476 ),
3477 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3478 lower: Antichain::from_elem(5),
3479 upper: Antichain::from_elem(4)
3480 }))
3481 );
3482
3483 assert_eq!(
3485 state.compare_and_append(
3486 &hollow(5, 5, &["key1"], 1),
3487 &writer_id,
3488 now(),
3489 LEASE_DURATION_MS,
3490 &IdempotencyToken::new(),
3491 &debug_state(),
3492 0,
3493 100,
3494 None
3495 ),
3496 Break(CompareAndAppendBreak::InvalidUsage(
3497 InvalidEmptyTimeInterval {
3498 lower: Antichain::from_elem(5),
3499 upper: Antichain::from_elem(5),
3500 keys: vec!["key1".to_owned()],
3501 }
3502 ))
3503 );
3504
3505 assert!(
3507 state
3508 .compare_and_append(
3509 &hollow(5, 5, &[], 0),
3510 &writer_id,
3511 now(),
3512 LEASE_DURATION_MS,
3513 &IdempotencyToken::new(),
3514 &debug_state(),
3515 0,
3516 100,
3517 None
3518 )
3519 .is_continue()
3520 );
3521 }
3522
3523 #[mz_ore::test]
3524 fn snapshot() {
3525 let now = SYSTEM_TIME.clone();
3526
3527 let mut state = TypedState::<String, String, u64, i64>::new(
3528 DUMMY_BUILD_INFO.semver_version(),
3529 ShardId::new(),
3530 "".to_owned(),
3531 0,
3532 );
3533 assert_eq!(
3535 state.snapshot(&Antichain::from_elem(0)),
3536 Err(SnapshotErr::AsOfNotYetAvailable(
3537 SeqNo(0),
3538 Upper(Antichain::from_elem(0))
3539 ))
3540 );
3541
3542 assert_eq!(
3544 state.snapshot(&Antichain::from_elem(5)),
3545 Err(SnapshotErr::AsOfNotYetAvailable(
3546 SeqNo(0),
3547 Upper(Antichain::from_elem(0))
3548 ))
3549 );
3550
3551 let writer_id = WriterId::new();
3552
3553 assert!(
3555 state
3556 .collections
3557 .compare_and_append(
3558 &hollow(0, 5, &["key1"], 1),
3559 &writer_id,
3560 now(),
3561 LEASE_DURATION_MS,
3562 &IdempotencyToken::new(),
3563 &debug_state(),
3564 0,
3565 100,
3566 None
3567 )
3568 .is_continue()
3569 );
3570
3571 assert_eq!(
3573 state.snapshot(&Antichain::from_elem(0)),
3574 Ok(vec![hollow(0, 5, &["key1"], 1)])
3575 );
3576
3577 assert_eq!(
3579 state.snapshot(&Antichain::from_elem(4)),
3580 Ok(vec![hollow(0, 5, &["key1"], 1)])
3581 );
3582
3583 assert_eq!(
3585 state.snapshot(&Antichain::from_elem(5)),
3586 Err(SnapshotErr::AsOfNotYetAvailable(
3587 SeqNo(0),
3588 Upper(Antichain::from_elem(5))
3589 ))
3590 );
3591 assert_eq!(
3592 state.snapshot(&Antichain::from_elem(6)),
3593 Err(SnapshotErr::AsOfNotYetAvailable(
3594 SeqNo(0),
3595 Upper(Antichain::from_elem(5))
3596 ))
3597 );
3598
3599 let reader = LeasedReaderId::new();
3600 let _ = state.collections.register_leased_reader(
3602 "",
3603 &reader,
3604 "",
3605 SeqNo::minimum(),
3606 Duration::from_secs(10),
3607 now(),
3608 false,
3609 );
3610 assert_eq!(
3611 state.collections.downgrade_since(
3612 &reader,
3613 SeqNo::minimum(),
3614 None,
3615 &Antichain::from_elem(2),
3616 now()
3617 ),
3618 Continue(Since(Antichain::from_elem(2)))
3619 );
3620 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3621 assert_eq!(
3623 state.snapshot(&Antichain::from_elem(1)),
3624 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3625 Antichain::from_elem(2)
3626 )))
3627 );
3628
3629 assert!(
3631 state
3632 .collections
3633 .compare_and_append(
3634 &hollow(5, 10, &[], 0),
3635 &writer_id,
3636 now(),
3637 LEASE_DURATION_MS,
3638 &IdempotencyToken::new(),
3639 &debug_state(),
3640 0,
3641 100,
3642 None
3643 )
3644 .is_continue()
3645 );
3646
3647 assert_eq!(
3649 state.snapshot(&Antichain::from_elem(7)),
3650 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3651 );
3652
3653 assert_eq!(
3655 state.snapshot(&Antichain::from_elem(10)),
3656 Err(SnapshotErr::AsOfNotYetAvailable(
3657 SeqNo(0),
3658 Upper(Antichain::from_elem(10))
3659 ))
3660 );
3661
3662 assert!(
3664 state
3665 .collections
3666 .compare_and_append(
3667 &hollow(10, 15, &["key2"], 1),
3668 &writer_id,
3669 now(),
3670 LEASE_DURATION_MS,
3671 &IdempotencyToken::new(),
3672 &debug_state(),
3673 0,
3674 100,
3675 None
3676 )
3677 .is_continue()
3678 );
3679
3680 assert_eq!(
3683 state.snapshot(&Antichain::from_elem(9)),
3684 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3685 );
3686
3687 assert_eq!(
3689 state.snapshot(&Antichain::from_elem(10)),
3690 Ok(vec![
3691 hollow(0, 5, &["key1"], 1),
3692 hollow(5, 10, &[], 0),
3693 hollow(10, 15, &["key2"], 1)
3694 ])
3695 );
3696
3697 assert_eq!(
3698 state.snapshot(&Antichain::from_elem(11)),
3699 Ok(vec![
3700 hollow(0, 5, &["key1"], 1),
3701 hollow(5, 10, &[], 0),
3702 hollow(10, 15, &["key2"], 1)
3703 ])
3704 );
3705 }
3706
3707 #[mz_ore::test]
3708 fn next_listen_batch() {
3709 let mut state = TypedState::<String, String, u64, i64>::new(
3710 DUMMY_BUILD_INFO.semver_version(),
3711 ShardId::new(),
3712 "".to_owned(),
3713 0,
3714 );
3715
3716 assert_eq!(
3719 state.next_listen_batch(&Antichain::from_elem(0)),
3720 Err(SeqNo(0))
3721 );
3722 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3723
3724 let writer_id = WriterId::new();
3725 let now = SYSTEM_TIME.clone();
3726
3727 assert!(
3729 state
3730 .collections
3731 .compare_and_append(
3732 &hollow(0, 5, &["key1"], 1),
3733 &writer_id,
3734 now(),
3735 LEASE_DURATION_MS,
3736 &IdempotencyToken::new(),
3737 &debug_state(),
3738 0,
3739 100,
3740 None
3741 )
3742 .is_continue()
3743 );
3744 assert!(
3745 state
3746 .collections
3747 .compare_and_append(
3748 &hollow(5, 10, &["key2"], 1),
3749 &writer_id,
3750 now(),
3751 LEASE_DURATION_MS,
3752 &IdempotencyToken::new(),
3753 &debug_state(),
3754 0,
3755 100,
3756 None
3757 )
3758 .is_continue()
3759 );
3760
3761 for t in 0..=4 {
3763 assert_eq!(
3764 state.next_listen_batch(&Antichain::from_elem(t)),
3765 Ok(hollow(0, 5, &["key1"], 1))
3766 );
3767 }
3768
3769 for t in 5..=9 {
3771 assert_eq!(
3772 state.next_listen_batch(&Antichain::from_elem(t)),
3773 Ok(hollow(5, 10, &["key2"], 1))
3774 );
3775 }
3776
3777 assert_eq!(
3779 state.next_listen_batch(&Antichain::from_elem(10)),
3780 Err(SeqNo(0))
3781 );
3782
3783 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3786 }
3787
3788 #[mz_ore::test]
3789 fn expire_writer() {
3790 let mut state = TypedState::<String, String, u64, i64>::new(
3791 DUMMY_BUILD_INFO.semver_version(),
3792 ShardId::new(),
3793 "".to_owned(),
3794 0,
3795 );
3796 let now = SYSTEM_TIME.clone();
3797
3798 let writer_id_one = WriterId::new();
3799
3800 let writer_id_two = WriterId::new();
3801
3802 assert!(
3804 state
3805 .collections
3806 .compare_and_append(
3807 &hollow(0, 2, &["key1"], 1),
3808 &writer_id_one,
3809 now(),
3810 LEASE_DURATION_MS,
3811 &IdempotencyToken::new(),
3812 &debug_state(),
3813 0,
3814 100,
3815 None
3816 )
3817 .is_continue()
3818 );
3819
3820 assert!(
3821 state
3822 .collections
3823 .expire_writer(&writer_id_one)
3824 .is_continue()
3825 );
3826
3827 assert!(
3829 state
3830 .collections
3831 .compare_and_append(
3832 &hollow(2, 5, &["key2"], 1),
3833 &writer_id_two,
3834 now(),
3835 LEASE_DURATION_MS,
3836 &IdempotencyToken::new(),
3837 &debug_state(),
3838 0,
3839 100,
3840 None
3841 )
3842 .is_continue()
3843 );
3844 }
3845
3846 #[mz_ore::test]
3847 fn maybe_gc_active_gc() {
3848 const GC_CONFIG: GcConfig = GcConfig {
3849 use_active_gc: true,
3850 fallback_threshold_ms: 5000,
3851 min_versions: 99,
3852 max_versions: 500,
3853 };
3854 let now_fn = SYSTEM_TIME.clone();
3855
3856 let mut state = TypedState::<String, String, u64, i64>::new(
3857 DUMMY_BUILD_INFO.semver_version(),
3858 ShardId::new(),
3859 "".to_owned(),
3860 0,
3861 );
3862
3863 let now = now_fn();
3864 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3866 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3867
3868 state.seqno = SeqNo(100);
3871 assert_eq!(state.seqno_since(), SeqNo(100));
3872
3873 let writer_id = WriterId::new();
3875 let _ = state.collections.compare_and_append(
3876 &hollow(1, 2, &["key1"], 1),
3877 &writer_id,
3878 now,
3879 LEASE_DURATION_MS,
3880 &IdempotencyToken::new(),
3881 &debug_state(),
3882 0,
3883 100,
3884 None,
3885 );
3886 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3887
3888 assert_eq!(
3890 state.maybe_gc(true, now, GC_CONFIG),
3891 Some(GcReq {
3892 shard_id: state.shard_id,
3893 new_seqno_since: SeqNo(100)
3894 })
3895 );
3896
3897 state.collections.active_gc = Some(ActiveGc {
3899 seqno: state.seqno,
3900 start_ms: now,
3901 });
3902
3903 state.seqno = SeqNo(200);
3904 assert_eq!(state.seqno_since(), SeqNo(200));
3905
3906 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3907
3908 state.seqno = SeqNo(300);
3909 assert_eq!(state.seqno_since(), SeqNo(300));
3910 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3912 assert_eq!(
3913 state.maybe_gc(true, new_now, GC_CONFIG),
3914 Some(GcReq {
3915 shard_id: state.shard_id,
3916 new_seqno_since: SeqNo(300)
3917 })
3918 );
3919
3920 state.seqno = SeqNo(301);
3924 assert_eq!(state.seqno_since(), SeqNo(301));
3925 assert_eq!(
3926 state.maybe_gc(true, new_now, GC_CONFIG),
3927 Some(GcReq {
3928 shard_id: state.shard_id,
3929 new_seqno_since: SeqNo(301)
3930 })
3931 );
3932
3933 state.collections.active_gc = None;
3934
3935 state.seqno = SeqNo(400);
3938 assert_eq!(state.seqno_since(), SeqNo(400));
3939
3940 let now = now_fn();
3941
3942 let _ = state.collections.expire_writer(&writer_id);
3944 assert_eq!(
3945 state.maybe_gc(false, now, GC_CONFIG),
3946 Some(GcReq {
3947 shard_id: state.shard_id,
3948 new_seqno_since: SeqNo(400)
3949 })
3950 );
3951
3952 let previous_seqno = state.seqno;
3954 state.seqno = SeqNo(10_000);
3955 assert_eq!(state.seqno_since(), SeqNo(10_000));
3956
3957 let now = now_fn();
3958 assert_eq!(
3959 state.maybe_gc(true, now, GC_CONFIG),
3960 Some(GcReq {
3961 shard_id: state.shard_id,
3962 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3963 })
3964 );
3965 }
3966
3967 #[mz_ore::test]
3968 fn maybe_gc_classic() {
3969 const GC_CONFIG: GcConfig = GcConfig {
3970 use_active_gc: false,
3971 fallback_threshold_ms: 5000,
3972 min_versions: 16,
3973 max_versions: 128,
3974 };
3975 const NOW_MS: u64 = 0;
3976
3977 let mut state = TypedState::<String, String, u64, i64>::new(
3978 DUMMY_BUILD_INFO.semver_version(),
3979 ShardId::new(),
3980 "".to_owned(),
3981 0,
3982 );
3983
3984 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3986 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3987
3988 state.seqno = SeqNo(100);
3991 assert_eq!(state.seqno_since(), SeqNo(100));
3992
3993 let writer_id = WriterId::new();
3995 let now = SYSTEM_TIME.clone();
3996 let _ = state.collections.compare_and_append(
3997 &hollow(1, 2, &["key1"], 1),
3998 &writer_id,
3999 now(),
4000 LEASE_DURATION_MS,
4001 &IdempotencyToken::new(),
4002 &debug_state(),
4003 0,
4004 100,
4005 None,
4006 );
4007 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4008
4009 assert_eq!(
4011 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4012 Some(GcReq {
4013 shard_id: state.shard_id,
4014 new_seqno_since: SeqNo(100)
4015 })
4016 );
4017
4018 state.seqno = SeqNo(200);
4021 assert_eq!(state.seqno_since(), SeqNo(200));
4022
4023 let _ = state.collections.expire_writer(&writer_id);
4025 assert_eq!(
4026 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4027 Some(GcReq {
4028 shard_id: state.shard_id,
4029 new_seqno_since: SeqNo(200)
4030 })
4031 );
4032 }
4033
4034 #[mz_ore::test]
4035 fn need_rollup_active_rollup() {
4036 const ROLLUP_THRESHOLD: usize = 3;
4037 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4038 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4039 let now = SYSTEM_TIME.clone();
4040
4041 mz_ore::test::init_logging();
4042 let mut state = TypedState::<String, String, u64, i64>::new(
4043 DUMMY_BUILD_INFO.semver_version(),
4044 ShardId::new(),
4045 "".to_owned(),
4046 0,
4047 );
4048
4049 let rollup_seqno = SeqNo(5);
4050 let rollup = HollowRollup {
4051 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4052 encoded_size_bytes: None,
4053 };
4054
4055 assert!(
4056 state
4057 .collections
4058 .add_rollup((rollup_seqno, &rollup))
4059 .is_continue()
4060 );
4061
4062 state.seqno = SeqNo(5);
4064 assert_none!(state.need_rollup(
4065 ROLLUP_THRESHOLD,
4066 ROLLUP_USE_ACTIVE_ROLLUP,
4067 ROLLUP_FALLBACK_THRESHOLD_MS,
4068 now()
4069 ));
4070
4071 state.seqno = SeqNo(6);
4073 assert_none!(state.need_rollup(
4074 ROLLUP_THRESHOLD,
4075 ROLLUP_USE_ACTIVE_ROLLUP,
4076 ROLLUP_FALLBACK_THRESHOLD_MS,
4077 now()
4078 ));
4079 state.seqno = SeqNo(7);
4080 assert_none!(state.need_rollup(
4081 ROLLUP_THRESHOLD,
4082 ROLLUP_USE_ACTIVE_ROLLUP,
4083 ROLLUP_FALLBACK_THRESHOLD_MS,
4084 now()
4085 ));
4086 state.seqno = SeqNo(8);
4087 assert_none!(state.need_rollup(
4088 ROLLUP_THRESHOLD,
4089 ROLLUP_USE_ACTIVE_ROLLUP,
4090 ROLLUP_FALLBACK_THRESHOLD_MS,
4091 now()
4092 ));
4093
4094 let mut current_time = now();
4095 state.seqno = SeqNo(9);
4097 assert_eq!(
4098 state
4099 .need_rollup(
4100 ROLLUP_THRESHOLD,
4101 ROLLUP_USE_ACTIVE_ROLLUP,
4102 ROLLUP_FALLBACK_THRESHOLD_MS,
4103 current_time
4104 )
4105 .expect("rollup"),
4106 SeqNo(9)
4107 );
4108
4109 state.collections.active_rollup = Some(ActiveRollup {
4110 seqno: SeqNo(9),
4111 start_ms: current_time,
4112 });
4113
4114 assert_none!(state.need_rollup(
4116 ROLLUP_THRESHOLD,
4117 ROLLUP_USE_ACTIVE_ROLLUP,
4118 ROLLUP_FALLBACK_THRESHOLD_MS,
4119 current_time
4120 ));
4121
4122 state.seqno = SeqNo(10);
4123 assert_none!(state.need_rollup(
4126 ROLLUP_THRESHOLD,
4127 ROLLUP_USE_ACTIVE_ROLLUP,
4128 ROLLUP_FALLBACK_THRESHOLD_MS,
4129 current_time
4130 ));
4131
4132 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4134 assert_eq!(
4135 state
4136 .need_rollup(
4137 ROLLUP_THRESHOLD,
4138 ROLLUP_USE_ACTIVE_ROLLUP,
4139 ROLLUP_FALLBACK_THRESHOLD_MS,
4140 current_time
4141 )
4142 .expect("rollup"),
4143 SeqNo(10)
4144 );
4145
4146 state.seqno = SeqNo(9);
4147 state.collections.active_rollup = None;
4149 let rollup_seqno = SeqNo(9);
4150 let rollup = HollowRollup {
4151 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4152 encoded_size_bytes: None,
4153 };
4154 assert!(
4155 state
4156 .collections
4157 .add_rollup((rollup_seqno, &rollup))
4158 .is_continue()
4159 );
4160
4161 state.seqno = SeqNo(11);
4162 assert_none!(state.need_rollup(
4164 ROLLUP_THRESHOLD,
4165 ROLLUP_USE_ACTIVE_ROLLUP,
4166 ROLLUP_FALLBACK_THRESHOLD_MS,
4167 current_time
4168 ));
4169 state.seqno = SeqNo(13);
4171 assert_eq!(
4172 state
4173 .need_rollup(
4174 ROLLUP_THRESHOLD,
4175 ROLLUP_USE_ACTIVE_ROLLUP,
4176 ROLLUP_FALLBACK_THRESHOLD_MS,
4177 current_time
4178 )
4179 .expect("rollup"),
4180 SeqNo(13)
4181 );
4182 }
4183
4184 #[mz_ore::test]
4185 fn need_rollup_classic() {
4186 const ROLLUP_THRESHOLD: usize = 3;
4187 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4188 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4189 const NOW: u64 = 0;
4190
4191 mz_ore::test::init_logging();
4192 let mut state = TypedState::<String, String, u64, i64>::new(
4193 DUMMY_BUILD_INFO.semver_version(),
4194 ShardId::new(),
4195 "".to_owned(),
4196 0,
4197 );
4198
4199 let rollup_seqno = SeqNo(5);
4200 let rollup = HollowRollup {
4201 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4202 encoded_size_bytes: None,
4203 };
4204
4205 assert!(
4206 state
4207 .collections
4208 .add_rollup((rollup_seqno, &rollup))
4209 .is_continue()
4210 );
4211
4212 state.seqno = SeqNo(5);
4214 assert_none!(state.need_rollup(
4215 ROLLUP_THRESHOLD,
4216 ROLLUP_USE_ACTIVE_ROLLUP,
4217 ROLLUP_FALLBACK_THRESHOLD_MS,
4218 NOW
4219 ));
4220
4221 state.seqno = SeqNo(6);
4223 assert_none!(state.need_rollup(
4224 ROLLUP_THRESHOLD,
4225 ROLLUP_USE_ACTIVE_ROLLUP,
4226 ROLLUP_FALLBACK_THRESHOLD_MS,
4227 NOW
4228 ));
4229 state.seqno = SeqNo(7);
4230 assert_none!(state.need_rollup(
4231 ROLLUP_THRESHOLD,
4232 ROLLUP_USE_ACTIVE_ROLLUP,
4233 ROLLUP_FALLBACK_THRESHOLD_MS,
4234 NOW
4235 ));
4236
4237 state.seqno = SeqNo(8);
4239 assert_eq!(
4240 state
4241 .need_rollup(
4242 ROLLUP_THRESHOLD,
4243 ROLLUP_USE_ACTIVE_ROLLUP,
4244 ROLLUP_FALLBACK_THRESHOLD_MS,
4245 NOW
4246 )
4247 .expect("rollup"),
4248 SeqNo(8)
4249 );
4250
4251 state.seqno = SeqNo(9);
4253 assert_none!(state.need_rollup(
4254 ROLLUP_THRESHOLD,
4255 ROLLUP_USE_ACTIVE_ROLLUP,
4256 ROLLUP_FALLBACK_THRESHOLD_MS,
4257 NOW
4258 ));
4259
4260 state.seqno = SeqNo(11);
4262 assert_eq!(
4263 state
4264 .need_rollup(
4265 ROLLUP_THRESHOLD,
4266 ROLLUP_USE_ACTIVE_ROLLUP,
4267 ROLLUP_FALLBACK_THRESHOLD_MS,
4268 NOW
4269 )
4270 .expect("rollup"),
4271 SeqNo(11)
4272 );
4273
4274 let rollup_seqno = SeqNo(6);
4276 let rollup = HollowRollup {
4277 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4278 encoded_size_bytes: None,
4279 };
4280 assert!(
4281 state
4282 .collections
4283 .add_rollup((rollup_seqno, &rollup))
4284 .is_continue()
4285 );
4286
4287 state.seqno = SeqNo(8);
4288 assert_none!(state.need_rollup(
4289 ROLLUP_THRESHOLD,
4290 ROLLUP_USE_ACTIVE_ROLLUP,
4291 ROLLUP_FALLBACK_THRESHOLD_MS,
4292 NOW
4293 ));
4294 state.seqno = SeqNo(9);
4295 assert_eq!(
4296 state
4297 .need_rollup(
4298 ROLLUP_THRESHOLD,
4299 ROLLUP_USE_ACTIVE_ROLLUP,
4300 ROLLUP_FALLBACK_THRESHOLD_MS,
4301 NOW
4302 )
4303 .expect("rollup"),
4304 SeqNo(9)
4305 );
4306
4307 let fallback_seqno = SeqNo(
4309 rollup_seqno.0
4310 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4311 );
4312 state.seqno = fallback_seqno;
4313 assert_eq!(
4314 state
4315 .need_rollup(
4316 ROLLUP_THRESHOLD,
4317 ROLLUP_USE_ACTIVE_ROLLUP,
4318 ROLLUP_FALLBACK_THRESHOLD_MS,
4319 NOW
4320 )
4321 .expect("rollup"),
4322 fallback_seqno
4323 );
4324 state.seqno = fallback_seqno.next();
4325 assert_eq!(
4326 state
4327 .need_rollup(
4328 ROLLUP_THRESHOLD,
4329 ROLLUP_USE_ACTIVE_ROLLUP,
4330 ROLLUP_FALLBACK_THRESHOLD_MS,
4331 NOW
4332 )
4333 .expect("rollup"),
4334 fallback_seqno.next()
4335 );
4336 }
4337
4338 #[mz_ore::test]
4339 fn idempotency_token_sentinel() {
4340 assert_eq!(
4341 IdempotencyToken::SENTINEL.to_string(),
4342 "i11111111-1111-1111-1111-111111111111"
4343 );
4344 }
4345
4346 #[mz_ore::test]
4355 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4357 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4358 let mut runner = proptest::test_runner::TestRunner::deterministic();
4359 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4360 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4361 assert_eq!(
4362 json.trim(),
4363 STATE_SERDE_JSON.trim(),
4364 "\n\nNEW GOLDEN\n{}\n",
4365 json
4366 );
4367 }
4368
4369 #[mz_persist_proc::test(tokio::test)]
4370 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4372 let mut clients = new_test_client_cache(&dyncfgs);
4373 let shard_id = ShardId::new();
4374
4375 async fn open_and_write(
4376 clients: &mut PersistClientCache,
4377 version: semver::Version,
4378 shard_id: ShardId,
4379 ) -> Result<(), tokio::task::JoinError> {
4380 clients.cfg.build_version = version.clone();
4381 clients.clear_state_cache();
4382 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4383 mz_ore::task::spawn(|| version.to_string(), async move {
4385 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4386 let current = *write.upper().as_option().unwrap();
4387 write
4389 .expect_compare_and_append_batch(&mut [], current, current + 1)
4390 .await;
4391 })
4392 .await
4393 }
4394
4395 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4397 assert_ok!(res);
4398
4399 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4401 assert_ok!(res);
4402
4403 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4405 assert_ok!(res);
4406
4407 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4409 assert!(res.unwrap_err().is_panic());
4410 }
4411
4412 #[mz_ore::test]
4413 fn runid_roundtrip() {
4414 proptest!(|(runid: RunId)| {
4415 let runid_str = runid.to_string();
4416 let parsed = RunId::from_str(&runid_str);
4417 prop_assert_eq!(parsed, Ok(runid));
4418 });
4419 }
4420}