1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Semigroup;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
60use crate::internal::gc::GcReq;
61use crate::internal::machine::retry_external;
62use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
63use crate::internal::trace::{
64 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
65};
66use crate::metrics::Metrics;
67use crate::read::LeasedReaderId;
68use crate::schema::CaESchema;
69use crate::write::WriterId;
70use crate::{PersistConfig, ShardId};
71
72include!(concat!(
73 env!("OUT_DIR"),
74 "/mz_persist_client.internal.state.rs"
75));
76
77include!(concat!(
78 env!("OUT_DIR"),
79 "/mz_persist_client.internal.diff.rs"
80));
81
82pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
90 "persist_rollup_threshold",
91 128,
92 "The number of seqnos between rollups.",
93);
94
95pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
98 "persist_rollup_fallback_threshold_ms",
99 5000,
100 "The number of milliseconds before a worker claims an already claimed rollup.",
101);
102
103pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
106 "persist_rollup_use_active_rollup",
107 false,
108 "Whether to use the new active rollup tracking mechanism.",
109);
110
111pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
114 "persist_gc_fallback_threshold_ms",
115 900000,
116 "The number of milliseconds before a worker claims an already claimed GC.",
117);
118
119pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
121 "persist_gc_min_versions",
122 32,
123 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
124);
125
126pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
128 "persist_gc_max_versions",
129 128_000,
130 "The maximum number of versions to GC in a single GC run.",
131);
132
133pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
136 "persist_gc_use_active_gc",
137 false,
138 "Whether to use the new active GC tracking mechanism.",
139);
140
141pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
142 "persist_enable_incremental_compaction",
143 false,
144 "Whether to enable incremental compaction.",
145);
146
147#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
150#[serde(into = "String")]
151pub struct IdempotencyToken(pub(crate) [u8; 16]);
152
153impl std::fmt::Display for IdempotencyToken {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 write!(f, "i{}", Uuid::from_bytes(self.0))
156 }
157}
158
159impl std::fmt::Debug for IdempotencyToken {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
162 }
163}
164
165impl std::str::FromStr for IdempotencyToken {
166 type Err = String;
167
168 fn from_str(s: &str) -> Result<Self, Self::Err> {
169 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
170 }
171}
172
173impl From<IdempotencyToken> for String {
174 fn from(x: IdempotencyToken) -> Self {
175 x.to_string()
176 }
177}
178
179impl IdempotencyToken {
180 pub(crate) fn new() -> Self {
181 IdempotencyToken(*Uuid::new_v4().as_bytes())
182 }
183 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
184}
185
186#[derive(Clone, Debug, PartialEq, Serialize)]
187pub struct LeasedReaderState<T> {
188 pub seqno: SeqNo,
190 pub since: Antichain<T>,
192 pub last_heartbeat_timestamp_ms: u64,
194 pub lease_duration_ms: u64,
197 pub debug: HandleDebugState,
199}
200
201#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
202#[serde(into = "u64")]
203pub struct OpaqueState(pub [u8; 8]);
204
205impl From<OpaqueState> for u64 {
206 fn from(value: OpaqueState) -> Self {
207 u64::from_le_bytes(value.0)
208 }
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize)]
212pub struct CriticalReaderState<T> {
213 pub since: Antichain<T>,
215 pub opaque: OpaqueState,
217 pub opaque_codec: String,
219 pub debug: HandleDebugState,
221}
222
223#[derive(Clone, Debug, PartialEq, Serialize)]
224pub struct WriterState<T> {
225 pub last_heartbeat_timestamp_ms: u64,
227 pub lease_duration_ms: u64,
230 pub most_recent_write_token: IdempotencyToken,
233 pub most_recent_write_upper: Antichain<T>,
236 pub debug: HandleDebugState,
238}
239
240#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
242pub struct HandleDebugState {
243 pub hostname: String,
246 pub purpose: String,
248}
249
250#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
254#[serde(tag = "type")]
255pub enum BatchPart<T> {
256 Hollow(HollowBatchPart<T>),
257 Inline {
258 updates: LazyInlineBatchPart,
259 ts_rewrite: Option<Antichain<T>>,
260 schema_id: Option<SchemaId>,
261
262 deprecated_schema_id: Option<SchemaId>,
264 },
265}
266
267fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
268 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
269 let proto = lower.decode()?;
270 let data = ArrayData::from_proto(proto)?;
271 ensure!(data.len() == 1);
272 Ok(ArrayBound::new(make_array(data), 0))
273 };
274
275 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
276
277 match decoded {
278 Ok(bound) => Some(bound),
279 Err(e) => {
280 soft_panic_or_log!("failed to decode bound: {e:#?}");
281 None
282 }
283 }
284}
285
286impl<T> BatchPart<T> {
287 pub fn hollow_bytes(&self) -> usize {
288 match self {
289 BatchPart::Hollow(x) => x.encoded_size_bytes,
290 BatchPart::Inline { .. } => 0,
291 }
292 }
293
294 pub fn is_inline(&self) -> bool {
295 matches!(self, BatchPart::Inline { .. })
296 }
297
298 pub fn inline_bytes(&self) -> usize {
299 match self {
300 BatchPart::Hollow(_) => 0,
301 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
302 }
303 }
304
305 pub fn writer_key(&self) -> Option<WriterKey> {
306 match self {
307 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
308 BatchPart::Inline { .. } => None,
309 }
310 }
311
312 pub fn encoded_size_bytes(&self) -> usize {
313 match self {
314 BatchPart::Hollow(x) => x.encoded_size_bytes,
315 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
316 }
317 }
318
319 pub fn printable_name(&self) -> &str {
322 match self {
323 BatchPart::Hollow(x) => x.key.0.as_str(),
324 BatchPart::Inline { .. } => "<inline>",
325 }
326 }
327
328 pub fn stats(&self) -> Option<&LazyPartStats> {
329 match self {
330 BatchPart::Hollow(x) => x.stats.as_ref(),
331 BatchPart::Inline { .. } => None,
332 }
333 }
334
335 pub fn key_lower(&self) -> &[u8] {
336 match self {
337 BatchPart::Hollow(x) => x.key_lower.as_slice(),
338 BatchPart::Inline { .. } => &[],
345 }
346 }
347
348 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
349 let part = match self {
350 BatchPart::Hollow(part) => part,
351 BatchPart::Inline { .. } => return None,
352 };
353
354 decode_structured_lower(part.structured_key_lower.as_ref()?)
355 }
356
357 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
358 match self {
359 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
360 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
361 }
362 }
363
364 pub fn schema_id(&self) -> Option<SchemaId> {
365 match self {
366 BatchPart::Hollow(x) => x.schema_id,
367 BatchPart::Inline { schema_id, .. } => *schema_id,
368 }
369 }
370
371 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
372 match self {
373 BatchPart::Hollow(x) => x.deprecated_schema_id,
374 BatchPart::Inline {
375 deprecated_schema_id,
376 ..
377 } => *deprecated_schema_id,
378 }
379 }
380}
381
382impl<T: Timestamp + Codec64> BatchPart<T> {
383 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
384 match self {
385 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
386 BatchPart::Inline { updates, .. } => {
387 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
388 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
389 }
390 }
391 }
392
393 pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
394 match self {
395 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
396 BatchPart::Inline { updates, .. } => updates
397 .decode::<T>(metrics)
398 .expect("valid inline part")
399 .updates
400 .diffs_sum(),
401 }
402 }
403}
404
405#[derive(Debug, Clone)]
407pub struct HollowRun<T> {
408 pub(crate) parts: Vec<RunPart<T>>,
410}
411
412#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
415pub struct HollowRunRef<T> {
416 pub key: PartialBatchKey,
417
418 pub hollow_bytes: usize,
420
421 pub max_part_bytes: usize,
423
424 pub key_lower: Vec<u8>,
426
427 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
429
430 pub diffs_sum: Option<[u8; 8]>,
431
432 pub(crate) _phantom_data: PhantomData<T>,
433}
434impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
435 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
436 Some(self.cmp(other))
437 }
438}
439
440impl<T: Eq> Ord for HollowRunRef<T> {
441 fn cmp(&self, other: &Self) -> Ordering {
442 self.key.cmp(&other.key)
443 }
444}
445
446impl<T> HollowRunRef<T> {
447 pub fn writer_key(&self) -> Option<WriterKey> {
448 Some(self.key.split()?.0)
449 }
450}
451
452impl<T: Timestamp + Codec64> HollowRunRef<T> {
453 pub async fn set<D: Codec64 + Semigroup>(
455 shard_id: ShardId,
456 blob: &dyn Blob,
457 writer: &WriterKey,
458 data: HollowRun<T>,
459 metrics: &Metrics,
460 ) -> Self {
461 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
462 let max_part_bytes = data
463 .parts
464 .iter()
465 .map(|p| p.max_part_bytes())
466 .max()
467 .unwrap_or(0);
468 let key_lower = data
469 .parts
470 .first()
471 .map_or(vec![], |p| p.key_lower().to_vec());
472 let structured_key_lower = match data.parts.first() {
473 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
474 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
475 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
476 };
477 let diffs_sum = data
478 .parts
479 .iter()
480 .map(|p| {
481 p.diffs_sum::<D>(&metrics.columnar)
482 .expect("valid diffs sum")
483 })
484 .reduce(|mut a, b| {
485 a.plus_equals(&b);
486 a
487 })
488 .expect("valid diffs sum")
489 .encode();
490
491 let key = PartialBatchKey::new(writer, &PartId::new());
492 let blob_key = key.complete(&shard_id);
493 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
494 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
495 blob.set(&blob_key, bytes.clone())
496 })
497 .await;
498 Self {
499 key,
500 hollow_bytes,
501 max_part_bytes,
502 key_lower,
503 structured_key_lower,
504 diffs_sum: Some(diffs_sum),
505 _phantom_data: Default::default(),
506 }
507 }
508
509 pub async fn get(
513 &self,
514 shard_id: ShardId,
515 blob: &dyn Blob,
516 metrics: &Metrics,
517 ) -> Option<HollowRun<T>> {
518 let blob_key = self.key.complete(&shard_id);
519 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
520 blob.get(&blob_key)
521 })
522 .await?;
523 let proto_runs: ProtoHollowRun =
524 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
525 let runs = proto_runs
526 .into_rust()
527 .expect("illegal state: invalid encoded runs proto");
528 Some(runs)
529 }
530}
531
532#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
536#[serde(untagged)]
537pub enum RunPart<T> {
538 Single(BatchPart<T>),
539 Many(HollowRunRef<T>),
540}
541
542impl<T: Ord> PartialOrd<Self> for RunPart<T> {
543 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
544 Some(self.cmp(other))
545 }
546}
547
548impl<T: Ord> Ord for RunPart<T> {
549 fn cmp(&self, other: &Self) -> Ordering {
550 match (self, other) {
551 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
552 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
553 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
554 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
555 }
556 }
557}
558
559impl<T> RunPart<T> {
560 #[cfg(test)]
561 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
562 match self {
563 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
564 _ => panic!("expected hollow part!"),
565 }
566 }
567
568 pub fn hollow_bytes(&self) -> usize {
569 match self {
570 Self::Single(p) => p.hollow_bytes(),
571 Self::Many(r) => r.hollow_bytes,
572 }
573 }
574
575 pub fn is_inline(&self) -> bool {
576 match self {
577 Self::Single(p) => p.is_inline(),
578 Self::Many(_) => false,
579 }
580 }
581
582 pub fn inline_bytes(&self) -> usize {
583 match self {
584 Self::Single(p) => p.inline_bytes(),
585 Self::Many(_) => 0,
586 }
587 }
588
589 pub fn max_part_bytes(&self) -> usize {
590 match self {
591 Self::Single(p) => p.encoded_size_bytes(),
592 Self::Many(r) => r.max_part_bytes,
593 }
594 }
595
596 pub fn writer_key(&self) -> Option<WriterKey> {
597 match self {
598 Self::Single(p) => p.writer_key(),
599 Self::Many(r) => r.writer_key(),
600 }
601 }
602
603 pub fn encoded_size_bytes(&self) -> usize {
604 match self {
605 Self::Single(p) => p.encoded_size_bytes(),
606 Self::Many(r) => r.hollow_bytes,
607 }
608 }
609
610 pub fn schema_id(&self) -> Option<SchemaId> {
611 match self {
612 Self::Single(p) => p.schema_id(),
613 Self::Many(_) => None,
614 }
615 }
616
617 pub fn printable_name(&self) -> &str {
620 match self {
621 Self::Single(p) => p.printable_name(),
622 Self::Many(r) => r.key.0.as_str(),
623 }
624 }
625
626 pub fn stats(&self) -> Option<&LazyPartStats> {
627 match self {
628 Self::Single(p) => p.stats(),
629 Self::Many(_) => None,
631 }
632 }
633
634 pub fn key_lower(&self) -> &[u8] {
635 match self {
636 Self::Single(p) => p.key_lower(),
637 Self::Many(r) => r.key_lower.as_slice(),
638 }
639 }
640
641 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
642 match self {
643 Self::Single(p) => p.structured_key_lower(),
644 Self::Many(_) => None,
645 }
646 }
647
648 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
649 match self {
650 Self::Single(p) => p.ts_rewrite(),
651 Self::Many(_) => None,
652 }
653 }
654}
655
656impl<T> RunPart<T>
657where
658 T: Timestamp + Codec64,
659{
660 pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
661 match self {
662 Self::Single(p) => p.diffs_sum(metrics),
663 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
664 }
665 }
666}
667
668#[derive(Clone, Debug)]
670pub struct MissingBlob(BlobKey);
671
672impl std::fmt::Display for MissingBlob {
673 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
674 write!(f, "unexpectedly missing key: {}", self.0)
675 }
676}
677
678impl std::error::Error for MissingBlob {}
679
680impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
681 pub fn part_stream<'a>(
682 &'a self,
683 shard_id: ShardId,
684 blob: &'a dyn Blob,
685 metrics: &'a Metrics,
686 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
687 try_stream! {
688 match self {
689 RunPart::Single(p) => {
690 yield Cow::Borrowed(p);
691 }
692 RunPart::Many(r) => {
693 let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
694 for run_part in fetched.parts {
695 for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
696 yield Cow::Owned(batch_part?.into_owned());
697 }
698 }
699 }
700 }
701 }
702 }
703}
704
705impl<T: Ord> PartialOrd for BatchPart<T> {
706 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
707 Some(self.cmp(other))
708 }
709}
710
711impl<T: Ord> Ord for BatchPart<T> {
712 fn cmp(&self, other: &Self) -> Ordering {
713 match (self, other) {
714 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
715 (
716 BatchPart::Inline {
717 updates: s_updates,
718 ts_rewrite: s_ts_rewrite,
719 schema_id: s_schema_id,
720 deprecated_schema_id: s_deprecated_schema_id,
721 },
722 BatchPart::Inline {
723 updates: o_updates,
724 ts_rewrite: o_ts_rewrite,
725 schema_id: o_schema_id,
726 deprecated_schema_id: o_deprecated_schema_id,
727 },
728 ) => (
729 s_updates,
730 s_ts_rewrite.as_ref().map(|x| x.elements()),
731 s_schema_id,
732 s_deprecated_schema_id,
733 )
734 .cmp(&(
735 o_updates,
736 o_ts_rewrite.as_ref().map(|x| x.elements()),
737 o_schema_id,
738 o_deprecated_schema_id,
739 )),
740 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
741 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
742 }
743 }
744}
745
746#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
748pub(crate) enum RunOrder {
749 Unordered,
751 Codec,
753 Structured,
755}
756
757#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
758pub struct RunId(pub(crate) [u8; 16]);
759
760impl std::fmt::Display for RunId {
761 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
762 write!(f, "ri{}", Uuid::from_bytes(self.0))
763 }
764}
765
766impl std::fmt::Debug for RunId {
767 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
768 write!(f, "RunId({})", Uuid::from_bytes(self.0))
769 }
770}
771
772impl std::str::FromStr for RunId {
773 type Err = String;
774
775 fn from_str(s: &str) -> Result<Self, Self::Err> {
776 parse_id("ri", "RunId", s).map(RunId)
777 }
778}
779
780impl From<RunId> for String {
781 fn from(x: RunId) -> Self {
782 x.to_string()
783 }
784}
785
786impl RunId {
787 pub(crate) fn new() -> Self {
788 RunId(*Uuid::new_v4().as_bytes())
789 }
790}
791
792impl Arbitrary for RunId {
793 type Parameters = ();
794 type Strategy = proptest::strategy::BoxedStrategy<Self>;
795 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
796 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
797 RunId(*Uuid::from_u128(n).as_bytes())
798 })
799 .boxed()
800 }
801}
802
803#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
805pub struct RunMeta {
806 pub(crate) order: Option<RunOrder>,
808 pub(crate) schema: Option<SchemaId>,
810
811 pub(crate) deprecated_schema: Option<SchemaId>,
813
814 pub(crate) id: Option<RunId>,
816
817 pub(crate) len: Option<usize>,
819}
820
821#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
823pub struct HollowBatchPart<T> {
824 pub key: PartialBatchKey,
826 pub encoded_size_bytes: usize,
828 #[serde(serialize_with = "serialize_part_bytes")]
831 pub key_lower: Vec<u8>,
832 #[serde(serialize_with = "serialize_lazy_proto")]
834 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
835 #[serde(serialize_with = "serialize_part_stats")]
837 pub stats: Option<LazyPartStats>,
838 pub ts_rewrite: Option<Antichain<T>>,
846 #[serde(serialize_with = "serialize_diffs_sum")]
854 pub diffs_sum: Option<[u8; 8]>,
855 pub format: Option<BatchColumnarFormat>,
860 pub schema_id: Option<SchemaId>,
865
866 pub deprecated_schema_id: Option<SchemaId>,
868}
869
870#[derive(Clone, PartialEq, Eq)]
874pub struct HollowBatch<T> {
875 pub desc: Description<T>,
877 pub len: usize,
879 pub(crate) parts: Vec<RunPart<T>>,
881 pub(crate) run_splits: Vec<usize>,
889 pub(crate) run_meta: Vec<RunMeta>,
892}
893
894impl<T: Debug> Debug for HollowBatch<T> {
895 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
896 let HollowBatch {
897 desc,
898 parts,
899 len,
900 run_splits: runs,
901 run_meta,
902 } = self;
903 f.debug_struct("HollowBatch")
904 .field(
905 "desc",
906 &(
907 desc.lower().elements(),
908 desc.upper().elements(),
909 desc.since().elements(),
910 ),
911 )
912 .field("parts", &parts)
913 .field("len", &len)
914 .field("runs", &runs)
915 .field("run_meta", &run_meta)
916 .finish()
917 }
918}
919
920impl<T: Serialize> serde::Serialize for HollowBatch<T> {
921 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
922 let HollowBatch {
923 desc,
924 len,
925 parts: _,
927 run_splits: _,
928 run_meta: _,
929 } = self;
930 let mut s = s.serialize_struct("HollowBatch", 5)?;
931 let () = s.serialize_field("lower", &desc.lower().elements())?;
932 let () = s.serialize_field("upper", &desc.upper().elements())?;
933 let () = s.serialize_field("since", &desc.since().elements())?;
934 let () = s.serialize_field("len", len)?;
935 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
936 s.end()
937 }
938}
939
940impl<T: Ord> PartialOrd for HollowBatch<T> {
941 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
942 Some(self.cmp(other))
943 }
944}
945
946impl<T: Ord> Ord for HollowBatch<T> {
947 fn cmp(&self, other: &Self) -> Ordering {
948 let HollowBatch {
951 desc: self_desc,
952 parts: self_parts,
953 len: self_len,
954 run_splits: self_runs,
955 run_meta: self_run_meta,
956 } = self;
957 let HollowBatch {
958 desc: other_desc,
959 parts: other_parts,
960 len: other_len,
961 run_splits: other_runs,
962 run_meta: other_run_meta,
963 } = other;
964 (
965 self_desc.lower().elements(),
966 self_desc.upper().elements(),
967 self_desc.since().elements(),
968 self_parts,
969 self_len,
970 self_runs,
971 self_run_meta,
972 )
973 .cmp(&(
974 other_desc.lower().elements(),
975 other_desc.upper().elements(),
976 other_desc.since().elements(),
977 other_parts,
978 other_len,
979 other_runs,
980 other_run_meta,
981 ))
982 }
983}
984
985impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
986 pub(crate) fn part_stream<'a>(
987 &'a self,
988 shard_id: ShardId,
989 blob: &'a dyn Blob,
990 metrics: &'a Metrics,
991 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
992 stream! {
993 for part in &self.parts {
994 for await part in part.part_stream(shard_id, blob, metrics) {
995 yield part;
996 }
997 }
998 }
999 }
1000}
1001impl<T> HollowBatch<T> {
1002 pub(crate) fn new(
1009 desc: Description<T>,
1010 parts: Vec<RunPart<T>>,
1011 len: usize,
1012 run_meta: Vec<RunMeta>,
1013 run_splits: Vec<usize>,
1014 ) -> Self {
1015 debug_assert!(
1016 run_splits.is_strictly_sorted(),
1017 "run indices should be strictly increasing"
1018 );
1019 debug_assert!(
1020 run_splits.first().map_or(true, |i| *i > 0),
1021 "run indices should be positive"
1022 );
1023 debug_assert!(
1024 run_splits.last().map_or(true, |i| *i < parts.len()),
1025 "run indices should be valid indices into parts"
1026 );
1027 debug_assert!(
1028 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1029 "all metadata should correspond to a run"
1030 );
1031
1032 Self {
1033 desc,
1034 len,
1035 parts,
1036 run_splits,
1037 run_meta,
1038 }
1039 }
1040
1041 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1043 let run_meta = if parts.is_empty() {
1044 vec![]
1045 } else {
1046 vec![RunMeta::default()]
1047 };
1048 Self {
1049 desc,
1050 len,
1051 parts,
1052 run_splits: vec![],
1053 run_meta,
1054 }
1055 }
1056
1057 #[cfg(test)]
1058 pub(crate) fn new_run_for_test(
1059 desc: Description<T>,
1060 parts: Vec<RunPart<T>>,
1061 len: usize,
1062 run_id: RunId,
1063 ) -> Self {
1064 let run_meta = if parts.is_empty() {
1065 vec![]
1066 } else {
1067 let mut meta = RunMeta::default();
1068 meta.id = Some(run_id);
1069 vec![meta]
1070 };
1071 Self {
1072 desc,
1073 len,
1074 parts,
1075 run_splits: vec![],
1076 run_meta,
1077 }
1078 }
1079
1080 pub(crate) fn empty(desc: Description<T>) -> Self {
1082 Self {
1083 desc,
1084 len: 0,
1085 parts: vec![],
1086 run_splits: vec![],
1087 run_meta: vec![],
1088 }
1089 }
1090
1091 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1092 let run_ends = self
1093 .run_splits
1094 .iter()
1095 .copied()
1096 .chain(std::iter::once(self.parts.len()));
1097 let run_metas = self.run_meta.iter();
1098 let run_parts = run_ends
1099 .scan(0, |start, end| {
1100 let range = *start..end;
1101 *start = end;
1102 Some(range)
1103 })
1104 .filter(|range| !range.is_empty())
1105 .map(|range| &self.parts[range]);
1106 run_metas.zip_eq(run_parts)
1107 }
1108
1109 pub(crate) fn inline_bytes(&self) -> usize {
1110 self.parts.iter().map(|x| x.inline_bytes()).sum()
1111 }
1112
1113 pub(crate) fn is_empty(&self) -> bool {
1114 self.parts.is_empty()
1115 }
1116
1117 pub(crate) fn part_count(&self) -> usize {
1118 self.parts.len()
1119 }
1120
1121 pub fn encoded_size_bytes(&self) -> usize {
1123 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1124 }
1125}
1126
1127impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1129 pub(crate) fn rewrite_ts(
1130 &mut self,
1131 frontier: &Antichain<T>,
1132 new_upper: Antichain<T>,
1133 ) -> Result<(), String> {
1134 if !PartialOrder::less_than(frontier, &new_upper) {
1135 return Err(format!(
1136 "rewrite frontier {:?} !< rewrite upper {:?}",
1137 frontier.elements(),
1138 new_upper.elements(),
1139 ));
1140 }
1141 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1142 return Err(format!(
1143 "rewrite upper {:?} < batch upper {:?}",
1144 new_upper.elements(),
1145 self.desc.upper().elements(),
1146 ));
1147 }
1148
1149 if PartialOrder::less_than(frontier, self.desc.lower()) {
1152 return Err(format!(
1153 "rewrite frontier {:?} < batch lower {:?}",
1154 frontier.elements(),
1155 self.desc.lower().elements(),
1156 ));
1157 }
1158 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1159 return Err(format!(
1160 "batch since {:?} != minimum antichain {:?}",
1161 self.desc.since().elements(),
1162 &[T::minimum()],
1163 ));
1164 }
1165 for part in self.parts.iter() {
1166 let Some(ts_rewrite) = part.ts_rewrite() else {
1167 continue;
1168 };
1169 if PartialOrder::less_than(frontier, ts_rewrite) {
1170 return Err(format!(
1171 "rewrite frontier {:?} < batch rewrite {:?}",
1172 frontier.elements(),
1173 ts_rewrite.elements(),
1174 ));
1175 }
1176 }
1177
1178 self.desc = Description::new(
1179 self.desc.lower().clone(),
1180 new_upper,
1181 self.desc.since().clone(),
1182 );
1183 for part in &mut self.parts {
1184 match part {
1185 RunPart::Single(BatchPart::Hollow(part)) => {
1186 part.ts_rewrite = Some(frontier.clone())
1187 }
1188 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1189 *ts_rewrite = Some(frontier.clone())
1190 }
1191 RunPart::Many(runs) => {
1192 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1195 }
1196 }
1197 }
1198 Ok(())
1199 }
1200}
1201
1202impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1203 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1204 Some(self.cmp(other))
1205 }
1206}
1207
1208impl<T: Ord> Ord for HollowBatchPart<T> {
1209 fn cmp(&self, other: &Self) -> Ordering {
1210 let HollowBatchPart {
1213 key: self_key,
1214 encoded_size_bytes: self_encoded_size_bytes,
1215 key_lower: self_key_lower,
1216 structured_key_lower: self_structured_key_lower,
1217 stats: self_stats,
1218 ts_rewrite: self_ts_rewrite,
1219 diffs_sum: self_diffs_sum,
1220 format: self_format,
1221 schema_id: self_schema_id,
1222 deprecated_schema_id: self_deprecated_schema_id,
1223 } = self;
1224 let HollowBatchPart {
1225 key: other_key,
1226 encoded_size_bytes: other_encoded_size_bytes,
1227 key_lower: other_key_lower,
1228 structured_key_lower: other_structured_key_lower,
1229 stats: other_stats,
1230 ts_rewrite: other_ts_rewrite,
1231 diffs_sum: other_diffs_sum,
1232 format: other_format,
1233 schema_id: other_schema_id,
1234 deprecated_schema_id: other_deprecated_schema_id,
1235 } = other;
1236 (
1237 self_key,
1238 self_encoded_size_bytes,
1239 self_key_lower,
1240 self_structured_key_lower,
1241 self_stats,
1242 self_ts_rewrite.as_ref().map(|x| x.elements()),
1243 self_diffs_sum,
1244 self_format,
1245 self_schema_id,
1246 self_deprecated_schema_id,
1247 )
1248 .cmp(&(
1249 other_key,
1250 other_encoded_size_bytes,
1251 other_key_lower,
1252 other_structured_key_lower,
1253 other_stats,
1254 other_ts_rewrite.as_ref().map(|x| x.elements()),
1255 other_diffs_sum,
1256 other_format,
1257 other_schema_id,
1258 other_deprecated_schema_id,
1259 ))
1260 }
1261}
1262
1263#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1265pub struct HollowRollup {
1266 pub key: PartialRollupKey,
1268 pub encoded_size_bytes: Option<usize>,
1270}
1271
1272#[derive(Debug)]
1274pub enum HollowBlobRef<'a, T> {
1275 Batch(&'a HollowBatch<T>),
1276 Rollup(&'a HollowRollup),
1277}
1278
1279#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1281pub struct ActiveRollup {
1282 pub seqno: SeqNo,
1283 pub start_ms: u64,
1284}
1285
1286#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1288pub struct ActiveGc {
1289 pub seqno: SeqNo,
1290 pub start_ms: u64,
1291}
1292
1293#[derive(Debug)]
1298#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1299pub struct NoOpStateTransition<T>(pub T);
1300
1301#[derive(Debug, Clone)]
1303#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1304pub struct StateCollections<T> {
1305 pub(crate) last_gc_req: SeqNo,
1308
1309 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1311
1312 pub(crate) active_rollup: Option<ActiveRollup>,
1314 pub(crate) active_gc: Option<ActiveGc>,
1316
1317 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1318 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1319 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1320 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1321
1322 pub(crate) trace: Trace<T>,
1327}
1328
1329#[derive(Debug, Clone, Serialize, PartialEq)]
1345pub struct EncodedSchemas {
1346 pub key: Bytes,
1348 pub key_data_type: Bytes,
1351 pub val: Bytes,
1353 pub val_data_type: Bytes,
1356}
1357
1358impl EncodedSchemas {
1359 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1360 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1361 DataType::from_proto(proto).expect("valid DataType")
1362 }
1363}
1364
1365#[derive(Debug)]
1366#[cfg_attr(test, derive(PartialEq))]
1367pub enum CompareAndAppendBreak<T> {
1368 AlreadyCommitted,
1369 Upper {
1370 shard_upper: Antichain<T>,
1371 writer_upper: Antichain<T>,
1372 },
1373 InvalidUsage(InvalidUsage<T>),
1374 InlineBackpressure,
1375}
1376
1377#[derive(Debug)]
1378#[cfg_attr(test, derive(PartialEq))]
1379pub enum SnapshotErr<T> {
1380 AsOfNotYetAvailable(SeqNo, Upper<T>),
1381 AsOfHistoricalDistinctionsLost(Since<T>),
1382}
1383
1384impl<T> StateCollections<T>
1385where
1386 T: Timestamp + Lattice + Codec64,
1387{
1388 pub fn add_rollup(
1389 &mut self,
1390 add_rollup: (SeqNo, &HollowRollup),
1391 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1392 let (rollup_seqno, rollup) = add_rollup;
1393 let applied = match self.rollups.get(&rollup_seqno) {
1394 Some(x) => x.key == rollup.key,
1395 None => {
1396 self.active_rollup = None;
1397 self.rollups.insert(rollup_seqno, rollup.to_owned());
1398 true
1399 }
1400 };
1401 Continue(applied)
1405 }
1406
1407 pub fn remove_rollups(
1408 &mut self,
1409 remove_rollups: &[(SeqNo, PartialRollupKey)],
1410 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1411 if remove_rollups.is_empty() || self.is_tombstone() {
1412 return Break(NoOpStateTransition(vec![]));
1413 }
1414
1415 self.active_gc = None;
1418
1419 let mut removed = vec![];
1420 for (seqno, key) in remove_rollups {
1421 let removed_key = self.rollups.remove(seqno);
1422 debug_assert!(
1423 removed_key.as_ref().map_or(true, |x| &x.key == key),
1424 "{} vs {:?}",
1425 key,
1426 removed_key
1427 );
1428
1429 if removed_key.is_some() {
1430 removed.push(*seqno);
1431 }
1432 }
1433
1434 Continue(removed)
1435 }
1436
1437 pub fn register_leased_reader(
1438 &mut self,
1439 hostname: &str,
1440 reader_id: &LeasedReaderId,
1441 purpose: &str,
1442 seqno: SeqNo,
1443 lease_duration: Duration,
1444 heartbeat_timestamp_ms: u64,
1445 use_critical_since: bool,
1446 ) -> ControlFlow<
1447 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1448 (LeasedReaderState<T>, SeqNo),
1449 > {
1450 let since = if use_critical_since {
1451 self.critical_since()
1452 .unwrap_or_else(|| self.trace.since().clone())
1453 } else {
1454 self.trace.since().clone()
1455 };
1456 let reader_state = LeasedReaderState {
1457 debug: HandleDebugState {
1458 hostname: hostname.to_owned(),
1459 purpose: purpose.to_owned(),
1460 },
1461 seqno,
1462 since,
1463 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1464 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1465 .expect("lease duration as millis should fit within u64"),
1466 };
1467
1468 if self.is_tombstone() {
1473 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1474 }
1475
1476 self.leased_readers
1478 .insert(reader_id.clone(), reader_state.clone());
1479 Continue((reader_state, self.seqno_since(seqno)))
1480 }
1481
1482 pub fn register_critical_reader<O: Opaque + Codec64>(
1483 &mut self,
1484 hostname: &str,
1485 reader_id: &CriticalReaderId,
1486 purpose: &str,
1487 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1488 let state = CriticalReaderState {
1489 debug: HandleDebugState {
1490 hostname: hostname.to_owned(),
1491 purpose: purpose.to_owned(),
1492 },
1493 since: self.trace.since().clone(),
1494 opaque: OpaqueState(Codec64::encode(&O::initial())),
1495 opaque_codec: O::codec_name(),
1496 };
1497
1498 if self.is_tombstone() {
1503 return Break(NoOpStateTransition(state));
1504 }
1505
1506 let state = match self.critical_readers.get_mut(reader_id) {
1507 Some(existing_state) => {
1508 existing_state.debug = state.debug;
1509 existing_state.clone()
1510 }
1511 None => {
1512 self.critical_readers
1513 .insert(reader_id.clone(), state.clone());
1514 state
1515 }
1516 };
1517 Continue(state)
1518 }
1519
1520 pub fn register_schema<K: Codec, V: Codec>(
1521 &mut self,
1522 key_schema: &K::Schema,
1523 val_schema: &V::Schema,
1524 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1525 fn encode_data_type(data_type: &DataType) -> Bytes {
1526 let proto = data_type.into_proto();
1527 prost::Message::encode_to_vec(&proto).into()
1528 }
1529
1530 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1542 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1543 });
1544 match existing_id {
1545 Some((schema_id, _)) => {
1546 Break(NoOpStateTransition(Some(*schema_id)))
1551 }
1552 None if self.is_tombstone() => {
1553 Break(NoOpStateTransition(None))
1555 }
1556 None if self.schemas.is_empty() => {
1557 let id = SchemaId(self.schemas.len());
1561 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1562 .expect("valid key schema");
1563 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1564 .expect("valid val schema");
1565 let prev = self.schemas.insert(
1566 id,
1567 EncodedSchemas {
1568 key: K::encode_schema(key_schema),
1569 key_data_type: encode_data_type(&key_data_type),
1570 val: V::encode_schema(val_schema),
1571 val_data_type: encode_data_type(&val_data_type),
1572 },
1573 );
1574 assert_eq!(prev, None);
1575 Continue(Some(id))
1576 }
1577 None => {
1578 info!(
1579 "register_schemas got {:?} expected {:?}",
1580 key_schema,
1581 self.schemas
1582 .iter()
1583 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1584 .collect::<Vec<_>>()
1585 );
1586 Break(NoOpStateTransition(None))
1589 }
1590 }
1591 }
1592
1593 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1594 &mut self,
1595 expected: SchemaId,
1596 key_schema: &K::Schema,
1597 val_schema: &V::Schema,
1598 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1599 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1600 let array = Schema::encoder(schema).expect("valid schema").finish();
1604 Array::data_type(&array).clone()
1605 }
1606
1607 let (current_id, current) = self
1608 .schemas
1609 .last_key_value()
1610 .expect("all shards have a schema");
1611 if *current_id != expected {
1612 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1613 schema_id: *current_id,
1614 key: K::decode_schema(¤t.key),
1615 val: V::decode_schema(¤t.val),
1616 }));
1617 }
1618
1619 let current_key = K::decode_schema(¤t.key);
1620 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1621 let current_val = V::decode_schema(¤t.val);
1622 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1623
1624 let key_dt = data_type(key_schema);
1625 let val_dt = data_type(val_schema);
1626
1627 if current_key == *key_schema
1629 && current_key_dt == key_dt
1630 && current_val == *val_schema
1631 && current_val_dt == val_dt
1632 {
1633 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1634 }
1635
1636 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1637 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1638 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1639 return Break(NoOpStateTransition(CaESchema::Incompatible));
1640 };
1641 if key_fn.contains_drop() || val_fn.contains_drop() {
1645 return Break(NoOpStateTransition(CaESchema::Incompatible));
1646 }
1647
1648 let id = SchemaId(self.schemas.len());
1652 self.schemas.insert(
1653 id,
1654 EncodedSchemas {
1655 key: K::encode_schema(key_schema),
1656 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1657 val: V::encode_schema(val_schema),
1658 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1659 },
1660 );
1661 Continue(CaESchema::Ok(id))
1662 }
1663
1664 pub fn compare_and_append(
1665 &mut self,
1666 batch: &HollowBatch<T>,
1667 writer_id: &WriterId,
1668 heartbeat_timestamp_ms: u64,
1669 lease_duration_ms: u64,
1670 idempotency_token: &IdempotencyToken,
1671 debug_info: &HandleDebugState,
1672 inline_writes_total_max_bytes: usize,
1673 claim_compaction_percent: usize,
1674 claim_compaction_min_version: Option<&Version>,
1675 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1676 if self.is_tombstone() {
1681 assert_eq!(self.trace.upper(), &Antichain::new());
1682 return Break(CompareAndAppendBreak::Upper {
1683 shard_upper: Antichain::new(),
1684 writer_upper: Antichain::new(),
1689 });
1690 }
1691
1692 let writer_state = self
1693 .writers
1694 .entry(writer_id.clone())
1695 .or_insert_with(|| WriterState {
1696 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1697 lease_duration_ms,
1698 most_recent_write_token: IdempotencyToken::SENTINEL,
1699 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1700 debug: debug_info.clone(),
1701 });
1702
1703 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1704 return Break(CompareAndAppendBreak::InvalidUsage(
1705 InvalidUsage::InvalidBounds {
1706 lower: batch.desc.lower().clone(),
1707 upper: batch.desc.upper().clone(),
1708 },
1709 ));
1710 }
1711
1712 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1715 return Break(CompareAndAppendBreak::InvalidUsage(
1716 InvalidUsage::InvalidEmptyTimeInterval {
1717 lower: batch.desc.lower().clone(),
1718 upper: batch.desc.upper().clone(),
1719 keys: batch
1720 .parts
1721 .iter()
1722 .map(|x| x.printable_name().to_owned())
1723 .collect(),
1724 },
1725 ));
1726 }
1727
1728 if idempotency_token == &writer_state.most_recent_write_token {
1729 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1734 assert!(
1735 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1736 "{:?} vs {:?}",
1737 batch.desc.upper(),
1738 self.trace.upper()
1739 );
1740 return Break(CompareAndAppendBreak::AlreadyCommitted);
1741 }
1742
1743 let shard_upper = self.trace.upper();
1744 if shard_upper != batch.desc.lower() {
1745 return Break(CompareAndAppendBreak::Upper {
1746 shard_upper: shard_upper.clone(),
1747 writer_upper: writer_state.most_recent_write_upper.clone(),
1748 });
1749 }
1750
1751 let new_inline_bytes = batch.inline_bytes();
1752 if new_inline_bytes > 0 {
1753 let mut existing_inline_bytes = 0;
1754 self.trace
1755 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1756 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1760 return Break(CompareAndAppendBreak::InlineBackpressure);
1761 }
1762 }
1763
1764 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1765 self.trace.push_batch(batch.clone())
1766 } else {
1767 Vec::new()
1768 };
1769
1770 let all_empty_reqs = merge_reqs
1773 .iter()
1774 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1775 if all_empty_reqs && !batch.is_empty() {
1776 let mut reqs_to_take = claim_compaction_percent / 100;
1777 if (usize::cast_from(idempotency_token.hashed()) % 100)
1778 < (claim_compaction_percent % 100)
1779 {
1780 reqs_to_take += 1;
1781 }
1782 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1783 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1784 merge_reqs.extend(
1785 self.trace
1788 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1789 .take(reqs_to_take),
1790 )
1791 }
1792
1793 for req in &merge_reqs {
1794 self.trace.claim_compaction(
1795 req.id,
1796 ActiveCompaction {
1797 start_ms: heartbeat_timestamp_ms,
1798 },
1799 )
1800 }
1801
1802 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1803 writer_state.most_recent_write_token = idempotency_token.clone();
1804 assert!(
1806 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1807 "{:?} vs {:?}",
1808 &writer_state.most_recent_write_upper,
1809 batch.desc.upper()
1810 );
1811 writer_state
1812 .most_recent_write_upper
1813 .clone_from(batch.desc.upper());
1814
1815 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1817 heartbeat_timestamp_ms,
1818 writer_state.last_heartbeat_timestamp_ms,
1819 );
1820
1821 Continue(merge_reqs)
1822 }
1823
1824 pub fn apply_merge_res<D: Codec64 + Semigroup + PartialEq>(
1825 &mut self,
1826 res: &FueledMergeRes<T>,
1827 metrics: &ColumnarMetrics,
1828 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1829 if self.is_tombstone() {
1834 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1835 }
1836
1837 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1838 Continue(apply_merge_result)
1839 }
1840
1841 pub fn apply_merge_res_classic<D: Codec64 + Semigroup + PartialEq>(
1842 &mut self,
1843 res: &FueledMergeRes<T>,
1844 metrics: &ColumnarMetrics,
1845 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1846 if self.is_tombstone() {
1851 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1852 }
1853
1854 let apply_merge_result = self
1855 .trace
1856 .apply_merge_res_checked_classic::<D>(res, metrics);
1857 Continue(apply_merge_result)
1858 }
1859
1860 pub fn spine_exert(
1861 &mut self,
1862 fuel: usize,
1863 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1864 let (merge_reqs, did_work) = self.trace.exert(fuel);
1865 if did_work {
1866 Continue(merge_reqs)
1867 } else {
1868 assert!(merge_reqs.is_empty());
1869 Break(NoOpStateTransition(Vec::new()))
1872 }
1873 }
1874
1875 pub fn downgrade_since(
1876 &mut self,
1877 reader_id: &LeasedReaderId,
1878 seqno: SeqNo,
1879 outstanding_seqno: Option<SeqNo>,
1880 new_since: &Antichain<T>,
1881 heartbeat_timestamp_ms: u64,
1882 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1883 if self.is_tombstone() {
1888 return Break(NoOpStateTransition(Since(Antichain::new())));
1889 }
1890
1891 let Some(reader_state) = self.leased_reader(reader_id) else {
1894 tracing::warn!(
1895 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1896 );
1897 return Break(NoOpStateTransition(Since(Antichain::new())));
1898 };
1899
1900 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1903 heartbeat_timestamp_ms,
1904 reader_state.last_heartbeat_timestamp_ms,
1905 );
1906
1907 let seqno = match outstanding_seqno {
1908 Some(outstanding_seqno) => {
1909 assert!(
1910 outstanding_seqno >= reader_state.seqno,
1911 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1912 is behind current reader_state ({:?})",
1913 outstanding_seqno,
1914 reader_state.seqno,
1915 );
1916 std::cmp::min(outstanding_seqno, seqno)
1917 }
1918 None => seqno,
1919 };
1920
1921 reader_state.seqno = seqno;
1922
1923 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1924 reader_state.since.clone_from(new_since);
1925 self.update_since();
1926 new_since.clone()
1927 } else {
1928 reader_state.since.clone()
1931 };
1932
1933 Continue(Since(reader_current_since))
1934 }
1935
1936 pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1937 &mut self,
1938 reader_id: &CriticalReaderId,
1939 expected_opaque: &O,
1940 (new_opaque, new_since): (&O, &Antichain<T>),
1941 ) -> ControlFlow<
1942 NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1943 Result<Since<T>, (O, Since<T>)>,
1944 > {
1945 if self.is_tombstone() {
1950 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1954 }
1955
1956 let reader_state = self.critical_reader(reader_id);
1957 assert_eq!(reader_state.opaque_codec, O::codec_name());
1958
1959 if &O::decode(reader_state.opaque.0) != expected_opaque {
1960 return Continue(Err((
1963 Codec64::decode(reader_state.opaque.0),
1964 Since(reader_state.since.clone()),
1965 )));
1966 }
1967
1968 reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1969 if PartialOrder::less_equal(&reader_state.since, new_since) {
1970 reader_state.since.clone_from(new_since);
1971 self.update_since();
1972 Continue(Ok(Since(new_since.clone())))
1973 } else {
1974 Continue(Ok(Since(reader_state.since.clone())))
1978 }
1979 }
1980
1981 pub fn heartbeat_leased_reader(
1982 &mut self,
1983 reader_id: &LeasedReaderId,
1984 heartbeat_timestamp_ms: u64,
1985 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1986 if self.is_tombstone() {
1991 return Break(NoOpStateTransition(false));
1992 }
1993
1994 match self.leased_readers.get_mut(reader_id) {
1995 Some(reader_state) => {
1996 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1997 heartbeat_timestamp_ms,
1998 reader_state.last_heartbeat_timestamp_ms,
1999 );
2000 Continue(true)
2001 }
2002 None => Continue(false),
2005 }
2006 }
2007
2008 pub fn expire_leased_reader(
2009 &mut self,
2010 reader_id: &LeasedReaderId,
2011 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2012 if self.is_tombstone() {
2017 return Break(NoOpStateTransition(false));
2018 }
2019
2020 let existed = self.leased_readers.remove(reader_id).is_some();
2021 if existed {
2022 }
2036 Continue(existed)
2039 }
2040
2041 pub fn expire_critical_reader(
2042 &mut self,
2043 reader_id: &CriticalReaderId,
2044 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2045 if self.is_tombstone() {
2050 return Break(NoOpStateTransition(false));
2051 }
2052
2053 let existed = self.critical_readers.remove(reader_id).is_some();
2054 if existed {
2055 }
2069 Continue(existed)
2073 }
2074
2075 pub fn expire_writer(
2076 &mut self,
2077 writer_id: &WriterId,
2078 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2079 if self.is_tombstone() {
2084 return Break(NoOpStateTransition(false));
2085 }
2086
2087 let existed = self.writers.remove(writer_id).is_some();
2088 Continue(existed)
2092 }
2093
2094 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2095 self.leased_readers.get_mut(id)
2096 }
2097
2098 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2099 self.critical_readers
2100 .get_mut(id)
2101 .unwrap_or_else(|| {
2102 panic!(
2103 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2104 id
2105 )
2106 })
2107 }
2108
2109 fn critical_since(&self) -> Option<Antichain<T>> {
2110 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2111 let mut since = critical_sinces.next().cloned()?;
2112 for s in critical_sinces {
2113 since.meet_assign(s);
2114 }
2115 Some(since)
2116 }
2117
2118 fn update_since(&mut self) {
2119 let mut sinces_iter = self
2120 .leased_readers
2121 .values()
2122 .map(|x| &x.since)
2123 .chain(self.critical_readers.values().map(|x| &x.since));
2124 let mut since = match sinces_iter.next() {
2125 Some(since) => since.clone(),
2126 None => {
2127 return;
2130 }
2131 };
2132 while let Some(s) = sinces_iter.next() {
2133 since.meet_assign(s);
2134 }
2135 self.trace.downgrade_since(&since);
2136 }
2137
2138 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2139 let mut seqno_since = seqno;
2140 for cap in self.leased_readers.values() {
2141 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2142 }
2143 seqno_since
2145 }
2146
2147 fn tombstone_batch() -> HollowBatch<T> {
2148 HollowBatch::empty(Description::new(
2149 Antichain::from_elem(T::minimum()),
2150 Antichain::new(),
2151 Antichain::new(),
2152 ))
2153 }
2154
2155 pub(crate) fn is_tombstone(&self) -> bool {
2156 self.trace.upper().is_empty()
2157 && self.trace.since().is_empty()
2158 && self.writers.is_empty()
2159 && self.leased_readers.is_empty()
2160 && self.critical_readers.is_empty()
2161 }
2162
2163 pub(crate) fn is_single_empty_batch(&self) -> bool {
2164 let mut batch_count = 0;
2165 let mut is_empty = true;
2166 self.trace.map_batches(|b| {
2167 batch_count += 1;
2168 is_empty &= b.is_empty()
2169 });
2170 batch_count <= 1 && is_empty
2171 }
2172
2173 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2174 assert_eq!(self.trace.upper(), &Antichain::new());
2175 assert_eq!(self.trace.since(), &Antichain::new());
2176
2177 let was_tombstone = self.is_tombstone();
2180
2181 self.writers.clear();
2183 self.leased_readers.clear();
2184 self.critical_readers.clear();
2185
2186 debug_assert!(self.is_tombstone());
2187
2188 let mut to_replace = None;
2197 let mut batch_count = 0;
2198 self.trace.map_batches(|b| {
2199 batch_count += 1;
2200 if !b.is_empty() && to_replace.is_none() {
2201 to_replace = Some(b.desc.clone());
2202 }
2203 });
2204 if let Some(desc) = to_replace {
2205 let result = self.trace.apply_tombstone_merge(&desc);
2209 assert!(
2210 result.matched(),
2211 "merge with a matching desc should always match"
2212 );
2213 Continue(())
2214 } else if batch_count > 1 {
2215 let mut new_trace = Trace::default();
2220 new_trace.downgrade_since(&Antichain::new());
2221 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2222 assert_eq!(merge_reqs, Vec::new());
2223 self.trace = new_trace;
2224 Continue(())
2225 } else if !was_tombstone {
2226 Continue(())
2229 } else {
2230 Break(NoOpStateTransition(()))
2233 }
2234 }
2235}
2236
2237#[derive(Debug)]
2239#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2240pub struct State<T> {
2241 pub(crate) applier_version: semver::Version,
2242 pub(crate) shard_id: ShardId,
2243
2244 pub(crate) seqno: SeqNo,
2245 pub(crate) walltime_ms: u64,
2248 pub(crate) hostname: String,
2251 pub(crate) collections: StateCollections<T>,
2252}
2253
2254pub struct TypedState<K, V, T, D> {
2257 pub(crate) state: State<T>,
2258
2259 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2267}
2268
2269impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2270 #[cfg(any(test, debug_assertions))]
2271 pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2272 TypedState {
2273 state: State {
2274 applier_version,
2275 shard_id: self.shard_id.clone(),
2276 seqno: self.seqno.clone(),
2277 walltime_ms: self.walltime_ms,
2278 hostname,
2279 collections: self.collections.clone(),
2280 },
2281 _phantom: PhantomData,
2282 }
2283 }
2284
2285 pub(crate) fn clone_for_rollup(&self) -> Self {
2286 TypedState {
2287 state: State {
2288 applier_version: self.applier_version.clone(),
2289 shard_id: self.shard_id.clone(),
2290 seqno: self.seqno.clone(),
2291 walltime_ms: self.walltime_ms,
2292 hostname: self.hostname.clone(),
2293 collections: self.collections.clone(),
2294 },
2295 _phantom: PhantomData,
2296 }
2297 }
2298}
2299
2300impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2301 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2302 let TypedState { state, _phantom } = self;
2305 f.debug_struct("TypedState").field("state", state).finish()
2306 }
2307}
2308
2309#[cfg(any(test, debug_assertions))]
2311impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2312 fn eq(&self, other: &Self) -> bool {
2313 let TypedState {
2316 state: self_state,
2317 _phantom,
2318 } = self;
2319 let TypedState {
2320 state: other_state,
2321 _phantom,
2322 } = other;
2323 self_state == other_state
2324 }
2325}
2326
2327impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2328 type Target = State<T>;
2329
2330 fn deref(&self) -> &Self::Target {
2331 &self.state
2332 }
2333}
2334
2335impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2336 fn deref_mut(&mut self) -> &mut Self::Target {
2337 &mut self.state
2338 }
2339}
2340
2341impl<K, V, T, D> TypedState<K, V, T, D>
2342where
2343 K: Codec,
2344 V: Codec,
2345 T: Timestamp + Lattice + Codec64,
2346 D: Codec64,
2347{
2348 pub fn new(
2349 applier_version: Version,
2350 shard_id: ShardId,
2351 hostname: String,
2352 walltime_ms: u64,
2353 ) -> Self {
2354 let state = State {
2355 applier_version,
2356 shard_id,
2357 seqno: SeqNo::minimum(),
2358 walltime_ms,
2359 hostname,
2360 collections: StateCollections {
2361 last_gc_req: SeqNo::minimum(),
2362 rollups: BTreeMap::new(),
2363 active_rollup: None,
2364 active_gc: None,
2365 leased_readers: BTreeMap::new(),
2366 critical_readers: BTreeMap::new(),
2367 writers: BTreeMap::new(),
2368 schemas: BTreeMap::new(),
2369 trace: Trace::default(),
2370 },
2371 };
2372 TypedState {
2373 state,
2374 _phantom: PhantomData,
2375 }
2376 }
2377
2378 pub fn clone_apply<R, E, WorkFn>(
2379 &self,
2380 cfg: &PersistConfig,
2381 work_fn: &mut WorkFn,
2382 ) -> ControlFlow<E, (R, Self)>
2383 where
2384 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2385 {
2386 let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2391 let mut new_state = State {
2392 applier_version: new_applier_version.clone(),
2393 shard_id: self.shard_id,
2394 seqno: self.seqno.next(),
2395 walltime_ms: (cfg.now)(),
2396 hostname: cfg.hostname.clone(),
2397 collections: self.collections.clone(),
2398 };
2399 if new_state.walltime_ms <= self.walltime_ms {
2402 new_state.walltime_ms = self.walltime_ms + 1;
2403 }
2404
2405 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2406 let new_state = TypedState {
2407 state: new_state,
2408 _phantom: PhantomData,
2409 };
2410 Continue((work_ret, new_state))
2411 }
2412}
2413
2414#[derive(Copy, Clone, Debug)]
2415pub struct GcConfig {
2416 pub use_active_gc: bool,
2417 pub fallback_threshold_ms: u64,
2418 pub min_versions: usize,
2419 pub max_versions: usize,
2420}
2421
2422impl<T> State<T>
2423where
2424 T: Timestamp + Lattice + Codec64,
2425{
2426 pub fn shard_id(&self) -> ShardId {
2427 self.shard_id
2428 }
2429
2430 pub fn seqno(&self) -> SeqNo {
2431 self.seqno
2432 }
2433
2434 pub fn since(&self) -> &Antichain<T> {
2435 self.collections.trace.since()
2436 }
2437
2438 pub fn upper(&self) -> &Antichain<T> {
2439 self.collections.trace.upper()
2440 }
2441
2442 pub fn spine_batch_count(&self) -> usize {
2443 self.collections.trace.num_spine_batches()
2444 }
2445
2446 pub fn size_metrics(&self) -> StateSizeMetrics {
2447 let mut ret = StateSizeMetrics::default();
2448 self.blobs().for_each(|x| match x {
2449 HollowBlobRef::Batch(x) => {
2450 ret.hollow_batch_count += 1;
2451 ret.batch_part_count += x.part_count();
2452 ret.num_updates += x.len;
2453
2454 let batch_size = x.encoded_size_bytes();
2455 for x in x.parts.iter() {
2456 if x.ts_rewrite().is_some() {
2457 ret.rewrite_part_count += 1;
2458 }
2459 if x.is_inline() {
2460 ret.inline_part_count += 1;
2461 ret.inline_part_bytes += x.inline_bytes();
2462 }
2463 }
2464 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2465 ret.state_batches_bytes += batch_size;
2466 }
2467 HollowBlobRef::Rollup(x) => {
2468 ret.state_rollup_count += 1;
2469 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2470 }
2471 });
2472 ret
2473 }
2474
2475 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2476 self.collections
2479 .rollups
2480 .iter()
2481 .rev()
2482 .next()
2483 .expect("State should have at least one rollup if seqno > minimum")
2484 }
2485
2486 pub(crate) fn seqno_since(&self) -> SeqNo {
2487 self.collections.seqno_since(self.seqno)
2488 }
2489
2490 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2502 let GcConfig {
2503 use_active_gc,
2504 fallback_threshold_ms,
2505 min_versions,
2506 max_versions,
2507 } = cfg;
2508 let gc_threshold = if use_active_gc {
2512 u64::cast_from(min_versions)
2513 } else {
2514 std::cmp::max(
2515 1,
2516 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2517 )
2518 };
2519 let new_seqno_since = self.seqno_since();
2520 let gc_until_seqno = new_seqno_since.min(SeqNo(
2523 self.collections
2524 .last_gc_req
2525 .0
2526 .saturating_add(u64::cast_from(max_versions)),
2527 ));
2528 let should_gc = new_seqno_since
2529 .0
2530 .saturating_sub(self.collections.last_gc_req.0)
2531 >= gc_threshold;
2532
2533 let should_gc = if use_active_gc && !should_gc {
2536 match self.collections.active_gc {
2537 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2538 None => false,
2539 }
2540 } else {
2541 should_gc
2542 };
2543 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2546 let tombstone_needs_gc = self.collections.is_tombstone();
2551 let should_gc = should_gc || tombstone_needs_gc;
2552 let should_gc = if use_active_gc {
2553 should_gc
2557 && match self.collections.active_gc {
2558 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2559 None => true,
2560 }
2561 } else {
2562 should_gc
2563 };
2564 if should_gc {
2565 self.collections.last_gc_req = gc_until_seqno;
2566 Some(GcReq {
2567 shard_id: self.shard_id,
2568 new_seqno_since: gc_until_seqno,
2569 })
2570 } else {
2571 None
2572 }
2573 }
2574
2575 pub fn seqnos_held(&self) -> usize {
2577 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2578 }
2579
2580 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2582 let mut metrics = ExpiryMetrics::default();
2583 let shard_id = self.shard_id();
2584 self.collections.leased_readers.retain(|id, state| {
2585 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2586 if !retain {
2587 info!(
2588 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2589 state.debug.purpose
2590 );
2591 metrics.readers_expired += 1;
2592 }
2593 retain
2594 });
2595 self.collections.writers.retain(|id, state| {
2597 let retain =
2598 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2599 if !retain {
2600 info!(
2601 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2602 state.debug.purpose
2603 );
2604 metrics.writers_expired += 1;
2605 }
2606 retain
2607 });
2608 metrics
2609 }
2610
2611 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2615 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2616 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2617 self.collections.trace.since().clone(),
2618 )));
2619 }
2620 let upper = self.collections.trace.upper();
2621 if PartialOrder::less_equal(upper, as_of) {
2622 return Err(SnapshotErr::AsOfNotYetAvailable(
2623 self.seqno,
2624 Upper(upper.clone()),
2625 ));
2626 }
2627
2628 let batches = self
2629 .collections
2630 .trace
2631 .batches()
2632 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2633 .cloned()
2634 .collect();
2635 Ok(batches)
2636 }
2637
2638 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2640 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2641 return Err(Since(self.collections.trace.since().clone()));
2642 }
2643 Ok(())
2644 }
2645
2646 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2647 self.collections
2650 .trace
2651 .batches()
2652 .find(|b| {
2653 PartialOrder::less_equal(b.desc.lower(), frontier)
2654 && PartialOrder::less_than(frontier, b.desc.upper())
2655 })
2656 .cloned()
2657 .ok_or(self.seqno)
2658 }
2659
2660 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2661 self.collections.active_rollup
2662 }
2663
2664 pub fn need_rollup(
2665 &self,
2666 threshold: usize,
2667 use_active_rollup: bool,
2668 fallback_threshold_ms: u64,
2669 now: u64,
2670 ) -> Option<SeqNo> {
2671 let (latest_rollup_seqno, _) = self.latest_rollup();
2672
2673 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2679 return Some(self.seqno);
2680 }
2681
2682 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2683
2684 if use_active_rollup {
2685 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2691 match self.active_rollup() {
2692 Some(active_rollup) => {
2693 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2694 return Some(self.seqno);
2695 }
2696 }
2697 None => {
2698 return Some(self.seqno);
2699 }
2700 }
2701 }
2702 } else {
2703 if seqnos_since_last_rollup > 0
2707 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2708 {
2709 return Some(self.seqno);
2710 }
2711
2712 if seqnos_since_last_rollup
2715 > u64::cast_from(
2716 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2717 )
2718 {
2719 return Some(self.seqno);
2720 }
2721 }
2722
2723 None
2724 }
2725
2726 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2727 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2728 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2729 batches.chain(rollups)
2730 }
2731}
2732
2733fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2734 let val = hex::encode(val);
2735 val.serialize(s)
2736}
2737
2738fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2739 val: &Option<LazyProto<T>>,
2740 s: S,
2741) -> Result<S::Ok, S::Error> {
2742 val.as_ref()
2743 .map(|lazy| hex::encode(&lazy.into_proto()))
2744 .serialize(s)
2745}
2746
2747fn serialize_part_stats<S: Serializer>(
2748 val: &Option<LazyPartStats>,
2749 s: S,
2750) -> Result<S::Ok, S::Error> {
2751 let val = val.as_ref().map(|x| x.decode().key);
2752 val.serialize(s)
2753}
2754
2755fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2756 let val = val.map(i64::decode);
2758 val.serialize(s)
2759}
2760
2761impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2767 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2768 let State {
2769 applier_version,
2770 shard_id,
2771 seqno,
2772 walltime_ms,
2773 hostname,
2774 collections:
2775 StateCollections {
2776 last_gc_req,
2777 rollups,
2778 active_rollup,
2779 active_gc,
2780 leased_readers,
2781 critical_readers,
2782 writers,
2783 schemas,
2784 trace,
2785 },
2786 } = self;
2787 let mut s = s.serialize_struct("State", 13)?;
2788 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2789 let () = s.serialize_field("shard_id", shard_id)?;
2790 let () = s.serialize_field("seqno", seqno)?;
2791 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2792 let () = s.serialize_field("hostname", hostname)?;
2793 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2794 let () = s.serialize_field("rollups", rollups)?;
2795 let () = s.serialize_field("active_rollup", active_rollup)?;
2796 let () = s.serialize_field("active_gc", active_gc)?;
2797 let () = s.serialize_field("leased_readers", leased_readers)?;
2798 let () = s.serialize_field("critical_readers", critical_readers)?;
2799 let () = s.serialize_field("writers", writers)?;
2800 let () = s.serialize_field("schemas", schemas)?;
2801 let () = s.serialize_field("since", &trace.since().elements())?;
2802 let () = s.serialize_field("upper", &trace.upper().elements())?;
2803 let trace = trace.flatten();
2804 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2805 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2806 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2807 let () = s.serialize_field("merges", &trace.merges)?;
2808 s.end()
2809 }
2810}
2811
2812#[derive(Debug, Default)]
2813pub struct StateSizeMetrics {
2814 pub hollow_batch_count: usize,
2815 pub batch_part_count: usize,
2816 pub rewrite_part_count: usize,
2817 pub num_updates: usize,
2818 pub largest_batch_bytes: usize,
2819 pub state_batches_bytes: usize,
2820 pub state_rollups_bytes: usize,
2821 pub state_rollup_count: usize,
2822 pub inline_part_count: usize,
2823 pub inline_part_bytes: usize,
2824}
2825
2826#[derive(Default)]
2827pub struct ExpiryMetrics {
2828 pub(crate) readers_expired: usize,
2829 pub(crate) writers_expired: usize,
2830}
2831
2832#[derive(Debug, Clone, PartialEq)]
2834pub struct Since<T>(pub Antichain<T>);
2835
2836#[derive(Debug, PartialEq)]
2838pub struct Upper<T>(pub Antichain<T>);
2839
2840#[cfg(test)]
2841pub(crate) mod tests {
2842 use std::ops::Range;
2843 use std::str::FromStr;
2844
2845 use bytes::Bytes;
2846 use mz_build_info::DUMMY_BUILD_INFO;
2847 use mz_dyncfg::ConfigUpdates;
2848 use mz_ore::now::SYSTEM_TIME;
2849 use mz_ore::{assert_none, assert_ok};
2850 use mz_proto::RustType;
2851 use proptest::prelude::*;
2852 use proptest::strategy::ValueTree;
2853
2854 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2855 use crate::PersistLocation;
2856 use crate::cache::PersistClientCache;
2857 use crate::internal::encoding::any_some_lazy_part_stats;
2858 use crate::internal::paths::RollupId;
2859 use crate::internal::trace::tests::any_trace;
2860 use crate::tests::new_test_client_cache;
2861
2862 use super::*;
2863
2864 const LEASE_DURATION_MS: u64 = 900 * 1000;
2865 fn debug_state() -> HandleDebugState {
2866 HandleDebugState {
2867 hostname: "debug".to_owned(),
2868 purpose: "finding the bugs".to_owned(),
2869 }
2870 }
2871
2872 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2873 num_runs: usize,
2874 ) -> impl Strategy<Value = HollowBatch<T>> {
2875 (
2876 any::<T>(),
2877 any::<T>(),
2878 any::<T>(),
2879 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2880 any::<usize>(),
2881 )
2882 .prop_map(move |(t0, t1, since, parts, len)| {
2883 let (lower, upper) = if t0 <= t1 {
2884 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2885 } else {
2886 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2887 };
2888 let since = Antichain::from_elem(since);
2889
2890 let run_splits = (1..num_runs)
2891 .map(|i| i * parts.len() / num_runs)
2892 .collect::<Vec<_>>();
2893
2894 let run_meta = (0..num_runs)
2895 .map(|_| {
2896 let mut meta = RunMeta::default();
2897 meta.id = Some(RunId::new());
2898 meta
2899 })
2900 .collect::<Vec<_>>();
2901
2902 HollowBatch::new(
2903 Description::new(lower, upper, since),
2904 parts,
2905 len % 10,
2906 run_meta,
2907 run_splits,
2908 )
2909 })
2910 }
2911
2912 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2913 Strategy::prop_map(
2914 (
2915 any::<T>(),
2916 any::<T>(),
2917 any::<T>(),
2918 proptest::collection::vec(any_run_part::<T>(), 0..20),
2919 any::<usize>(),
2920 0..=10usize,
2921 proptest::collection::vec(any::<RunId>(), 10),
2922 ),
2923 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2924 let (lower, upper) = if t0 <= t1 {
2925 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2926 } else {
2927 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2928 };
2929 let since = Antichain::from_elem(since);
2930 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2931 let run_splits = (1..num_runs)
2932 .map(|i| i * parts.len() / num_runs)
2933 .collect::<Vec<_>>();
2934
2935 let run_meta = (0..num_runs)
2936 .enumerate()
2937 .map(|(i, _)| {
2938 let mut meta = RunMeta::default();
2939 meta.id = Some(run_ids[i]);
2940 meta
2941 })
2942 .collect::<Vec<_>>();
2943
2944 HollowBatch::new(
2945 Description::new(lower, upper, since),
2946 parts,
2947 len % 10,
2948 run_meta,
2949 run_splits,
2950 )
2951 } else {
2952 HollowBatch::new_run_for_test(
2953 Description::new(lower, upper, since),
2954 parts,
2955 len % 10,
2956 run_ids[0],
2957 )
2958 }
2959 },
2960 )
2961 }
2962
2963 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2964 Strategy::prop_map(
2965 (
2966 any::<bool>(),
2967 any_hollow_batch_part(),
2968 any::<Option<T>>(),
2969 any::<Option<SchemaId>>(),
2970 any::<Option<SchemaId>>(),
2971 ),
2972 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2973 if is_hollow {
2974 BatchPart::Hollow(hollow)
2975 } else {
2976 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2977 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2978 BatchPart::Inline {
2979 updates,
2980 ts_rewrite,
2981 schema_id,
2982 deprecated_schema_id,
2983 }
2984 }
2985 },
2986 )
2987 }
2988
2989 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2990 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2991 }
2992
2993 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2994 -> impl Strategy<Value = HollowBatchPart<T>> {
2995 Strategy::prop_map(
2996 (
2997 any::<PartialBatchKey>(),
2998 any::<usize>(),
2999 any::<Vec<u8>>(),
3000 any_some_lazy_part_stats(),
3001 any::<Option<T>>(),
3002 any::<[u8; 8]>(),
3003 any::<Option<BatchColumnarFormat>>(),
3004 any::<Option<SchemaId>>(),
3005 any::<Option<SchemaId>>(),
3006 ),
3007 |(
3008 key,
3009 encoded_size_bytes,
3010 key_lower,
3011 stats,
3012 ts_rewrite,
3013 diffs_sum,
3014 format,
3015 schema_id,
3016 deprecated_schema_id,
3017 )| {
3018 HollowBatchPart {
3019 key,
3020 encoded_size_bytes,
3021 key_lower,
3022 structured_key_lower: None,
3023 stats,
3024 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3025 diffs_sum: Some(diffs_sum),
3026 format,
3027 schema_id,
3028 deprecated_schema_id,
3029 }
3030 },
3031 )
3032 }
3033
3034 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3035 Strategy::prop_map(
3036 (
3037 any::<SeqNo>(),
3038 any::<Option<T>>(),
3039 any::<u64>(),
3040 any::<u64>(),
3041 any::<HandleDebugState>(),
3042 ),
3043 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3044 if lease_duration_ms == 0 {
3048 lease_duration_ms += 1;
3049 }
3050 LeasedReaderState {
3051 seqno,
3052 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3053 last_heartbeat_timestamp_ms,
3054 lease_duration_ms,
3055 debug,
3056 }
3057 },
3058 )
3059 }
3060
3061 pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3062 {
3063 Strategy::prop_map(
3064 (
3065 any::<Option<T>>(),
3066 any::<OpaqueState>(),
3067 any::<String>(),
3068 any::<HandleDebugState>(),
3069 ),
3070 |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3071 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3072 opaque,
3073 opaque_codec,
3074 debug,
3075 },
3076 )
3077 }
3078
3079 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3080 Strategy::prop_map(
3081 (
3082 any::<u64>(),
3083 any::<u64>(),
3084 any::<IdempotencyToken>(),
3085 any::<Option<T>>(),
3086 any::<HandleDebugState>(),
3087 ),
3088 |(
3089 last_heartbeat_timestamp_ms,
3090 lease_duration_ms,
3091 most_recent_write_token,
3092 most_recent_write_upper,
3093 debug,
3094 )| WriterState {
3095 last_heartbeat_timestamp_ms,
3096 lease_duration_ms,
3097 most_recent_write_token,
3098 most_recent_write_upper: most_recent_write_upper
3099 .map_or_else(Antichain::new, Antichain::from_elem),
3100 debug,
3101 },
3102 )
3103 }
3104
3105 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3106 Strategy::prop_map(
3107 (
3108 any::<Vec<u8>>(),
3109 any::<Vec<u8>>(),
3110 any::<Vec<u8>>(),
3111 any::<Vec<u8>>(),
3112 ),
3113 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3114 key: Bytes::from(key),
3115 key_data_type: Bytes::from(key_data_type),
3116 val: Bytes::from(val),
3117 val_data_type: Bytes::from(val_data_type),
3118 },
3119 )
3120 }
3121
3122 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3123 num_trace_batches: Range<usize>,
3124 ) -> impl Strategy<Value = State<T>> {
3125 let part1 = (
3126 any::<ShardId>(),
3127 any::<SeqNo>(),
3128 any::<u64>(),
3129 any::<String>(),
3130 any::<SeqNo>(),
3131 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3132 proptest::option::of(any::<ActiveRollup>()),
3133 );
3134
3135 let part2 = (
3136 proptest::option::of(any::<ActiveGc>()),
3137 proptest::collection::btree_map(
3138 any::<LeasedReaderId>(),
3139 any_leased_reader_state::<T>(),
3140 1..3,
3141 ),
3142 proptest::collection::btree_map(
3143 any::<CriticalReaderId>(),
3144 any_critical_reader_state::<T>(),
3145 1..3,
3146 ),
3147 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3148 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3149 any_trace::<T>(num_trace_batches),
3150 );
3151
3152 (part1, part2).prop_map(
3153 |(
3154 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3155 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3156 )| State {
3157 applier_version: semver::Version::new(1, 2, 3),
3158 shard_id,
3159 seqno,
3160 walltime_ms,
3161 hostname,
3162 collections: StateCollections {
3163 last_gc_req,
3164 rollups,
3165 active_rollup,
3166 active_gc,
3167 leased_readers,
3168 critical_readers,
3169 writers,
3170 schemas,
3171 trace,
3172 },
3173 },
3174 )
3175 }
3176
3177 pub(crate) fn hollow<T: Timestamp>(
3178 lower: T,
3179 upper: T,
3180 keys: &[&str],
3181 len: usize,
3182 ) -> HollowBatch<T> {
3183 HollowBatch::new_run(
3184 Description::new(
3185 Antichain::from_elem(lower),
3186 Antichain::from_elem(upper),
3187 Antichain::from_elem(T::minimum()),
3188 ),
3189 keys.iter()
3190 .map(|x| {
3191 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3192 key: PartialBatchKey((*x).to_owned()),
3193 encoded_size_bytes: 0,
3194 key_lower: vec![],
3195 structured_key_lower: None,
3196 stats: None,
3197 ts_rewrite: None,
3198 diffs_sum: None,
3199 format: None,
3200 schema_id: None,
3201 deprecated_schema_id: None,
3202 }))
3203 })
3204 .collect(),
3205 len,
3206 )
3207 }
3208
3209 #[mz_ore::test]
3210 fn downgrade_since() {
3211 let mut state = TypedState::<(), (), u64, i64>::new(
3212 DUMMY_BUILD_INFO.semver_version(),
3213 ShardId::new(),
3214 "".to_owned(),
3215 0,
3216 );
3217 let reader = LeasedReaderId::new();
3218 let seqno = SeqNo::minimum();
3219 let now = SYSTEM_TIME.clone();
3220 let _ = state.collections.register_leased_reader(
3221 "",
3222 &reader,
3223 "",
3224 seqno,
3225 Duration::from_secs(10),
3226 now(),
3227 false,
3228 );
3229
3230 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3232
3233 assert_eq!(
3235 state.collections.downgrade_since(
3236 &reader,
3237 seqno,
3238 None,
3239 &Antichain::from_elem(2),
3240 now()
3241 ),
3242 Continue(Since(Antichain::from_elem(2)))
3243 );
3244 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3245 assert_eq!(
3247 state.collections.downgrade_since(
3248 &reader,
3249 seqno,
3250 None,
3251 &Antichain::from_elem(2),
3252 now()
3253 ),
3254 Continue(Since(Antichain::from_elem(2)))
3255 );
3256 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3257 assert_eq!(
3259 state.collections.downgrade_since(
3260 &reader,
3261 seqno,
3262 None,
3263 &Antichain::from_elem(1),
3264 now()
3265 ),
3266 Continue(Since(Antichain::from_elem(2)))
3267 );
3268 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3269
3270 let reader2 = LeasedReaderId::new();
3272 let _ = state.collections.register_leased_reader(
3273 "",
3274 &reader2,
3275 "",
3276 seqno,
3277 Duration::from_secs(10),
3278 now(),
3279 false,
3280 );
3281
3282 assert_eq!(
3284 state.collections.downgrade_since(
3285 &reader2,
3286 seqno,
3287 None,
3288 &Antichain::from_elem(3),
3289 now()
3290 ),
3291 Continue(Since(Antichain::from_elem(3)))
3292 );
3293 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3294 assert_eq!(
3296 state.collections.downgrade_since(
3297 &reader,
3298 seqno,
3299 None,
3300 &Antichain::from_elem(5),
3301 now()
3302 ),
3303 Continue(Since(Antichain::from_elem(5)))
3304 );
3305 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3306
3307 assert_eq!(
3309 state.collections.expire_leased_reader(&reader),
3310 Continue(true)
3311 );
3312 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3313
3314 let reader3 = LeasedReaderId::new();
3316 let _ = state.collections.register_leased_reader(
3317 "",
3318 &reader3,
3319 "",
3320 seqno,
3321 Duration::from_secs(10),
3322 now(),
3323 false,
3324 );
3325
3326 assert_eq!(
3328 state.collections.downgrade_since(
3329 &reader3,
3330 seqno,
3331 None,
3332 &Antichain::from_elem(10),
3333 now()
3334 ),
3335 Continue(Since(Antichain::from_elem(10)))
3336 );
3337 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3338
3339 assert_eq!(
3341 state.collections.expire_leased_reader(&reader2),
3342 Continue(true)
3343 );
3344 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3349
3350 assert_eq!(
3352 state.collections.expire_leased_reader(&reader3),
3353 Continue(true)
3354 );
3355 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3360 }
3361
3362 #[mz_ore::test]
3363 fn compare_and_downgrade_since() {
3364 let mut state = TypedState::<(), (), u64, i64>::new(
3365 DUMMY_BUILD_INFO.semver_version(),
3366 ShardId::new(),
3367 "".to_owned(),
3368 0,
3369 );
3370 let reader = CriticalReaderId::new();
3371 let _ = state
3372 .collections
3373 .register_critical_reader::<u64>("", &reader, "");
3374
3375 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3377 assert_eq!(
3379 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3380 u64::initial()
3381 );
3382
3383 assert_eq!(
3385 state.collections.compare_and_downgrade_since::<u64>(
3386 &reader,
3387 &u64::initial(),
3388 (&1, &Antichain::from_elem(2)),
3389 ),
3390 Continue(Ok(Since(Antichain::from_elem(2))))
3391 );
3392 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3393 assert_eq!(
3394 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3395 1
3396 );
3397 assert_eq!(
3399 state.collections.compare_and_downgrade_since::<u64>(
3400 &reader,
3401 &1,
3402 (&2, &Antichain::from_elem(2)),
3403 ),
3404 Continue(Ok(Since(Antichain::from_elem(2))))
3405 );
3406 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3407 assert_eq!(
3408 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3409 2
3410 );
3411 assert_eq!(
3413 state.collections.compare_and_downgrade_since::<u64>(
3414 &reader,
3415 &2,
3416 (&3, &Antichain::from_elem(1)),
3417 ),
3418 Continue(Ok(Since(Antichain::from_elem(2))))
3419 );
3420 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3421 assert_eq!(
3422 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3423 3
3424 );
3425 }
3426
3427 #[mz_ore::test]
3428 fn compare_and_append() {
3429 let state = &mut TypedState::<String, String, u64, i64>::new(
3430 DUMMY_BUILD_INFO.semver_version(),
3431 ShardId::new(),
3432 "".to_owned(),
3433 0,
3434 )
3435 .collections;
3436
3437 let writer_id = WriterId::new();
3438 let now = SYSTEM_TIME.clone();
3439
3440 assert_eq!(state.trace.num_spine_batches(), 0);
3442 assert_eq!(state.trace.num_hollow_batches(), 0);
3443 assert_eq!(state.trace.num_updates(), 0);
3444
3445 assert_eq!(
3447 state.compare_and_append(
3448 &hollow(1, 2, &["key1"], 1),
3449 &writer_id,
3450 now(),
3451 LEASE_DURATION_MS,
3452 &IdempotencyToken::new(),
3453 &debug_state(),
3454 0,
3455 100,
3456 None
3457 ),
3458 Break(CompareAndAppendBreak::Upper {
3459 shard_upper: Antichain::from_elem(0),
3460 writer_upper: Antichain::from_elem(0)
3461 })
3462 );
3463
3464 assert!(
3466 state
3467 .compare_and_append(
3468 &hollow(0, 5, &[], 0),
3469 &writer_id,
3470 now(),
3471 LEASE_DURATION_MS,
3472 &IdempotencyToken::new(),
3473 &debug_state(),
3474 0,
3475 100,
3476 None
3477 )
3478 .is_continue()
3479 );
3480
3481 assert_eq!(
3483 state.compare_and_append(
3484 &hollow(5, 4, &["key1"], 1),
3485 &writer_id,
3486 now(),
3487 LEASE_DURATION_MS,
3488 &IdempotencyToken::new(),
3489 &debug_state(),
3490 0,
3491 100,
3492 None
3493 ),
3494 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3495 lower: Antichain::from_elem(5),
3496 upper: Antichain::from_elem(4)
3497 }))
3498 );
3499
3500 assert_eq!(
3502 state.compare_and_append(
3503 &hollow(5, 5, &["key1"], 1),
3504 &writer_id,
3505 now(),
3506 LEASE_DURATION_MS,
3507 &IdempotencyToken::new(),
3508 &debug_state(),
3509 0,
3510 100,
3511 None
3512 ),
3513 Break(CompareAndAppendBreak::InvalidUsage(
3514 InvalidEmptyTimeInterval {
3515 lower: Antichain::from_elem(5),
3516 upper: Antichain::from_elem(5),
3517 keys: vec!["key1".to_owned()],
3518 }
3519 ))
3520 );
3521
3522 assert!(
3524 state
3525 .compare_and_append(
3526 &hollow(5, 5, &[], 0),
3527 &writer_id,
3528 now(),
3529 LEASE_DURATION_MS,
3530 &IdempotencyToken::new(),
3531 &debug_state(),
3532 0,
3533 100,
3534 None
3535 )
3536 .is_continue()
3537 );
3538 }
3539
3540 #[mz_ore::test]
3541 fn snapshot() {
3542 let now = SYSTEM_TIME.clone();
3543
3544 let mut state = TypedState::<String, String, u64, i64>::new(
3545 DUMMY_BUILD_INFO.semver_version(),
3546 ShardId::new(),
3547 "".to_owned(),
3548 0,
3549 );
3550 assert_eq!(
3552 state.snapshot(&Antichain::from_elem(0)),
3553 Err(SnapshotErr::AsOfNotYetAvailable(
3554 SeqNo(0),
3555 Upper(Antichain::from_elem(0))
3556 ))
3557 );
3558
3559 assert_eq!(
3561 state.snapshot(&Antichain::from_elem(5)),
3562 Err(SnapshotErr::AsOfNotYetAvailable(
3563 SeqNo(0),
3564 Upper(Antichain::from_elem(0))
3565 ))
3566 );
3567
3568 let writer_id = WriterId::new();
3569
3570 assert!(
3572 state
3573 .collections
3574 .compare_and_append(
3575 &hollow(0, 5, &["key1"], 1),
3576 &writer_id,
3577 now(),
3578 LEASE_DURATION_MS,
3579 &IdempotencyToken::new(),
3580 &debug_state(),
3581 0,
3582 100,
3583 None
3584 )
3585 .is_continue()
3586 );
3587
3588 assert_eq!(
3590 state.snapshot(&Antichain::from_elem(0)),
3591 Ok(vec![hollow(0, 5, &["key1"], 1)])
3592 );
3593
3594 assert_eq!(
3596 state.snapshot(&Antichain::from_elem(4)),
3597 Ok(vec![hollow(0, 5, &["key1"], 1)])
3598 );
3599
3600 assert_eq!(
3602 state.snapshot(&Antichain::from_elem(5)),
3603 Err(SnapshotErr::AsOfNotYetAvailable(
3604 SeqNo(0),
3605 Upper(Antichain::from_elem(5))
3606 ))
3607 );
3608 assert_eq!(
3609 state.snapshot(&Antichain::from_elem(6)),
3610 Err(SnapshotErr::AsOfNotYetAvailable(
3611 SeqNo(0),
3612 Upper(Antichain::from_elem(5))
3613 ))
3614 );
3615
3616 let reader = LeasedReaderId::new();
3617 let _ = state.collections.register_leased_reader(
3619 "",
3620 &reader,
3621 "",
3622 SeqNo::minimum(),
3623 Duration::from_secs(10),
3624 now(),
3625 false,
3626 );
3627 assert_eq!(
3628 state.collections.downgrade_since(
3629 &reader,
3630 SeqNo::minimum(),
3631 None,
3632 &Antichain::from_elem(2),
3633 now()
3634 ),
3635 Continue(Since(Antichain::from_elem(2)))
3636 );
3637 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3638 assert_eq!(
3640 state.snapshot(&Antichain::from_elem(1)),
3641 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3642 Antichain::from_elem(2)
3643 )))
3644 );
3645
3646 assert!(
3648 state
3649 .collections
3650 .compare_and_append(
3651 &hollow(5, 10, &[], 0),
3652 &writer_id,
3653 now(),
3654 LEASE_DURATION_MS,
3655 &IdempotencyToken::new(),
3656 &debug_state(),
3657 0,
3658 100,
3659 None
3660 )
3661 .is_continue()
3662 );
3663
3664 assert_eq!(
3666 state.snapshot(&Antichain::from_elem(7)),
3667 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3668 );
3669
3670 assert_eq!(
3672 state.snapshot(&Antichain::from_elem(10)),
3673 Err(SnapshotErr::AsOfNotYetAvailable(
3674 SeqNo(0),
3675 Upper(Antichain::from_elem(10))
3676 ))
3677 );
3678
3679 assert!(
3681 state
3682 .collections
3683 .compare_and_append(
3684 &hollow(10, 15, &["key2"], 1),
3685 &writer_id,
3686 now(),
3687 LEASE_DURATION_MS,
3688 &IdempotencyToken::new(),
3689 &debug_state(),
3690 0,
3691 100,
3692 None
3693 )
3694 .is_continue()
3695 );
3696
3697 assert_eq!(
3700 state.snapshot(&Antichain::from_elem(9)),
3701 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3702 );
3703
3704 assert_eq!(
3706 state.snapshot(&Antichain::from_elem(10)),
3707 Ok(vec![
3708 hollow(0, 5, &["key1"], 1),
3709 hollow(5, 10, &[], 0),
3710 hollow(10, 15, &["key2"], 1)
3711 ])
3712 );
3713
3714 assert_eq!(
3715 state.snapshot(&Antichain::from_elem(11)),
3716 Ok(vec![
3717 hollow(0, 5, &["key1"], 1),
3718 hollow(5, 10, &[], 0),
3719 hollow(10, 15, &["key2"], 1)
3720 ])
3721 );
3722 }
3723
3724 #[mz_ore::test]
3725 fn next_listen_batch() {
3726 let mut state = TypedState::<String, String, u64, i64>::new(
3727 DUMMY_BUILD_INFO.semver_version(),
3728 ShardId::new(),
3729 "".to_owned(),
3730 0,
3731 );
3732
3733 assert_eq!(
3736 state.next_listen_batch(&Antichain::from_elem(0)),
3737 Err(SeqNo(0))
3738 );
3739 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3740
3741 let writer_id = WriterId::new();
3742 let now = SYSTEM_TIME.clone();
3743
3744 assert!(
3746 state
3747 .collections
3748 .compare_and_append(
3749 &hollow(0, 5, &["key1"], 1),
3750 &writer_id,
3751 now(),
3752 LEASE_DURATION_MS,
3753 &IdempotencyToken::new(),
3754 &debug_state(),
3755 0,
3756 100,
3757 None
3758 )
3759 .is_continue()
3760 );
3761 assert!(
3762 state
3763 .collections
3764 .compare_and_append(
3765 &hollow(5, 10, &["key2"], 1),
3766 &writer_id,
3767 now(),
3768 LEASE_DURATION_MS,
3769 &IdempotencyToken::new(),
3770 &debug_state(),
3771 0,
3772 100,
3773 None
3774 )
3775 .is_continue()
3776 );
3777
3778 for t in 0..=4 {
3780 assert_eq!(
3781 state.next_listen_batch(&Antichain::from_elem(t)),
3782 Ok(hollow(0, 5, &["key1"], 1))
3783 );
3784 }
3785
3786 for t in 5..=9 {
3788 assert_eq!(
3789 state.next_listen_batch(&Antichain::from_elem(t)),
3790 Ok(hollow(5, 10, &["key2"], 1))
3791 );
3792 }
3793
3794 assert_eq!(
3796 state.next_listen_batch(&Antichain::from_elem(10)),
3797 Err(SeqNo(0))
3798 );
3799
3800 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3803 }
3804
3805 #[mz_ore::test]
3806 fn expire_writer() {
3807 let mut state = TypedState::<String, String, u64, i64>::new(
3808 DUMMY_BUILD_INFO.semver_version(),
3809 ShardId::new(),
3810 "".to_owned(),
3811 0,
3812 );
3813 let now = SYSTEM_TIME.clone();
3814
3815 let writer_id_one = WriterId::new();
3816
3817 let writer_id_two = WriterId::new();
3818
3819 assert!(
3821 state
3822 .collections
3823 .compare_and_append(
3824 &hollow(0, 2, &["key1"], 1),
3825 &writer_id_one,
3826 now(),
3827 LEASE_DURATION_MS,
3828 &IdempotencyToken::new(),
3829 &debug_state(),
3830 0,
3831 100,
3832 None
3833 )
3834 .is_continue()
3835 );
3836
3837 assert!(
3838 state
3839 .collections
3840 .expire_writer(&writer_id_one)
3841 .is_continue()
3842 );
3843
3844 assert!(
3846 state
3847 .collections
3848 .compare_and_append(
3849 &hollow(2, 5, &["key2"], 1),
3850 &writer_id_two,
3851 now(),
3852 LEASE_DURATION_MS,
3853 &IdempotencyToken::new(),
3854 &debug_state(),
3855 0,
3856 100,
3857 None
3858 )
3859 .is_continue()
3860 );
3861 }
3862
3863 #[mz_ore::test]
3864 fn maybe_gc_active_gc() {
3865 const GC_CONFIG: GcConfig = GcConfig {
3866 use_active_gc: true,
3867 fallback_threshold_ms: 5000,
3868 min_versions: 99,
3869 max_versions: 500,
3870 };
3871 let now_fn = SYSTEM_TIME.clone();
3872
3873 let mut state = TypedState::<String, String, u64, i64>::new(
3874 DUMMY_BUILD_INFO.semver_version(),
3875 ShardId::new(),
3876 "".to_owned(),
3877 0,
3878 );
3879
3880 let now = now_fn();
3881 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3883 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3884
3885 state.seqno = SeqNo(100);
3888 assert_eq!(state.seqno_since(), SeqNo(100));
3889
3890 let writer_id = WriterId::new();
3892 let _ = state.collections.compare_and_append(
3893 &hollow(1, 2, &["key1"], 1),
3894 &writer_id,
3895 now,
3896 LEASE_DURATION_MS,
3897 &IdempotencyToken::new(),
3898 &debug_state(),
3899 0,
3900 100,
3901 None,
3902 );
3903 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3904
3905 assert_eq!(
3907 state.maybe_gc(true, now, GC_CONFIG),
3908 Some(GcReq {
3909 shard_id: state.shard_id,
3910 new_seqno_since: SeqNo(100)
3911 })
3912 );
3913
3914 state.collections.active_gc = Some(ActiveGc {
3916 seqno: state.seqno,
3917 start_ms: now,
3918 });
3919
3920 state.seqno = SeqNo(200);
3921 assert_eq!(state.seqno_since(), SeqNo(200));
3922
3923 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3924
3925 state.seqno = SeqNo(300);
3926 assert_eq!(state.seqno_since(), SeqNo(300));
3927 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3929 assert_eq!(
3930 state.maybe_gc(true, new_now, GC_CONFIG),
3931 Some(GcReq {
3932 shard_id: state.shard_id,
3933 new_seqno_since: SeqNo(300)
3934 })
3935 );
3936
3937 state.seqno = SeqNo(301);
3941 assert_eq!(state.seqno_since(), SeqNo(301));
3942 assert_eq!(
3943 state.maybe_gc(true, new_now, GC_CONFIG),
3944 Some(GcReq {
3945 shard_id: state.shard_id,
3946 new_seqno_since: SeqNo(301)
3947 })
3948 );
3949
3950 state.collections.active_gc = None;
3951
3952 state.seqno = SeqNo(400);
3955 assert_eq!(state.seqno_since(), SeqNo(400));
3956
3957 let now = now_fn();
3958
3959 let _ = state.collections.expire_writer(&writer_id);
3961 assert_eq!(
3962 state.maybe_gc(false, now, GC_CONFIG),
3963 Some(GcReq {
3964 shard_id: state.shard_id,
3965 new_seqno_since: SeqNo(400)
3966 })
3967 );
3968
3969 let previous_seqno = state.seqno;
3971 state.seqno = SeqNo(10_000);
3972 assert_eq!(state.seqno_since(), SeqNo(10_000));
3973
3974 let now = now_fn();
3975 assert_eq!(
3976 state.maybe_gc(true, now, GC_CONFIG),
3977 Some(GcReq {
3978 shard_id: state.shard_id,
3979 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3980 })
3981 );
3982 }
3983
3984 #[mz_ore::test]
3985 fn maybe_gc_classic() {
3986 const GC_CONFIG: GcConfig = GcConfig {
3987 use_active_gc: false,
3988 fallback_threshold_ms: 5000,
3989 min_versions: 16,
3990 max_versions: 128,
3991 };
3992 const NOW_MS: u64 = 0;
3993
3994 let mut state = TypedState::<String, String, u64, i64>::new(
3995 DUMMY_BUILD_INFO.semver_version(),
3996 ShardId::new(),
3997 "".to_owned(),
3998 0,
3999 );
4000
4001 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
4003 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4004
4005 state.seqno = SeqNo(100);
4008 assert_eq!(state.seqno_since(), SeqNo(100));
4009
4010 let writer_id = WriterId::new();
4012 let now = SYSTEM_TIME.clone();
4013 let _ = state.collections.compare_and_append(
4014 &hollow(1, 2, &["key1"], 1),
4015 &writer_id,
4016 now(),
4017 LEASE_DURATION_MS,
4018 &IdempotencyToken::new(),
4019 &debug_state(),
4020 0,
4021 100,
4022 None,
4023 );
4024 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4025
4026 assert_eq!(
4028 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4029 Some(GcReq {
4030 shard_id: state.shard_id,
4031 new_seqno_since: SeqNo(100)
4032 })
4033 );
4034
4035 state.seqno = SeqNo(200);
4038 assert_eq!(state.seqno_since(), SeqNo(200));
4039
4040 let _ = state.collections.expire_writer(&writer_id);
4042 assert_eq!(
4043 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4044 Some(GcReq {
4045 shard_id: state.shard_id,
4046 new_seqno_since: SeqNo(200)
4047 })
4048 );
4049 }
4050
4051 #[mz_ore::test]
4052 fn need_rollup_active_rollup() {
4053 const ROLLUP_THRESHOLD: usize = 3;
4054 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4055 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4056 let now = SYSTEM_TIME.clone();
4057
4058 mz_ore::test::init_logging();
4059 let mut state = TypedState::<String, String, u64, i64>::new(
4060 DUMMY_BUILD_INFO.semver_version(),
4061 ShardId::new(),
4062 "".to_owned(),
4063 0,
4064 );
4065
4066 let rollup_seqno = SeqNo(5);
4067 let rollup = HollowRollup {
4068 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4069 encoded_size_bytes: None,
4070 };
4071
4072 assert!(
4073 state
4074 .collections
4075 .add_rollup((rollup_seqno, &rollup))
4076 .is_continue()
4077 );
4078
4079 state.seqno = SeqNo(5);
4081 assert_none!(state.need_rollup(
4082 ROLLUP_THRESHOLD,
4083 ROLLUP_USE_ACTIVE_ROLLUP,
4084 ROLLUP_FALLBACK_THRESHOLD_MS,
4085 now()
4086 ));
4087
4088 state.seqno = SeqNo(6);
4090 assert_none!(state.need_rollup(
4091 ROLLUP_THRESHOLD,
4092 ROLLUP_USE_ACTIVE_ROLLUP,
4093 ROLLUP_FALLBACK_THRESHOLD_MS,
4094 now()
4095 ));
4096 state.seqno = SeqNo(7);
4097 assert_none!(state.need_rollup(
4098 ROLLUP_THRESHOLD,
4099 ROLLUP_USE_ACTIVE_ROLLUP,
4100 ROLLUP_FALLBACK_THRESHOLD_MS,
4101 now()
4102 ));
4103 state.seqno = SeqNo(8);
4104 assert_none!(state.need_rollup(
4105 ROLLUP_THRESHOLD,
4106 ROLLUP_USE_ACTIVE_ROLLUP,
4107 ROLLUP_FALLBACK_THRESHOLD_MS,
4108 now()
4109 ));
4110
4111 let mut current_time = now();
4112 state.seqno = SeqNo(9);
4114 assert_eq!(
4115 state
4116 .need_rollup(
4117 ROLLUP_THRESHOLD,
4118 ROLLUP_USE_ACTIVE_ROLLUP,
4119 ROLLUP_FALLBACK_THRESHOLD_MS,
4120 current_time
4121 )
4122 .expect("rollup"),
4123 SeqNo(9)
4124 );
4125
4126 state.collections.active_rollup = Some(ActiveRollup {
4127 seqno: SeqNo(9),
4128 start_ms: current_time,
4129 });
4130
4131 assert_none!(state.need_rollup(
4133 ROLLUP_THRESHOLD,
4134 ROLLUP_USE_ACTIVE_ROLLUP,
4135 ROLLUP_FALLBACK_THRESHOLD_MS,
4136 current_time
4137 ));
4138
4139 state.seqno = SeqNo(10);
4140 assert_none!(state.need_rollup(
4143 ROLLUP_THRESHOLD,
4144 ROLLUP_USE_ACTIVE_ROLLUP,
4145 ROLLUP_FALLBACK_THRESHOLD_MS,
4146 current_time
4147 ));
4148
4149 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4151 assert_eq!(
4152 state
4153 .need_rollup(
4154 ROLLUP_THRESHOLD,
4155 ROLLUP_USE_ACTIVE_ROLLUP,
4156 ROLLUP_FALLBACK_THRESHOLD_MS,
4157 current_time
4158 )
4159 .expect("rollup"),
4160 SeqNo(10)
4161 );
4162
4163 state.seqno = SeqNo(9);
4164 state.collections.active_rollup = None;
4166 let rollup_seqno = SeqNo(9);
4167 let rollup = HollowRollup {
4168 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4169 encoded_size_bytes: None,
4170 };
4171 assert!(
4172 state
4173 .collections
4174 .add_rollup((rollup_seqno, &rollup))
4175 .is_continue()
4176 );
4177
4178 state.seqno = SeqNo(11);
4179 assert_none!(state.need_rollup(
4181 ROLLUP_THRESHOLD,
4182 ROLLUP_USE_ACTIVE_ROLLUP,
4183 ROLLUP_FALLBACK_THRESHOLD_MS,
4184 current_time
4185 ));
4186 state.seqno = SeqNo(13);
4188 assert_eq!(
4189 state
4190 .need_rollup(
4191 ROLLUP_THRESHOLD,
4192 ROLLUP_USE_ACTIVE_ROLLUP,
4193 ROLLUP_FALLBACK_THRESHOLD_MS,
4194 current_time
4195 )
4196 .expect("rollup"),
4197 SeqNo(13)
4198 );
4199 }
4200
4201 #[mz_ore::test]
4202 fn need_rollup_classic() {
4203 const ROLLUP_THRESHOLD: usize = 3;
4204 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4205 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4206 const NOW: u64 = 0;
4207
4208 mz_ore::test::init_logging();
4209 let mut state = TypedState::<String, String, u64, i64>::new(
4210 DUMMY_BUILD_INFO.semver_version(),
4211 ShardId::new(),
4212 "".to_owned(),
4213 0,
4214 );
4215
4216 let rollup_seqno = SeqNo(5);
4217 let rollup = HollowRollup {
4218 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4219 encoded_size_bytes: None,
4220 };
4221
4222 assert!(
4223 state
4224 .collections
4225 .add_rollup((rollup_seqno, &rollup))
4226 .is_continue()
4227 );
4228
4229 state.seqno = SeqNo(5);
4231 assert_none!(state.need_rollup(
4232 ROLLUP_THRESHOLD,
4233 ROLLUP_USE_ACTIVE_ROLLUP,
4234 ROLLUP_FALLBACK_THRESHOLD_MS,
4235 NOW
4236 ));
4237
4238 state.seqno = SeqNo(6);
4240 assert_none!(state.need_rollup(
4241 ROLLUP_THRESHOLD,
4242 ROLLUP_USE_ACTIVE_ROLLUP,
4243 ROLLUP_FALLBACK_THRESHOLD_MS,
4244 NOW
4245 ));
4246 state.seqno = SeqNo(7);
4247 assert_none!(state.need_rollup(
4248 ROLLUP_THRESHOLD,
4249 ROLLUP_USE_ACTIVE_ROLLUP,
4250 ROLLUP_FALLBACK_THRESHOLD_MS,
4251 NOW
4252 ));
4253
4254 state.seqno = SeqNo(8);
4256 assert_eq!(
4257 state
4258 .need_rollup(
4259 ROLLUP_THRESHOLD,
4260 ROLLUP_USE_ACTIVE_ROLLUP,
4261 ROLLUP_FALLBACK_THRESHOLD_MS,
4262 NOW
4263 )
4264 .expect("rollup"),
4265 SeqNo(8)
4266 );
4267
4268 state.seqno = SeqNo(9);
4270 assert_none!(state.need_rollup(
4271 ROLLUP_THRESHOLD,
4272 ROLLUP_USE_ACTIVE_ROLLUP,
4273 ROLLUP_FALLBACK_THRESHOLD_MS,
4274 NOW
4275 ));
4276
4277 state.seqno = SeqNo(11);
4279 assert_eq!(
4280 state
4281 .need_rollup(
4282 ROLLUP_THRESHOLD,
4283 ROLLUP_USE_ACTIVE_ROLLUP,
4284 ROLLUP_FALLBACK_THRESHOLD_MS,
4285 NOW
4286 )
4287 .expect("rollup"),
4288 SeqNo(11)
4289 );
4290
4291 let rollup_seqno = SeqNo(6);
4293 let rollup = HollowRollup {
4294 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4295 encoded_size_bytes: None,
4296 };
4297 assert!(
4298 state
4299 .collections
4300 .add_rollup((rollup_seqno, &rollup))
4301 .is_continue()
4302 );
4303
4304 state.seqno = SeqNo(8);
4305 assert_none!(state.need_rollup(
4306 ROLLUP_THRESHOLD,
4307 ROLLUP_USE_ACTIVE_ROLLUP,
4308 ROLLUP_FALLBACK_THRESHOLD_MS,
4309 NOW
4310 ));
4311 state.seqno = SeqNo(9);
4312 assert_eq!(
4313 state
4314 .need_rollup(
4315 ROLLUP_THRESHOLD,
4316 ROLLUP_USE_ACTIVE_ROLLUP,
4317 ROLLUP_FALLBACK_THRESHOLD_MS,
4318 NOW
4319 )
4320 .expect("rollup"),
4321 SeqNo(9)
4322 );
4323
4324 let fallback_seqno = SeqNo(
4326 rollup_seqno.0
4327 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4328 );
4329 state.seqno = fallback_seqno;
4330 assert_eq!(
4331 state
4332 .need_rollup(
4333 ROLLUP_THRESHOLD,
4334 ROLLUP_USE_ACTIVE_ROLLUP,
4335 ROLLUP_FALLBACK_THRESHOLD_MS,
4336 NOW
4337 )
4338 .expect("rollup"),
4339 fallback_seqno
4340 );
4341 state.seqno = fallback_seqno.next();
4342 assert_eq!(
4343 state
4344 .need_rollup(
4345 ROLLUP_THRESHOLD,
4346 ROLLUP_USE_ACTIVE_ROLLUP,
4347 ROLLUP_FALLBACK_THRESHOLD_MS,
4348 NOW
4349 )
4350 .expect("rollup"),
4351 fallback_seqno.next()
4352 );
4353 }
4354
4355 #[mz_ore::test]
4356 fn idempotency_token_sentinel() {
4357 assert_eq!(
4358 IdempotencyToken::SENTINEL.to_string(),
4359 "i11111111-1111-1111-1111-111111111111"
4360 );
4361 }
4362
4363 #[mz_ore::test]
4372 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4374 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4375 let mut runner = proptest::test_runner::TestRunner::deterministic();
4376 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4377 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4378 assert_eq!(
4379 json.trim(),
4380 STATE_SERDE_JSON.trim(),
4381 "\n\nNEW GOLDEN\n{}\n",
4382 json
4383 );
4384 }
4385
4386 #[mz_persist_proc::test(tokio::test)]
4387 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4389 let mut clients = new_test_client_cache(&dyncfgs);
4390 let shard_id = ShardId::new();
4391
4392 async fn open_and_write(
4393 clients: &mut PersistClientCache,
4394 version: semver::Version,
4395 shard_id: ShardId,
4396 ) -> Result<(), tokio::task::JoinError> {
4397 clients.cfg.build_version = version.clone();
4398 clients.clear_state_cache();
4399 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4400 mz_ore::task::spawn(|| version.to_string(), async move {
4402 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4403 let current = *write.upper().as_option().unwrap();
4404 write
4406 .expect_compare_and_append_batch(&mut [], current, current + 1)
4407 .await;
4408 })
4409 .await
4410 }
4411
4412 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4414 assert_ok!(res);
4415
4416 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4418 assert_ok!(res);
4419
4420 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4422 assert_ok!(res);
4423
4424 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4426 assert!(res.unwrap_err().is_panic());
4427 }
4428
4429 #[mz_ore::test]
4430 fn runid_roundtrip() {
4431 proptest!(|(runid: RunId)| {
4432 let runid_str = runid.to_string();
4433 let parsed = RunId::from_str(&runid_str);
4434 prop_assert_eq!(parsed, Ok(runid));
4435 });
4436 }
4437}