1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Semigroup;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
60use crate::internal::gc::GcReq;
61use crate::internal::machine::retry_external;
62use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
63use crate::internal::trace::{
64 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
65};
66use crate::metrics::Metrics;
67use crate::read::LeasedReaderId;
68use crate::schema::CaESchema;
69use crate::write::WriterId;
70use crate::{PersistConfig, ShardId};
71
72include!(concat!(
73 env!("OUT_DIR"),
74 "/mz_persist_client.internal.state.rs"
75));
76
77include!(concat!(
78 env!("OUT_DIR"),
79 "/mz_persist_client.internal.diff.rs"
80));
81
82pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
90 "persist_rollup_threshold",
91 128,
92 "The number of seqnos between rollups.",
93);
94
95pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
98 "persist_rollup_fallback_threshold_ms",
99 5000,
100 "The number of milliseconds before a worker claims an already claimed rollup.",
101);
102
103pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
106 "persist_rollup_use_active_rollup",
107 false,
108 "Whether to use the new active rollup tracking mechanism.",
109);
110
111pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
114 "persist_gc_fallback_threshold_ms",
115 900000,
116 "The number of milliseconds before a worker claims an already claimed GC.",
117);
118
119pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
121 "persist_gc_min_versions",
122 32,
123 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
124);
125
126pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
128 "persist_gc_max_versions",
129 128_000,
130 "The maximum number of versions to GC in a single GC run.",
131);
132
133pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
136 "persist_gc_use_active_gc",
137 false,
138 "Whether to use the new active GC tracking mechanism.",
139);
140
141pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
142 "persist_enable_incremental_compaction",
143 false,
144 "Whether to enable incremental compaction.",
145);
146
147#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
150#[serde(into = "String")]
151pub struct IdempotencyToken(pub(crate) [u8; 16]);
152
153impl std::fmt::Display for IdempotencyToken {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 write!(f, "i{}", Uuid::from_bytes(self.0))
156 }
157}
158
159impl std::fmt::Debug for IdempotencyToken {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
162 }
163}
164
165impl std::str::FromStr for IdempotencyToken {
166 type Err = String;
167
168 fn from_str(s: &str) -> Result<Self, Self::Err> {
169 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
170 }
171}
172
173impl From<IdempotencyToken> for String {
174 fn from(x: IdempotencyToken) -> Self {
175 x.to_string()
176 }
177}
178
179impl IdempotencyToken {
180 pub(crate) fn new() -> Self {
181 IdempotencyToken(*Uuid::new_v4().as_bytes())
182 }
183 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
184}
185
186#[derive(Clone, Debug, PartialEq, Serialize)]
187pub struct LeasedReaderState<T> {
188 pub seqno: SeqNo,
190 pub since: Antichain<T>,
192 pub last_heartbeat_timestamp_ms: u64,
194 pub lease_duration_ms: u64,
197 pub debug: HandleDebugState,
199}
200
201#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
202#[serde(into = "u64")]
203pub struct OpaqueState(pub [u8; 8]);
204
205impl From<OpaqueState> for u64 {
206 fn from(value: OpaqueState) -> Self {
207 u64::from_le_bytes(value.0)
208 }
209}
210
211#[derive(Clone, Debug, PartialEq, Serialize)]
212pub struct CriticalReaderState<T> {
213 pub since: Antichain<T>,
215 pub opaque: OpaqueState,
217 pub opaque_codec: String,
219 pub debug: HandleDebugState,
221}
222
223#[derive(Clone, Debug, PartialEq, Serialize)]
224pub struct WriterState<T> {
225 pub last_heartbeat_timestamp_ms: u64,
227 pub lease_duration_ms: u64,
230 pub most_recent_write_token: IdempotencyToken,
233 pub most_recent_write_upper: Antichain<T>,
236 pub debug: HandleDebugState,
238}
239
240#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
242pub struct HandleDebugState {
243 pub hostname: String,
246 pub purpose: String,
248}
249
250#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
254#[serde(tag = "type")]
255pub enum BatchPart<T> {
256 Hollow(HollowBatchPart<T>),
257 Inline {
258 updates: LazyInlineBatchPart,
259 ts_rewrite: Option<Antichain<T>>,
260 schema_id: Option<SchemaId>,
261
262 deprecated_schema_id: Option<SchemaId>,
264 },
265}
266
267fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
268 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
269 let proto = lower.decode()?;
270 let data = ArrayData::from_proto(proto)?;
271 ensure!(data.len() == 1);
272 Ok(ArrayBound::new(make_array(data), 0))
273 };
274
275 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
276
277 match decoded {
278 Ok(bound) => Some(bound),
279 Err(e) => {
280 soft_panic_or_log!("failed to decode bound: {e:#?}");
281 None
282 }
283 }
284}
285
286impl<T> BatchPart<T> {
287 pub fn hollow_bytes(&self) -> usize {
288 match self {
289 BatchPart::Hollow(x) => x.encoded_size_bytes,
290 BatchPart::Inline { .. } => 0,
291 }
292 }
293
294 pub fn is_inline(&self) -> bool {
295 matches!(self, BatchPart::Inline { .. })
296 }
297
298 pub fn inline_bytes(&self) -> usize {
299 match self {
300 BatchPart::Hollow(_) => 0,
301 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
302 }
303 }
304
305 pub fn writer_key(&self) -> Option<WriterKey> {
306 match self {
307 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
308 BatchPart::Inline { .. } => None,
309 }
310 }
311
312 pub fn encoded_size_bytes(&self) -> usize {
313 match self {
314 BatchPart::Hollow(x) => x.encoded_size_bytes,
315 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
316 }
317 }
318
319 pub fn printable_name(&self) -> &str {
322 match self {
323 BatchPart::Hollow(x) => x.key.0.as_str(),
324 BatchPart::Inline { .. } => "<inline>",
325 }
326 }
327
328 pub fn stats(&self) -> Option<&LazyPartStats> {
329 match self {
330 BatchPart::Hollow(x) => x.stats.as_ref(),
331 BatchPart::Inline { .. } => None,
332 }
333 }
334
335 pub fn key_lower(&self) -> &[u8] {
336 match self {
337 BatchPart::Hollow(x) => x.key_lower.as_slice(),
338 BatchPart::Inline { .. } => &[],
345 }
346 }
347
348 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
349 let part = match self {
350 BatchPart::Hollow(part) => part,
351 BatchPart::Inline { .. } => return None,
352 };
353
354 decode_structured_lower(part.structured_key_lower.as_ref()?)
355 }
356
357 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
358 match self {
359 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
360 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
361 }
362 }
363
364 pub fn schema_id(&self) -> Option<SchemaId> {
365 match self {
366 BatchPart::Hollow(x) => x.schema_id,
367 BatchPart::Inline { schema_id, .. } => *schema_id,
368 }
369 }
370
371 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
372 match self {
373 BatchPart::Hollow(x) => x.deprecated_schema_id,
374 BatchPart::Inline {
375 deprecated_schema_id,
376 ..
377 } => *deprecated_schema_id,
378 }
379 }
380}
381
382impl<T: Timestamp + Codec64> BatchPart<T> {
383 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
384 match self {
385 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
386 BatchPart::Inline { updates, .. } => {
387 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
388 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
389 }
390 }
391 }
392
393 pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
394 match self {
395 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
396 BatchPart::Inline { updates, .. } => updates
397 .decode::<T>(metrics)
398 .expect("valid inline part")
399 .updates
400 .diffs_sum(),
401 }
402 }
403}
404
405#[derive(Debug, Clone)]
407pub struct HollowRun<T> {
408 pub(crate) parts: Vec<RunPart<T>>,
410}
411
412#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
415pub struct HollowRunRef<T> {
416 pub key: PartialBatchKey,
417
418 pub hollow_bytes: usize,
420
421 pub max_part_bytes: usize,
423
424 pub key_lower: Vec<u8>,
426
427 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
429
430 pub diffs_sum: Option<[u8; 8]>,
431
432 pub(crate) _phantom_data: PhantomData<T>,
433}
434impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
435 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
436 Some(self.cmp(other))
437 }
438}
439
440impl<T: Eq> Ord for HollowRunRef<T> {
441 fn cmp(&self, other: &Self) -> Ordering {
442 self.key.cmp(&other.key)
443 }
444}
445
446impl<T> HollowRunRef<T> {
447 pub fn writer_key(&self) -> Option<WriterKey> {
448 Some(self.key.split()?.0)
449 }
450}
451
452impl<T: Timestamp + Codec64> HollowRunRef<T> {
453 pub async fn set<D: Codec64 + Semigroup>(
455 shard_id: ShardId,
456 blob: &dyn Blob,
457 writer: &WriterKey,
458 data: HollowRun<T>,
459 metrics: &Metrics,
460 ) -> Self {
461 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
462 let max_part_bytes = data
463 .parts
464 .iter()
465 .map(|p| p.max_part_bytes())
466 .max()
467 .unwrap_or(0);
468 let key_lower = data
469 .parts
470 .first()
471 .map_or(vec![], |p| p.key_lower().to_vec());
472 let structured_key_lower = match data.parts.first() {
473 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
474 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
475 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
476 };
477 let diffs_sum = data
478 .parts
479 .iter()
480 .map(|p| {
481 p.diffs_sum::<D>(&metrics.columnar)
482 .expect("valid diffs sum")
483 })
484 .reduce(|mut a, b| {
485 a.plus_equals(&b);
486 a
487 })
488 .expect("valid diffs sum")
489 .encode();
490
491 let key = PartialBatchKey::new(writer, &PartId::new());
492 let blob_key = key.complete(&shard_id);
493 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
494 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
495 blob.set(&blob_key, bytes.clone())
496 })
497 .await;
498 Self {
499 key,
500 hollow_bytes,
501 max_part_bytes,
502 key_lower,
503 structured_key_lower,
504 diffs_sum: Some(diffs_sum),
505 _phantom_data: Default::default(),
506 }
507 }
508
509 pub async fn get(
513 &self,
514 shard_id: ShardId,
515 blob: &dyn Blob,
516 metrics: &Metrics,
517 ) -> Option<HollowRun<T>> {
518 let blob_key = self.key.complete(&shard_id);
519 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
520 blob.get(&blob_key)
521 })
522 .await?;
523 let proto_runs: ProtoHollowRun =
524 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
525 let runs = proto_runs
526 .into_rust()
527 .expect("illegal state: invalid encoded runs proto");
528 Some(runs)
529 }
530}
531
532#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
536#[serde(untagged)]
537pub enum RunPart<T> {
538 Single(BatchPart<T>),
539 Many(HollowRunRef<T>),
540}
541
542impl<T: Ord> PartialOrd<Self> for RunPart<T> {
543 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
544 Some(self.cmp(other))
545 }
546}
547
548impl<T: Ord> Ord for RunPart<T> {
549 fn cmp(&self, other: &Self) -> Ordering {
550 match (self, other) {
551 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
552 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
553 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
554 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
555 }
556 }
557}
558
559impl<T> RunPart<T> {
560 #[cfg(test)]
561 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
562 match self {
563 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
564 _ => panic!("expected hollow part!"),
565 }
566 }
567
568 pub fn hollow_bytes(&self) -> usize {
569 match self {
570 Self::Single(p) => p.hollow_bytes(),
571 Self::Many(r) => r.hollow_bytes,
572 }
573 }
574
575 pub fn is_inline(&self) -> bool {
576 match self {
577 Self::Single(p) => p.is_inline(),
578 Self::Many(_) => false,
579 }
580 }
581
582 pub fn inline_bytes(&self) -> usize {
583 match self {
584 Self::Single(p) => p.inline_bytes(),
585 Self::Many(_) => 0,
586 }
587 }
588
589 pub fn max_part_bytes(&self) -> usize {
590 match self {
591 Self::Single(p) => p.encoded_size_bytes(),
592 Self::Many(r) => r.max_part_bytes,
593 }
594 }
595
596 pub fn writer_key(&self) -> Option<WriterKey> {
597 match self {
598 Self::Single(p) => p.writer_key(),
599 Self::Many(r) => r.writer_key(),
600 }
601 }
602
603 pub fn encoded_size_bytes(&self) -> usize {
604 match self {
605 Self::Single(p) => p.encoded_size_bytes(),
606 Self::Many(r) => r.hollow_bytes,
607 }
608 }
609
610 pub fn schema_id(&self) -> Option<SchemaId> {
611 match self {
612 Self::Single(p) => p.schema_id(),
613 Self::Many(_) => None,
614 }
615 }
616
617 pub fn printable_name(&self) -> &str {
620 match self {
621 Self::Single(p) => p.printable_name(),
622 Self::Many(r) => r.key.0.as_str(),
623 }
624 }
625
626 pub fn stats(&self) -> Option<&LazyPartStats> {
627 match self {
628 Self::Single(p) => p.stats(),
629 Self::Many(_) => None,
631 }
632 }
633
634 pub fn key_lower(&self) -> &[u8] {
635 match self {
636 Self::Single(p) => p.key_lower(),
637 Self::Many(r) => r.key_lower.as_slice(),
638 }
639 }
640
641 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
642 match self {
643 Self::Single(p) => p.structured_key_lower(),
644 Self::Many(_) => None,
645 }
646 }
647
648 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
649 match self {
650 Self::Single(p) => p.ts_rewrite(),
651 Self::Many(_) => None,
652 }
653 }
654}
655
656impl<T> RunPart<T>
657where
658 T: Timestamp + Codec64,
659{
660 pub fn diffs_sum<D: Codec64 + Semigroup>(&self, metrics: &ColumnarMetrics) -> Option<D> {
661 match self {
662 Self::Single(p) => p.diffs_sum(metrics),
663 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
664 }
665 }
666}
667
668#[derive(Clone, Debug)]
670pub struct MissingBlob(BlobKey);
671
672impl std::fmt::Display for MissingBlob {
673 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
674 write!(f, "unexpectedly missing key: {}", self.0)
675 }
676}
677
678impl std::error::Error for MissingBlob {}
679
680impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
681 pub fn part_stream<'a>(
682 &'a self,
683 shard_id: ShardId,
684 blob: &'a dyn Blob,
685 metrics: &'a Metrics,
686 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
687 try_stream! {
688 match self {
689 RunPart::Single(p) => {
690 yield Cow::Borrowed(p);
691 }
692 RunPart::Many(r) => {
693 let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
694 for run_part in fetched.parts {
695 for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
696 yield Cow::Owned(batch_part?.into_owned());
697 }
698 }
699 }
700 }
701 }
702 }
703}
704
705impl<T: Ord> PartialOrd for BatchPart<T> {
706 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
707 Some(self.cmp(other))
708 }
709}
710
711impl<T: Ord> Ord for BatchPart<T> {
712 fn cmp(&self, other: &Self) -> Ordering {
713 match (self, other) {
714 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
715 (
716 BatchPart::Inline {
717 updates: s_updates,
718 ts_rewrite: s_ts_rewrite,
719 schema_id: s_schema_id,
720 deprecated_schema_id: s_deprecated_schema_id,
721 },
722 BatchPart::Inline {
723 updates: o_updates,
724 ts_rewrite: o_ts_rewrite,
725 schema_id: o_schema_id,
726 deprecated_schema_id: o_deprecated_schema_id,
727 },
728 ) => (
729 s_updates,
730 s_ts_rewrite.as_ref().map(|x| x.elements()),
731 s_schema_id,
732 s_deprecated_schema_id,
733 )
734 .cmp(&(
735 o_updates,
736 o_ts_rewrite.as_ref().map(|x| x.elements()),
737 o_schema_id,
738 o_deprecated_schema_id,
739 )),
740 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
741 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
742 }
743 }
744}
745
746#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
748pub(crate) enum RunOrder {
749 Unordered,
751 Codec,
753 Structured,
755}
756
757#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
758pub struct RunId(pub(crate) [u8; 16]);
759
760impl std::fmt::Display for RunId {
761 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
762 write!(f, "ri{}", Uuid::from_bytes(self.0))
763 }
764}
765
766impl std::fmt::Debug for RunId {
767 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
768 write!(f, "RunId({})", Uuid::from_bytes(self.0))
769 }
770}
771
772impl std::str::FromStr for RunId {
773 type Err = String;
774
775 fn from_str(s: &str) -> Result<Self, Self::Err> {
776 parse_id("ri", "RunId", s).map(RunId)
777 }
778}
779
780impl From<RunId> for String {
781 fn from(x: RunId) -> Self {
782 x.to_string()
783 }
784}
785
786impl RunId {
787 pub(crate) fn new() -> Self {
788 RunId(*Uuid::new_v4().as_bytes())
789 }
790}
791
792impl Arbitrary for RunId {
793 type Parameters = ();
794 type Strategy = proptest::strategy::BoxedStrategy<Self>;
795 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
796 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
797 RunId(*Uuid::from_u128(n).as_bytes())
798 })
799 .boxed()
800 }
801}
802
803#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
805pub struct RunMeta {
806 pub(crate) order: Option<RunOrder>,
808 pub(crate) schema: Option<SchemaId>,
810
811 pub(crate) deprecated_schema: Option<SchemaId>,
813
814 pub(crate) id: Option<RunId>,
816
817 pub(crate) len: Option<usize>,
819}
820
821#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
823pub struct HollowBatchPart<T> {
824 pub key: PartialBatchKey,
826 pub encoded_size_bytes: usize,
828 #[serde(serialize_with = "serialize_part_bytes")]
831 pub key_lower: Vec<u8>,
832 #[serde(serialize_with = "serialize_lazy_proto")]
834 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
835 #[serde(serialize_with = "serialize_part_stats")]
837 pub stats: Option<LazyPartStats>,
838 pub ts_rewrite: Option<Antichain<T>>,
846 #[serde(serialize_with = "serialize_diffs_sum")]
854 pub diffs_sum: Option<[u8; 8]>,
855 pub format: Option<BatchColumnarFormat>,
860 pub schema_id: Option<SchemaId>,
865
866 pub deprecated_schema_id: Option<SchemaId>,
868}
869
870#[derive(Clone, PartialEq, Eq)]
874pub struct HollowBatch<T> {
875 pub desc: Description<T>,
877 pub len: usize,
879 pub(crate) parts: Vec<RunPart<T>>,
881 pub(crate) run_splits: Vec<usize>,
889 pub(crate) run_meta: Vec<RunMeta>,
892}
893
894impl<T: Debug> Debug for HollowBatch<T> {
895 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
896 let HollowBatch {
897 desc,
898 parts,
899 len,
900 run_splits: runs,
901 run_meta,
902 } = self;
903 f.debug_struct("HollowBatch")
904 .field(
905 "desc",
906 &(
907 desc.lower().elements(),
908 desc.upper().elements(),
909 desc.since().elements(),
910 ),
911 )
912 .field("parts", &parts)
913 .field("len", &len)
914 .field("runs", &runs)
915 .field("run_meta", &run_meta)
916 .finish()
917 }
918}
919
920impl<T: Serialize> serde::Serialize for HollowBatch<T> {
921 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
922 let HollowBatch {
923 desc,
924 len,
925 parts: _,
927 run_splits: _,
928 run_meta: _,
929 } = self;
930 let mut s = s.serialize_struct("HollowBatch", 5)?;
931 let () = s.serialize_field("lower", &desc.lower().elements())?;
932 let () = s.serialize_field("upper", &desc.upper().elements())?;
933 let () = s.serialize_field("since", &desc.since().elements())?;
934 let () = s.serialize_field("len", len)?;
935 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
936 s.end()
937 }
938}
939
940impl<T: Ord> PartialOrd for HollowBatch<T> {
941 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
942 Some(self.cmp(other))
943 }
944}
945
946impl<T: Ord> Ord for HollowBatch<T> {
947 fn cmp(&self, other: &Self) -> Ordering {
948 let HollowBatch {
951 desc: self_desc,
952 parts: self_parts,
953 len: self_len,
954 run_splits: self_runs,
955 run_meta: self_run_meta,
956 } = self;
957 let HollowBatch {
958 desc: other_desc,
959 parts: other_parts,
960 len: other_len,
961 run_splits: other_runs,
962 run_meta: other_run_meta,
963 } = other;
964 (
965 self_desc.lower().elements(),
966 self_desc.upper().elements(),
967 self_desc.since().elements(),
968 self_parts,
969 self_len,
970 self_runs,
971 self_run_meta,
972 )
973 .cmp(&(
974 other_desc.lower().elements(),
975 other_desc.upper().elements(),
976 other_desc.since().elements(),
977 other_parts,
978 other_len,
979 other_runs,
980 other_run_meta,
981 ))
982 }
983}
984
985impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
986 pub(crate) fn part_stream<'a>(
987 &'a self,
988 shard_id: ShardId,
989 blob: &'a dyn Blob,
990 metrics: &'a Metrics,
991 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
992 stream! {
993 for part in &self.parts {
994 for await part in part.part_stream(shard_id, blob, metrics) {
995 yield part;
996 }
997 }
998 }
999 }
1000}
1001impl<T> HollowBatch<T> {
1002 pub(crate) fn new(
1009 desc: Description<T>,
1010 parts: Vec<RunPart<T>>,
1011 len: usize,
1012 run_meta: Vec<RunMeta>,
1013 run_splits: Vec<usize>,
1014 ) -> Self {
1015 debug_assert!(
1016 run_splits.is_strictly_sorted(),
1017 "run indices should be strictly increasing"
1018 );
1019 debug_assert!(
1020 run_splits.first().map_or(true, |i| *i > 0),
1021 "run indices should be positive"
1022 );
1023 debug_assert!(
1024 run_splits.last().map_or(true, |i| *i < parts.len()),
1025 "run indices should be valid indices into parts"
1026 );
1027 debug_assert!(
1028 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1029 "all metadata should correspond to a run"
1030 );
1031
1032 Self {
1033 desc,
1034 len,
1035 parts,
1036 run_splits,
1037 run_meta,
1038 }
1039 }
1040
1041 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1043 let run_meta = if parts.is_empty() {
1044 vec![]
1045 } else {
1046 vec![RunMeta::default()]
1047 };
1048 Self {
1049 desc,
1050 len,
1051 parts,
1052 run_splits: vec![],
1053 run_meta,
1054 }
1055 }
1056
1057 #[cfg(test)]
1058 pub(crate) fn new_run_for_test(
1059 desc: Description<T>,
1060 parts: Vec<RunPart<T>>,
1061 len: usize,
1062 run_id: RunId,
1063 ) -> Self {
1064 let run_meta = if parts.is_empty() {
1065 vec![]
1066 } else {
1067 let mut meta = RunMeta::default();
1068 meta.id = Some(run_id);
1069 vec![meta]
1070 };
1071 Self {
1072 desc,
1073 len,
1074 parts,
1075 run_splits: vec![],
1076 run_meta,
1077 }
1078 }
1079
1080 pub(crate) fn empty(desc: Description<T>) -> Self {
1082 Self {
1083 desc,
1084 len: 0,
1085 parts: vec![],
1086 run_splits: vec![],
1087 run_meta: vec![],
1088 }
1089 }
1090
1091 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1092 let run_ends = self
1093 .run_splits
1094 .iter()
1095 .copied()
1096 .chain(std::iter::once(self.parts.len()));
1097 let run_metas = self.run_meta.iter();
1098 let run_parts = run_ends
1099 .scan(0, |start, end| {
1100 let range = *start..end;
1101 *start = end;
1102 Some(range)
1103 })
1104 .filter(|range| !range.is_empty())
1105 .map(|range| &self.parts[range]);
1106 run_metas.zip_eq(run_parts)
1107 }
1108
1109 pub(crate) fn inline_bytes(&self) -> usize {
1110 self.parts.iter().map(|x| x.inline_bytes()).sum()
1111 }
1112
1113 pub(crate) fn is_empty(&self) -> bool {
1114 self.parts.is_empty()
1115 }
1116
1117 pub(crate) fn part_count(&self) -> usize {
1118 self.parts.len()
1119 }
1120
1121 pub fn encoded_size_bytes(&self) -> usize {
1123 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1124 }
1125}
1126
1127impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1129 pub(crate) fn rewrite_ts(
1130 &mut self,
1131 frontier: &Antichain<T>,
1132 new_upper: Antichain<T>,
1133 ) -> Result<(), String> {
1134 if !PartialOrder::less_than(frontier, &new_upper) {
1135 return Err(format!(
1136 "rewrite frontier {:?} !< rewrite upper {:?}",
1137 frontier.elements(),
1138 new_upper.elements(),
1139 ));
1140 }
1141 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1142 return Err(format!(
1143 "rewrite upper {:?} < batch upper {:?}",
1144 new_upper.elements(),
1145 self.desc.upper().elements(),
1146 ));
1147 }
1148
1149 if PartialOrder::less_than(frontier, self.desc.lower()) {
1152 return Err(format!(
1153 "rewrite frontier {:?} < batch lower {:?}",
1154 frontier.elements(),
1155 self.desc.lower().elements(),
1156 ));
1157 }
1158 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1159 return Err(format!(
1160 "batch since {:?} != minimum antichain {:?}",
1161 self.desc.since().elements(),
1162 &[T::minimum()],
1163 ));
1164 }
1165 for part in self.parts.iter() {
1166 let Some(ts_rewrite) = part.ts_rewrite() else {
1167 continue;
1168 };
1169 if PartialOrder::less_than(frontier, ts_rewrite) {
1170 return Err(format!(
1171 "rewrite frontier {:?} < batch rewrite {:?}",
1172 frontier.elements(),
1173 ts_rewrite.elements(),
1174 ));
1175 }
1176 }
1177
1178 self.desc = Description::new(
1179 self.desc.lower().clone(),
1180 new_upper,
1181 self.desc.since().clone(),
1182 );
1183 for part in &mut self.parts {
1184 match part {
1185 RunPart::Single(BatchPart::Hollow(part)) => {
1186 part.ts_rewrite = Some(frontier.clone())
1187 }
1188 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1189 *ts_rewrite = Some(frontier.clone())
1190 }
1191 RunPart::Many(runs) => {
1192 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1195 }
1196 }
1197 }
1198 Ok(())
1199 }
1200}
1201
1202impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1203 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1204 Some(self.cmp(other))
1205 }
1206}
1207
1208impl<T: Ord> Ord for HollowBatchPart<T> {
1209 fn cmp(&self, other: &Self) -> Ordering {
1210 let HollowBatchPart {
1213 key: self_key,
1214 encoded_size_bytes: self_encoded_size_bytes,
1215 key_lower: self_key_lower,
1216 structured_key_lower: self_structured_key_lower,
1217 stats: self_stats,
1218 ts_rewrite: self_ts_rewrite,
1219 diffs_sum: self_diffs_sum,
1220 format: self_format,
1221 schema_id: self_schema_id,
1222 deprecated_schema_id: self_deprecated_schema_id,
1223 } = self;
1224 let HollowBatchPart {
1225 key: other_key,
1226 encoded_size_bytes: other_encoded_size_bytes,
1227 key_lower: other_key_lower,
1228 structured_key_lower: other_structured_key_lower,
1229 stats: other_stats,
1230 ts_rewrite: other_ts_rewrite,
1231 diffs_sum: other_diffs_sum,
1232 format: other_format,
1233 schema_id: other_schema_id,
1234 deprecated_schema_id: other_deprecated_schema_id,
1235 } = other;
1236 (
1237 self_key,
1238 self_encoded_size_bytes,
1239 self_key_lower,
1240 self_structured_key_lower,
1241 self_stats,
1242 self_ts_rewrite.as_ref().map(|x| x.elements()),
1243 self_diffs_sum,
1244 self_format,
1245 self_schema_id,
1246 self_deprecated_schema_id,
1247 )
1248 .cmp(&(
1249 other_key,
1250 other_encoded_size_bytes,
1251 other_key_lower,
1252 other_structured_key_lower,
1253 other_stats,
1254 other_ts_rewrite.as_ref().map(|x| x.elements()),
1255 other_diffs_sum,
1256 other_format,
1257 other_schema_id,
1258 other_deprecated_schema_id,
1259 ))
1260 }
1261}
1262
1263#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1265pub struct HollowRollup {
1266 pub key: PartialRollupKey,
1268 pub encoded_size_bytes: Option<usize>,
1270}
1271
1272#[derive(Debug)]
1274pub enum HollowBlobRef<'a, T> {
1275 Batch(&'a HollowBatch<T>),
1276 Rollup(&'a HollowRollup),
1277}
1278
1279#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1281pub struct ActiveRollup {
1282 pub seqno: SeqNo,
1283 pub start_ms: u64,
1284}
1285
1286#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1288pub struct ActiveGc {
1289 pub seqno: SeqNo,
1290 pub start_ms: u64,
1291}
1292
1293#[derive(Debug)]
1298#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1299pub struct NoOpStateTransition<T>(pub T);
1300
1301#[derive(Debug, Clone)]
1303#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1304pub struct StateCollections<T> {
1305 pub(crate) last_gc_req: SeqNo,
1308
1309 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1311
1312 pub(crate) active_rollup: Option<ActiveRollup>,
1314 pub(crate) active_gc: Option<ActiveGc>,
1316
1317 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1318 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1319 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1320 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1321
1322 pub(crate) trace: Trace<T>,
1327}
1328
1329#[derive(Debug, Clone, Serialize, PartialEq)]
1345pub struct EncodedSchemas {
1346 pub key: Bytes,
1348 pub key_data_type: Bytes,
1351 pub val: Bytes,
1353 pub val_data_type: Bytes,
1356}
1357
1358impl EncodedSchemas {
1359 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1360 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1361 DataType::from_proto(proto).expect("valid DataType")
1362 }
1363}
1364
1365#[derive(Debug)]
1366#[cfg_attr(test, derive(PartialEq))]
1367pub enum CompareAndAppendBreak<T> {
1368 AlreadyCommitted,
1369 Upper {
1370 shard_upper: Antichain<T>,
1371 writer_upper: Antichain<T>,
1372 },
1373 InvalidUsage(InvalidUsage<T>),
1374 InlineBackpressure,
1375}
1376
1377#[derive(Debug)]
1378#[cfg_attr(test, derive(PartialEq))]
1379pub enum SnapshotErr<T> {
1380 AsOfNotYetAvailable(SeqNo, Upper<T>),
1381 AsOfHistoricalDistinctionsLost(Since<T>),
1382}
1383
1384impl<T> StateCollections<T>
1385where
1386 T: Timestamp + Lattice + Codec64,
1387{
1388 pub fn add_rollup(
1389 &mut self,
1390 add_rollup: (SeqNo, &HollowRollup),
1391 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1392 let (rollup_seqno, rollup) = add_rollup;
1393 let applied = match self.rollups.get(&rollup_seqno) {
1394 Some(x) => x.key == rollup.key,
1395 None => {
1396 self.active_rollup = None;
1397 self.rollups.insert(rollup_seqno, rollup.to_owned());
1398 true
1399 }
1400 };
1401 Continue(applied)
1405 }
1406
1407 pub fn remove_rollups(
1408 &mut self,
1409 remove_rollups: &[(SeqNo, PartialRollupKey)],
1410 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1411 if remove_rollups.is_empty() || self.is_tombstone() {
1412 return Break(NoOpStateTransition(vec![]));
1413 }
1414
1415 self.active_gc = None;
1418
1419 let mut removed = vec![];
1420 for (seqno, key) in remove_rollups {
1421 let removed_key = self.rollups.remove(seqno);
1422 debug_assert!(
1423 removed_key.as_ref().map_or(true, |x| &x.key == key),
1424 "{} vs {:?}",
1425 key,
1426 removed_key
1427 );
1428
1429 if removed_key.is_some() {
1430 removed.push(*seqno);
1431 }
1432 }
1433
1434 Continue(removed)
1435 }
1436
1437 pub fn register_leased_reader(
1438 &mut self,
1439 hostname: &str,
1440 reader_id: &LeasedReaderId,
1441 purpose: &str,
1442 seqno: SeqNo,
1443 lease_duration: Duration,
1444 heartbeat_timestamp_ms: u64,
1445 use_critical_since: bool,
1446 ) -> ControlFlow<
1447 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1448 (LeasedReaderState<T>, SeqNo),
1449 > {
1450 let since = if use_critical_since {
1451 self.critical_since()
1452 .unwrap_or_else(|| self.trace.since().clone())
1453 } else {
1454 self.trace.since().clone()
1455 };
1456 let reader_state = LeasedReaderState {
1457 debug: HandleDebugState {
1458 hostname: hostname.to_owned(),
1459 purpose: purpose.to_owned(),
1460 },
1461 seqno,
1462 since,
1463 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1464 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1465 .expect("lease duration as millis should fit within u64"),
1466 };
1467
1468 if self.is_tombstone() {
1473 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1474 }
1475
1476 self.leased_readers
1478 .insert(reader_id.clone(), reader_state.clone());
1479 Continue((reader_state, self.seqno_since(seqno)))
1480 }
1481
1482 pub fn register_critical_reader<O: Opaque + Codec64>(
1483 &mut self,
1484 hostname: &str,
1485 reader_id: &CriticalReaderId,
1486 purpose: &str,
1487 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1488 let state = CriticalReaderState {
1489 debug: HandleDebugState {
1490 hostname: hostname.to_owned(),
1491 purpose: purpose.to_owned(),
1492 },
1493 since: self.trace.since().clone(),
1494 opaque: OpaqueState(Codec64::encode(&O::initial())),
1495 opaque_codec: O::codec_name(),
1496 };
1497
1498 if self.is_tombstone() {
1503 return Break(NoOpStateTransition(state));
1504 }
1505
1506 let state = match self.critical_readers.get_mut(reader_id) {
1507 Some(existing_state) => {
1508 existing_state.debug = state.debug;
1509 existing_state.clone()
1510 }
1511 None => {
1512 self.critical_readers
1513 .insert(reader_id.clone(), state.clone());
1514 state
1515 }
1516 };
1517 Continue(state)
1518 }
1519
1520 pub fn register_schema<K: Codec, V: Codec>(
1521 &mut self,
1522 key_schema: &K::Schema,
1523 val_schema: &V::Schema,
1524 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1525 fn encode_data_type(data_type: &DataType) -> Bytes {
1526 let proto = data_type.into_proto();
1527 prost::Message::encode_to_vec(&proto).into()
1528 }
1529
1530 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1542 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1543 });
1544 match existing_id {
1545 Some((schema_id, _)) => {
1546 Break(NoOpStateTransition(Some(*schema_id)))
1551 }
1552 None if self.is_tombstone() => {
1553 Break(NoOpStateTransition(None))
1555 }
1556 None if self.schemas.is_empty() => {
1557 let id = SchemaId(self.schemas.len());
1561 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1562 .expect("valid key schema");
1563 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1564 .expect("valid val schema");
1565 let prev = self.schemas.insert(
1566 id,
1567 EncodedSchemas {
1568 key: K::encode_schema(key_schema),
1569 key_data_type: encode_data_type(&key_data_type),
1570 val: V::encode_schema(val_schema),
1571 val_data_type: encode_data_type(&val_data_type),
1572 },
1573 );
1574 assert_eq!(prev, None);
1575 Continue(Some(id))
1576 }
1577 None => {
1578 info!(
1579 "register_schemas got {:?} expected {:?}",
1580 key_schema,
1581 self.schemas
1582 .iter()
1583 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1584 .collect::<Vec<_>>()
1585 );
1586 Break(NoOpStateTransition(None))
1589 }
1590 }
1591 }
1592
1593 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1594 &mut self,
1595 expected: SchemaId,
1596 key_schema: &K::Schema,
1597 val_schema: &V::Schema,
1598 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1599 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1600 let array = Schema::encoder(schema).expect("valid schema").finish();
1604 Array::data_type(&array).clone()
1605 }
1606
1607 let (current_id, current) = self
1608 .schemas
1609 .last_key_value()
1610 .expect("all shards have a schema");
1611 if *current_id != expected {
1612 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1613 schema_id: *current_id,
1614 key: K::decode_schema(¤t.key),
1615 val: V::decode_schema(¤t.val),
1616 }));
1617 }
1618
1619 let current_key = K::decode_schema(¤t.key);
1620 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1621 let current_val = V::decode_schema(¤t.val);
1622 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1623
1624 let key_dt = data_type(key_schema);
1625 let val_dt = data_type(val_schema);
1626
1627 if current_key == *key_schema
1629 && current_key_dt == key_dt
1630 && current_val == *val_schema
1631 && current_val_dt == val_dt
1632 {
1633 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1634 }
1635
1636 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1637 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1638 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1639 return Break(NoOpStateTransition(CaESchema::Incompatible));
1640 };
1641 if key_fn.contains_drop() || val_fn.contains_drop() {
1645 return Break(NoOpStateTransition(CaESchema::Incompatible));
1646 }
1647
1648 let id = SchemaId(self.schemas.len());
1652 self.schemas.insert(
1653 id,
1654 EncodedSchemas {
1655 key: K::encode_schema(key_schema),
1656 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1657 val: V::encode_schema(val_schema),
1658 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1659 },
1660 );
1661 Continue(CaESchema::Ok(id))
1662 }
1663
1664 pub fn compare_and_append(
1665 &mut self,
1666 batch: &HollowBatch<T>,
1667 writer_id: &WriterId,
1668 heartbeat_timestamp_ms: u64,
1669 lease_duration_ms: u64,
1670 idempotency_token: &IdempotencyToken,
1671 debug_info: &HandleDebugState,
1672 inline_writes_total_max_bytes: usize,
1673 claim_compaction_percent: usize,
1674 claim_compaction_min_version: Option<&Version>,
1675 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1676 if self.is_tombstone() {
1681 assert_eq!(self.trace.upper(), &Antichain::new());
1682 return Break(CompareAndAppendBreak::Upper {
1683 shard_upper: Antichain::new(),
1684 writer_upper: Antichain::new(),
1689 });
1690 }
1691
1692 let writer_state = self
1693 .writers
1694 .entry(writer_id.clone())
1695 .or_insert_with(|| WriterState {
1696 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1697 lease_duration_ms,
1698 most_recent_write_token: IdempotencyToken::SENTINEL,
1699 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1700 debug: debug_info.clone(),
1701 });
1702
1703 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1704 return Break(CompareAndAppendBreak::InvalidUsage(
1705 InvalidUsage::InvalidBounds {
1706 lower: batch.desc.lower().clone(),
1707 upper: batch.desc.upper().clone(),
1708 },
1709 ));
1710 }
1711
1712 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1715 return Break(CompareAndAppendBreak::InvalidUsage(
1716 InvalidUsage::InvalidEmptyTimeInterval {
1717 lower: batch.desc.lower().clone(),
1718 upper: batch.desc.upper().clone(),
1719 keys: batch
1720 .parts
1721 .iter()
1722 .map(|x| x.printable_name().to_owned())
1723 .collect(),
1724 },
1725 ));
1726 }
1727
1728 if idempotency_token == &writer_state.most_recent_write_token {
1729 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1734 assert!(
1735 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1736 "{:?} vs {:?}",
1737 batch.desc.upper(),
1738 self.trace.upper()
1739 );
1740 return Break(CompareAndAppendBreak::AlreadyCommitted);
1741 }
1742
1743 let shard_upper = self.trace.upper();
1744 if shard_upper != batch.desc.lower() {
1745 return Break(CompareAndAppendBreak::Upper {
1746 shard_upper: shard_upper.clone(),
1747 writer_upper: writer_state.most_recent_write_upper.clone(),
1748 });
1749 }
1750
1751 let new_inline_bytes = batch.inline_bytes();
1752 if new_inline_bytes > 0 {
1753 let mut existing_inline_bytes = 0;
1754 self.trace
1755 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1756 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1760 return Break(CompareAndAppendBreak::InlineBackpressure);
1761 }
1762 }
1763
1764 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1765 self.trace.push_batch(batch.clone())
1766 } else {
1767 Vec::new()
1768 };
1769
1770 let all_empty_reqs = merge_reqs
1773 .iter()
1774 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1775 if all_empty_reqs && !batch.is_empty() {
1776 let mut reqs_to_take = claim_compaction_percent / 100;
1777 if (usize::cast_from(idempotency_token.hashed()) % 100)
1778 < (claim_compaction_percent % 100)
1779 {
1780 reqs_to_take += 1;
1781 }
1782 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1783 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1784 merge_reqs.extend(
1785 self.trace
1788 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1789 .take(reqs_to_take),
1790 )
1791 }
1792
1793 for req in &merge_reqs {
1794 self.trace.claim_compaction(
1795 req.id,
1796 ActiveCompaction {
1797 start_ms: heartbeat_timestamp_ms,
1798 },
1799 )
1800 }
1801
1802 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1803 writer_state.most_recent_write_token = idempotency_token.clone();
1804 assert!(
1806 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1807 "{:?} vs {:?}",
1808 &writer_state.most_recent_write_upper,
1809 batch.desc.upper()
1810 );
1811 writer_state
1812 .most_recent_write_upper
1813 .clone_from(batch.desc.upper());
1814
1815 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1817 heartbeat_timestamp_ms,
1818 writer_state.last_heartbeat_timestamp_ms,
1819 );
1820
1821 Continue(merge_reqs)
1822 }
1823
1824 pub fn apply_merge_res<D: Codec64 + Semigroup + PartialEq>(
1825 &mut self,
1826 res: &FueledMergeRes<T>,
1827 metrics: &ColumnarMetrics,
1828 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1829 if self.is_tombstone() {
1834 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1835 }
1836
1837 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1838 Continue(apply_merge_result)
1839 }
1840
1841 pub fn spine_exert(
1842 &mut self,
1843 fuel: usize,
1844 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1845 let (merge_reqs, did_work) = self.trace.exert(fuel);
1846 if did_work {
1847 Continue(merge_reqs)
1848 } else {
1849 assert!(merge_reqs.is_empty());
1850 Break(NoOpStateTransition(Vec::new()))
1853 }
1854 }
1855
1856 pub fn downgrade_since(
1857 &mut self,
1858 reader_id: &LeasedReaderId,
1859 seqno: SeqNo,
1860 outstanding_seqno: Option<SeqNo>,
1861 new_since: &Antichain<T>,
1862 heartbeat_timestamp_ms: u64,
1863 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1864 if self.is_tombstone() {
1869 return Break(NoOpStateTransition(Since(Antichain::new())));
1870 }
1871
1872 let Some(reader_state) = self.leased_reader(reader_id) else {
1875 tracing::warn!(
1876 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1877 );
1878 return Break(NoOpStateTransition(Since(Antichain::new())));
1879 };
1880
1881 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1884 heartbeat_timestamp_ms,
1885 reader_state.last_heartbeat_timestamp_ms,
1886 );
1887
1888 let seqno = match outstanding_seqno {
1889 Some(outstanding_seqno) => {
1890 assert!(
1891 outstanding_seqno >= reader_state.seqno,
1892 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1893 is behind current reader_state ({:?})",
1894 outstanding_seqno,
1895 reader_state.seqno,
1896 );
1897 std::cmp::min(outstanding_seqno, seqno)
1898 }
1899 None => seqno,
1900 };
1901
1902 reader_state.seqno = seqno;
1903
1904 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1905 reader_state.since.clone_from(new_since);
1906 self.update_since();
1907 new_since.clone()
1908 } else {
1909 reader_state.since.clone()
1912 };
1913
1914 Continue(Since(reader_current_since))
1915 }
1916
1917 pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1918 &mut self,
1919 reader_id: &CriticalReaderId,
1920 expected_opaque: &O,
1921 (new_opaque, new_since): (&O, &Antichain<T>),
1922 ) -> ControlFlow<
1923 NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1924 Result<Since<T>, (O, Since<T>)>,
1925 > {
1926 if self.is_tombstone() {
1931 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1935 }
1936
1937 let reader_state = self.critical_reader(reader_id);
1938 assert_eq!(reader_state.opaque_codec, O::codec_name());
1939
1940 if &O::decode(reader_state.opaque.0) != expected_opaque {
1941 return Continue(Err((
1944 Codec64::decode(reader_state.opaque.0),
1945 Since(reader_state.since.clone()),
1946 )));
1947 }
1948
1949 reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1950 if PartialOrder::less_equal(&reader_state.since, new_since) {
1951 reader_state.since.clone_from(new_since);
1952 self.update_since();
1953 Continue(Ok(Since(new_since.clone())))
1954 } else {
1955 Continue(Ok(Since(reader_state.since.clone())))
1959 }
1960 }
1961
1962 pub fn heartbeat_leased_reader(
1963 &mut self,
1964 reader_id: &LeasedReaderId,
1965 heartbeat_timestamp_ms: u64,
1966 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1967 if self.is_tombstone() {
1972 return Break(NoOpStateTransition(false));
1973 }
1974
1975 match self.leased_readers.get_mut(reader_id) {
1976 Some(reader_state) => {
1977 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1978 heartbeat_timestamp_ms,
1979 reader_state.last_heartbeat_timestamp_ms,
1980 );
1981 Continue(true)
1982 }
1983 None => Continue(false),
1986 }
1987 }
1988
1989 pub fn expire_leased_reader(
1990 &mut self,
1991 reader_id: &LeasedReaderId,
1992 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1993 if self.is_tombstone() {
1998 return Break(NoOpStateTransition(false));
1999 }
2000
2001 let existed = self.leased_readers.remove(reader_id).is_some();
2002 if existed {
2003 }
2017 Continue(existed)
2020 }
2021
2022 pub fn expire_critical_reader(
2023 &mut self,
2024 reader_id: &CriticalReaderId,
2025 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2026 if self.is_tombstone() {
2031 return Break(NoOpStateTransition(false));
2032 }
2033
2034 let existed = self.critical_readers.remove(reader_id).is_some();
2035 if existed {
2036 }
2050 Continue(existed)
2054 }
2055
2056 pub fn expire_writer(
2057 &mut self,
2058 writer_id: &WriterId,
2059 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2060 if self.is_tombstone() {
2065 return Break(NoOpStateTransition(false));
2066 }
2067
2068 let existed = self.writers.remove(writer_id).is_some();
2069 Continue(existed)
2073 }
2074
2075 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2076 self.leased_readers.get_mut(id)
2077 }
2078
2079 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2080 self.critical_readers
2081 .get_mut(id)
2082 .unwrap_or_else(|| {
2083 panic!(
2084 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2085 id
2086 )
2087 })
2088 }
2089
2090 fn critical_since(&self) -> Option<Antichain<T>> {
2091 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2092 let mut since = critical_sinces.next().cloned()?;
2093 for s in critical_sinces {
2094 since.meet_assign(s);
2095 }
2096 Some(since)
2097 }
2098
2099 fn update_since(&mut self) {
2100 let mut sinces_iter = self
2101 .leased_readers
2102 .values()
2103 .map(|x| &x.since)
2104 .chain(self.critical_readers.values().map(|x| &x.since));
2105 let mut since = match sinces_iter.next() {
2106 Some(since) => since.clone(),
2107 None => {
2108 return;
2111 }
2112 };
2113 while let Some(s) = sinces_iter.next() {
2114 since.meet_assign(s);
2115 }
2116 self.trace.downgrade_since(&since);
2117 }
2118
2119 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2120 let mut seqno_since = seqno;
2121 for cap in self.leased_readers.values() {
2122 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2123 }
2124 seqno_since
2126 }
2127
2128 fn tombstone_batch() -> HollowBatch<T> {
2129 HollowBatch::empty(Description::new(
2130 Antichain::from_elem(T::minimum()),
2131 Antichain::new(),
2132 Antichain::new(),
2133 ))
2134 }
2135
2136 pub(crate) fn is_tombstone(&self) -> bool {
2137 self.trace.upper().is_empty()
2138 && self.trace.since().is_empty()
2139 && self.writers.is_empty()
2140 && self.leased_readers.is_empty()
2141 && self.critical_readers.is_empty()
2142 }
2143
2144 pub(crate) fn is_single_empty_batch(&self) -> bool {
2145 let mut batch_count = 0;
2146 let mut is_empty = true;
2147 self.trace.map_batches(|b| {
2148 batch_count += 1;
2149 is_empty &= b.is_empty()
2150 });
2151 batch_count <= 1 && is_empty
2152 }
2153
2154 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2155 assert_eq!(self.trace.upper(), &Antichain::new());
2156 assert_eq!(self.trace.since(), &Antichain::new());
2157
2158 let was_tombstone = self.is_tombstone();
2161
2162 self.writers.clear();
2164 self.leased_readers.clear();
2165 self.critical_readers.clear();
2166
2167 debug_assert!(self.is_tombstone());
2168
2169 let mut to_replace = None;
2178 let mut batch_count = 0;
2179 self.trace.map_batches(|b| {
2180 batch_count += 1;
2181 if !b.is_empty() && to_replace.is_none() {
2182 to_replace = Some(b.desc.clone());
2183 }
2184 });
2185 if let Some(desc) = to_replace {
2186 let result = self.trace.apply_tombstone_merge(&desc);
2190 assert!(
2191 result.matched(),
2192 "merge with a matching desc should always match"
2193 );
2194 Continue(())
2195 } else if batch_count > 1 {
2196 let mut new_trace = Trace::default();
2201 new_trace.downgrade_since(&Antichain::new());
2202 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2203 assert_eq!(merge_reqs, Vec::new());
2204 self.trace = new_trace;
2205 Continue(())
2206 } else if !was_tombstone {
2207 Continue(())
2210 } else {
2211 Break(NoOpStateTransition(()))
2214 }
2215 }
2216}
2217
2218#[derive(Debug)]
2220#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2221pub struct State<T> {
2222 pub(crate) applier_version: semver::Version,
2223 pub(crate) shard_id: ShardId,
2224
2225 pub(crate) seqno: SeqNo,
2226 pub(crate) walltime_ms: u64,
2229 pub(crate) hostname: String,
2232 pub(crate) collections: StateCollections<T>,
2233}
2234
2235pub struct TypedState<K, V, T, D> {
2238 pub(crate) state: State<T>,
2239
2240 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2248}
2249
2250impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2251 #[cfg(any(test, debug_assertions))]
2252 pub(crate) fn clone(&self, applier_version: Version, hostname: String) -> Self {
2253 TypedState {
2254 state: State {
2255 applier_version,
2256 shard_id: self.shard_id.clone(),
2257 seqno: self.seqno.clone(),
2258 walltime_ms: self.walltime_ms,
2259 hostname,
2260 collections: self.collections.clone(),
2261 },
2262 _phantom: PhantomData,
2263 }
2264 }
2265
2266 pub(crate) fn clone_for_rollup(&self) -> Self {
2267 TypedState {
2268 state: State {
2269 applier_version: self.applier_version.clone(),
2270 shard_id: self.shard_id.clone(),
2271 seqno: self.seqno.clone(),
2272 walltime_ms: self.walltime_ms,
2273 hostname: self.hostname.clone(),
2274 collections: self.collections.clone(),
2275 },
2276 _phantom: PhantomData,
2277 }
2278 }
2279}
2280
2281impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2282 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2283 let TypedState { state, _phantom } = self;
2286 f.debug_struct("TypedState").field("state", state).finish()
2287 }
2288}
2289
2290#[cfg(any(test, debug_assertions))]
2292impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2293 fn eq(&self, other: &Self) -> bool {
2294 let TypedState {
2297 state: self_state,
2298 _phantom,
2299 } = self;
2300 let TypedState {
2301 state: other_state,
2302 _phantom,
2303 } = other;
2304 self_state == other_state
2305 }
2306}
2307
2308impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2309 type Target = State<T>;
2310
2311 fn deref(&self) -> &Self::Target {
2312 &self.state
2313 }
2314}
2315
2316impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2317 fn deref_mut(&mut self) -> &mut Self::Target {
2318 &mut self.state
2319 }
2320}
2321
2322impl<K, V, T, D> TypedState<K, V, T, D>
2323where
2324 K: Codec,
2325 V: Codec,
2326 T: Timestamp + Lattice + Codec64,
2327 D: Codec64,
2328{
2329 pub fn new(
2330 applier_version: Version,
2331 shard_id: ShardId,
2332 hostname: String,
2333 walltime_ms: u64,
2334 ) -> Self {
2335 let state = State {
2336 applier_version,
2337 shard_id,
2338 seqno: SeqNo::minimum(),
2339 walltime_ms,
2340 hostname,
2341 collections: StateCollections {
2342 last_gc_req: SeqNo::minimum(),
2343 rollups: BTreeMap::new(),
2344 active_rollup: None,
2345 active_gc: None,
2346 leased_readers: BTreeMap::new(),
2347 critical_readers: BTreeMap::new(),
2348 writers: BTreeMap::new(),
2349 schemas: BTreeMap::new(),
2350 trace: Trace::default(),
2351 },
2352 };
2353 TypedState {
2354 state,
2355 _phantom: PhantomData,
2356 }
2357 }
2358
2359 pub fn clone_apply<R, E, WorkFn>(
2360 &self,
2361 cfg: &PersistConfig,
2362 work_fn: &mut WorkFn,
2363 ) -> ControlFlow<E, (R, Self)>
2364 where
2365 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2366 {
2367 let new_applier_version = std::cmp::max(&self.applier_version, &cfg.build_version);
2372 let mut new_state = State {
2373 applier_version: new_applier_version.clone(),
2374 shard_id: self.shard_id,
2375 seqno: self.seqno.next(),
2376 walltime_ms: (cfg.now)(),
2377 hostname: cfg.hostname.clone(),
2378 collections: self.collections.clone(),
2379 };
2380 if new_state.walltime_ms <= self.walltime_ms {
2383 new_state.walltime_ms = self.walltime_ms + 1;
2384 }
2385
2386 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2387 let new_state = TypedState {
2388 state: new_state,
2389 _phantom: PhantomData,
2390 };
2391 Continue((work_ret, new_state))
2392 }
2393}
2394
2395#[derive(Copy, Clone, Debug)]
2396pub struct GcConfig {
2397 pub use_active_gc: bool,
2398 pub fallback_threshold_ms: u64,
2399 pub min_versions: usize,
2400 pub max_versions: usize,
2401}
2402
2403impl<T> State<T>
2404where
2405 T: Timestamp + Lattice + Codec64,
2406{
2407 pub fn shard_id(&self) -> ShardId {
2408 self.shard_id
2409 }
2410
2411 pub fn seqno(&self) -> SeqNo {
2412 self.seqno
2413 }
2414
2415 pub fn since(&self) -> &Antichain<T> {
2416 self.collections.trace.since()
2417 }
2418
2419 pub fn upper(&self) -> &Antichain<T> {
2420 self.collections.trace.upper()
2421 }
2422
2423 pub fn spine_batch_count(&self) -> usize {
2424 self.collections.trace.num_spine_batches()
2425 }
2426
2427 pub fn size_metrics(&self) -> StateSizeMetrics {
2428 let mut ret = StateSizeMetrics::default();
2429 self.blobs().for_each(|x| match x {
2430 HollowBlobRef::Batch(x) => {
2431 ret.hollow_batch_count += 1;
2432 ret.batch_part_count += x.part_count();
2433 ret.num_updates += x.len;
2434
2435 let batch_size = x.encoded_size_bytes();
2436 for x in x.parts.iter() {
2437 if x.ts_rewrite().is_some() {
2438 ret.rewrite_part_count += 1;
2439 }
2440 if x.is_inline() {
2441 ret.inline_part_count += 1;
2442 ret.inline_part_bytes += x.inline_bytes();
2443 }
2444 }
2445 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2446 ret.state_batches_bytes += batch_size;
2447 }
2448 HollowBlobRef::Rollup(x) => {
2449 ret.state_rollup_count += 1;
2450 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2451 }
2452 });
2453 ret
2454 }
2455
2456 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2457 self.collections
2460 .rollups
2461 .iter()
2462 .rev()
2463 .next()
2464 .expect("State should have at least one rollup if seqno > minimum")
2465 }
2466
2467 pub(crate) fn seqno_since(&self) -> SeqNo {
2468 self.collections.seqno_since(self.seqno)
2469 }
2470
2471 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2483 let GcConfig {
2484 use_active_gc,
2485 fallback_threshold_ms,
2486 min_versions,
2487 max_versions,
2488 } = cfg;
2489 let gc_threshold = if use_active_gc {
2493 u64::cast_from(min_versions)
2494 } else {
2495 std::cmp::max(
2496 1,
2497 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2498 )
2499 };
2500 let new_seqno_since = self.seqno_since();
2501 let gc_until_seqno = new_seqno_since.min(SeqNo(
2504 self.collections
2505 .last_gc_req
2506 .0
2507 .saturating_add(u64::cast_from(max_versions)),
2508 ));
2509 let should_gc = new_seqno_since
2510 .0
2511 .saturating_sub(self.collections.last_gc_req.0)
2512 >= gc_threshold;
2513
2514 let should_gc = if use_active_gc && !should_gc {
2517 match self.collections.active_gc {
2518 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2519 None => false,
2520 }
2521 } else {
2522 should_gc
2523 };
2524 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2527 let tombstone_needs_gc = self.collections.is_tombstone();
2532 let should_gc = should_gc || tombstone_needs_gc;
2533 let should_gc = if use_active_gc {
2534 should_gc
2538 && match self.collections.active_gc {
2539 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2540 None => true,
2541 }
2542 } else {
2543 should_gc
2544 };
2545 if should_gc {
2546 self.collections.last_gc_req = gc_until_seqno;
2547 Some(GcReq {
2548 shard_id: self.shard_id,
2549 new_seqno_since: gc_until_seqno,
2550 })
2551 } else {
2552 None
2553 }
2554 }
2555
2556 pub fn seqnos_held(&self) -> usize {
2558 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2559 }
2560
2561 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2563 let mut metrics = ExpiryMetrics::default();
2564 let shard_id = self.shard_id();
2565 self.collections.leased_readers.retain(|id, state| {
2566 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2567 if !retain {
2568 info!(
2569 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2570 state.debug.purpose
2571 );
2572 metrics.readers_expired += 1;
2573 }
2574 retain
2575 });
2576 self.collections.writers.retain(|id, state| {
2578 let retain =
2579 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2580 if !retain {
2581 info!(
2582 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2583 state.debug.purpose
2584 );
2585 metrics.writers_expired += 1;
2586 }
2587 retain
2588 });
2589 metrics
2590 }
2591
2592 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2596 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2597 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2598 self.collections.trace.since().clone(),
2599 )));
2600 }
2601 let upper = self.collections.trace.upper();
2602 if PartialOrder::less_equal(upper, as_of) {
2603 return Err(SnapshotErr::AsOfNotYetAvailable(
2604 self.seqno,
2605 Upper(upper.clone()),
2606 ));
2607 }
2608
2609 let batches = self
2610 .collections
2611 .trace
2612 .batches()
2613 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2614 .cloned()
2615 .collect();
2616 Ok(batches)
2617 }
2618
2619 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2621 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2622 return Err(Since(self.collections.trace.since().clone()));
2623 }
2624 Ok(())
2625 }
2626
2627 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2628 self.collections
2631 .trace
2632 .batches()
2633 .find(|b| {
2634 PartialOrder::less_equal(b.desc.lower(), frontier)
2635 && PartialOrder::less_than(frontier, b.desc.upper())
2636 })
2637 .cloned()
2638 .ok_or(self.seqno)
2639 }
2640
2641 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2642 self.collections.active_rollup
2643 }
2644
2645 pub fn need_rollup(
2646 &self,
2647 threshold: usize,
2648 use_active_rollup: bool,
2649 fallback_threshold_ms: u64,
2650 now: u64,
2651 ) -> Option<SeqNo> {
2652 let (latest_rollup_seqno, _) = self.latest_rollup();
2653
2654 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2660 return Some(self.seqno);
2661 }
2662
2663 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2664
2665 if use_active_rollup {
2666 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2672 match self.active_rollup() {
2673 Some(active_rollup) => {
2674 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2675 return Some(self.seqno);
2676 }
2677 }
2678 None => {
2679 return Some(self.seqno);
2680 }
2681 }
2682 }
2683 } else {
2684 if seqnos_since_last_rollup > 0
2688 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2689 {
2690 return Some(self.seqno);
2691 }
2692
2693 if seqnos_since_last_rollup
2696 > u64::cast_from(
2697 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2698 )
2699 {
2700 return Some(self.seqno);
2701 }
2702 }
2703
2704 None
2705 }
2706
2707 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2708 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2709 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2710 batches.chain(rollups)
2711 }
2712}
2713
2714fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2715 let val = hex::encode(val);
2716 val.serialize(s)
2717}
2718
2719fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2720 val: &Option<LazyProto<T>>,
2721 s: S,
2722) -> Result<S::Ok, S::Error> {
2723 val.as_ref()
2724 .map(|lazy| hex::encode(&lazy.into_proto()))
2725 .serialize(s)
2726}
2727
2728fn serialize_part_stats<S: Serializer>(
2729 val: &Option<LazyPartStats>,
2730 s: S,
2731) -> Result<S::Ok, S::Error> {
2732 let val = val.as_ref().map(|x| x.decode().key);
2733 val.serialize(s)
2734}
2735
2736fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2737 let val = val.map(i64::decode);
2739 val.serialize(s)
2740}
2741
2742impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2748 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2749 let State {
2750 applier_version,
2751 shard_id,
2752 seqno,
2753 walltime_ms,
2754 hostname,
2755 collections:
2756 StateCollections {
2757 last_gc_req,
2758 rollups,
2759 active_rollup,
2760 active_gc,
2761 leased_readers,
2762 critical_readers,
2763 writers,
2764 schemas,
2765 trace,
2766 },
2767 } = self;
2768 let mut s = s.serialize_struct("State", 13)?;
2769 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2770 let () = s.serialize_field("shard_id", shard_id)?;
2771 let () = s.serialize_field("seqno", seqno)?;
2772 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2773 let () = s.serialize_field("hostname", hostname)?;
2774 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2775 let () = s.serialize_field("rollups", rollups)?;
2776 let () = s.serialize_field("active_rollup", active_rollup)?;
2777 let () = s.serialize_field("active_gc", active_gc)?;
2778 let () = s.serialize_field("leased_readers", leased_readers)?;
2779 let () = s.serialize_field("critical_readers", critical_readers)?;
2780 let () = s.serialize_field("writers", writers)?;
2781 let () = s.serialize_field("schemas", schemas)?;
2782 let () = s.serialize_field("since", &trace.since().elements())?;
2783 let () = s.serialize_field("upper", &trace.upper().elements())?;
2784 let trace = trace.flatten();
2785 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2786 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2787 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2788 let () = s.serialize_field("merges", &trace.merges)?;
2789 s.end()
2790 }
2791}
2792
2793#[derive(Debug, Default)]
2794pub struct StateSizeMetrics {
2795 pub hollow_batch_count: usize,
2796 pub batch_part_count: usize,
2797 pub rewrite_part_count: usize,
2798 pub num_updates: usize,
2799 pub largest_batch_bytes: usize,
2800 pub state_batches_bytes: usize,
2801 pub state_rollups_bytes: usize,
2802 pub state_rollup_count: usize,
2803 pub inline_part_count: usize,
2804 pub inline_part_bytes: usize,
2805}
2806
2807#[derive(Default)]
2808pub struct ExpiryMetrics {
2809 pub(crate) readers_expired: usize,
2810 pub(crate) writers_expired: usize,
2811}
2812
2813#[derive(Debug, Clone, PartialEq)]
2815pub struct Since<T>(pub Antichain<T>);
2816
2817#[derive(Debug, PartialEq)]
2819pub struct Upper<T>(pub Antichain<T>);
2820
2821#[cfg(test)]
2822pub(crate) mod tests {
2823 use std::ops::Range;
2824 use std::str::FromStr;
2825
2826 use bytes::Bytes;
2827 use mz_build_info::DUMMY_BUILD_INFO;
2828 use mz_dyncfg::ConfigUpdates;
2829 use mz_ore::now::SYSTEM_TIME;
2830 use mz_ore::{assert_none, assert_ok};
2831 use mz_proto::RustType;
2832 use proptest::prelude::*;
2833 use proptest::strategy::ValueTree;
2834
2835 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2836 use crate::PersistLocation;
2837 use crate::cache::PersistClientCache;
2838 use crate::internal::encoding::any_some_lazy_part_stats;
2839 use crate::internal::paths::RollupId;
2840 use crate::internal::trace::tests::any_trace;
2841 use crate::tests::new_test_client_cache;
2842
2843 use super::*;
2844
2845 const LEASE_DURATION_MS: u64 = 900 * 1000;
2846 fn debug_state() -> HandleDebugState {
2847 HandleDebugState {
2848 hostname: "debug".to_owned(),
2849 purpose: "finding the bugs".to_owned(),
2850 }
2851 }
2852
2853 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2854 num_runs: usize,
2855 ) -> impl Strategy<Value = HollowBatch<T>> {
2856 (
2857 any::<T>(),
2858 any::<T>(),
2859 any::<T>(),
2860 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2861 any::<usize>(),
2862 )
2863 .prop_map(move |(t0, t1, since, parts, len)| {
2864 let (lower, upper) = if t0 <= t1 {
2865 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2866 } else {
2867 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2868 };
2869 let since = Antichain::from_elem(since);
2870
2871 let run_splits = (1..num_runs)
2872 .map(|i| i * parts.len() / num_runs)
2873 .collect::<Vec<_>>();
2874
2875 let run_meta = (0..num_runs)
2876 .map(|_| {
2877 let mut meta = RunMeta::default();
2878 meta.id = Some(RunId::new());
2879 meta
2880 })
2881 .collect::<Vec<_>>();
2882
2883 HollowBatch::new(
2884 Description::new(lower, upper, since),
2885 parts,
2886 len % 10,
2887 run_meta,
2888 run_splits,
2889 )
2890 })
2891 }
2892
2893 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2894 Strategy::prop_map(
2895 (
2896 any::<T>(),
2897 any::<T>(),
2898 any::<T>(),
2899 proptest::collection::vec(any_run_part::<T>(), 0..20),
2900 any::<usize>(),
2901 0..=10usize,
2902 proptest::collection::vec(any::<RunId>(), 10),
2903 ),
2904 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2905 let (lower, upper) = if t0 <= t1 {
2906 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2907 } else {
2908 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2909 };
2910 let since = Antichain::from_elem(since);
2911 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2912 let run_splits = (1..num_runs)
2913 .map(|i| i * parts.len() / num_runs)
2914 .collect::<Vec<_>>();
2915
2916 let run_meta = (0..num_runs)
2917 .enumerate()
2918 .map(|(i, _)| {
2919 let mut meta = RunMeta::default();
2920 meta.id = Some(run_ids[i]);
2921 meta
2922 })
2923 .collect::<Vec<_>>();
2924
2925 HollowBatch::new(
2926 Description::new(lower, upper, since),
2927 parts,
2928 len % 10,
2929 run_meta,
2930 run_splits,
2931 )
2932 } else {
2933 HollowBatch::new_run_for_test(
2934 Description::new(lower, upper, since),
2935 parts,
2936 len % 10,
2937 run_ids[0],
2938 )
2939 }
2940 },
2941 )
2942 }
2943
2944 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2945 Strategy::prop_map(
2946 (
2947 any::<bool>(),
2948 any_hollow_batch_part(),
2949 any::<Option<T>>(),
2950 any::<Option<SchemaId>>(),
2951 any::<Option<SchemaId>>(),
2952 ),
2953 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2954 if is_hollow {
2955 BatchPart::Hollow(hollow)
2956 } else {
2957 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2958 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2959 BatchPart::Inline {
2960 updates,
2961 ts_rewrite,
2962 schema_id,
2963 deprecated_schema_id,
2964 }
2965 }
2966 },
2967 )
2968 }
2969
2970 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2971 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2972 }
2973
2974 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2975 -> impl Strategy<Value = HollowBatchPart<T>> {
2976 Strategy::prop_map(
2977 (
2978 any::<PartialBatchKey>(),
2979 any::<usize>(),
2980 any::<Vec<u8>>(),
2981 any_some_lazy_part_stats(),
2982 any::<Option<T>>(),
2983 any::<[u8; 8]>(),
2984 any::<Option<BatchColumnarFormat>>(),
2985 any::<Option<SchemaId>>(),
2986 any::<Option<SchemaId>>(),
2987 ),
2988 |(
2989 key,
2990 encoded_size_bytes,
2991 key_lower,
2992 stats,
2993 ts_rewrite,
2994 diffs_sum,
2995 format,
2996 schema_id,
2997 deprecated_schema_id,
2998 )| {
2999 HollowBatchPart {
3000 key,
3001 encoded_size_bytes,
3002 key_lower,
3003 structured_key_lower: None,
3004 stats,
3005 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3006 diffs_sum: Some(diffs_sum),
3007 format,
3008 schema_id,
3009 deprecated_schema_id,
3010 }
3011 },
3012 )
3013 }
3014
3015 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3016 Strategy::prop_map(
3017 (
3018 any::<SeqNo>(),
3019 any::<Option<T>>(),
3020 any::<u64>(),
3021 any::<u64>(),
3022 any::<HandleDebugState>(),
3023 ),
3024 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3025 if lease_duration_ms == 0 {
3029 lease_duration_ms += 1;
3030 }
3031 LeasedReaderState {
3032 seqno,
3033 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3034 last_heartbeat_timestamp_ms,
3035 lease_duration_ms,
3036 debug,
3037 }
3038 },
3039 )
3040 }
3041
3042 pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3043 {
3044 Strategy::prop_map(
3045 (
3046 any::<Option<T>>(),
3047 any::<OpaqueState>(),
3048 any::<String>(),
3049 any::<HandleDebugState>(),
3050 ),
3051 |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3052 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3053 opaque,
3054 opaque_codec,
3055 debug,
3056 },
3057 )
3058 }
3059
3060 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3061 Strategy::prop_map(
3062 (
3063 any::<u64>(),
3064 any::<u64>(),
3065 any::<IdempotencyToken>(),
3066 any::<Option<T>>(),
3067 any::<HandleDebugState>(),
3068 ),
3069 |(
3070 last_heartbeat_timestamp_ms,
3071 lease_duration_ms,
3072 most_recent_write_token,
3073 most_recent_write_upper,
3074 debug,
3075 )| WriterState {
3076 last_heartbeat_timestamp_ms,
3077 lease_duration_ms,
3078 most_recent_write_token,
3079 most_recent_write_upper: most_recent_write_upper
3080 .map_or_else(Antichain::new, Antichain::from_elem),
3081 debug,
3082 },
3083 )
3084 }
3085
3086 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3087 Strategy::prop_map(
3088 (
3089 any::<Vec<u8>>(),
3090 any::<Vec<u8>>(),
3091 any::<Vec<u8>>(),
3092 any::<Vec<u8>>(),
3093 ),
3094 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3095 key: Bytes::from(key),
3096 key_data_type: Bytes::from(key_data_type),
3097 val: Bytes::from(val),
3098 val_data_type: Bytes::from(val_data_type),
3099 },
3100 )
3101 }
3102
3103 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3104 num_trace_batches: Range<usize>,
3105 ) -> impl Strategy<Value = State<T>> {
3106 let part1 = (
3107 any::<ShardId>(),
3108 any::<SeqNo>(),
3109 any::<u64>(),
3110 any::<String>(),
3111 any::<SeqNo>(),
3112 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3113 proptest::option::of(any::<ActiveRollup>()),
3114 );
3115
3116 let part2 = (
3117 proptest::option::of(any::<ActiveGc>()),
3118 proptest::collection::btree_map(
3119 any::<LeasedReaderId>(),
3120 any_leased_reader_state::<T>(),
3121 1..3,
3122 ),
3123 proptest::collection::btree_map(
3124 any::<CriticalReaderId>(),
3125 any_critical_reader_state::<T>(),
3126 1..3,
3127 ),
3128 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3129 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3130 any_trace::<T>(num_trace_batches),
3131 );
3132
3133 (part1, part2).prop_map(
3134 |(
3135 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3136 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3137 )| State {
3138 applier_version: semver::Version::new(1, 2, 3),
3139 shard_id,
3140 seqno,
3141 walltime_ms,
3142 hostname,
3143 collections: StateCollections {
3144 last_gc_req,
3145 rollups,
3146 active_rollup,
3147 active_gc,
3148 leased_readers,
3149 critical_readers,
3150 writers,
3151 schemas,
3152 trace,
3153 },
3154 },
3155 )
3156 }
3157
3158 pub(crate) fn hollow<T: Timestamp>(
3159 lower: T,
3160 upper: T,
3161 keys: &[&str],
3162 len: usize,
3163 ) -> HollowBatch<T> {
3164 HollowBatch::new_run(
3165 Description::new(
3166 Antichain::from_elem(lower),
3167 Antichain::from_elem(upper),
3168 Antichain::from_elem(T::minimum()),
3169 ),
3170 keys.iter()
3171 .map(|x| {
3172 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3173 key: PartialBatchKey((*x).to_owned()),
3174 encoded_size_bytes: 0,
3175 key_lower: vec![],
3176 structured_key_lower: None,
3177 stats: None,
3178 ts_rewrite: None,
3179 diffs_sum: None,
3180 format: None,
3181 schema_id: None,
3182 deprecated_schema_id: None,
3183 }))
3184 })
3185 .collect(),
3186 len,
3187 )
3188 }
3189
3190 #[mz_ore::test]
3191 fn downgrade_since() {
3192 let mut state = TypedState::<(), (), u64, i64>::new(
3193 DUMMY_BUILD_INFO.semver_version(),
3194 ShardId::new(),
3195 "".to_owned(),
3196 0,
3197 );
3198 let reader = LeasedReaderId::new();
3199 let seqno = SeqNo::minimum();
3200 let now = SYSTEM_TIME.clone();
3201 let _ = state.collections.register_leased_reader(
3202 "",
3203 &reader,
3204 "",
3205 seqno,
3206 Duration::from_secs(10),
3207 now(),
3208 false,
3209 );
3210
3211 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3213
3214 assert_eq!(
3216 state.collections.downgrade_since(
3217 &reader,
3218 seqno,
3219 None,
3220 &Antichain::from_elem(2),
3221 now()
3222 ),
3223 Continue(Since(Antichain::from_elem(2)))
3224 );
3225 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3226 assert_eq!(
3228 state.collections.downgrade_since(
3229 &reader,
3230 seqno,
3231 None,
3232 &Antichain::from_elem(2),
3233 now()
3234 ),
3235 Continue(Since(Antichain::from_elem(2)))
3236 );
3237 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3238 assert_eq!(
3240 state.collections.downgrade_since(
3241 &reader,
3242 seqno,
3243 None,
3244 &Antichain::from_elem(1),
3245 now()
3246 ),
3247 Continue(Since(Antichain::from_elem(2)))
3248 );
3249 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3250
3251 let reader2 = LeasedReaderId::new();
3253 let _ = state.collections.register_leased_reader(
3254 "",
3255 &reader2,
3256 "",
3257 seqno,
3258 Duration::from_secs(10),
3259 now(),
3260 false,
3261 );
3262
3263 assert_eq!(
3265 state.collections.downgrade_since(
3266 &reader2,
3267 seqno,
3268 None,
3269 &Antichain::from_elem(3),
3270 now()
3271 ),
3272 Continue(Since(Antichain::from_elem(3)))
3273 );
3274 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3275 assert_eq!(
3277 state.collections.downgrade_since(
3278 &reader,
3279 seqno,
3280 None,
3281 &Antichain::from_elem(5),
3282 now()
3283 ),
3284 Continue(Since(Antichain::from_elem(5)))
3285 );
3286 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3287
3288 assert_eq!(
3290 state.collections.expire_leased_reader(&reader),
3291 Continue(true)
3292 );
3293 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3294
3295 let reader3 = LeasedReaderId::new();
3297 let _ = state.collections.register_leased_reader(
3298 "",
3299 &reader3,
3300 "",
3301 seqno,
3302 Duration::from_secs(10),
3303 now(),
3304 false,
3305 );
3306
3307 assert_eq!(
3309 state.collections.downgrade_since(
3310 &reader3,
3311 seqno,
3312 None,
3313 &Antichain::from_elem(10),
3314 now()
3315 ),
3316 Continue(Since(Antichain::from_elem(10)))
3317 );
3318 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3319
3320 assert_eq!(
3322 state.collections.expire_leased_reader(&reader2),
3323 Continue(true)
3324 );
3325 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3330
3331 assert_eq!(
3333 state.collections.expire_leased_reader(&reader3),
3334 Continue(true)
3335 );
3336 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3341 }
3342
3343 #[mz_ore::test]
3344 fn compare_and_downgrade_since() {
3345 let mut state = TypedState::<(), (), u64, i64>::new(
3346 DUMMY_BUILD_INFO.semver_version(),
3347 ShardId::new(),
3348 "".to_owned(),
3349 0,
3350 );
3351 let reader = CriticalReaderId::new();
3352 let _ = state
3353 .collections
3354 .register_critical_reader::<u64>("", &reader, "");
3355
3356 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3358 assert_eq!(
3360 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3361 u64::initial()
3362 );
3363
3364 assert_eq!(
3366 state.collections.compare_and_downgrade_since::<u64>(
3367 &reader,
3368 &u64::initial(),
3369 (&1, &Antichain::from_elem(2)),
3370 ),
3371 Continue(Ok(Since(Antichain::from_elem(2))))
3372 );
3373 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3374 assert_eq!(
3375 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3376 1
3377 );
3378 assert_eq!(
3380 state.collections.compare_and_downgrade_since::<u64>(
3381 &reader,
3382 &1,
3383 (&2, &Antichain::from_elem(2)),
3384 ),
3385 Continue(Ok(Since(Antichain::from_elem(2))))
3386 );
3387 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3388 assert_eq!(
3389 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3390 2
3391 );
3392 assert_eq!(
3394 state.collections.compare_and_downgrade_since::<u64>(
3395 &reader,
3396 &2,
3397 (&3, &Antichain::from_elem(1)),
3398 ),
3399 Continue(Ok(Since(Antichain::from_elem(2))))
3400 );
3401 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3402 assert_eq!(
3403 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3404 3
3405 );
3406 }
3407
3408 #[mz_ore::test]
3409 fn compare_and_append() {
3410 let state = &mut TypedState::<String, String, u64, i64>::new(
3411 DUMMY_BUILD_INFO.semver_version(),
3412 ShardId::new(),
3413 "".to_owned(),
3414 0,
3415 )
3416 .collections;
3417
3418 let writer_id = WriterId::new();
3419 let now = SYSTEM_TIME.clone();
3420
3421 assert_eq!(state.trace.num_spine_batches(), 0);
3423 assert_eq!(state.trace.num_hollow_batches(), 0);
3424 assert_eq!(state.trace.num_updates(), 0);
3425
3426 assert_eq!(
3428 state.compare_and_append(
3429 &hollow(1, 2, &["key1"], 1),
3430 &writer_id,
3431 now(),
3432 LEASE_DURATION_MS,
3433 &IdempotencyToken::new(),
3434 &debug_state(),
3435 0,
3436 100,
3437 None
3438 ),
3439 Break(CompareAndAppendBreak::Upper {
3440 shard_upper: Antichain::from_elem(0),
3441 writer_upper: Antichain::from_elem(0)
3442 })
3443 );
3444
3445 assert!(
3447 state
3448 .compare_and_append(
3449 &hollow(0, 5, &[], 0),
3450 &writer_id,
3451 now(),
3452 LEASE_DURATION_MS,
3453 &IdempotencyToken::new(),
3454 &debug_state(),
3455 0,
3456 100,
3457 None
3458 )
3459 .is_continue()
3460 );
3461
3462 assert_eq!(
3464 state.compare_and_append(
3465 &hollow(5, 4, &["key1"], 1),
3466 &writer_id,
3467 now(),
3468 LEASE_DURATION_MS,
3469 &IdempotencyToken::new(),
3470 &debug_state(),
3471 0,
3472 100,
3473 None
3474 ),
3475 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3476 lower: Antichain::from_elem(5),
3477 upper: Antichain::from_elem(4)
3478 }))
3479 );
3480
3481 assert_eq!(
3483 state.compare_and_append(
3484 &hollow(5, 5, &["key1"], 1),
3485 &writer_id,
3486 now(),
3487 LEASE_DURATION_MS,
3488 &IdempotencyToken::new(),
3489 &debug_state(),
3490 0,
3491 100,
3492 None
3493 ),
3494 Break(CompareAndAppendBreak::InvalidUsage(
3495 InvalidEmptyTimeInterval {
3496 lower: Antichain::from_elem(5),
3497 upper: Antichain::from_elem(5),
3498 keys: vec!["key1".to_owned()],
3499 }
3500 ))
3501 );
3502
3503 assert!(
3505 state
3506 .compare_and_append(
3507 &hollow(5, 5, &[], 0),
3508 &writer_id,
3509 now(),
3510 LEASE_DURATION_MS,
3511 &IdempotencyToken::new(),
3512 &debug_state(),
3513 0,
3514 100,
3515 None
3516 )
3517 .is_continue()
3518 );
3519 }
3520
3521 #[mz_ore::test]
3522 fn snapshot() {
3523 let now = SYSTEM_TIME.clone();
3524
3525 let mut state = TypedState::<String, String, u64, i64>::new(
3526 DUMMY_BUILD_INFO.semver_version(),
3527 ShardId::new(),
3528 "".to_owned(),
3529 0,
3530 );
3531 assert_eq!(
3533 state.snapshot(&Antichain::from_elem(0)),
3534 Err(SnapshotErr::AsOfNotYetAvailable(
3535 SeqNo(0),
3536 Upper(Antichain::from_elem(0))
3537 ))
3538 );
3539
3540 assert_eq!(
3542 state.snapshot(&Antichain::from_elem(5)),
3543 Err(SnapshotErr::AsOfNotYetAvailable(
3544 SeqNo(0),
3545 Upper(Antichain::from_elem(0))
3546 ))
3547 );
3548
3549 let writer_id = WriterId::new();
3550
3551 assert!(
3553 state
3554 .collections
3555 .compare_and_append(
3556 &hollow(0, 5, &["key1"], 1),
3557 &writer_id,
3558 now(),
3559 LEASE_DURATION_MS,
3560 &IdempotencyToken::new(),
3561 &debug_state(),
3562 0,
3563 100,
3564 None
3565 )
3566 .is_continue()
3567 );
3568
3569 assert_eq!(
3571 state.snapshot(&Antichain::from_elem(0)),
3572 Ok(vec![hollow(0, 5, &["key1"], 1)])
3573 );
3574
3575 assert_eq!(
3577 state.snapshot(&Antichain::from_elem(4)),
3578 Ok(vec![hollow(0, 5, &["key1"], 1)])
3579 );
3580
3581 assert_eq!(
3583 state.snapshot(&Antichain::from_elem(5)),
3584 Err(SnapshotErr::AsOfNotYetAvailable(
3585 SeqNo(0),
3586 Upper(Antichain::from_elem(5))
3587 ))
3588 );
3589 assert_eq!(
3590 state.snapshot(&Antichain::from_elem(6)),
3591 Err(SnapshotErr::AsOfNotYetAvailable(
3592 SeqNo(0),
3593 Upper(Antichain::from_elem(5))
3594 ))
3595 );
3596
3597 let reader = LeasedReaderId::new();
3598 let _ = state.collections.register_leased_reader(
3600 "",
3601 &reader,
3602 "",
3603 SeqNo::minimum(),
3604 Duration::from_secs(10),
3605 now(),
3606 false,
3607 );
3608 assert_eq!(
3609 state.collections.downgrade_since(
3610 &reader,
3611 SeqNo::minimum(),
3612 None,
3613 &Antichain::from_elem(2),
3614 now()
3615 ),
3616 Continue(Since(Antichain::from_elem(2)))
3617 );
3618 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3619 assert_eq!(
3621 state.snapshot(&Antichain::from_elem(1)),
3622 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3623 Antichain::from_elem(2)
3624 )))
3625 );
3626
3627 assert!(
3629 state
3630 .collections
3631 .compare_and_append(
3632 &hollow(5, 10, &[], 0),
3633 &writer_id,
3634 now(),
3635 LEASE_DURATION_MS,
3636 &IdempotencyToken::new(),
3637 &debug_state(),
3638 0,
3639 100,
3640 None
3641 )
3642 .is_continue()
3643 );
3644
3645 assert_eq!(
3647 state.snapshot(&Antichain::from_elem(7)),
3648 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3649 );
3650
3651 assert_eq!(
3653 state.snapshot(&Antichain::from_elem(10)),
3654 Err(SnapshotErr::AsOfNotYetAvailable(
3655 SeqNo(0),
3656 Upper(Antichain::from_elem(10))
3657 ))
3658 );
3659
3660 assert!(
3662 state
3663 .collections
3664 .compare_and_append(
3665 &hollow(10, 15, &["key2"], 1),
3666 &writer_id,
3667 now(),
3668 LEASE_DURATION_MS,
3669 &IdempotencyToken::new(),
3670 &debug_state(),
3671 0,
3672 100,
3673 None
3674 )
3675 .is_continue()
3676 );
3677
3678 assert_eq!(
3681 state.snapshot(&Antichain::from_elem(9)),
3682 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3683 );
3684
3685 assert_eq!(
3687 state.snapshot(&Antichain::from_elem(10)),
3688 Ok(vec![
3689 hollow(0, 5, &["key1"], 1),
3690 hollow(5, 10, &[], 0),
3691 hollow(10, 15, &["key2"], 1)
3692 ])
3693 );
3694
3695 assert_eq!(
3696 state.snapshot(&Antichain::from_elem(11)),
3697 Ok(vec![
3698 hollow(0, 5, &["key1"], 1),
3699 hollow(5, 10, &[], 0),
3700 hollow(10, 15, &["key2"], 1)
3701 ])
3702 );
3703 }
3704
3705 #[mz_ore::test]
3706 fn next_listen_batch() {
3707 let mut state = TypedState::<String, String, u64, i64>::new(
3708 DUMMY_BUILD_INFO.semver_version(),
3709 ShardId::new(),
3710 "".to_owned(),
3711 0,
3712 );
3713
3714 assert_eq!(
3717 state.next_listen_batch(&Antichain::from_elem(0)),
3718 Err(SeqNo(0))
3719 );
3720 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3721
3722 let writer_id = WriterId::new();
3723 let now = SYSTEM_TIME.clone();
3724
3725 assert!(
3727 state
3728 .collections
3729 .compare_and_append(
3730 &hollow(0, 5, &["key1"], 1),
3731 &writer_id,
3732 now(),
3733 LEASE_DURATION_MS,
3734 &IdempotencyToken::new(),
3735 &debug_state(),
3736 0,
3737 100,
3738 None
3739 )
3740 .is_continue()
3741 );
3742 assert!(
3743 state
3744 .collections
3745 .compare_and_append(
3746 &hollow(5, 10, &["key2"], 1),
3747 &writer_id,
3748 now(),
3749 LEASE_DURATION_MS,
3750 &IdempotencyToken::new(),
3751 &debug_state(),
3752 0,
3753 100,
3754 None
3755 )
3756 .is_continue()
3757 );
3758
3759 for t in 0..=4 {
3761 assert_eq!(
3762 state.next_listen_batch(&Antichain::from_elem(t)),
3763 Ok(hollow(0, 5, &["key1"], 1))
3764 );
3765 }
3766
3767 for t in 5..=9 {
3769 assert_eq!(
3770 state.next_listen_batch(&Antichain::from_elem(t)),
3771 Ok(hollow(5, 10, &["key2"], 1))
3772 );
3773 }
3774
3775 assert_eq!(
3777 state.next_listen_batch(&Antichain::from_elem(10)),
3778 Err(SeqNo(0))
3779 );
3780
3781 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3784 }
3785
3786 #[mz_ore::test]
3787 fn expire_writer() {
3788 let mut state = TypedState::<String, String, u64, i64>::new(
3789 DUMMY_BUILD_INFO.semver_version(),
3790 ShardId::new(),
3791 "".to_owned(),
3792 0,
3793 );
3794 let now = SYSTEM_TIME.clone();
3795
3796 let writer_id_one = WriterId::new();
3797
3798 let writer_id_two = WriterId::new();
3799
3800 assert!(
3802 state
3803 .collections
3804 .compare_and_append(
3805 &hollow(0, 2, &["key1"], 1),
3806 &writer_id_one,
3807 now(),
3808 LEASE_DURATION_MS,
3809 &IdempotencyToken::new(),
3810 &debug_state(),
3811 0,
3812 100,
3813 None
3814 )
3815 .is_continue()
3816 );
3817
3818 assert!(
3819 state
3820 .collections
3821 .expire_writer(&writer_id_one)
3822 .is_continue()
3823 );
3824
3825 assert!(
3827 state
3828 .collections
3829 .compare_and_append(
3830 &hollow(2, 5, &["key2"], 1),
3831 &writer_id_two,
3832 now(),
3833 LEASE_DURATION_MS,
3834 &IdempotencyToken::new(),
3835 &debug_state(),
3836 0,
3837 100,
3838 None
3839 )
3840 .is_continue()
3841 );
3842 }
3843
3844 #[mz_ore::test]
3845 fn maybe_gc_active_gc() {
3846 const GC_CONFIG: GcConfig = GcConfig {
3847 use_active_gc: true,
3848 fallback_threshold_ms: 5000,
3849 min_versions: 99,
3850 max_versions: 500,
3851 };
3852 let now_fn = SYSTEM_TIME.clone();
3853
3854 let mut state = TypedState::<String, String, u64, i64>::new(
3855 DUMMY_BUILD_INFO.semver_version(),
3856 ShardId::new(),
3857 "".to_owned(),
3858 0,
3859 );
3860
3861 let now = now_fn();
3862 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3864 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3865
3866 state.seqno = SeqNo(100);
3869 assert_eq!(state.seqno_since(), SeqNo(100));
3870
3871 let writer_id = WriterId::new();
3873 let _ = state.collections.compare_and_append(
3874 &hollow(1, 2, &["key1"], 1),
3875 &writer_id,
3876 now,
3877 LEASE_DURATION_MS,
3878 &IdempotencyToken::new(),
3879 &debug_state(),
3880 0,
3881 100,
3882 None,
3883 );
3884 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3885
3886 assert_eq!(
3888 state.maybe_gc(true, now, GC_CONFIG),
3889 Some(GcReq {
3890 shard_id: state.shard_id,
3891 new_seqno_since: SeqNo(100)
3892 })
3893 );
3894
3895 state.collections.active_gc = Some(ActiveGc {
3897 seqno: state.seqno,
3898 start_ms: now,
3899 });
3900
3901 state.seqno = SeqNo(200);
3902 assert_eq!(state.seqno_since(), SeqNo(200));
3903
3904 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3905
3906 state.seqno = SeqNo(300);
3907 assert_eq!(state.seqno_since(), SeqNo(300));
3908 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3910 assert_eq!(
3911 state.maybe_gc(true, new_now, GC_CONFIG),
3912 Some(GcReq {
3913 shard_id: state.shard_id,
3914 new_seqno_since: SeqNo(300)
3915 })
3916 );
3917
3918 state.seqno = SeqNo(301);
3922 assert_eq!(state.seqno_since(), SeqNo(301));
3923 assert_eq!(
3924 state.maybe_gc(true, new_now, GC_CONFIG),
3925 Some(GcReq {
3926 shard_id: state.shard_id,
3927 new_seqno_since: SeqNo(301)
3928 })
3929 );
3930
3931 state.collections.active_gc = None;
3932
3933 state.seqno = SeqNo(400);
3936 assert_eq!(state.seqno_since(), SeqNo(400));
3937
3938 let now = now_fn();
3939
3940 let _ = state.collections.expire_writer(&writer_id);
3942 assert_eq!(
3943 state.maybe_gc(false, now, GC_CONFIG),
3944 Some(GcReq {
3945 shard_id: state.shard_id,
3946 new_seqno_since: SeqNo(400)
3947 })
3948 );
3949
3950 let previous_seqno = state.seqno;
3952 state.seqno = SeqNo(10_000);
3953 assert_eq!(state.seqno_since(), SeqNo(10_000));
3954
3955 let now = now_fn();
3956 assert_eq!(
3957 state.maybe_gc(true, now, GC_CONFIG),
3958 Some(GcReq {
3959 shard_id: state.shard_id,
3960 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3961 })
3962 );
3963 }
3964
3965 #[mz_ore::test]
3966 fn maybe_gc_classic() {
3967 const GC_CONFIG: GcConfig = GcConfig {
3968 use_active_gc: false,
3969 fallback_threshold_ms: 5000,
3970 min_versions: 16,
3971 max_versions: 128,
3972 };
3973 const NOW_MS: u64 = 0;
3974
3975 let mut state = TypedState::<String, String, u64, i64>::new(
3976 DUMMY_BUILD_INFO.semver_version(),
3977 ShardId::new(),
3978 "".to_owned(),
3979 0,
3980 );
3981
3982 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3984 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3985
3986 state.seqno = SeqNo(100);
3989 assert_eq!(state.seqno_since(), SeqNo(100));
3990
3991 let writer_id = WriterId::new();
3993 let now = SYSTEM_TIME.clone();
3994 let _ = state.collections.compare_and_append(
3995 &hollow(1, 2, &["key1"], 1),
3996 &writer_id,
3997 now(),
3998 LEASE_DURATION_MS,
3999 &IdempotencyToken::new(),
4000 &debug_state(),
4001 0,
4002 100,
4003 None,
4004 );
4005 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4006
4007 assert_eq!(
4009 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4010 Some(GcReq {
4011 shard_id: state.shard_id,
4012 new_seqno_since: SeqNo(100)
4013 })
4014 );
4015
4016 state.seqno = SeqNo(200);
4019 assert_eq!(state.seqno_since(), SeqNo(200));
4020
4021 let _ = state.collections.expire_writer(&writer_id);
4023 assert_eq!(
4024 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4025 Some(GcReq {
4026 shard_id: state.shard_id,
4027 new_seqno_since: SeqNo(200)
4028 })
4029 );
4030 }
4031
4032 #[mz_ore::test]
4033 fn need_rollup_active_rollup() {
4034 const ROLLUP_THRESHOLD: usize = 3;
4035 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4036 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4037 let now = SYSTEM_TIME.clone();
4038
4039 mz_ore::test::init_logging();
4040 let mut state = TypedState::<String, String, u64, i64>::new(
4041 DUMMY_BUILD_INFO.semver_version(),
4042 ShardId::new(),
4043 "".to_owned(),
4044 0,
4045 );
4046
4047 let rollup_seqno = SeqNo(5);
4048 let rollup = HollowRollup {
4049 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4050 encoded_size_bytes: None,
4051 };
4052
4053 assert!(
4054 state
4055 .collections
4056 .add_rollup((rollup_seqno, &rollup))
4057 .is_continue()
4058 );
4059
4060 state.seqno = SeqNo(5);
4062 assert_none!(state.need_rollup(
4063 ROLLUP_THRESHOLD,
4064 ROLLUP_USE_ACTIVE_ROLLUP,
4065 ROLLUP_FALLBACK_THRESHOLD_MS,
4066 now()
4067 ));
4068
4069 state.seqno = SeqNo(6);
4071 assert_none!(state.need_rollup(
4072 ROLLUP_THRESHOLD,
4073 ROLLUP_USE_ACTIVE_ROLLUP,
4074 ROLLUP_FALLBACK_THRESHOLD_MS,
4075 now()
4076 ));
4077 state.seqno = SeqNo(7);
4078 assert_none!(state.need_rollup(
4079 ROLLUP_THRESHOLD,
4080 ROLLUP_USE_ACTIVE_ROLLUP,
4081 ROLLUP_FALLBACK_THRESHOLD_MS,
4082 now()
4083 ));
4084 state.seqno = SeqNo(8);
4085 assert_none!(state.need_rollup(
4086 ROLLUP_THRESHOLD,
4087 ROLLUP_USE_ACTIVE_ROLLUP,
4088 ROLLUP_FALLBACK_THRESHOLD_MS,
4089 now()
4090 ));
4091
4092 let mut current_time = now();
4093 state.seqno = SeqNo(9);
4095 assert_eq!(
4096 state
4097 .need_rollup(
4098 ROLLUP_THRESHOLD,
4099 ROLLUP_USE_ACTIVE_ROLLUP,
4100 ROLLUP_FALLBACK_THRESHOLD_MS,
4101 current_time
4102 )
4103 .expect("rollup"),
4104 SeqNo(9)
4105 );
4106
4107 state.collections.active_rollup = Some(ActiveRollup {
4108 seqno: SeqNo(9),
4109 start_ms: current_time,
4110 });
4111
4112 assert_none!(state.need_rollup(
4114 ROLLUP_THRESHOLD,
4115 ROLLUP_USE_ACTIVE_ROLLUP,
4116 ROLLUP_FALLBACK_THRESHOLD_MS,
4117 current_time
4118 ));
4119
4120 state.seqno = SeqNo(10);
4121 assert_none!(state.need_rollup(
4124 ROLLUP_THRESHOLD,
4125 ROLLUP_USE_ACTIVE_ROLLUP,
4126 ROLLUP_FALLBACK_THRESHOLD_MS,
4127 current_time
4128 ));
4129
4130 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4132 assert_eq!(
4133 state
4134 .need_rollup(
4135 ROLLUP_THRESHOLD,
4136 ROLLUP_USE_ACTIVE_ROLLUP,
4137 ROLLUP_FALLBACK_THRESHOLD_MS,
4138 current_time
4139 )
4140 .expect("rollup"),
4141 SeqNo(10)
4142 );
4143
4144 state.seqno = SeqNo(9);
4145 state.collections.active_rollup = None;
4147 let rollup_seqno = SeqNo(9);
4148 let rollup = HollowRollup {
4149 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4150 encoded_size_bytes: None,
4151 };
4152 assert!(
4153 state
4154 .collections
4155 .add_rollup((rollup_seqno, &rollup))
4156 .is_continue()
4157 );
4158
4159 state.seqno = SeqNo(11);
4160 assert_none!(state.need_rollup(
4162 ROLLUP_THRESHOLD,
4163 ROLLUP_USE_ACTIVE_ROLLUP,
4164 ROLLUP_FALLBACK_THRESHOLD_MS,
4165 current_time
4166 ));
4167 state.seqno = SeqNo(13);
4169 assert_eq!(
4170 state
4171 .need_rollup(
4172 ROLLUP_THRESHOLD,
4173 ROLLUP_USE_ACTIVE_ROLLUP,
4174 ROLLUP_FALLBACK_THRESHOLD_MS,
4175 current_time
4176 )
4177 .expect("rollup"),
4178 SeqNo(13)
4179 );
4180 }
4181
4182 #[mz_ore::test]
4183 fn need_rollup_classic() {
4184 const ROLLUP_THRESHOLD: usize = 3;
4185 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4186 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4187 const NOW: u64 = 0;
4188
4189 mz_ore::test::init_logging();
4190 let mut state = TypedState::<String, String, u64, i64>::new(
4191 DUMMY_BUILD_INFO.semver_version(),
4192 ShardId::new(),
4193 "".to_owned(),
4194 0,
4195 );
4196
4197 let rollup_seqno = SeqNo(5);
4198 let rollup = HollowRollup {
4199 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4200 encoded_size_bytes: None,
4201 };
4202
4203 assert!(
4204 state
4205 .collections
4206 .add_rollup((rollup_seqno, &rollup))
4207 .is_continue()
4208 );
4209
4210 state.seqno = SeqNo(5);
4212 assert_none!(state.need_rollup(
4213 ROLLUP_THRESHOLD,
4214 ROLLUP_USE_ACTIVE_ROLLUP,
4215 ROLLUP_FALLBACK_THRESHOLD_MS,
4216 NOW
4217 ));
4218
4219 state.seqno = SeqNo(6);
4221 assert_none!(state.need_rollup(
4222 ROLLUP_THRESHOLD,
4223 ROLLUP_USE_ACTIVE_ROLLUP,
4224 ROLLUP_FALLBACK_THRESHOLD_MS,
4225 NOW
4226 ));
4227 state.seqno = SeqNo(7);
4228 assert_none!(state.need_rollup(
4229 ROLLUP_THRESHOLD,
4230 ROLLUP_USE_ACTIVE_ROLLUP,
4231 ROLLUP_FALLBACK_THRESHOLD_MS,
4232 NOW
4233 ));
4234
4235 state.seqno = SeqNo(8);
4237 assert_eq!(
4238 state
4239 .need_rollup(
4240 ROLLUP_THRESHOLD,
4241 ROLLUP_USE_ACTIVE_ROLLUP,
4242 ROLLUP_FALLBACK_THRESHOLD_MS,
4243 NOW
4244 )
4245 .expect("rollup"),
4246 SeqNo(8)
4247 );
4248
4249 state.seqno = SeqNo(9);
4251 assert_none!(state.need_rollup(
4252 ROLLUP_THRESHOLD,
4253 ROLLUP_USE_ACTIVE_ROLLUP,
4254 ROLLUP_FALLBACK_THRESHOLD_MS,
4255 NOW
4256 ));
4257
4258 state.seqno = SeqNo(11);
4260 assert_eq!(
4261 state
4262 .need_rollup(
4263 ROLLUP_THRESHOLD,
4264 ROLLUP_USE_ACTIVE_ROLLUP,
4265 ROLLUP_FALLBACK_THRESHOLD_MS,
4266 NOW
4267 )
4268 .expect("rollup"),
4269 SeqNo(11)
4270 );
4271
4272 let rollup_seqno = SeqNo(6);
4274 let rollup = HollowRollup {
4275 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4276 encoded_size_bytes: None,
4277 };
4278 assert!(
4279 state
4280 .collections
4281 .add_rollup((rollup_seqno, &rollup))
4282 .is_continue()
4283 );
4284
4285 state.seqno = SeqNo(8);
4286 assert_none!(state.need_rollup(
4287 ROLLUP_THRESHOLD,
4288 ROLLUP_USE_ACTIVE_ROLLUP,
4289 ROLLUP_FALLBACK_THRESHOLD_MS,
4290 NOW
4291 ));
4292 state.seqno = SeqNo(9);
4293 assert_eq!(
4294 state
4295 .need_rollup(
4296 ROLLUP_THRESHOLD,
4297 ROLLUP_USE_ACTIVE_ROLLUP,
4298 ROLLUP_FALLBACK_THRESHOLD_MS,
4299 NOW
4300 )
4301 .expect("rollup"),
4302 SeqNo(9)
4303 );
4304
4305 let fallback_seqno = SeqNo(
4307 rollup_seqno.0
4308 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4309 );
4310 state.seqno = fallback_seqno;
4311 assert_eq!(
4312 state
4313 .need_rollup(
4314 ROLLUP_THRESHOLD,
4315 ROLLUP_USE_ACTIVE_ROLLUP,
4316 ROLLUP_FALLBACK_THRESHOLD_MS,
4317 NOW
4318 )
4319 .expect("rollup"),
4320 fallback_seqno
4321 );
4322 state.seqno = fallback_seqno.next();
4323 assert_eq!(
4324 state
4325 .need_rollup(
4326 ROLLUP_THRESHOLD,
4327 ROLLUP_USE_ACTIVE_ROLLUP,
4328 ROLLUP_FALLBACK_THRESHOLD_MS,
4329 NOW
4330 )
4331 .expect("rollup"),
4332 fallback_seqno.next()
4333 );
4334 }
4335
4336 #[mz_ore::test]
4337 fn idempotency_token_sentinel() {
4338 assert_eq!(
4339 IdempotencyToken::SENTINEL.to_string(),
4340 "i11111111-1111-1111-1111-111111111111"
4341 );
4342 }
4343
4344 #[mz_ore::test]
4353 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4355 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4356 let mut runner = proptest::test_runner::TestRunner::deterministic();
4357 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4358 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4359 assert_eq!(
4360 json.trim(),
4361 STATE_SERDE_JSON.trim(),
4362 "\n\nNEW GOLDEN\n{}\n",
4363 json
4364 );
4365 }
4366
4367 #[mz_persist_proc::test(tokio::test)]
4368 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4370 let mut clients = new_test_client_cache(&dyncfgs);
4371 let shard_id = ShardId::new();
4372
4373 async fn open_and_write(
4374 clients: &mut PersistClientCache,
4375 version: semver::Version,
4376 shard_id: ShardId,
4377 ) -> Result<(), tokio::task::JoinError> {
4378 clients.cfg.build_version = version.clone();
4379 clients.clear_state_cache();
4380 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4381 mz_ore::task::spawn(|| version.to_string(), async move {
4383 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4384 let current = *write.upper().as_option().unwrap();
4385 write
4387 .expect_compare_and_append_batch(&mut [], current, current + 1)
4388 .await;
4389 })
4390 .await
4391 }
4392
4393 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4395 assert_ok!(res);
4396
4397 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4399 assert_ok!(res);
4400
4401 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4403 assert_ok!(res);
4404
4405 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4407 assert!(res.unwrap_err().is_panic());
4408 }
4409
4410 #[mz_ore::test]
4411 fn runid_roundtrip() {
4412 proptest!(|(runid: RunId)| {
4413 let runid_str = runid.to_string();
4414 let parsed = RunId::from_str(&runid_str);
4415 prop_assert_eq!(parsed, Ok(runid));
4416 });
4417 }
4418}