1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Semigroup;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use mz_dyncfg::Config;
34use mz_ore::cast::CastFrom;
35use mz_ore::now::EpochMillis;
36use mz_ore::soft_panic_or_log;
37use mz_ore::vec::PartialOrdVecExt;
38use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
39use mz_persist::location::{Blob, SeqNo};
40use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
41use mz_persist_types::columnar::{ColumnEncoder, Schema};
42use mz_persist_types::schema::{SchemaId, backward_compatible};
43use mz_persist_types::{Codec, Codec64, Opaque};
44use mz_proto::ProtoType;
45use mz_proto::RustType;
46use proptest_derive::Arbitrary;
47use semver::Version;
48use serde::ser::SerializeStruct;
49use serde::{Serialize, Serializer};
50use timely::order::TotalOrder;
51use timely::progress::{Antichain, Timestamp};
52use timely::{Container, PartialOrder};
53use tracing::info;
54use uuid::Uuid;
55
56use crate::critical::CriticalReaderId;
57use crate::error::InvalidUsage;
58use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
59use crate::internal::gc::GcReq;
60use crate::internal::machine::retry_external;
61use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
62use crate::internal::trace::{
63 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
64};
65use crate::metrics::Metrics;
66use crate::read::LeasedReaderId;
67use crate::schema::CaESchema;
68use crate::write::WriterId;
69use crate::{PersistConfig, ShardId};
70
71include!(concat!(
72 env!("OUT_DIR"),
73 "/mz_persist_client.internal.state.rs"
74));
75
76include!(concat!(
77 env!("OUT_DIR"),
78 "/mz_persist_client.internal.diff.rs"
79));
80
81pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
89 "persist_rollup_threshold",
90 128,
91 "The number of seqnos between rollups.",
92);
93
94pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
97 "persist_rollup_fallback_threshold_ms",
98 5000,
99 "The number of milliseconds before a worker claims an already claimed rollup.",
100);
101
102pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
105 "persist_rollup_use_active_rollup",
106 false,
107 "Whether to use the new active rollup tracking mechanism.",
108);
109
110pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
113 "persist_gc_fallback_threshold_ms",
114 900000,
115 "The number of milliseconds before a worker claims an already claimed GC.",
116);
117
118pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
120 "persist_gc_min_versions",
121 32,
122 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
123);
124
125pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
127 "persist_gc_max_versions",
128 128_000,
129 "The maximum number of versions to GC in a single GC run.",
130);
131
132pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
135 "persist_gc_use_active_gc",
136 false,
137 "Whether to use the new active GC tracking mechanism.",
138);
139
140pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
141 "persist_enable_incremental_compaction",
142 false,
143 "Whether to enable incremental compaction.",
144);
145
146#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
149#[serde(into = "String")]
150pub struct IdempotencyToken(pub(crate) [u8; 16]);
151
152impl std::fmt::Display for IdempotencyToken {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 write!(f, "i{}", Uuid::from_bytes(self.0))
155 }
156}
157
158impl std::fmt::Debug for IdempotencyToken {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
161 }
162}
163
164impl std::str::FromStr for IdempotencyToken {
165 type Err = String;
166
167 fn from_str(s: &str) -> Result<Self, Self::Err> {
168 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
169 }
170}
171
172impl From<IdempotencyToken> for String {
173 fn from(x: IdempotencyToken) -> Self {
174 x.to_string()
175 }
176}
177
178impl IdempotencyToken {
179 pub(crate) fn new() -> Self {
180 IdempotencyToken(*Uuid::new_v4().as_bytes())
181 }
182 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize)]
186pub struct LeasedReaderState<T> {
187 pub seqno: SeqNo,
189 pub since: Antichain<T>,
191 pub last_heartbeat_timestamp_ms: u64,
193 pub lease_duration_ms: u64,
196 pub debug: HandleDebugState,
198}
199
200#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
201#[serde(into = "u64")]
202pub struct OpaqueState(pub [u8; 8]);
203
204impl From<OpaqueState> for u64 {
205 fn from(value: OpaqueState) -> Self {
206 u64::from_le_bytes(value.0)
207 }
208}
209
210#[derive(Clone, Debug, PartialEq, Serialize)]
211pub struct CriticalReaderState<T> {
212 pub since: Antichain<T>,
214 pub opaque: OpaqueState,
216 pub opaque_codec: String,
218 pub debug: HandleDebugState,
220}
221
222#[derive(Clone, Debug, PartialEq, Serialize)]
223pub struct WriterState<T> {
224 pub last_heartbeat_timestamp_ms: u64,
226 pub lease_duration_ms: u64,
229 pub most_recent_write_token: IdempotencyToken,
232 pub most_recent_write_upper: Antichain<T>,
235 pub debug: HandleDebugState,
237}
238
239#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
241pub struct HandleDebugState {
242 pub hostname: String,
245 pub purpose: String,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
253#[serde(tag = "type")]
254pub enum BatchPart<T> {
255 Hollow(HollowBatchPart<T>),
256 Inline {
257 updates: LazyInlineBatchPart,
258 ts_rewrite: Option<Antichain<T>>,
259 schema_id: Option<SchemaId>,
260
261 deprecated_schema_id: Option<SchemaId>,
263 },
264}
265
266fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
267 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
268 let proto = lower.decode()?;
269 let data = ArrayData::from_proto(proto)?;
270 ensure!(data.len() == 1);
271 Ok(ArrayBound::new(make_array(data), 0))
272 };
273
274 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
275
276 match decoded {
277 Ok(bound) => Some(bound),
278 Err(e) => {
279 soft_panic_or_log!("failed to decode bound: {e:#?}");
280 None
281 }
282 }
283}
284
285impl<T> BatchPart<T> {
286 pub fn hollow_bytes(&self) -> usize {
287 match self {
288 BatchPart::Hollow(x) => x.encoded_size_bytes,
289 BatchPart::Inline { .. } => 0,
290 }
291 }
292
293 pub fn is_inline(&self) -> bool {
294 matches!(self, BatchPart::Inline { .. })
295 }
296
297 pub fn inline_bytes(&self) -> usize {
298 match self {
299 BatchPart::Hollow(_) => 0,
300 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
301 }
302 }
303
304 pub fn writer_key(&self) -> Option<WriterKey> {
305 match self {
306 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
307 BatchPart::Inline { .. } => None,
308 }
309 }
310
311 pub fn encoded_size_bytes(&self) -> usize {
312 match self {
313 BatchPart::Hollow(x) => x.encoded_size_bytes,
314 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
315 }
316 }
317
318 pub fn printable_name(&self) -> &str {
321 match self {
322 BatchPart::Hollow(x) => x.key.0.as_str(),
323 BatchPart::Inline { .. } => "<inline>",
324 }
325 }
326
327 pub fn stats(&self) -> Option<&LazyPartStats> {
328 match self {
329 BatchPart::Hollow(x) => x.stats.as_ref(),
330 BatchPart::Inline { .. } => None,
331 }
332 }
333
334 pub fn key_lower(&self) -> &[u8] {
335 match self {
336 BatchPart::Hollow(x) => x.key_lower.as_slice(),
337 BatchPart::Inline { .. } => &[],
344 }
345 }
346
347 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
348 let part = match self {
349 BatchPart::Hollow(part) => part,
350 BatchPart::Inline { .. } => return None,
351 };
352
353 decode_structured_lower(part.structured_key_lower.as_ref()?)
354 }
355
356 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
357 match self {
358 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
359 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
360 }
361 }
362
363 pub fn schema_id(&self) -> Option<SchemaId> {
364 match self {
365 BatchPart::Hollow(x) => x.schema_id,
366 BatchPart::Inline { schema_id, .. } => *schema_id,
367 }
368 }
369
370 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
371 match self {
372 BatchPart::Hollow(x) => x.deprecated_schema_id,
373 BatchPart::Inline {
374 deprecated_schema_id,
375 ..
376 } => *deprecated_schema_id,
377 }
378 }
379}
380
381impl<T: Timestamp + Codec64> BatchPart<T> {
382 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
383 match self {
384 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
385 BatchPart::Inline { updates, .. } => {
386 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
387 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
388 }
389 }
390 }
391
392 pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
393 match self {
394 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
395 BatchPart::Inline { updates, .. } => updates
396 .decode::<T>(metrics)
397 .expect("valid inline part")
398 .updates
399 .diffs_sum(),
400 }
401 }
402}
403
404#[derive(Debug, Clone)]
406pub struct HollowRun<T> {
407 pub(crate) parts: Vec<RunPart<T>>,
409}
410
411#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
414pub struct HollowRunRef<T> {
415 pub key: PartialBatchKey,
416
417 pub hollow_bytes: usize,
419
420 pub max_part_bytes: usize,
422
423 pub key_lower: Vec<u8>,
425
426 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
428
429 pub diffs_sum: Option<[u8; 8]>,
430
431 pub(crate) _phantom_data: PhantomData<T>,
432}
433impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
434 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
435 Some(self.cmp(other))
436 }
437}
438
439impl<T: Eq> Ord for HollowRunRef<T> {
440 fn cmp(&self, other: &Self) -> Ordering {
441 self.key.cmp(&other.key)
442 }
443}
444
445impl<T> HollowRunRef<T> {
446 pub fn writer_key(&self) -> Option<WriterKey> {
447 Some(self.key.split()?.0)
448 }
449}
450
451impl<T: Timestamp + Codec64> HollowRunRef<T> {
452 pub async fn set<D: Codec64 + Semigroup>(
454 shard_id: ShardId,
455 blob: &dyn Blob,
456 writer: &WriterKey,
457 data: HollowRun<T>,
458 metrics: &Metrics,
459 ) -> Self {
460 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
461 let max_part_bytes = data
462 .parts
463 .iter()
464 .map(|p| p.max_part_bytes())
465 .max()
466 .unwrap_or(0);
467 let key_lower = data
468 .parts
469 .first()
470 .map_or(vec![], |p| p.key_lower().to_vec());
471 let structured_key_lower = match data.parts.first() {
472 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
473 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
474 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
475 };
476 let diffs_sum = data
477 .parts
478 .iter()
479 .map(|p| {
480 p.diffs_sum::<D>(&metrics.columnar)
481 .expect("valid diffs sum")
482 })
483 .reduce(|mut a, b| {
484 a.plus_equals(&b);
485 a
486 })
487 .expect("valid diffs sum")
488 .encode();
489
490 let key = PartialBatchKey::new(writer, &PartId::new());
491 let blob_key = key.complete(&shard_id);
492 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
493 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
494 blob.set(&blob_key, bytes.clone())
495 })
496 .await;
497 Self {
498 key,
499 hollow_bytes,
500 max_part_bytes,
501 key_lower,
502 structured_key_lower,
503 diffs_sum: Some(diffs_sum),
504 _phantom_data: Default::default(),
505 }
506 }
507
508 pub async fn get(
512 &self,
513 shard_id: ShardId,
514 blob: &dyn Blob,
515 metrics: &Metrics,
516 ) -> Option<HollowRun<T>> {
517 let blob_key = self.key.complete(&shard_id);
518 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
519 blob.get(&blob_key)
520 })
521 .await?;
522 let proto_runs: ProtoHollowRun =
523 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
524 let runs = proto_runs
525 .into_rust()
526 .expect("illegal state: invalid encoded runs proto");
527 Some(runs)
528 }
529}
530
531#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
535#[serde(untagged)]
536pub enum RunPart<T> {
537 Single(BatchPart<T>),
538 Many(HollowRunRef<T>),
539}
540
541impl<T: Ord> PartialOrd<Self> for RunPart<T> {
542 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
543 Some(self.cmp(other))
544 }
545}
546
547impl<T: Ord> Ord for RunPart<T> {
548 fn cmp(&self, other: &Self) -> Ordering {
549 match (self, other) {
550 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
551 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
552 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
553 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
554 }
555 }
556}
557
558impl<T> RunPart<T> {
559 #[cfg(test)]
560 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
561 match self {
562 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
563 _ => panic!("expected hollow part!"),
564 }
565 }
566
567 pub fn hollow_bytes(&self) -> usize {
568 match self {
569 Self::Single(p) => p.hollow_bytes(),
570 Self::Many(r) => r.hollow_bytes,
571 }
572 }
573
574 pub fn is_inline(&self) -> bool {
575 match self {
576 Self::Single(p) => p.is_inline(),
577 Self::Many(_) => false,
578 }
579 }
580
581 pub fn inline_bytes(&self) -> usize {
582 match self {
583 Self::Single(p) => p.inline_bytes(),
584 Self::Many(_) => 0,
585 }
586 }
587
588 pub fn max_part_bytes(&self) -> usize {
589 match self {
590 Self::Single(p) => p.encoded_size_bytes(),
591 Self::Many(r) => r.max_part_bytes,
592 }
593 }
594
595 pub fn writer_key(&self) -> Option<WriterKey> {
596 match self {
597 Self::Single(p) => p.writer_key(),
598 Self::Many(r) => r.writer_key(),
599 }
600 }
601
602 pub fn encoded_size_bytes(&self) -> usize {
603 match self {
604 Self::Single(p) => p.encoded_size_bytes(),
605 Self::Many(r) => r.hollow_bytes,
606 }
607 }
608
609 pub fn schema_id(&self) -> Option<SchemaId> {
610 match self {
611 Self::Single(p) => p.schema_id(),
612 Self::Many(_) => None,
613 }
614 }
615
616 pub fn printable_name(&self) -> &str {
619 match self {
620 Self::Single(p) => p.printable_name(),
621 Self::Many(r) => r.key.0.as_str(),
622 }
623 }
624
625 pub fn stats(&self) -> Option<&LazyPartStats> {
626 match self {
627 Self::Single(p) => p.stats(),
628 Self::Many(_) => None,
630 }
631 }
632
633 pub fn key_lower(&self) -> &[u8] {
634 match self {
635 Self::Single(p) => p.key_lower(),
636 Self::Many(r) => r.key_lower.as_slice(),
637 }
638 }
639
640 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
641 match self {
642 Self::Single(p) => p.structured_key_lower(),
643 Self::Many(_) => None,
644 }
645 }
646
647 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
648 match self {
649 Self::Single(p) => p.ts_rewrite(),
650 Self::Many(_) => None,
651 }
652 }
653}
654
655impl<T> RunPart<T>
656where
657 T: Timestamp + Codec64,
658{
659 pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
660 match self {
661 Self::Single(p) => p.diffs_sum(metrics),
662 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
663 }
664 }
665}
666
667#[derive(Clone, Debug)]
669pub struct MissingBlob(BlobKey);
670
671impl std::fmt::Display for MissingBlob {
672 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
673 write!(f, "unexpectedly missing key: {}", self.0)
674 }
675}
676
677impl std::error::Error for MissingBlob {}
678
679impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
680 pub fn part_stream<'a>(
681 &'a self,
682 shard_id: ShardId,
683 blob: &'a dyn Blob,
684 metrics: &'a Metrics,
685 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
686 try_stream! {
687 match self {
688 RunPart::Single(p) => {
689 yield Cow::Borrowed(p);
690 }
691 RunPart::Many(r) => {
692 let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
693 for run_part in fetched.parts {
694 for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
695 yield Cow::Owned(batch_part?.into_owned());
696 }
697 }
698 }
699 }
700 }
701 }
702}
703
704impl<T: Ord> PartialOrd for BatchPart<T> {
705 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
706 Some(self.cmp(other))
707 }
708}
709
710impl<T: Ord> Ord for BatchPart<T> {
711 fn cmp(&self, other: &Self) -> Ordering {
712 match (self, other) {
713 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
714 (
715 BatchPart::Inline {
716 updates: s_updates,
717 ts_rewrite: s_ts_rewrite,
718 schema_id: s_schema_id,
719 deprecated_schema_id: s_deprecated_schema_id,
720 },
721 BatchPart::Inline {
722 updates: o_updates,
723 ts_rewrite: o_ts_rewrite,
724 schema_id: o_schema_id,
725 deprecated_schema_id: o_deprecated_schema_id,
726 },
727 ) => (
728 s_updates,
729 s_ts_rewrite.as_ref().map(|x| x.elements()),
730 s_schema_id,
731 s_deprecated_schema_id,
732 )
733 .cmp(&(
734 o_updates,
735 o_ts_rewrite.as_ref().map(|x| x.elements()),
736 o_schema_id,
737 o_deprecated_schema_id,
738 )),
739 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
740 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
741 }
742 }
743}
744
745#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
747pub(crate) enum RunOrder {
748 Unordered,
750 Codec,
752 Structured,
754}
755
756#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
757pub struct RunId(pub(crate) [u8; 16]);
758
759impl std::fmt::Display for RunId {
760 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
761 write!(f, "ri{}", Uuid::from_bytes(self.0))
762 }
763}
764
765impl std::fmt::Debug for RunId {
766 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
767 write!(f, "RunId({})", Uuid::from_bytes(self.0))
768 }
769}
770
771impl std::str::FromStr for RunId {
772 type Err = String;
773
774 fn from_str(s: &str) -> Result<Self, Self::Err> {
775 parse_id("ri", "RunId", s).map(RunId)
776 }
777}
778
779impl From<RunId> for String {
780 fn from(x: RunId) -> Self {
781 x.to_string()
782 }
783}
784
785impl RunId {
786 pub(crate) fn new() -> Self {
787 RunId(*Uuid::new_v4().as_bytes())
788 }
789}
790
791impl Arbitrary for RunId {
792 type Parameters = ();
793 type Strategy = proptest::strategy::BoxedStrategy<Self>;
794 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
795 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
796 RunId(*Uuid::from_u128(n).as_bytes())
797 })
798 .boxed()
799 }
800}
801
802#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
804pub struct RunMeta {
805 pub(crate) order: Option<RunOrder>,
807 pub(crate) schema: Option<SchemaId>,
809
810 pub(crate) deprecated_schema: Option<SchemaId>,
812
813 pub(crate) id: Option<RunId>,
815
816 pub(crate) len: Option<usize>,
818}
819
820#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823 pub key: PartialBatchKey,
825 pub encoded_size_bytes: usize,
827 #[serde(serialize_with = "serialize_part_bytes")]
830 pub key_lower: Vec<u8>,
831 #[serde(serialize_with = "serialize_lazy_proto")]
833 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
834 #[serde(serialize_with = "serialize_part_stats")]
836 pub stats: Option<LazyPartStats>,
837 pub ts_rewrite: Option<Antichain<T>>,
845 #[serde(serialize_with = "serialize_diffs_sum")]
853 pub diffs_sum: Option<[u8; 8]>,
854 pub format: Option<BatchColumnarFormat>,
859 pub schema_id: Option<SchemaId>,
864
865 pub deprecated_schema_id: Option<SchemaId>,
867}
868
869#[derive(Clone, PartialEq, Eq)]
873pub struct HollowBatch<T> {
874 pub desc: Description<T>,
876 pub len: usize,
878 pub(crate) parts: Vec<RunPart<T>>,
880 pub(crate) run_splits: Vec<usize>,
888 pub(crate) run_meta: Vec<RunMeta>,
891}
892
893impl<T: Debug> Debug for HollowBatch<T> {
894 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
895 let HollowBatch {
896 desc,
897 parts,
898 len,
899 run_splits: runs,
900 run_meta,
901 } = self;
902 f.debug_struct("HollowBatch")
903 .field(
904 "desc",
905 &(
906 desc.lower().elements(),
907 desc.upper().elements(),
908 desc.since().elements(),
909 ),
910 )
911 .field("parts", &parts)
912 .field("len", &len)
913 .field("runs", &runs)
914 .field("run_meta", &run_meta)
915 .finish()
916 }
917}
918
919impl<T: Serialize> serde::Serialize for HollowBatch<T> {
920 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
921 let HollowBatch {
922 desc,
923 len,
924 parts: _,
926 run_splits: _,
927 run_meta: _,
928 } = self;
929 let mut s = s.serialize_struct("HollowBatch", 5)?;
930 let () = s.serialize_field("lower", &desc.lower().elements())?;
931 let () = s.serialize_field("upper", &desc.upper().elements())?;
932 let () = s.serialize_field("since", &desc.since().elements())?;
933 let () = s.serialize_field("len", len)?;
934 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
935 s.end()
936 }
937}
938
939impl<T: Ord> PartialOrd for HollowBatch<T> {
940 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
941 Some(self.cmp(other))
942 }
943}
944
945impl<T: Ord> Ord for HollowBatch<T> {
946 fn cmp(&self, other: &Self) -> Ordering {
947 let HollowBatch {
950 desc: self_desc,
951 parts: self_parts,
952 len: self_len,
953 run_splits: self_runs,
954 run_meta: self_run_meta,
955 } = self;
956 let HollowBatch {
957 desc: other_desc,
958 parts: other_parts,
959 len: other_len,
960 run_splits: other_runs,
961 run_meta: other_run_meta,
962 } = other;
963 (
964 self_desc.lower().elements(),
965 self_desc.upper().elements(),
966 self_desc.since().elements(),
967 self_parts,
968 self_len,
969 self_runs,
970 self_run_meta,
971 )
972 .cmp(&(
973 other_desc.lower().elements(),
974 other_desc.upper().elements(),
975 other_desc.since().elements(),
976 other_parts,
977 other_len,
978 other_runs,
979 other_run_meta,
980 ))
981 }
982}
983
984impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
985 pub(crate) fn part_stream<'a>(
986 &'a self,
987 shard_id: ShardId,
988 blob: &'a dyn Blob,
989 metrics: &'a Metrics,
990 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
991 stream! {
992 for part in &self.parts {
993 for await part in part.part_stream(shard_id, blob, metrics) {
994 yield part;
995 }
996 }
997 }
998 }
999}
1000impl<T> HollowBatch<T> {
1001 pub(crate) fn new(
1008 desc: Description<T>,
1009 parts: Vec<RunPart<T>>,
1010 len: usize,
1011 run_meta: Vec<RunMeta>,
1012 run_splits: Vec<usize>,
1013 ) -> Self {
1014 debug_assert!(
1015 run_splits.is_strictly_sorted(),
1016 "run indices should be strictly increasing"
1017 );
1018 debug_assert!(
1019 run_splits.first().map_or(true, |i| *i > 0),
1020 "run indices should be positive"
1021 );
1022 debug_assert!(
1023 run_splits.last().map_or(true, |i| *i < parts.len()),
1024 "run indices should be valid indices into parts"
1025 );
1026 debug_assert!(
1027 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1028 "all metadata should correspond to a run"
1029 );
1030
1031 Self {
1032 desc,
1033 len,
1034 parts,
1035 run_splits,
1036 run_meta,
1037 }
1038 }
1039
1040 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1042 let run_meta = if parts.is_empty() {
1043 vec![]
1044 } else {
1045 vec![RunMeta::default()]
1046 };
1047 Self {
1048 desc,
1049 len,
1050 parts,
1051 run_splits: vec![],
1052 run_meta,
1053 }
1054 }
1055
1056 #[cfg(test)]
1057 pub(crate) fn new_run_for_test(
1058 desc: Description<T>,
1059 parts: Vec<RunPart<T>>,
1060 len: usize,
1061 run_id: RunId,
1062 ) -> Self {
1063 let run_meta = if parts.is_empty() {
1064 vec![]
1065 } else {
1066 let mut meta = RunMeta::default();
1067 meta.id = Some(run_id);
1068 vec![meta]
1069 };
1070 Self {
1071 desc,
1072 len,
1073 parts,
1074 run_splits: vec![],
1075 run_meta,
1076 }
1077 }
1078
1079 pub(crate) fn empty(desc: Description<T>) -> Self {
1081 Self {
1082 desc,
1083 len: 0,
1084 parts: vec![],
1085 run_splits: vec![],
1086 run_meta: vec![],
1087 }
1088 }
1089
1090 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1091 let run_ends = self
1092 .run_splits
1093 .iter()
1094 .copied()
1095 .chain(std::iter::once(self.parts.len()));
1096 let run_metas = self.run_meta.iter();
1097 let run_parts = run_ends
1098 .scan(0, |start, end| {
1099 let range = *start..end;
1100 *start = end;
1101 Some(range)
1102 })
1103 .filter(|range| !range.is_empty())
1104 .map(|range| &self.parts[range]);
1105 run_metas.zip(run_parts)
1106 }
1107
1108 pub(crate) fn inline_bytes(&self) -> usize {
1109 self.parts.iter().map(|x| x.inline_bytes()).sum()
1110 }
1111
1112 pub(crate) fn is_empty(&self) -> bool {
1113 self.parts.is_empty()
1114 }
1115
1116 pub(crate) fn part_count(&self) -> usize {
1117 self.parts.len()
1118 }
1119
1120 pub fn encoded_size_bytes(&self) -> usize {
1122 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1123 }
1124}
1125
1126impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1128 pub(crate) fn rewrite_ts(
1129 &mut self,
1130 frontier: &Antichain<T>,
1131 new_upper: Antichain<T>,
1132 ) -> Result<(), String> {
1133 if !PartialOrder::less_than(frontier, &new_upper) {
1134 return Err(format!(
1135 "rewrite frontier {:?} !< rewrite upper {:?}",
1136 frontier.elements(),
1137 new_upper.elements(),
1138 ));
1139 }
1140 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1141 return Err(format!(
1142 "rewrite upper {:?} < batch upper {:?}",
1143 new_upper.elements(),
1144 self.desc.upper().elements(),
1145 ));
1146 }
1147
1148 if PartialOrder::less_than(frontier, self.desc.lower()) {
1151 return Err(format!(
1152 "rewrite frontier {:?} < batch lower {:?}",
1153 frontier.elements(),
1154 self.desc.lower().elements(),
1155 ));
1156 }
1157 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1158 return Err(format!(
1159 "batch since {:?} != minimum antichain {:?}",
1160 self.desc.since().elements(),
1161 &[T::minimum()],
1162 ));
1163 }
1164 for part in self.parts.iter() {
1165 let Some(ts_rewrite) = part.ts_rewrite() else {
1166 continue;
1167 };
1168 if PartialOrder::less_than(frontier, ts_rewrite) {
1169 return Err(format!(
1170 "rewrite frontier {:?} < batch rewrite {:?}",
1171 frontier.elements(),
1172 ts_rewrite.elements(),
1173 ));
1174 }
1175 }
1176
1177 self.desc = Description::new(
1178 self.desc.lower().clone(),
1179 new_upper,
1180 self.desc.since().clone(),
1181 );
1182 for part in &mut self.parts {
1183 match part {
1184 RunPart::Single(BatchPart::Hollow(part)) => {
1185 part.ts_rewrite = Some(frontier.clone())
1186 }
1187 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1188 *ts_rewrite = Some(frontier.clone())
1189 }
1190 RunPart::Many(runs) => {
1191 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1194 }
1195 }
1196 }
1197 Ok(())
1198 }
1199}
1200
1201impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1202 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1203 Some(self.cmp(other))
1204 }
1205}
1206
1207impl<T: Ord> Ord for HollowBatchPart<T> {
1208 fn cmp(&self, other: &Self) -> Ordering {
1209 let HollowBatchPart {
1212 key: self_key,
1213 encoded_size_bytes: self_encoded_size_bytes,
1214 key_lower: self_key_lower,
1215 structured_key_lower: self_structured_key_lower,
1216 stats: self_stats,
1217 ts_rewrite: self_ts_rewrite,
1218 diffs_sum: self_diffs_sum,
1219 format: self_format,
1220 schema_id: self_schema_id,
1221 deprecated_schema_id: self_deprecated_schema_id,
1222 } = self;
1223 let HollowBatchPart {
1224 key: other_key,
1225 encoded_size_bytes: other_encoded_size_bytes,
1226 key_lower: other_key_lower,
1227 structured_key_lower: other_structured_key_lower,
1228 stats: other_stats,
1229 ts_rewrite: other_ts_rewrite,
1230 diffs_sum: other_diffs_sum,
1231 format: other_format,
1232 schema_id: other_schema_id,
1233 deprecated_schema_id: other_deprecated_schema_id,
1234 } = other;
1235 (
1236 self_key,
1237 self_encoded_size_bytes,
1238 self_key_lower,
1239 self_structured_key_lower,
1240 self_stats,
1241 self_ts_rewrite.as_ref().map(|x| x.elements()),
1242 self_diffs_sum,
1243 self_format,
1244 self_schema_id,
1245 self_deprecated_schema_id,
1246 )
1247 .cmp(&(
1248 other_key,
1249 other_encoded_size_bytes,
1250 other_key_lower,
1251 other_structured_key_lower,
1252 other_stats,
1253 other_ts_rewrite.as_ref().map(|x| x.elements()),
1254 other_diffs_sum,
1255 other_format,
1256 other_schema_id,
1257 other_deprecated_schema_id,
1258 ))
1259 }
1260}
1261
1262#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1264pub struct HollowRollup {
1265 pub key: PartialRollupKey,
1267 pub encoded_size_bytes: Option<usize>,
1269}
1270
1271#[derive(Debug)]
1273pub enum HollowBlobRef<'a, T> {
1274 Batch(&'a HollowBatch<T>),
1275 Rollup(&'a HollowRollup),
1276}
1277
1278#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1280pub struct ActiveRollup {
1281 pub seqno: SeqNo,
1282 pub start_ms: u64,
1283}
1284
1285#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1287pub struct ActiveGc {
1288 pub seqno: SeqNo,
1289 pub start_ms: u64,
1290}
1291
1292#[derive(Debug)]
1297#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1298pub struct NoOpStateTransition<T>(pub T);
1299
1300#[derive(Debug, Clone)]
1302#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1303pub struct StateCollections<T> {
1304 pub(crate) last_gc_req: SeqNo,
1307
1308 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1310
1311 pub(crate) active_rollup: Option<ActiveRollup>,
1313 pub(crate) active_gc: Option<ActiveGc>,
1315
1316 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1317 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1318 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1319 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1320
1321 pub(crate) trace: Trace<T>,
1326}
1327
1328#[derive(Debug, Clone, Serialize, PartialEq)]
1344pub struct EncodedSchemas {
1345 pub key: Bytes,
1347 pub key_data_type: Bytes,
1350 pub val: Bytes,
1352 pub val_data_type: Bytes,
1355}
1356
1357impl EncodedSchemas {
1358 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1359 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1360 DataType::from_proto(proto).expect("valid DataType")
1361 }
1362}
1363
1364#[derive(Debug)]
1365#[cfg_attr(test, derive(PartialEq))]
1366pub enum CompareAndAppendBreak<T> {
1367 AlreadyCommitted,
1368 Upper {
1369 shard_upper: Antichain<T>,
1370 writer_upper: Antichain<T>,
1371 },
1372 InvalidUsage(InvalidUsage<T>),
1373 InlineBackpressure,
1374}
1375
1376#[derive(Debug)]
1377#[cfg_attr(test, derive(PartialEq))]
1378pub enum SnapshotErr<T> {
1379 AsOfNotYetAvailable(SeqNo, Upper<T>),
1380 AsOfHistoricalDistinctionsLost(Since<T>),
1381}
1382
1383impl<T> StateCollections<T>
1384where
1385 T: Timestamp + Lattice + Codec64,
1386{
1387 pub fn add_rollup(
1388 &mut self,
1389 add_rollup: (SeqNo, &HollowRollup),
1390 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1391 let (rollup_seqno, rollup) = add_rollup;
1392 let applied = match self.rollups.get(&rollup_seqno) {
1393 Some(x) => x.key == rollup.key,
1394 None => {
1395 self.active_rollup = None;
1396 self.rollups.insert(rollup_seqno, rollup.to_owned());
1397 true
1398 }
1399 };
1400 Continue(applied)
1404 }
1405
1406 pub fn remove_rollups(
1407 &mut self,
1408 remove_rollups: &[(SeqNo, PartialRollupKey)],
1409 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1410 if remove_rollups.is_empty() || self.is_tombstone() {
1411 return Break(NoOpStateTransition(vec![]));
1412 }
1413
1414 self.active_gc = None;
1417
1418 let mut removed = vec![];
1419 for (seqno, key) in remove_rollups {
1420 let removed_key = self.rollups.remove(seqno);
1421 debug_assert!(
1422 removed_key.as_ref().map_or(true, |x| &x.key == key),
1423 "{} vs {:?}",
1424 key,
1425 removed_key
1426 );
1427
1428 if removed_key.is_some() {
1429 removed.push(*seqno);
1430 }
1431 }
1432
1433 Continue(removed)
1434 }
1435
1436 pub fn register_leased_reader(
1437 &mut self,
1438 hostname: &str,
1439 reader_id: &LeasedReaderId,
1440 purpose: &str,
1441 seqno: SeqNo,
1442 lease_duration: Duration,
1443 heartbeat_timestamp_ms: u64,
1444 use_critical_since: bool,
1445 ) -> ControlFlow<
1446 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1447 (LeasedReaderState<T>, SeqNo),
1448 > {
1449 let since = if use_critical_since {
1450 self.critical_since()
1451 .unwrap_or_else(|| self.trace.since().clone())
1452 } else {
1453 self.trace.since().clone()
1454 };
1455 let reader_state = LeasedReaderState {
1456 debug: HandleDebugState {
1457 hostname: hostname.to_owned(),
1458 purpose: purpose.to_owned(),
1459 },
1460 seqno,
1461 since,
1462 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1463 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1464 .expect("lease duration as millis should fit within u64"),
1465 };
1466
1467 if self.is_tombstone() {
1472 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1473 }
1474
1475 self.leased_readers
1477 .insert(reader_id.clone(), reader_state.clone());
1478 Continue((reader_state, self.seqno_since(seqno)))
1479 }
1480
1481 pub fn register_critical_reader<O: Opaque + Codec64>(
1482 &mut self,
1483 hostname: &str,
1484 reader_id: &CriticalReaderId,
1485 purpose: &str,
1486 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1487 let state = CriticalReaderState {
1488 debug: HandleDebugState {
1489 hostname: hostname.to_owned(),
1490 purpose: purpose.to_owned(),
1491 },
1492 since: self.trace.since().clone(),
1493 opaque: OpaqueState(Codec64::encode(&O::initial())),
1494 opaque_codec: O::codec_name(),
1495 };
1496
1497 if self.is_tombstone() {
1502 return Break(NoOpStateTransition(state));
1503 }
1504
1505 let state = match self.critical_readers.get_mut(reader_id) {
1506 Some(existing_state) => {
1507 existing_state.debug = state.debug;
1508 existing_state.clone()
1509 }
1510 None => {
1511 self.critical_readers
1512 .insert(reader_id.clone(), state.clone());
1513 state
1514 }
1515 };
1516 Continue(state)
1517 }
1518
1519 pub fn register_schema<K: Codec, V: Codec>(
1520 &mut self,
1521 key_schema: &K::Schema,
1522 val_schema: &V::Schema,
1523 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1524 fn encode_data_type(data_type: &DataType) -> Bytes {
1525 let proto = data_type.into_proto();
1526 prost::Message::encode_to_vec(&proto).into()
1527 }
1528
1529 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1541 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1542 });
1543 match existing_id {
1544 Some((schema_id, _)) => {
1545 Break(NoOpStateTransition(Some(*schema_id)))
1550 }
1551 None if self.is_tombstone() => {
1552 Break(NoOpStateTransition(None))
1554 }
1555 None if self.schemas.is_empty() => {
1556 let id = SchemaId(self.schemas.len());
1560 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1561 .expect("valid key schema");
1562 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1563 .expect("valid val schema");
1564 let prev = self.schemas.insert(
1565 id,
1566 EncodedSchemas {
1567 key: K::encode_schema(key_schema),
1568 key_data_type: encode_data_type(&key_data_type),
1569 val: V::encode_schema(val_schema),
1570 val_data_type: encode_data_type(&val_data_type),
1571 },
1572 );
1573 assert_eq!(prev, None);
1574 Continue(Some(id))
1575 }
1576 None => {
1577 info!(
1578 "register_schemas got {:?} expected {:?}",
1579 key_schema,
1580 self.schemas
1581 .iter()
1582 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1583 .collect::<Vec<_>>()
1584 );
1585 Break(NoOpStateTransition(None))
1588 }
1589 }
1590 }
1591
1592 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1593 &mut self,
1594 expected: SchemaId,
1595 key_schema: &K::Schema,
1596 val_schema: &V::Schema,
1597 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1598 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1599 let array = Schema::encoder(schema).expect("valid schema").finish();
1603 Array::data_type(&array).clone()
1604 }
1605
1606 let (current_id, current) = self
1607 .schemas
1608 .last_key_value()
1609 .expect("all shards have a schema");
1610 if *current_id != expected {
1611 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1612 schema_id: *current_id,
1613 key: K::decode_schema(¤t.key),
1614 val: V::decode_schema(¤t.val),
1615 }));
1616 }
1617
1618 let current_key = K::decode_schema(¤t.key);
1619 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1620 let current_val = V::decode_schema(¤t.val);
1621 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1622
1623 let key_dt = data_type(key_schema);
1624 let val_dt = data_type(val_schema);
1625
1626 if current_key == *key_schema
1628 && current_key_dt == key_dt
1629 && current_val == *val_schema
1630 && current_val_dt == val_dt
1631 {
1632 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1633 }
1634
1635 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1636 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1637 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1638 return Break(NoOpStateTransition(CaESchema::Incompatible));
1639 };
1640 if key_fn.contains_drop() || val_fn.contains_drop() {
1644 return Break(NoOpStateTransition(CaESchema::Incompatible));
1645 }
1646
1647 let id = SchemaId(self.schemas.len());
1651 self.schemas.insert(
1652 id,
1653 EncodedSchemas {
1654 key: K::encode_schema(key_schema),
1655 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1656 val: V::encode_schema(val_schema),
1657 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1658 },
1659 );
1660 Continue(CaESchema::Ok(id))
1661 }
1662
1663 pub fn compare_and_append(
1664 &mut self,
1665 batch: &HollowBatch<T>,
1666 writer_id: &WriterId,
1667 heartbeat_timestamp_ms: u64,
1668 lease_duration_ms: u64,
1669 idempotency_token: &IdempotencyToken,
1670 debug_info: &HandleDebugState,
1671 inline_writes_total_max_bytes: usize,
1672 claim_compaction_percent: usize,
1673 claim_compaction_min_version: Option<&Version>,
1674 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1675 if self.is_tombstone() {
1680 assert_eq!(self.trace.upper(), &Antichain::new());
1681 return Break(CompareAndAppendBreak::Upper {
1682 shard_upper: Antichain::new(),
1683 writer_upper: Antichain::new(),
1688 });
1689 }
1690
1691 let writer_state = self
1692 .writers
1693 .entry(writer_id.clone())
1694 .or_insert_with(|| WriterState {
1695 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1696 lease_duration_ms,
1697 most_recent_write_token: IdempotencyToken::SENTINEL,
1698 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1699 debug: debug_info.clone(),
1700 });
1701
1702 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1703 return Break(CompareAndAppendBreak::InvalidUsage(
1704 InvalidUsage::InvalidBounds {
1705 lower: batch.desc.lower().clone(),
1706 upper: batch.desc.upper().clone(),
1707 },
1708 ));
1709 }
1710
1711 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1714 return Break(CompareAndAppendBreak::InvalidUsage(
1715 InvalidUsage::InvalidEmptyTimeInterval {
1716 lower: batch.desc.lower().clone(),
1717 upper: batch.desc.upper().clone(),
1718 keys: batch
1719 .parts
1720 .iter()
1721 .map(|x| x.printable_name().to_owned())
1722 .collect(),
1723 },
1724 ));
1725 }
1726
1727 if idempotency_token == &writer_state.most_recent_write_token {
1728 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1733 assert!(
1734 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1735 "{:?} vs {:?}",
1736 batch.desc.upper(),
1737 self.trace.upper()
1738 );
1739 return Break(CompareAndAppendBreak::AlreadyCommitted);
1740 }
1741
1742 let shard_upper = self.trace.upper();
1743 if shard_upper != batch.desc.lower() {
1744 return Break(CompareAndAppendBreak::Upper {
1745 shard_upper: shard_upper.clone(),
1746 writer_upper: writer_state.most_recent_write_upper.clone(),
1747 });
1748 }
1749
1750 let new_inline_bytes = batch.inline_bytes();
1751 if new_inline_bytes > 0 {
1752 let mut existing_inline_bytes = 0;
1753 self.trace
1754 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1755 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1759 return Break(CompareAndAppendBreak::InlineBackpressure);
1760 }
1761 }
1762
1763 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1764 self.trace.push_batch(batch.clone())
1765 } else {
1766 Vec::new()
1767 };
1768
1769 let all_empty_reqs = merge_reqs
1772 .iter()
1773 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1774 if all_empty_reqs && !batch.is_empty() {
1775 let mut reqs_to_take = claim_compaction_percent / 100;
1776 if (usize::cast_from(idempotency_token.hashed()) % 100)
1777 < (claim_compaction_percent % 100)
1778 {
1779 reqs_to_take += 1;
1780 }
1781 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1782 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1783 merge_reqs.extend(
1784 self.trace
1787 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1788 .take(reqs_to_take),
1789 )
1790 }
1791
1792 for req in &merge_reqs {
1793 self.trace.claim_compaction(
1794 req.id,
1795 ActiveCompaction {
1796 start_ms: heartbeat_timestamp_ms,
1797 },
1798 )
1799 }
1800
1801 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1802 writer_state.most_recent_write_token = idempotency_token.clone();
1803 assert!(
1805 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1806 "{:?} vs {:?}",
1807 &writer_state.most_recent_write_upper,
1808 batch.desc.upper()
1809 );
1810 writer_state
1811 .most_recent_write_upper
1812 .clone_from(batch.desc.upper());
1813
1814 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1816 heartbeat_timestamp_ms,
1817 writer_state.last_heartbeat_timestamp_ms,
1818 );
1819
1820 Continue(merge_reqs)
1821 }
1822
1823 pub fn apply_merge_res<D: Codec64 + Semigroup + PartialEq>(
1824 &mut self,
1825 res: &FueledMergeRes<T>,
1826 metrics: &ColumnarMetrics,
1827 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1828 if self.is_tombstone() {
1833 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1834 }
1835
1836 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1837 Continue(apply_merge_result)
1838 }
1839
1840 pub fn apply_merge_res_classic<D: Codec64 + Semigroup + PartialEq>(
1841 &mut self,
1842 res: &FueledMergeRes<T>,
1843 metrics: &ColumnarMetrics,
1844 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1845 if self.is_tombstone() {
1850 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1851 }
1852
1853 let apply_merge_result = self
1854 .trace
1855 .apply_merge_res_checked_classic::<D>(res, metrics);
1856 Continue(apply_merge_result)
1857 }
1858
1859 pub fn spine_exert(
1860 &mut self,
1861 fuel: usize,
1862 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1863 let (merge_reqs, did_work) = self.trace.exert(fuel);
1864 if did_work {
1865 Continue(merge_reqs)
1866 } else {
1867 assert!(merge_reqs.is_empty());
1868 Break(NoOpStateTransition(Vec::new()))
1871 }
1872 }
1873
1874 pub fn downgrade_since(
1875 &mut self,
1876 reader_id: &LeasedReaderId,
1877 seqno: SeqNo,
1878 outstanding_seqno: Option<SeqNo>,
1879 new_since: &Antichain<T>,
1880 heartbeat_timestamp_ms: u64,
1881 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1882 if self.is_tombstone() {
1887 return Break(NoOpStateTransition(Since(Antichain::new())));
1888 }
1889
1890 let Some(reader_state) = self.leased_reader(reader_id) else {
1893 tracing::warn!(
1894 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1895 );
1896 return Break(NoOpStateTransition(Since(Antichain::new())));
1897 };
1898
1899 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1902 heartbeat_timestamp_ms,
1903 reader_state.last_heartbeat_timestamp_ms,
1904 );
1905
1906 let seqno = match outstanding_seqno {
1907 Some(outstanding_seqno) => {
1908 assert!(
1909 outstanding_seqno >= reader_state.seqno,
1910 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1911 is behind current reader_state ({:?})",
1912 outstanding_seqno,
1913 reader_state.seqno,
1914 );
1915 std::cmp::min(outstanding_seqno, seqno)
1916 }
1917 None => seqno,
1918 };
1919
1920 reader_state.seqno = seqno;
1921
1922 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1923 reader_state.since.clone_from(new_since);
1924 self.update_since();
1925 new_since.clone()
1926 } else {
1927 reader_state.since.clone()
1930 };
1931
1932 Continue(Since(reader_current_since))
1933 }
1934
1935 pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1936 &mut self,
1937 reader_id: &CriticalReaderId,
1938 expected_opaque: &O,
1939 (new_opaque, new_since): (&O, &Antichain<T>),
1940 ) -> ControlFlow<
1941 NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1942 Result<Since<T>, (O, Since<T>)>,
1943 > {
1944 if self.is_tombstone() {
1949 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1953 }
1954
1955 let reader_state = self.critical_reader(reader_id);
1956 assert_eq!(reader_state.opaque_codec, O::codec_name());
1957
1958 if &O::decode(reader_state.opaque.0) != expected_opaque {
1959 return Continue(Err((
1962 Codec64::decode(reader_state.opaque.0),
1963 Since(reader_state.since.clone()),
1964 )));
1965 }
1966
1967 reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1968 if PartialOrder::less_equal(&reader_state.since, new_since) {
1969 reader_state.since.clone_from(new_since);
1970 self.update_since();
1971 Continue(Ok(Since(new_since.clone())))
1972 } else {
1973 Continue(Ok(Since(reader_state.since.clone())))
1977 }
1978 }
1979
1980 pub fn heartbeat_leased_reader(
1981 &mut self,
1982 reader_id: &LeasedReaderId,
1983 heartbeat_timestamp_ms: u64,
1984 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1985 if self.is_tombstone() {
1990 return Break(NoOpStateTransition(false));
1991 }
1992
1993 match self.leased_readers.get_mut(reader_id) {
1994 Some(reader_state) => {
1995 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1996 heartbeat_timestamp_ms,
1997 reader_state.last_heartbeat_timestamp_ms,
1998 );
1999 Continue(true)
2000 }
2001 None => Continue(false),
2004 }
2005 }
2006
2007 pub fn expire_leased_reader(
2008 &mut self,
2009 reader_id: &LeasedReaderId,
2010 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2011 if self.is_tombstone() {
2016 return Break(NoOpStateTransition(false));
2017 }
2018
2019 let existed = self.leased_readers.remove(reader_id).is_some();
2020 if existed {
2021 }
2035 Continue(existed)
2038 }
2039
2040 pub fn expire_critical_reader(
2041 &mut self,
2042 reader_id: &CriticalReaderId,
2043 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2044 if self.is_tombstone() {
2049 return Break(NoOpStateTransition(false));
2050 }
2051
2052 let existed = self.critical_readers.remove(reader_id).is_some();
2053 if existed {
2054 }
2068 Continue(existed)
2072 }
2073
2074 pub fn expire_writer(
2075 &mut self,
2076 writer_id: &WriterId,
2077 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2078 if self.is_tombstone() {
2083 return Break(NoOpStateTransition(false));
2084 }
2085
2086 let existed = self.writers.remove(writer_id).is_some();
2087 Continue(existed)
2091 }
2092
2093 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2094 self.leased_readers.get_mut(id)
2095 }
2096
2097 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2098 self.critical_readers
2099 .get_mut(id)
2100 .unwrap_or_else(|| {
2101 panic!(
2102 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2103 id
2104 )
2105 })
2106 }
2107
2108 fn critical_since(&self) -> Option<Antichain<T>> {
2109 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2110 let mut since = critical_sinces.next().cloned()?;
2111 for s in critical_sinces {
2112 since.meet_assign(s);
2113 }
2114 Some(since)
2115 }
2116
2117 fn update_since(&mut self) {
2118 let mut sinces_iter = self
2119 .leased_readers
2120 .values()
2121 .map(|x| &x.since)
2122 .chain(self.critical_readers.values().map(|x| &x.since));
2123 let mut since = match sinces_iter.next() {
2124 Some(since) => since.clone(),
2125 None => {
2126 return;
2129 }
2130 };
2131 while let Some(s) = sinces_iter.next() {
2132 since.meet_assign(s);
2133 }
2134 self.trace.downgrade_since(&since);
2135 }
2136
2137 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2138 let mut seqno_since = seqno;
2139 for cap in self.leased_readers.values() {
2140 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2141 }
2142 seqno_since
2144 }
2145
2146 fn tombstone_batch() -> HollowBatch<T> {
2147 HollowBatch::empty(Description::new(
2148 Antichain::from_elem(T::minimum()),
2149 Antichain::new(),
2150 Antichain::new(),
2151 ))
2152 }
2153
2154 pub(crate) fn is_tombstone(&self) -> bool {
2155 self.trace.upper().is_empty()
2156 && self.trace.since().is_empty()
2157 && self.writers.is_empty()
2158 && self.leased_readers.is_empty()
2159 && self.critical_readers.is_empty()
2160 }
2161
2162 pub(crate) fn is_single_empty_batch(&self) -> bool {
2163 let mut batch_count = 0;
2164 let mut is_empty = true;
2165 self.trace.map_batches(|b| {
2166 batch_count += 1;
2167 is_empty &= b.is_empty()
2168 });
2169 batch_count <= 1 && is_empty
2170 }
2171
2172 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2173 assert_eq!(self.trace.upper(), &Antichain::new());
2174 assert_eq!(self.trace.since(), &Antichain::new());
2175
2176 let was_tombstone = self.is_tombstone();
2179
2180 self.writers.clear();
2182 self.leased_readers.clear();
2183 self.critical_readers.clear();
2184
2185 debug_assert!(self.is_tombstone());
2186
2187 let mut to_replace = None;
2196 let mut batch_count = 0;
2197 self.trace.map_batches(|b| {
2198 batch_count += 1;
2199 if !b.is_empty() && to_replace.is_none() {
2200 to_replace = Some(b.desc.clone());
2201 }
2202 });
2203 if let Some(desc) = to_replace {
2204 let result = self.trace.apply_tombstone_merge(&desc);
2208 assert!(
2209 result.matched(),
2210 "merge with a matching desc should always match"
2211 );
2212 Continue(())
2213 } else if batch_count > 1 {
2214 let mut new_trace = Trace::default();
2219 new_trace.downgrade_since(&Antichain::new());
2220 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2221 assert_eq!(merge_reqs, Vec::new());
2222 self.trace = new_trace;
2223 Continue(())
2224 } else if !was_tombstone {
2225 Continue(())
2228 } else {
2229 Break(NoOpStateTransition(()))
2232 }
2233 }
2234}
2235
2236#[derive(Debug)]
2238#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2239pub struct State<T> {
2240 pub(crate) applier_version: semver::Version,
2241 pub(crate) shard_id: ShardId,
2242
2243 pub(crate) seqno: SeqNo,
2244 pub(crate) walltime_ms: u64,
2247 pub(crate) hostname: String,
2250 pub(crate) collections: StateCollections<T>,
2251}
2252
2253pub struct TypedState<K, V, T, D> {
2256 pub(crate) state: State<T>,
2257
2258 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2266}
2267
2268impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2269 #[cfg(any(test, debug_assertions))]
2270 pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2271 TypedState {
2272 state: State {
2273 applier_version,
2274 shard_id: self.shard_id.clone(),
2275 seqno: self.seqno.clone(),
2276 walltime_ms: self.walltime_ms,
2277 hostname,
2278 collections: self.collections.clone(),
2279 },
2280 _phantom: PhantomData,
2281 }
2282 }
2283
2284 pub(crate) fn clone_for_rollup(&self) -> Self {
2285 TypedState {
2286 state: State {
2287 applier_version: self.applier_version.clone(),
2288 shard_id: self.shard_id.clone(),
2289 seqno: self.seqno.clone(),
2290 walltime_ms: self.walltime_ms,
2291 hostname: self.hostname.clone(),
2292 collections: self.collections.clone(),
2293 },
2294 _phantom: PhantomData,
2295 }
2296 }
2297}
2298
2299impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2300 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2301 let TypedState { state, _phantom } = self;
2304 f.debug_struct("TypedState").field("state", state).finish()
2305 }
2306}
2307
2308#[cfg(any(test, debug_assertions))]
2310impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2311 fn eq(&self, other: &Self) -> bool {
2312 let TypedState {
2315 state: self_state,
2316 _phantom,
2317 } = self;
2318 let TypedState {
2319 state: other_state,
2320 _phantom,
2321 } = other;
2322 self_state == other_state
2323 }
2324}
2325
2326impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2327 type Target = State<T>;
2328
2329 fn deref(&self) -> &Self::Target {
2330 &self.state
2331 }
2332}
2333
2334impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2335 fn deref_mut(&mut self) -> &mut Self::Target {
2336 &mut self.state
2337 }
2338}
2339
2340impl<K, V, T, D> TypedState<K, V, T, D>
2341where
2342 K: Codec,
2343 V: Codec,
2344 T: Timestamp + Lattice + Codec64,
2345 D: Codec64,
2346{
2347 pub fn new(
2348 applier_version: Version,
2349 shard_id: ShardId,
2350 hostname: String,
2351 walltime_ms: u64,
2352 ) -> Self {
2353 let state = State {
2354 applier_version,
2355 shard_id,
2356 seqno: SeqNo::minimum(),
2357 walltime_ms,
2358 hostname,
2359 collections: StateCollections {
2360 last_gc_req: SeqNo::minimum(),
2361 rollups: BTreeMap::new(),
2362 active_rollup: None,
2363 active_gc: None,
2364 leased_readers: BTreeMap::new(),
2365 critical_readers: BTreeMap::new(),
2366 writers: BTreeMap::new(),
2367 schemas: BTreeMap::new(),
2368 trace: Trace::default(),
2369 },
2370 };
2371 TypedState {
2372 state,
2373 _phantom: PhantomData,
2374 }
2375 }
2376
2377 pub fn clone_apply<R, E, WorkFn>(
2378 &self,
2379 cfg: &PersistConfig,
2380 work_fn: &mut WorkFn,
2381 ) -> ControlFlow<E, (R, Self)>
2382 where
2383 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2384 {
2385 let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2390 let mut new_state = State {
2391 applier_version: new_applier_version.clone(),
2392 shard_id: self.shard_id,
2393 seqno: self.seqno.next(),
2394 walltime_ms: (cfg.now)(),
2395 hostname: cfg.hostname.clone(),
2396 collections: self.collections.clone(),
2397 };
2398 if new_state.walltime_ms <= self.walltime_ms {
2401 new_state.walltime_ms = self.walltime_ms + 1;
2402 }
2403
2404 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2405 let new_state = TypedState {
2406 state: new_state,
2407 _phantom: PhantomData,
2408 };
2409 Continue((work_ret, new_state))
2410 }
2411}
2412
2413#[derive(Copy, Clone, Debug)]
2414pub struct GcConfig {
2415 pub use_active_gc: bool,
2416 pub fallback_threshold_ms: u64,
2417 pub min_versions: usize,
2418 pub max_versions: usize,
2419}
2420
2421impl<T> State<T>
2422where
2423 T: Timestamp + Lattice + Codec64,
2424{
2425 pub fn shard_id(&self) -> ShardId {
2426 self.shard_id
2427 }
2428
2429 pub fn seqno(&self) -> SeqNo {
2430 self.seqno
2431 }
2432
2433 pub fn since(&self) -> &Antichain<T> {
2434 self.collections.trace.since()
2435 }
2436
2437 pub fn upper(&self) -> &Antichain<T> {
2438 self.collections.trace.upper()
2439 }
2440
2441 pub fn spine_batch_count(&self) -> usize {
2442 self.collections.trace.num_spine_batches()
2443 }
2444
2445 pub fn size_metrics(&self) -> StateSizeMetrics {
2446 let mut ret = StateSizeMetrics::default();
2447 self.blobs().for_each(|x| match x {
2448 HollowBlobRef::Batch(x) => {
2449 ret.hollow_batch_count += 1;
2450 ret.batch_part_count += x.part_count();
2451 ret.num_updates += x.len;
2452
2453 let batch_size = x.encoded_size_bytes();
2454 for x in x.parts.iter() {
2455 if x.ts_rewrite().is_some() {
2456 ret.rewrite_part_count += 1;
2457 }
2458 if x.is_inline() {
2459 ret.inline_part_count += 1;
2460 ret.inline_part_bytes += x.inline_bytes();
2461 }
2462 }
2463 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2464 ret.state_batches_bytes += batch_size;
2465 }
2466 HollowBlobRef::Rollup(x) => {
2467 ret.state_rollup_count += 1;
2468 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2469 }
2470 });
2471 ret
2472 }
2473
2474 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2475 self.collections
2478 .rollups
2479 .iter()
2480 .rev()
2481 .next()
2482 .expect("State should have at least one rollup if seqno > minimum")
2483 }
2484
2485 pub(crate) fn seqno_since(&self) -> SeqNo {
2486 self.collections.seqno_since(self.seqno)
2487 }
2488
2489 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2501 let GcConfig {
2502 use_active_gc,
2503 fallback_threshold_ms,
2504 min_versions,
2505 max_versions,
2506 } = cfg;
2507 let gc_threshold = if use_active_gc {
2511 u64::cast_from(min_versions)
2512 } else {
2513 std::cmp::max(
2514 1,
2515 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2516 )
2517 };
2518 let new_seqno_since = self.seqno_since();
2519 let gc_until_seqno = new_seqno_since.min(SeqNo(
2522 self.collections
2523 .last_gc_req
2524 .0
2525 .saturating_add(u64::cast_from(max_versions)),
2526 ));
2527 let should_gc = new_seqno_since
2528 .0
2529 .saturating_sub(self.collections.last_gc_req.0)
2530 >= gc_threshold;
2531
2532 let should_gc = if use_active_gc && !should_gc {
2535 match self.collections.active_gc {
2536 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2537 None => false,
2538 }
2539 } else {
2540 should_gc
2541 };
2542 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2545 let tombstone_needs_gc = self.collections.is_tombstone();
2550 let should_gc = should_gc || tombstone_needs_gc;
2551 let should_gc = if use_active_gc {
2552 should_gc
2556 && match self.collections.active_gc {
2557 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2558 None => true,
2559 }
2560 } else {
2561 should_gc
2562 };
2563 if should_gc {
2564 self.collections.last_gc_req = gc_until_seqno;
2565 Some(GcReq {
2566 shard_id: self.shard_id,
2567 new_seqno_since: gc_until_seqno,
2568 })
2569 } else {
2570 None
2571 }
2572 }
2573
2574 pub fn seqnos_held(&self) -> usize {
2576 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2577 }
2578
2579 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2581 let mut metrics = ExpiryMetrics::default();
2582 let shard_id = self.shard_id();
2583 self.collections.leased_readers.retain(|id, state| {
2584 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2585 if !retain {
2586 info!(
2587 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2588 state.debug.purpose
2589 );
2590 metrics.readers_expired += 1;
2591 }
2592 retain
2593 });
2594 self.collections.writers.retain(|id, state| {
2596 let retain =
2597 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2598 if !retain {
2599 info!(
2600 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2601 state.debug.purpose
2602 );
2603 metrics.writers_expired += 1;
2604 }
2605 retain
2606 });
2607 metrics
2608 }
2609
2610 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2614 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2615 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2616 self.collections.trace.since().clone(),
2617 )));
2618 }
2619 let upper = self.collections.trace.upper();
2620 if PartialOrder::less_equal(upper, as_of) {
2621 return Err(SnapshotErr::AsOfNotYetAvailable(
2622 self.seqno,
2623 Upper(upper.clone()),
2624 ));
2625 }
2626
2627 let batches = self
2628 .collections
2629 .trace
2630 .batches()
2631 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2632 .cloned()
2633 .collect();
2634 Ok(batches)
2635 }
2636
2637 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2639 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2640 return Err(Since(self.collections.trace.since().clone()));
2641 }
2642 Ok(())
2643 }
2644
2645 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2646 self.collections
2649 .trace
2650 .batches()
2651 .find(|b| {
2652 PartialOrder::less_equal(b.desc.lower(), frontier)
2653 && PartialOrder::less_than(frontier, b.desc.upper())
2654 })
2655 .cloned()
2656 .ok_or(self.seqno)
2657 }
2658
2659 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2660 self.collections.active_rollup
2661 }
2662
2663 pub fn need_rollup(
2664 &self,
2665 threshold: usize,
2666 use_active_rollup: bool,
2667 fallback_threshold_ms: u64,
2668 now: u64,
2669 ) -> Option<SeqNo> {
2670 let (latest_rollup_seqno, _) = self.latest_rollup();
2671
2672 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2678 return Some(self.seqno);
2679 }
2680
2681 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2682
2683 if use_active_rollup {
2684 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2690 match self.active_rollup() {
2691 Some(active_rollup) => {
2692 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2693 return Some(self.seqno);
2694 }
2695 }
2696 None => {
2697 return Some(self.seqno);
2698 }
2699 }
2700 }
2701 } else {
2702 if seqnos_since_last_rollup > 0
2706 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2707 {
2708 return Some(self.seqno);
2709 }
2710
2711 if seqnos_since_last_rollup
2714 > u64::cast_from(
2715 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2716 )
2717 {
2718 return Some(self.seqno);
2719 }
2720 }
2721
2722 None
2723 }
2724
2725 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2726 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2727 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2728 batches.chain(rollups)
2729 }
2730}
2731
2732fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2733 let val = hex::encode(val);
2734 val.serialize(s)
2735}
2736
2737fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2738 val: &Option<LazyProto<T>>,
2739 s: S,
2740) -> Result<S::Ok, S::Error> {
2741 val.as_ref()
2742 .map(|lazy| hex::encode(&lazy.into_proto()))
2743 .serialize(s)
2744}
2745
2746fn serialize_part_stats<S: Serializer>(
2747 val: &Option<LazyPartStats>,
2748 s: S,
2749) -> Result<S::Ok, S::Error> {
2750 let val = val.as_ref().map(|x| x.decode().key);
2751 val.serialize(s)
2752}
2753
2754fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2755 let val = val.map(i64::decode);
2757 val.serialize(s)
2758}
2759
2760impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2766 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2767 let State {
2768 applier_version,
2769 shard_id,
2770 seqno,
2771 walltime_ms,
2772 hostname,
2773 collections:
2774 StateCollections {
2775 last_gc_req,
2776 rollups,
2777 active_rollup,
2778 active_gc,
2779 leased_readers,
2780 critical_readers,
2781 writers,
2782 schemas,
2783 trace,
2784 },
2785 } = self;
2786 let mut s = s.serialize_struct("State", 13)?;
2787 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2788 let () = s.serialize_field("shard_id", shard_id)?;
2789 let () = s.serialize_field("seqno", seqno)?;
2790 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2791 let () = s.serialize_field("hostname", hostname)?;
2792 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2793 let () = s.serialize_field("rollups", rollups)?;
2794 let () = s.serialize_field("active_rollup", active_rollup)?;
2795 let () = s.serialize_field("active_gc", active_gc)?;
2796 let () = s.serialize_field("leased_readers", leased_readers)?;
2797 let () = s.serialize_field("critical_readers", critical_readers)?;
2798 let () = s.serialize_field("writers", writers)?;
2799 let () = s.serialize_field("schemas", schemas)?;
2800 let () = s.serialize_field("since", &trace.since().elements())?;
2801 let () = s.serialize_field("upper", &trace.upper().elements())?;
2802 let trace = trace.flatten();
2803 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2804 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2805 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2806 let () = s.serialize_field("merges", &trace.merges)?;
2807 s.end()
2808 }
2809}
2810
2811#[derive(Debug, Default)]
2812pub struct StateSizeMetrics {
2813 pub hollow_batch_count: usize,
2814 pub batch_part_count: usize,
2815 pub rewrite_part_count: usize,
2816 pub num_updates: usize,
2817 pub largest_batch_bytes: usize,
2818 pub state_batches_bytes: usize,
2819 pub state_rollups_bytes: usize,
2820 pub state_rollup_count: usize,
2821 pub inline_part_count: usize,
2822 pub inline_part_bytes: usize,
2823}
2824
2825#[derive(Default)]
2826pub struct ExpiryMetrics {
2827 pub(crate) readers_expired: usize,
2828 pub(crate) writers_expired: usize,
2829}
2830
2831#[derive(Debug, Clone, PartialEq)]
2833pub struct Since<T>(pub Antichain<T>);
2834
2835#[derive(Debug, PartialEq)]
2837pub struct Upper<T>(pub Antichain<T>);
2838
2839#[cfg(test)]
2840pub(crate) mod tests {
2841 use std::ops::Range;
2842 use std::str::FromStr;
2843
2844 use bytes::Bytes;
2845 use mz_build_info::DUMMY_BUILD_INFO;
2846 use mz_dyncfg::ConfigUpdates;
2847 use mz_ore::now::SYSTEM_TIME;
2848 use mz_ore::{assert_none, assert_ok};
2849 use mz_proto::RustType;
2850 use proptest::prelude::*;
2851 use proptest::strategy::ValueTree;
2852
2853 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2854 use crate::PersistLocation;
2855 use crate::cache::PersistClientCache;
2856 use crate::internal::encoding::any_some_lazy_part_stats;
2857 use crate::internal::paths::RollupId;
2858 use crate::internal::trace::tests::any_trace;
2859 use crate::tests::new_test_client_cache;
2860
2861 use super::*;
2862
2863 const LEASE_DURATION_MS: u64 = 900 * 1000;
2864 fn debug_state() -> HandleDebugState {
2865 HandleDebugState {
2866 hostname: "debug".to_owned(),
2867 purpose: "finding the bugs".to_owned(),
2868 }
2869 }
2870
2871 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2872 num_runs: usize,
2873 ) -> impl Strategy<Value = HollowBatch<T>> {
2874 (
2875 any::<T>(),
2876 any::<T>(),
2877 any::<T>(),
2878 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2879 any::<usize>(),
2880 )
2881 .prop_map(move |(t0, t1, since, parts, len)| {
2882 let (lower, upper) = if t0 <= t1 {
2883 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2884 } else {
2885 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2886 };
2887 let since = Antichain::from_elem(since);
2888
2889 let run_splits = (1..num_runs)
2890 .map(|i| i * parts.len() / num_runs)
2891 .collect::<Vec<_>>();
2892
2893 let run_meta = (0..num_runs)
2894 .map(|_| {
2895 let mut meta = RunMeta::default();
2896 meta.id = Some(RunId::new());
2897 meta
2898 })
2899 .collect::<Vec<_>>();
2900
2901 HollowBatch::new(
2902 Description::new(lower, upper, since),
2903 parts,
2904 len % 10,
2905 run_meta,
2906 run_splits,
2907 )
2908 })
2909 }
2910
2911 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2912 Strategy::prop_map(
2913 (
2914 any::<T>(),
2915 any::<T>(),
2916 any::<T>(),
2917 proptest::collection::vec(any_run_part::<T>(), 0..20),
2918 any::<usize>(),
2919 0..=10usize,
2920 proptest::collection::vec(any::<RunId>(), 10),
2921 ),
2922 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2923 let (lower, upper) = if t0 <= t1 {
2924 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2925 } else {
2926 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2927 };
2928 let since = Antichain::from_elem(since);
2929 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2930 let run_splits = (1..num_runs)
2931 .map(|i| i * parts.len() / num_runs)
2932 .collect::<Vec<_>>();
2933
2934 let run_meta = (0..num_runs)
2935 .enumerate()
2936 .map(|(i, _)| {
2937 let mut meta = RunMeta::default();
2938 meta.id = Some(run_ids[i]);
2939 meta
2940 })
2941 .collect::<Vec<_>>();
2942
2943 HollowBatch::new(
2944 Description::new(lower, upper, since),
2945 parts,
2946 len % 10,
2947 run_meta,
2948 run_splits,
2949 )
2950 } else {
2951 HollowBatch::new_run_for_test(
2952 Description::new(lower, upper, since),
2953 parts,
2954 len % 10,
2955 run_ids[0],
2956 )
2957 }
2958 },
2959 )
2960 }
2961
2962 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2963 Strategy::prop_map(
2964 (
2965 any::<bool>(),
2966 any_hollow_batch_part(),
2967 any::<Option<T>>(),
2968 any::<Option<SchemaId>>(),
2969 any::<Option<SchemaId>>(),
2970 ),
2971 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2972 if is_hollow {
2973 BatchPart::Hollow(hollow)
2974 } else {
2975 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2976 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2977 BatchPart::Inline {
2978 updates,
2979 ts_rewrite,
2980 schema_id,
2981 deprecated_schema_id,
2982 }
2983 }
2984 },
2985 )
2986 }
2987
2988 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2989 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2990 }
2991
2992 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2993 -> impl Strategy<Value = HollowBatchPart<T>> {
2994 Strategy::prop_map(
2995 (
2996 any::<PartialBatchKey>(),
2997 any::<usize>(),
2998 any::<Vec<u8>>(),
2999 any_some_lazy_part_stats(),
3000 any::<Option<T>>(),
3001 any::<[u8; 8]>(),
3002 any::<Option<BatchColumnarFormat>>(),
3003 any::<Option<SchemaId>>(),
3004 any::<Option<SchemaId>>(),
3005 ),
3006 |(
3007 key,
3008 encoded_size_bytes,
3009 key_lower,
3010 stats,
3011 ts_rewrite,
3012 diffs_sum,
3013 format,
3014 schema_id,
3015 deprecated_schema_id,
3016 )| {
3017 HollowBatchPart {
3018 key,
3019 encoded_size_bytes,
3020 key_lower,
3021 structured_key_lower: None,
3022 stats,
3023 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3024 diffs_sum: Some(diffs_sum),
3025 format,
3026 schema_id,
3027 deprecated_schema_id,
3028 }
3029 },
3030 )
3031 }
3032
3033 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3034 Strategy::prop_map(
3035 (
3036 any::<SeqNo>(),
3037 any::<Option<T>>(),
3038 any::<u64>(),
3039 any::<u64>(),
3040 any::<HandleDebugState>(),
3041 ),
3042 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3043 if lease_duration_ms == 0 {
3047 lease_duration_ms += 1;
3048 }
3049 LeasedReaderState {
3050 seqno,
3051 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3052 last_heartbeat_timestamp_ms,
3053 lease_duration_ms,
3054 debug,
3055 }
3056 },
3057 )
3058 }
3059
3060 pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3061 {
3062 Strategy::prop_map(
3063 (
3064 any::<Option<T>>(),
3065 any::<OpaqueState>(),
3066 any::<String>(),
3067 any::<HandleDebugState>(),
3068 ),
3069 |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3070 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3071 opaque,
3072 opaque_codec,
3073 debug,
3074 },
3075 )
3076 }
3077
3078 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3079 Strategy::prop_map(
3080 (
3081 any::<u64>(),
3082 any::<u64>(),
3083 any::<IdempotencyToken>(),
3084 any::<Option<T>>(),
3085 any::<HandleDebugState>(),
3086 ),
3087 |(
3088 last_heartbeat_timestamp_ms,
3089 lease_duration_ms,
3090 most_recent_write_token,
3091 most_recent_write_upper,
3092 debug,
3093 )| WriterState {
3094 last_heartbeat_timestamp_ms,
3095 lease_duration_ms,
3096 most_recent_write_token,
3097 most_recent_write_upper: most_recent_write_upper
3098 .map_or_else(Antichain::new, Antichain::from_elem),
3099 debug,
3100 },
3101 )
3102 }
3103
3104 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3105 Strategy::prop_map(
3106 (
3107 any::<Vec<u8>>(),
3108 any::<Vec<u8>>(),
3109 any::<Vec<u8>>(),
3110 any::<Vec<u8>>(),
3111 ),
3112 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3113 key: Bytes::from(key),
3114 key_data_type: Bytes::from(key_data_type),
3115 val: Bytes::from(val),
3116 val_data_type: Bytes::from(val_data_type),
3117 },
3118 )
3119 }
3120
3121 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3122 num_trace_batches: Range<usize>,
3123 ) -> impl Strategy<Value = State<T>> {
3124 let part1 = (
3125 any::<ShardId>(),
3126 any::<SeqNo>(),
3127 any::<u64>(),
3128 any::<String>(),
3129 any::<SeqNo>(),
3130 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3131 proptest::option::of(any::<ActiveRollup>()),
3132 );
3133
3134 let part2 = (
3135 proptest::option::of(any::<ActiveGc>()),
3136 proptest::collection::btree_map(
3137 any::<LeasedReaderId>(),
3138 any_leased_reader_state::<T>(),
3139 1..3,
3140 ),
3141 proptest::collection::btree_map(
3142 any::<CriticalReaderId>(),
3143 any_critical_reader_state::<T>(),
3144 1..3,
3145 ),
3146 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3147 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3148 any_trace::<T>(num_trace_batches),
3149 );
3150
3151 (part1, part2).prop_map(
3152 |(
3153 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3154 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3155 )| State {
3156 applier_version: semver::Version::new(1, 2, 3),
3157 shard_id,
3158 seqno,
3159 walltime_ms,
3160 hostname,
3161 collections: StateCollections {
3162 last_gc_req,
3163 rollups,
3164 active_rollup,
3165 active_gc,
3166 leased_readers,
3167 critical_readers,
3168 writers,
3169 schemas,
3170 trace,
3171 },
3172 },
3173 )
3174 }
3175
3176 pub(crate) fn hollow<T: Timestamp>(
3177 lower: T,
3178 upper: T,
3179 keys: &[&str],
3180 len: usize,
3181 ) -> HollowBatch<T> {
3182 HollowBatch::new_run(
3183 Description::new(
3184 Antichain::from_elem(lower),
3185 Antichain::from_elem(upper),
3186 Antichain::from_elem(T::minimum()),
3187 ),
3188 keys.iter()
3189 .map(|x| {
3190 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3191 key: PartialBatchKey((*x).to_owned()),
3192 encoded_size_bytes: 0,
3193 key_lower: vec![],
3194 structured_key_lower: None,
3195 stats: None,
3196 ts_rewrite: None,
3197 diffs_sum: None,
3198 format: None,
3199 schema_id: None,
3200 deprecated_schema_id: None,
3201 }))
3202 })
3203 .collect(),
3204 len,
3205 )
3206 }
3207
3208 #[mz_ore::test]
3209 fn downgrade_since() {
3210 let mut state = TypedState::<(), (), u64, i64>::new(
3211 DUMMY_BUILD_INFO.semver_version(),
3212 ShardId::new(),
3213 "".to_owned(),
3214 0,
3215 );
3216 let reader = LeasedReaderId::new();
3217 let seqno = SeqNo::minimum();
3218 let now = SYSTEM_TIME.clone();
3219 let _ = state.collections.register_leased_reader(
3220 "",
3221 &reader,
3222 "",
3223 seqno,
3224 Duration::from_secs(10),
3225 now(),
3226 false,
3227 );
3228
3229 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3231
3232 assert_eq!(
3234 state.collections.downgrade_since(
3235 &reader,
3236 seqno,
3237 None,
3238 &Antichain::from_elem(2),
3239 now()
3240 ),
3241 Continue(Since(Antichain::from_elem(2)))
3242 );
3243 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3244 assert_eq!(
3246 state.collections.downgrade_since(
3247 &reader,
3248 seqno,
3249 None,
3250 &Antichain::from_elem(2),
3251 now()
3252 ),
3253 Continue(Since(Antichain::from_elem(2)))
3254 );
3255 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3256 assert_eq!(
3258 state.collections.downgrade_since(
3259 &reader,
3260 seqno,
3261 None,
3262 &Antichain::from_elem(1),
3263 now()
3264 ),
3265 Continue(Since(Antichain::from_elem(2)))
3266 );
3267 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3268
3269 let reader2 = LeasedReaderId::new();
3271 let _ = state.collections.register_leased_reader(
3272 "",
3273 &reader2,
3274 "",
3275 seqno,
3276 Duration::from_secs(10),
3277 now(),
3278 false,
3279 );
3280
3281 assert_eq!(
3283 state.collections.downgrade_since(
3284 &reader2,
3285 seqno,
3286 None,
3287 &Antichain::from_elem(3),
3288 now()
3289 ),
3290 Continue(Since(Antichain::from_elem(3)))
3291 );
3292 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3293 assert_eq!(
3295 state.collections.downgrade_since(
3296 &reader,
3297 seqno,
3298 None,
3299 &Antichain::from_elem(5),
3300 now()
3301 ),
3302 Continue(Since(Antichain::from_elem(5)))
3303 );
3304 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3305
3306 assert_eq!(
3308 state.collections.expire_leased_reader(&reader),
3309 Continue(true)
3310 );
3311 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3312
3313 let reader3 = LeasedReaderId::new();
3315 let _ = state.collections.register_leased_reader(
3316 "",
3317 &reader3,
3318 "",
3319 seqno,
3320 Duration::from_secs(10),
3321 now(),
3322 false,
3323 );
3324
3325 assert_eq!(
3327 state.collections.downgrade_since(
3328 &reader3,
3329 seqno,
3330 None,
3331 &Antichain::from_elem(10),
3332 now()
3333 ),
3334 Continue(Since(Antichain::from_elem(10)))
3335 );
3336 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3337
3338 assert_eq!(
3340 state.collections.expire_leased_reader(&reader2),
3341 Continue(true)
3342 );
3343 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3348
3349 assert_eq!(
3351 state.collections.expire_leased_reader(&reader3),
3352 Continue(true)
3353 );
3354 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3359 }
3360
3361 #[mz_ore::test]
3362 fn compare_and_downgrade_since() {
3363 let mut state = TypedState::<(), (), u64, i64>::new(
3364 DUMMY_BUILD_INFO.semver_version(),
3365 ShardId::new(),
3366 "".to_owned(),
3367 0,
3368 );
3369 let reader = CriticalReaderId::new();
3370 let _ = state
3371 .collections
3372 .register_critical_reader::<u64>("", &reader, "");
3373
3374 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3376 assert_eq!(
3378 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3379 u64::initial()
3380 );
3381
3382 assert_eq!(
3384 state.collections.compare_and_downgrade_since::<u64>(
3385 &reader,
3386 &u64::initial(),
3387 (&1, &Antichain::from_elem(2)),
3388 ),
3389 Continue(Ok(Since(Antichain::from_elem(2))))
3390 );
3391 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3392 assert_eq!(
3393 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3394 1
3395 );
3396 assert_eq!(
3398 state.collections.compare_and_downgrade_since::<u64>(
3399 &reader,
3400 &1,
3401 (&2, &Antichain::from_elem(2)),
3402 ),
3403 Continue(Ok(Since(Antichain::from_elem(2))))
3404 );
3405 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3406 assert_eq!(
3407 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3408 2
3409 );
3410 assert_eq!(
3412 state.collections.compare_and_downgrade_since::<u64>(
3413 &reader,
3414 &2,
3415 (&3, &Antichain::from_elem(1)),
3416 ),
3417 Continue(Ok(Since(Antichain::from_elem(2))))
3418 );
3419 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3420 assert_eq!(
3421 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3422 3
3423 );
3424 }
3425
3426 #[mz_ore::test]
3427 fn compare_and_append() {
3428 let state = &mut TypedState::<String, String, u64, i64>::new(
3429 DUMMY_BUILD_INFO.semver_version(),
3430 ShardId::new(),
3431 "".to_owned(),
3432 0,
3433 )
3434 .collections;
3435
3436 let writer_id = WriterId::new();
3437 let now = SYSTEM_TIME.clone();
3438
3439 assert_eq!(state.trace.num_spine_batches(), 0);
3441 assert_eq!(state.trace.num_hollow_batches(), 0);
3442 assert_eq!(state.trace.num_updates(), 0);
3443
3444 assert_eq!(
3446 state.compare_and_append(
3447 &hollow(1, 2, &["key1"], 1),
3448 &writer_id,
3449 now(),
3450 LEASE_DURATION_MS,
3451 &IdempotencyToken::new(),
3452 &debug_state(),
3453 0,
3454 100,
3455 None
3456 ),
3457 Break(CompareAndAppendBreak::Upper {
3458 shard_upper: Antichain::from_elem(0),
3459 writer_upper: Antichain::from_elem(0)
3460 })
3461 );
3462
3463 assert!(
3465 state
3466 .compare_and_append(
3467 &hollow(0, 5, &[], 0),
3468 &writer_id,
3469 now(),
3470 LEASE_DURATION_MS,
3471 &IdempotencyToken::new(),
3472 &debug_state(),
3473 0,
3474 100,
3475 None
3476 )
3477 .is_continue()
3478 );
3479
3480 assert_eq!(
3482 state.compare_and_append(
3483 &hollow(5, 4, &["key1"], 1),
3484 &writer_id,
3485 now(),
3486 LEASE_DURATION_MS,
3487 &IdempotencyToken::new(),
3488 &debug_state(),
3489 0,
3490 100,
3491 None
3492 ),
3493 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3494 lower: Antichain::from_elem(5),
3495 upper: Antichain::from_elem(4)
3496 }))
3497 );
3498
3499 assert_eq!(
3501 state.compare_and_append(
3502 &hollow(5, 5, &["key1"], 1),
3503 &writer_id,
3504 now(),
3505 LEASE_DURATION_MS,
3506 &IdempotencyToken::new(),
3507 &debug_state(),
3508 0,
3509 100,
3510 None
3511 ),
3512 Break(CompareAndAppendBreak::InvalidUsage(
3513 InvalidEmptyTimeInterval {
3514 lower: Antichain::from_elem(5),
3515 upper: Antichain::from_elem(5),
3516 keys: vec!["key1".to_owned()],
3517 }
3518 ))
3519 );
3520
3521 assert!(
3523 state
3524 .compare_and_append(
3525 &hollow(5, 5, &[], 0),
3526 &writer_id,
3527 now(),
3528 LEASE_DURATION_MS,
3529 &IdempotencyToken::new(),
3530 &debug_state(),
3531 0,
3532 100,
3533 None
3534 )
3535 .is_continue()
3536 );
3537 }
3538
3539 #[mz_ore::test]
3540 fn snapshot() {
3541 let now = SYSTEM_TIME.clone();
3542
3543 let mut state = TypedState::<String, String, u64, i64>::new(
3544 DUMMY_BUILD_INFO.semver_version(),
3545 ShardId::new(),
3546 "".to_owned(),
3547 0,
3548 );
3549 assert_eq!(
3551 state.snapshot(&Antichain::from_elem(0)),
3552 Err(SnapshotErr::AsOfNotYetAvailable(
3553 SeqNo(0),
3554 Upper(Antichain::from_elem(0))
3555 ))
3556 );
3557
3558 assert_eq!(
3560 state.snapshot(&Antichain::from_elem(5)),
3561 Err(SnapshotErr::AsOfNotYetAvailable(
3562 SeqNo(0),
3563 Upper(Antichain::from_elem(0))
3564 ))
3565 );
3566
3567 let writer_id = WriterId::new();
3568
3569 assert!(
3571 state
3572 .collections
3573 .compare_and_append(
3574 &hollow(0, 5, &["key1"], 1),
3575 &writer_id,
3576 now(),
3577 LEASE_DURATION_MS,
3578 &IdempotencyToken::new(),
3579 &debug_state(),
3580 0,
3581 100,
3582 None
3583 )
3584 .is_continue()
3585 );
3586
3587 assert_eq!(
3589 state.snapshot(&Antichain::from_elem(0)),
3590 Ok(vec![hollow(0, 5, &["key1"], 1)])
3591 );
3592
3593 assert_eq!(
3595 state.snapshot(&Antichain::from_elem(4)),
3596 Ok(vec![hollow(0, 5, &["key1"], 1)])
3597 );
3598
3599 assert_eq!(
3601 state.snapshot(&Antichain::from_elem(5)),
3602 Err(SnapshotErr::AsOfNotYetAvailable(
3603 SeqNo(0),
3604 Upper(Antichain::from_elem(5))
3605 ))
3606 );
3607 assert_eq!(
3608 state.snapshot(&Antichain::from_elem(6)),
3609 Err(SnapshotErr::AsOfNotYetAvailable(
3610 SeqNo(0),
3611 Upper(Antichain::from_elem(5))
3612 ))
3613 );
3614
3615 let reader = LeasedReaderId::new();
3616 let _ = state.collections.register_leased_reader(
3618 "",
3619 &reader,
3620 "",
3621 SeqNo::minimum(),
3622 Duration::from_secs(10),
3623 now(),
3624 false,
3625 );
3626 assert_eq!(
3627 state.collections.downgrade_since(
3628 &reader,
3629 SeqNo::minimum(),
3630 None,
3631 &Antichain::from_elem(2),
3632 now()
3633 ),
3634 Continue(Since(Antichain::from_elem(2)))
3635 );
3636 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3637 assert_eq!(
3639 state.snapshot(&Antichain::from_elem(1)),
3640 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3641 Antichain::from_elem(2)
3642 )))
3643 );
3644
3645 assert!(
3647 state
3648 .collections
3649 .compare_and_append(
3650 &hollow(5, 10, &[], 0),
3651 &writer_id,
3652 now(),
3653 LEASE_DURATION_MS,
3654 &IdempotencyToken::new(),
3655 &debug_state(),
3656 0,
3657 100,
3658 None
3659 )
3660 .is_continue()
3661 );
3662
3663 assert_eq!(
3665 state.snapshot(&Antichain::from_elem(7)),
3666 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3667 );
3668
3669 assert_eq!(
3671 state.snapshot(&Antichain::from_elem(10)),
3672 Err(SnapshotErr::AsOfNotYetAvailable(
3673 SeqNo(0),
3674 Upper(Antichain::from_elem(10))
3675 ))
3676 );
3677
3678 assert!(
3680 state
3681 .collections
3682 .compare_and_append(
3683 &hollow(10, 15, &["key2"], 1),
3684 &writer_id,
3685 now(),
3686 LEASE_DURATION_MS,
3687 &IdempotencyToken::new(),
3688 &debug_state(),
3689 0,
3690 100,
3691 None
3692 )
3693 .is_continue()
3694 );
3695
3696 assert_eq!(
3699 state.snapshot(&Antichain::from_elem(9)),
3700 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3701 );
3702
3703 assert_eq!(
3705 state.snapshot(&Antichain::from_elem(10)),
3706 Ok(vec![
3707 hollow(0, 5, &["key1"], 1),
3708 hollow(5, 10, &[], 0),
3709 hollow(10, 15, &["key2"], 1)
3710 ])
3711 );
3712
3713 assert_eq!(
3714 state.snapshot(&Antichain::from_elem(11)),
3715 Ok(vec![
3716 hollow(0, 5, &["key1"], 1),
3717 hollow(5, 10, &[], 0),
3718 hollow(10, 15, &["key2"], 1)
3719 ])
3720 );
3721 }
3722
3723 #[mz_ore::test]
3724 fn next_listen_batch() {
3725 let mut state = TypedState::<String, String, u64, i64>::new(
3726 DUMMY_BUILD_INFO.semver_version(),
3727 ShardId::new(),
3728 "".to_owned(),
3729 0,
3730 );
3731
3732 assert_eq!(
3735 state.next_listen_batch(&Antichain::from_elem(0)),
3736 Err(SeqNo(0))
3737 );
3738 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3739
3740 let writer_id = WriterId::new();
3741 let now = SYSTEM_TIME.clone();
3742
3743 assert!(
3745 state
3746 .collections
3747 .compare_and_append(
3748 &hollow(0, 5, &["key1"], 1),
3749 &writer_id,
3750 now(),
3751 LEASE_DURATION_MS,
3752 &IdempotencyToken::new(),
3753 &debug_state(),
3754 0,
3755 100,
3756 None
3757 )
3758 .is_continue()
3759 );
3760 assert!(
3761 state
3762 .collections
3763 .compare_and_append(
3764 &hollow(5, 10, &["key2"], 1),
3765 &writer_id,
3766 now(),
3767 LEASE_DURATION_MS,
3768 &IdempotencyToken::new(),
3769 &debug_state(),
3770 0,
3771 100,
3772 None
3773 )
3774 .is_continue()
3775 );
3776
3777 for t in 0..=4 {
3779 assert_eq!(
3780 state.next_listen_batch(&Antichain::from_elem(t)),
3781 Ok(hollow(0, 5, &["key1"], 1))
3782 );
3783 }
3784
3785 for t in 5..=9 {
3787 assert_eq!(
3788 state.next_listen_batch(&Antichain::from_elem(t)),
3789 Ok(hollow(5, 10, &["key2"], 1))
3790 );
3791 }
3792
3793 assert_eq!(
3795 state.next_listen_batch(&Antichain::from_elem(10)),
3796 Err(SeqNo(0))
3797 );
3798
3799 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3802 }
3803
3804 #[mz_ore::test]
3805 fn expire_writer() {
3806 let mut state = TypedState::<String, String, u64, i64>::new(
3807 DUMMY_BUILD_INFO.semver_version(),
3808 ShardId::new(),
3809 "".to_owned(),
3810 0,
3811 );
3812 let now = SYSTEM_TIME.clone();
3813
3814 let writer_id_one = WriterId::new();
3815
3816 let writer_id_two = WriterId::new();
3817
3818 assert!(
3820 state
3821 .collections
3822 .compare_and_append(
3823 &hollow(0, 2, &["key1"], 1),
3824 &writer_id_one,
3825 now(),
3826 LEASE_DURATION_MS,
3827 &IdempotencyToken::new(),
3828 &debug_state(),
3829 0,
3830 100,
3831 None
3832 )
3833 .is_continue()
3834 );
3835
3836 assert!(
3837 state
3838 .collections
3839 .expire_writer(&writer_id_one)
3840 .is_continue()
3841 );
3842
3843 assert!(
3845 state
3846 .collections
3847 .compare_and_append(
3848 &hollow(2, 5, &["key2"], 1),
3849 &writer_id_two,
3850 now(),
3851 LEASE_DURATION_MS,
3852 &IdempotencyToken::new(),
3853 &debug_state(),
3854 0,
3855 100,
3856 None
3857 )
3858 .is_continue()
3859 );
3860 }
3861
3862 #[mz_ore::test]
3863 fn maybe_gc_active_gc() {
3864 const GC_CONFIG: GcConfig = GcConfig {
3865 use_active_gc: true,
3866 fallback_threshold_ms: 5000,
3867 min_versions: 99,
3868 max_versions: 500,
3869 };
3870 let now_fn = SYSTEM_TIME.clone();
3871
3872 let mut state = TypedState::<String, String, u64, i64>::new(
3873 DUMMY_BUILD_INFO.semver_version(),
3874 ShardId::new(),
3875 "".to_owned(),
3876 0,
3877 );
3878
3879 let now = now_fn();
3880 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3882 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3883
3884 state.seqno = SeqNo(100);
3887 assert_eq!(state.seqno_since(), SeqNo(100));
3888
3889 let writer_id = WriterId::new();
3891 let _ = state.collections.compare_and_append(
3892 &hollow(1, 2, &["key1"], 1),
3893 &writer_id,
3894 now,
3895 LEASE_DURATION_MS,
3896 &IdempotencyToken::new(),
3897 &debug_state(),
3898 0,
3899 100,
3900 None,
3901 );
3902 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3903
3904 assert_eq!(
3906 state.maybe_gc(true, now, GC_CONFIG),
3907 Some(GcReq {
3908 shard_id: state.shard_id,
3909 new_seqno_since: SeqNo(100)
3910 })
3911 );
3912
3913 state.collections.active_gc = Some(ActiveGc {
3915 seqno: state.seqno,
3916 start_ms: now,
3917 });
3918
3919 state.seqno = SeqNo(200);
3920 assert_eq!(state.seqno_since(), SeqNo(200));
3921
3922 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3923
3924 state.seqno = SeqNo(300);
3925 assert_eq!(state.seqno_since(), SeqNo(300));
3926 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3928 assert_eq!(
3929 state.maybe_gc(true, new_now, GC_CONFIG),
3930 Some(GcReq {
3931 shard_id: state.shard_id,
3932 new_seqno_since: SeqNo(300)
3933 })
3934 );
3935
3936 state.seqno = SeqNo(301);
3940 assert_eq!(state.seqno_since(), SeqNo(301));
3941 assert_eq!(
3942 state.maybe_gc(true, new_now, GC_CONFIG),
3943 Some(GcReq {
3944 shard_id: state.shard_id,
3945 new_seqno_since: SeqNo(301)
3946 })
3947 );
3948
3949 state.collections.active_gc = None;
3950
3951 state.seqno = SeqNo(400);
3954 assert_eq!(state.seqno_since(), SeqNo(400));
3955
3956 let now = now_fn();
3957
3958 let _ = state.collections.expire_writer(&writer_id);
3960 assert_eq!(
3961 state.maybe_gc(false, now, GC_CONFIG),
3962 Some(GcReq {
3963 shard_id: state.shard_id,
3964 new_seqno_since: SeqNo(400)
3965 })
3966 );
3967
3968 let previous_seqno = state.seqno;
3970 state.seqno = SeqNo(10_000);
3971 assert_eq!(state.seqno_since(), SeqNo(10_000));
3972
3973 let now = now_fn();
3974 assert_eq!(
3975 state.maybe_gc(true, now, GC_CONFIG),
3976 Some(GcReq {
3977 shard_id: state.shard_id,
3978 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3979 })
3980 );
3981 }
3982
3983 #[mz_ore::test]
3984 fn maybe_gc_classic() {
3985 const GC_CONFIG: GcConfig = GcConfig {
3986 use_active_gc: false,
3987 fallback_threshold_ms: 5000,
3988 min_versions: 16,
3989 max_versions: 128,
3990 };
3991 const NOW_MS: u64 = 0;
3992
3993 let mut state = TypedState::<String, String, u64, i64>::new(
3994 DUMMY_BUILD_INFO.semver_version(),
3995 ShardId::new(),
3996 "".to_owned(),
3997 0,
3998 );
3999
4000 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
4002 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4003
4004 state.seqno = SeqNo(100);
4007 assert_eq!(state.seqno_since(), SeqNo(100));
4008
4009 let writer_id = WriterId::new();
4011 let now = SYSTEM_TIME.clone();
4012 let _ = state.collections.compare_and_append(
4013 &hollow(1, 2, &["key1"], 1),
4014 &writer_id,
4015 now(),
4016 LEASE_DURATION_MS,
4017 &IdempotencyToken::new(),
4018 &debug_state(),
4019 0,
4020 100,
4021 None,
4022 );
4023 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4024
4025 assert_eq!(
4027 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4028 Some(GcReq {
4029 shard_id: state.shard_id,
4030 new_seqno_since: SeqNo(100)
4031 })
4032 );
4033
4034 state.seqno = SeqNo(200);
4037 assert_eq!(state.seqno_since(), SeqNo(200));
4038
4039 let _ = state.collections.expire_writer(&writer_id);
4041 assert_eq!(
4042 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4043 Some(GcReq {
4044 shard_id: state.shard_id,
4045 new_seqno_since: SeqNo(200)
4046 })
4047 );
4048 }
4049
4050 #[mz_ore::test]
4051 fn need_rollup_active_rollup() {
4052 const ROLLUP_THRESHOLD: usize = 3;
4053 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4054 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4055 let now = SYSTEM_TIME.clone();
4056
4057 mz_ore::test::init_logging();
4058 let mut state = TypedState::<String, String, u64, i64>::new(
4059 DUMMY_BUILD_INFO.semver_version(),
4060 ShardId::new(),
4061 "".to_owned(),
4062 0,
4063 );
4064
4065 let rollup_seqno = SeqNo(5);
4066 let rollup = HollowRollup {
4067 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4068 encoded_size_bytes: None,
4069 };
4070
4071 assert!(
4072 state
4073 .collections
4074 .add_rollup((rollup_seqno, &rollup))
4075 .is_continue()
4076 );
4077
4078 state.seqno = SeqNo(5);
4080 assert_none!(state.need_rollup(
4081 ROLLUP_THRESHOLD,
4082 ROLLUP_USE_ACTIVE_ROLLUP,
4083 ROLLUP_FALLBACK_THRESHOLD_MS,
4084 now()
4085 ));
4086
4087 state.seqno = SeqNo(6);
4089 assert_none!(state.need_rollup(
4090 ROLLUP_THRESHOLD,
4091 ROLLUP_USE_ACTIVE_ROLLUP,
4092 ROLLUP_FALLBACK_THRESHOLD_MS,
4093 now()
4094 ));
4095 state.seqno = SeqNo(7);
4096 assert_none!(state.need_rollup(
4097 ROLLUP_THRESHOLD,
4098 ROLLUP_USE_ACTIVE_ROLLUP,
4099 ROLLUP_FALLBACK_THRESHOLD_MS,
4100 now()
4101 ));
4102 state.seqno = SeqNo(8);
4103 assert_none!(state.need_rollup(
4104 ROLLUP_THRESHOLD,
4105 ROLLUP_USE_ACTIVE_ROLLUP,
4106 ROLLUP_FALLBACK_THRESHOLD_MS,
4107 now()
4108 ));
4109
4110 let mut current_time = now();
4111 state.seqno = SeqNo(9);
4113 assert_eq!(
4114 state
4115 .need_rollup(
4116 ROLLUP_THRESHOLD,
4117 ROLLUP_USE_ACTIVE_ROLLUP,
4118 ROLLUP_FALLBACK_THRESHOLD_MS,
4119 current_time
4120 )
4121 .expect("rollup"),
4122 SeqNo(9)
4123 );
4124
4125 state.collections.active_rollup = Some(ActiveRollup {
4126 seqno: SeqNo(9),
4127 start_ms: current_time,
4128 });
4129
4130 assert_none!(state.need_rollup(
4132 ROLLUP_THRESHOLD,
4133 ROLLUP_USE_ACTIVE_ROLLUP,
4134 ROLLUP_FALLBACK_THRESHOLD_MS,
4135 current_time
4136 ));
4137
4138 state.seqno = SeqNo(10);
4139 assert_none!(state.need_rollup(
4142 ROLLUP_THRESHOLD,
4143 ROLLUP_USE_ACTIVE_ROLLUP,
4144 ROLLUP_FALLBACK_THRESHOLD_MS,
4145 current_time
4146 ));
4147
4148 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4150 assert_eq!(
4151 state
4152 .need_rollup(
4153 ROLLUP_THRESHOLD,
4154 ROLLUP_USE_ACTIVE_ROLLUP,
4155 ROLLUP_FALLBACK_THRESHOLD_MS,
4156 current_time
4157 )
4158 .expect("rollup"),
4159 SeqNo(10)
4160 );
4161
4162 state.seqno = SeqNo(9);
4163 state.collections.active_rollup = None;
4165 let rollup_seqno = SeqNo(9);
4166 let rollup = HollowRollup {
4167 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4168 encoded_size_bytes: None,
4169 };
4170 assert!(
4171 state
4172 .collections
4173 .add_rollup((rollup_seqno, &rollup))
4174 .is_continue()
4175 );
4176
4177 state.seqno = SeqNo(11);
4178 assert_none!(state.need_rollup(
4180 ROLLUP_THRESHOLD,
4181 ROLLUP_USE_ACTIVE_ROLLUP,
4182 ROLLUP_FALLBACK_THRESHOLD_MS,
4183 current_time
4184 ));
4185 state.seqno = SeqNo(13);
4187 assert_eq!(
4188 state
4189 .need_rollup(
4190 ROLLUP_THRESHOLD,
4191 ROLLUP_USE_ACTIVE_ROLLUP,
4192 ROLLUP_FALLBACK_THRESHOLD_MS,
4193 current_time
4194 )
4195 .expect("rollup"),
4196 SeqNo(13)
4197 );
4198 }
4199
4200 #[mz_ore::test]
4201 fn need_rollup_classic() {
4202 const ROLLUP_THRESHOLD: usize = 3;
4203 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4204 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4205 const NOW: u64 = 0;
4206
4207 mz_ore::test::init_logging();
4208 let mut state = TypedState::<String, String, u64, i64>::new(
4209 DUMMY_BUILD_INFO.semver_version(),
4210 ShardId::new(),
4211 "".to_owned(),
4212 0,
4213 );
4214
4215 let rollup_seqno = SeqNo(5);
4216 let rollup = HollowRollup {
4217 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4218 encoded_size_bytes: None,
4219 };
4220
4221 assert!(
4222 state
4223 .collections
4224 .add_rollup((rollup_seqno, &rollup))
4225 .is_continue()
4226 );
4227
4228 state.seqno = SeqNo(5);
4230 assert_none!(state.need_rollup(
4231 ROLLUP_THRESHOLD,
4232 ROLLUP_USE_ACTIVE_ROLLUP,
4233 ROLLUP_FALLBACK_THRESHOLD_MS,
4234 NOW
4235 ));
4236
4237 state.seqno = SeqNo(6);
4239 assert_none!(state.need_rollup(
4240 ROLLUP_THRESHOLD,
4241 ROLLUP_USE_ACTIVE_ROLLUP,
4242 ROLLUP_FALLBACK_THRESHOLD_MS,
4243 NOW
4244 ));
4245 state.seqno = SeqNo(7);
4246 assert_none!(state.need_rollup(
4247 ROLLUP_THRESHOLD,
4248 ROLLUP_USE_ACTIVE_ROLLUP,
4249 ROLLUP_FALLBACK_THRESHOLD_MS,
4250 NOW
4251 ));
4252
4253 state.seqno = SeqNo(8);
4255 assert_eq!(
4256 state
4257 .need_rollup(
4258 ROLLUP_THRESHOLD,
4259 ROLLUP_USE_ACTIVE_ROLLUP,
4260 ROLLUP_FALLBACK_THRESHOLD_MS,
4261 NOW
4262 )
4263 .expect("rollup"),
4264 SeqNo(8)
4265 );
4266
4267 state.seqno = SeqNo(9);
4269 assert_none!(state.need_rollup(
4270 ROLLUP_THRESHOLD,
4271 ROLLUP_USE_ACTIVE_ROLLUP,
4272 ROLLUP_FALLBACK_THRESHOLD_MS,
4273 NOW
4274 ));
4275
4276 state.seqno = SeqNo(11);
4278 assert_eq!(
4279 state
4280 .need_rollup(
4281 ROLLUP_THRESHOLD,
4282 ROLLUP_USE_ACTIVE_ROLLUP,
4283 ROLLUP_FALLBACK_THRESHOLD_MS,
4284 NOW
4285 )
4286 .expect("rollup"),
4287 SeqNo(11)
4288 );
4289
4290 let rollup_seqno = SeqNo(6);
4292 let rollup = HollowRollup {
4293 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4294 encoded_size_bytes: None,
4295 };
4296 assert!(
4297 state
4298 .collections
4299 .add_rollup((rollup_seqno, &rollup))
4300 .is_continue()
4301 );
4302
4303 state.seqno = SeqNo(8);
4304 assert_none!(state.need_rollup(
4305 ROLLUP_THRESHOLD,
4306 ROLLUP_USE_ACTIVE_ROLLUP,
4307 ROLLUP_FALLBACK_THRESHOLD_MS,
4308 NOW
4309 ));
4310 state.seqno = SeqNo(9);
4311 assert_eq!(
4312 state
4313 .need_rollup(
4314 ROLLUP_THRESHOLD,
4315 ROLLUP_USE_ACTIVE_ROLLUP,
4316 ROLLUP_FALLBACK_THRESHOLD_MS,
4317 NOW
4318 )
4319 .expect("rollup"),
4320 SeqNo(9)
4321 );
4322
4323 let fallback_seqno = SeqNo(
4325 rollup_seqno.0
4326 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4327 );
4328 state.seqno = fallback_seqno;
4329 assert_eq!(
4330 state
4331 .need_rollup(
4332 ROLLUP_THRESHOLD,
4333 ROLLUP_USE_ACTIVE_ROLLUP,
4334 ROLLUP_FALLBACK_THRESHOLD_MS,
4335 NOW
4336 )
4337 .expect("rollup"),
4338 fallback_seqno
4339 );
4340 state.seqno = fallback_seqno.next();
4341 assert_eq!(
4342 state
4343 .need_rollup(
4344 ROLLUP_THRESHOLD,
4345 ROLLUP_USE_ACTIVE_ROLLUP,
4346 ROLLUP_FALLBACK_THRESHOLD_MS,
4347 NOW
4348 )
4349 .expect("rollup"),
4350 fallback_seqno.next()
4351 );
4352 }
4353
4354 #[mz_ore::test]
4355 fn idempotency_token_sentinel() {
4356 assert_eq!(
4357 IdempotencyToken::SENTINEL.to_string(),
4358 "i11111111-1111-1111-1111-111111111111"
4359 );
4360 }
4361
4362 #[mz_ore::test]
4371 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4373 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4374 let mut runner = proptest::test_runner::TestRunner::deterministic();
4375 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4376 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4377 assert_eq!(
4378 json.trim(),
4379 STATE_SERDE_JSON.trim(),
4380 "\n\nNEW GOLDEN\n{}\n",
4381 json
4382 );
4383 }
4384
4385 #[mz_persist_proc::test(tokio::test)]
4386 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4388 let mut clients = new_test_client_cache(&dyncfgs);
4389 let shard_id = ShardId::new();
4390
4391 async fn open_and_write(
4392 clients: &mut PersistClientCache,
4393 version: semver::Version,
4394 shard_id: ShardId,
4395 ) -> Result<(), tokio::task::JoinError> {
4396 clients.cfg.build_version = version.clone();
4397 clients.clear_state_cache();
4398 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4399 mz_ore::task::spawn(|| version.to_string(), async move {
4401 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4402 let current = *write.upper().as_option().unwrap();
4403 write
4405 .expect_compare_and_append_batch(&mut [], current, current + 1)
4406 .await;
4407 })
4408 .await
4409 }
4410
4411 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4413 assert_ok!(res);
4414
4415 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4417 assert_ok!(res);
4418
4419 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4421 assert_ok!(res);
4422
4423 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4425 assert!(res.unwrap_err().is_panic());
4426 }
4427
4428 #[mz_ore::test]
4429 fn runid_roundtrip() {
4430 proptest!(|(runid: RunId)| {
4431 let runid_str = runid.to_string();
4432 let parsed = RunId::from_str(&runid_str);
4433 prop_assert_eq!(parsed, Ok(runid));
4434 });
4435 }
4436}