1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75 env!("OUT_DIR"),
76 "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80 env!("OUT_DIR"),
81 "/mz_persist_client.internal.diff.rs"
82));
83
84pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92 "persist_rollup_threshold",
93 128,
94 "The number of seqnos between rollups.",
95);
96
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100 "persist_rollup_fallback_threshold_ms",
101 5000,
102 "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108 "persist_rollup_use_active_rollup",
109 true,
110 "Whether to use the new active rollup tracking mechanism.",
111);
112
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116 "persist_gc_fallback_threshold_ms",
117 900000,
118 "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123 "persist_gc_min_versions",
124 32,
125 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130 "persist_gc_max_versions",
131 128_000,
132 "The maximum number of versions to GC in a single GC run.",
133);
134
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138 "persist_gc_use_active_gc",
139 false,
140 "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144 "persist_enable_incremental_compaction",
145 false,
146 "Whether to enable incremental compaction.",
147);
148
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 write!(f, "i{}", Uuid::from_bytes(self.0))
158 }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164 }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168 type Err = String;
169
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172 }
173}
174
175impl From<IdempotencyToken> for String {
176 fn from(x: IdempotencyToken) -> Self {
177 x.to_string()
178 }
179}
180
181impl IdempotencyToken {
182 pub(crate) fn new() -> Self {
183 IdempotencyToken(*Uuid::new_v4().as_bytes())
184 }
185 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190 pub seqno: SeqNo,
192 pub since: Antichain<T>,
194 pub last_heartbeat_timestamp_ms: u64,
196 pub lease_duration_ms: u64,
199 pub debug: HandleDebugState,
201}
202
203#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
204#[serde(into = "u64")]
205pub struct OpaqueState(pub [u8; 8]);
206
207impl From<OpaqueState> for u64 {
208 fn from(value: OpaqueState) -> Self {
209 u64::from_le_bytes(value.0)
210 }
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct CriticalReaderState<T> {
215 pub since: Antichain<T>,
217 pub opaque: OpaqueState,
219 pub opaque_codec: String,
221 pub debug: HandleDebugState,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize)]
226pub struct WriterState<T> {
227 pub last_heartbeat_timestamp_ms: u64,
229 pub lease_duration_ms: u64,
232 pub most_recent_write_token: IdempotencyToken,
235 pub most_recent_write_upper: Antichain<T>,
238 pub debug: HandleDebugState,
240}
241
242#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
244pub struct HandleDebugState {
245 pub hostname: String,
248 pub purpose: String,
250}
251
252#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
256#[serde(tag = "type")]
257pub enum BatchPart<T> {
258 Hollow(HollowBatchPart<T>),
259 Inline {
260 updates: LazyInlineBatchPart,
261 ts_rewrite: Option<Antichain<T>>,
262 schema_id: Option<SchemaId>,
263
264 deprecated_schema_id: Option<SchemaId>,
266 },
267}
268
269fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
270 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
271 let proto = lower.decode()?;
272 let data = ArrayData::from_proto(proto)?;
273 ensure!(data.len() == 1);
274 Ok(ArrayBound::new(make_array(data), 0))
275 };
276
277 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
278
279 match decoded {
280 Ok(bound) => Some(bound),
281 Err(e) => {
282 soft_panic_or_log!("failed to decode bound: {e:#?}");
283 None
284 }
285 }
286}
287
288impl<T> BatchPart<T> {
289 pub fn hollow_bytes(&self) -> usize {
290 match self {
291 BatchPart::Hollow(x) => x.encoded_size_bytes,
292 BatchPart::Inline { .. } => 0,
293 }
294 }
295
296 pub fn is_inline(&self) -> bool {
297 matches!(self, BatchPart::Inline { .. })
298 }
299
300 pub fn inline_bytes(&self) -> usize {
301 match self {
302 BatchPart::Hollow(_) => 0,
303 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
304 }
305 }
306
307 pub fn writer_key(&self) -> Option<WriterKey> {
308 match self {
309 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
310 BatchPart::Inline { .. } => None,
311 }
312 }
313
314 pub fn encoded_size_bytes(&self) -> usize {
315 match self {
316 BatchPart::Hollow(x) => x.encoded_size_bytes,
317 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
318 }
319 }
320
321 pub fn printable_name(&self) -> &str {
324 match self {
325 BatchPart::Hollow(x) => x.key.0.as_str(),
326 BatchPart::Inline { .. } => "<inline>",
327 }
328 }
329
330 pub fn stats(&self) -> Option<&LazyPartStats> {
331 match self {
332 BatchPart::Hollow(x) => x.stats.as_ref(),
333 BatchPart::Inline { .. } => None,
334 }
335 }
336
337 pub fn key_lower(&self) -> &[u8] {
338 match self {
339 BatchPart::Hollow(x) => x.key_lower.as_slice(),
340 BatchPart::Inline { .. } => &[],
347 }
348 }
349
350 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
351 let part = match self {
352 BatchPart::Hollow(part) => part,
353 BatchPart::Inline { .. } => return None,
354 };
355
356 decode_structured_lower(part.structured_key_lower.as_ref()?)
357 }
358
359 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
360 match self {
361 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
362 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
363 }
364 }
365
366 pub fn schema_id(&self) -> Option<SchemaId> {
367 match self {
368 BatchPart::Hollow(x) => x.schema_id,
369 BatchPart::Inline { schema_id, .. } => *schema_id,
370 }
371 }
372
373 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
374 match self {
375 BatchPart::Hollow(x) => x.deprecated_schema_id,
376 BatchPart::Inline {
377 deprecated_schema_id,
378 ..
379 } => *deprecated_schema_id,
380 }
381 }
382}
383
384impl<T: Timestamp + Codec64> BatchPart<T> {
385 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
386 match self {
387 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
388 BatchPart::Inline { updates, .. } => {
389 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
390 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
391 }
392 }
393 }
394
395 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
396 match self {
397 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
398 BatchPart::Inline { updates, .. } => Some(
399 updates
400 .decode::<T>(metrics)
401 .expect("valid inline part")
402 .updates
403 .diffs_sum(),
404 ),
405 }
406 }
407}
408
409#[derive(Debug, Clone)]
411pub struct HollowRun<T> {
412 pub(crate) parts: Vec<RunPart<T>>,
414}
415
416#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
419pub struct HollowRunRef<T> {
420 pub key: PartialBatchKey,
421
422 pub hollow_bytes: usize,
424
425 pub max_part_bytes: usize,
427
428 pub key_lower: Vec<u8>,
430
431 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
433
434 pub diffs_sum: Option<[u8; 8]>,
435
436 pub(crate) _phantom_data: PhantomData<T>,
437}
438impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
439 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
440 Some(self.cmp(other))
441 }
442}
443
444impl<T: Eq> Ord for HollowRunRef<T> {
445 fn cmp(&self, other: &Self) -> Ordering {
446 self.key.cmp(&other.key)
447 }
448}
449
450impl<T> HollowRunRef<T> {
451 pub fn writer_key(&self) -> Option<WriterKey> {
452 Some(self.key.split()?.0)
453 }
454}
455
456impl<T: Timestamp + Codec64> HollowRunRef<T> {
457 pub async fn set<D: Codec64 + Monoid>(
459 shard_id: ShardId,
460 blob: &dyn Blob,
461 writer: &WriterKey,
462 data: HollowRun<T>,
463 metrics: &Metrics,
464 ) -> Self {
465 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
466 let max_part_bytes = data
467 .parts
468 .iter()
469 .map(|p| p.max_part_bytes())
470 .max()
471 .unwrap_or(0);
472 let key_lower = data
473 .parts
474 .first()
475 .map_or(vec![], |p| p.key_lower().to_vec());
476 let structured_key_lower = match data.parts.first() {
477 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
478 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
479 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
480 };
481 let diffs_sum = data
482 .parts
483 .iter()
484 .map(|p| {
485 p.diffs_sum::<D>(&metrics.columnar)
486 .expect("valid diffs sum")
487 })
488 .reduce(|mut a, b| {
489 a.plus_equals(&b);
490 a
491 })
492 .expect("valid diffs sum")
493 .encode();
494
495 let key = PartialBatchKey::new(writer, &PartId::new());
496 let blob_key = key.complete(&shard_id);
497 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
498 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
499 blob.set(&blob_key, bytes.clone())
500 })
501 .await;
502 Self {
503 key,
504 hollow_bytes,
505 max_part_bytes,
506 key_lower,
507 structured_key_lower,
508 diffs_sum: Some(diffs_sum),
509 _phantom_data: Default::default(),
510 }
511 }
512
513 pub async fn get(
517 &self,
518 shard_id: ShardId,
519 blob: &dyn Blob,
520 metrics: &Metrics,
521 ) -> Option<HollowRun<T>> {
522 let blob_key = self.key.complete(&shard_id);
523 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
524 blob.get(&blob_key)
525 })
526 .await?;
527 let proto_runs: ProtoHollowRun =
528 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
529 let runs = proto_runs
530 .into_rust()
531 .expect("illegal state: invalid encoded runs proto");
532 Some(runs)
533 }
534}
535
536#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
540#[serde(untagged)]
541pub enum RunPart<T> {
542 Single(BatchPart<T>),
543 Many(HollowRunRef<T>),
544}
545
546impl<T: Ord> PartialOrd<Self> for RunPart<T> {
547 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
548 Some(self.cmp(other))
549 }
550}
551
552impl<T: Ord> Ord for RunPart<T> {
553 fn cmp(&self, other: &Self) -> Ordering {
554 match (self, other) {
555 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
556 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
557 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
558 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
559 }
560 }
561}
562
563impl<T> RunPart<T> {
564 #[cfg(test)]
565 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
566 match self {
567 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
568 _ => panic!("expected hollow part!"),
569 }
570 }
571
572 pub fn hollow_bytes(&self) -> usize {
573 match self {
574 Self::Single(p) => p.hollow_bytes(),
575 Self::Many(r) => r.hollow_bytes,
576 }
577 }
578
579 pub fn is_inline(&self) -> bool {
580 match self {
581 Self::Single(p) => p.is_inline(),
582 Self::Many(_) => false,
583 }
584 }
585
586 pub fn inline_bytes(&self) -> usize {
587 match self {
588 Self::Single(p) => p.inline_bytes(),
589 Self::Many(_) => 0,
590 }
591 }
592
593 pub fn max_part_bytes(&self) -> usize {
594 match self {
595 Self::Single(p) => p.encoded_size_bytes(),
596 Self::Many(r) => r.max_part_bytes,
597 }
598 }
599
600 pub fn writer_key(&self) -> Option<WriterKey> {
601 match self {
602 Self::Single(p) => p.writer_key(),
603 Self::Many(r) => r.writer_key(),
604 }
605 }
606
607 pub fn encoded_size_bytes(&self) -> usize {
608 match self {
609 Self::Single(p) => p.encoded_size_bytes(),
610 Self::Many(r) => r.hollow_bytes,
611 }
612 }
613
614 pub fn schema_id(&self) -> Option<SchemaId> {
615 match self {
616 Self::Single(p) => p.schema_id(),
617 Self::Many(_) => None,
618 }
619 }
620
621 pub fn printable_name(&self) -> &str {
624 match self {
625 Self::Single(p) => p.printable_name(),
626 Self::Many(r) => r.key.0.as_str(),
627 }
628 }
629
630 pub fn stats(&self) -> Option<&LazyPartStats> {
631 match self {
632 Self::Single(p) => p.stats(),
633 Self::Many(_) => None,
635 }
636 }
637
638 pub fn key_lower(&self) -> &[u8] {
639 match self {
640 Self::Single(p) => p.key_lower(),
641 Self::Many(r) => r.key_lower.as_slice(),
642 }
643 }
644
645 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
646 match self {
647 Self::Single(p) => p.structured_key_lower(),
648 Self::Many(_) => None,
649 }
650 }
651
652 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
653 match self {
654 Self::Single(p) => p.ts_rewrite(),
655 Self::Many(_) => None,
656 }
657 }
658}
659
660impl<T> RunPart<T>
661where
662 T: Timestamp + Codec64,
663{
664 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
665 match self {
666 Self::Single(p) => p.diffs_sum(metrics),
667 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
668 }
669 }
670}
671
672#[derive(Clone, Debug)]
674pub struct MissingBlob(BlobKey);
675
676impl std::fmt::Display for MissingBlob {
677 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
678 write!(f, "unexpectedly missing key: {}", self.0)
679 }
680}
681
682impl std::error::Error for MissingBlob {}
683
684impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
685 pub fn part_stream<'a>(
686 &'a self,
687 shard_id: ShardId,
688 blob: &'a dyn Blob,
689 metrics: &'a Metrics,
690 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
691 try_stream! {
692 match self {
693 RunPart::Single(p) => {
694 yield Cow::Borrowed(p);
695 }
696 RunPart::Many(r) => {
697 let fetched = r.get(shard_id, blob, metrics).await
698 .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
699 for run_part in fetched.parts {
700 for await batch_part in
701 run_part.part_stream(shard_id, blob, metrics).boxed()
702 {
703 yield Cow::Owned(batch_part?.into_owned());
704 }
705 }
706 }
707 }
708 }
709 }
710}
711
712impl<T: Ord> PartialOrd for BatchPart<T> {
713 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
714 Some(self.cmp(other))
715 }
716}
717
718impl<T: Ord> Ord for BatchPart<T> {
719 fn cmp(&self, other: &Self) -> Ordering {
720 match (self, other) {
721 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
722 (
723 BatchPart::Inline {
724 updates: s_updates,
725 ts_rewrite: s_ts_rewrite,
726 schema_id: s_schema_id,
727 deprecated_schema_id: s_deprecated_schema_id,
728 },
729 BatchPart::Inline {
730 updates: o_updates,
731 ts_rewrite: o_ts_rewrite,
732 schema_id: o_schema_id,
733 deprecated_schema_id: o_deprecated_schema_id,
734 },
735 ) => (
736 s_updates,
737 s_ts_rewrite.as_ref().map(|x| x.elements()),
738 s_schema_id,
739 s_deprecated_schema_id,
740 )
741 .cmp(&(
742 o_updates,
743 o_ts_rewrite.as_ref().map(|x| x.elements()),
744 o_schema_id,
745 o_deprecated_schema_id,
746 )),
747 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
748 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
749 }
750 }
751}
752
753#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
755pub(crate) enum RunOrder {
756 Unordered,
758 Codec,
760 Structured,
762}
763
764#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
765pub struct RunId(pub(crate) [u8; 16]);
766
767impl std::fmt::Display for RunId {
768 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
769 write!(f, "ri{}", Uuid::from_bytes(self.0))
770 }
771}
772
773impl std::fmt::Debug for RunId {
774 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
775 write!(f, "RunId({})", Uuid::from_bytes(self.0))
776 }
777}
778
779impl std::str::FromStr for RunId {
780 type Err = String;
781
782 fn from_str(s: &str) -> Result<Self, Self::Err> {
783 parse_id("ri", "RunId", s).map(RunId)
784 }
785}
786
787impl From<RunId> for String {
788 fn from(x: RunId) -> Self {
789 x.to_string()
790 }
791}
792
793impl RunId {
794 pub(crate) fn new() -> Self {
795 RunId(*Uuid::new_v4().as_bytes())
796 }
797}
798
799impl Arbitrary for RunId {
800 type Parameters = ();
801 type Strategy = proptest::strategy::BoxedStrategy<Self>;
802 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
803 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
804 RunId(*Uuid::from_u128(n).as_bytes())
805 })
806 .boxed()
807 }
808}
809
810#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
812pub struct RunMeta {
813 pub(crate) order: Option<RunOrder>,
815 pub(crate) schema: Option<SchemaId>,
817
818 pub(crate) deprecated_schema: Option<SchemaId>,
820
821 pub(crate) id: Option<RunId>,
823
824 pub(crate) len: Option<usize>,
826
827 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
829 pub(crate) meta: MetadataMap,
830}
831
832#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
834pub struct HollowBatchPart<T> {
835 pub key: PartialBatchKey,
837 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
839 pub meta: MetadataMap,
840 pub encoded_size_bytes: usize,
842 #[serde(serialize_with = "serialize_part_bytes")]
845 pub key_lower: Vec<u8>,
846 #[serde(serialize_with = "serialize_lazy_proto")]
848 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
849 #[serde(serialize_with = "serialize_part_stats")]
851 pub stats: Option<LazyPartStats>,
852 pub ts_rewrite: Option<Antichain<T>>,
860 #[serde(serialize_with = "serialize_diffs_sum")]
868 pub diffs_sum: Option<[u8; 8]>,
869 pub format: Option<BatchColumnarFormat>,
874 pub schema_id: Option<SchemaId>,
879
880 pub deprecated_schema_id: Option<SchemaId>,
882}
883
884#[derive(Clone, PartialEq, Eq)]
888pub struct HollowBatch<T> {
889 pub desc: Description<T>,
891 pub len: usize,
893 pub(crate) parts: Vec<RunPart<T>>,
895 pub(crate) run_splits: Vec<usize>,
903 pub(crate) run_meta: Vec<RunMeta>,
906}
907
908impl<T: Debug> Debug for HollowBatch<T> {
909 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
910 let HollowBatch {
911 desc,
912 parts,
913 len,
914 run_splits: runs,
915 run_meta,
916 } = self;
917 f.debug_struct("HollowBatch")
918 .field(
919 "desc",
920 &(
921 desc.lower().elements(),
922 desc.upper().elements(),
923 desc.since().elements(),
924 ),
925 )
926 .field("parts", &parts)
927 .field("len", &len)
928 .field("runs", &runs)
929 .field("run_meta", &run_meta)
930 .finish()
931 }
932}
933
934impl<T: Serialize> serde::Serialize for HollowBatch<T> {
935 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
936 let HollowBatch {
937 desc,
938 len,
939 parts: _,
941 run_splits: _,
942 run_meta: _,
943 } = self;
944 let mut s = s.serialize_struct("HollowBatch", 5)?;
945 let () = s.serialize_field("lower", &desc.lower().elements())?;
946 let () = s.serialize_field("upper", &desc.upper().elements())?;
947 let () = s.serialize_field("since", &desc.since().elements())?;
948 let () = s.serialize_field("len", len)?;
949 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
950 s.end()
951 }
952}
953
954impl<T: Ord> PartialOrd for HollowBatch<T> {
955 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
956 Some(self.cmp(other))
957 }
958}
959
960impl<T: Ord> Ord for HollowBatch<T> {
961 fn cmp(&self, other: &Self) -> Ordering {
962 let HollowBatch {
965 desc: self_desc,
966 parts: self_parts,
967 len: self_len,
968 run_splits: self_runs,
969 run_meta: self_run_meta,
970 } = self;
971 let HollowBatch {
972 desc: other_desc,
973 parts: other_parts,
974 len: other_len,
975 run_splits: other_runs,
976 run_meta: other_run_meta,
977 } = other;
978 (
979 self_desc.lower().elements(),
980 self_desc.upper().elements(),
981 self_desc.since().elements(),
982 self_parts,
983 self_len,
984 self_runs,
985 self_run_meta,
986 )
987 .cmp(&(
988 other_desc.lower().elements(),
989 other_desc.upper().elements(),
990 other_desc.since().elements(),
991 other_parts,
992 other_len,
993 other_runs,
994 other_run_meta,
995 ))
996 }
997}
998
999impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
1000 pub(crate) fn part_stream<'a>(
1001 &'a self,
1002 shard_id: ShardId,
1003 blob: &'a dyn Blob,
1004 metrics: &'a Metrics,
1005 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
1006 stream! {
1007 for part in &self.parts {
1008 for await part in part.part_stream(shard_id, blob, metrics) {
1009 yield part;
1010 }
1011 }
1012 }
1013 }
1014}
1015impl<T> HollowBatch<T> {
1016 pub(crate) fn new(
1023 desc: Description<T>,
1024 parts: Vec<RunPart<T>>,
1025 len: usize,
1026 run_meta: Vec<RunMeta>,
1027 run_splits: Vec<usize>,
1028 ) -> Self {
1029 debug_assert!(
1030 run_splits.is_strictly_sorted(),
1031 "run indices should be strictly increasing"
1032 );
1033 debug_assert!(
1034 run_splits.first().map_or(true, |i| *i > 0),
1035 "run indices should be positive"
1036 );
1037 debug_assert!(
1038 run_splits.last().map_or(true, |i| *i < parts.len()),
1039 "run indices should be valid indices into parts"
1040 );
1041 debug_assert!(
1042 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1043 "all metadata should correspond to a run"
1044 );
1045
1046 Self {
1047 desc,
1048 len,
1049 parts,
1050 run_splits,
1051 run_meta,
1052 }
1053 }
1054
1055 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1057 let run_meta = if parts.is_empty() {
1058 vec![]
1059 } else {
1060 vec![RunMeta::default()]
1061 };
1062 Self {
1063 desc,
1064 len,
1065 parts,
1066 run_splits: vec![],
1067 run_meta,
1068 }
1069 }
1070
1071 #[cfg(test)]
1072 pub(crate) fn new_run_for_test(
1073 desc: Description<T>,
1074 parts: Vec<RunPart<T>>,
1075 len: usize,
1076 run_id: RunId,
1077 ) -> Self {
1078 let run_meta = if parts.is_empty() {
1079 vec![]
1080 } else {
1081 let mut meta = RunMeta::default();
1082 meta.id = Some(run_id);
1083 vec![meta]
1084 };
1085 Self {
1086 desc,
1087 len,
1088 parts,
1089 run_splits: vec![],
1090 run_meta,
1091 }
1092 }
1093
1094 pub(crate) fn empty(desc: Description<T>) -> Self {
1096 Self {
1097 desc,
1098 len: 0,
1099 parts: vec![],
1100 run_splits: vec![],
1101 run_meta: vec![],
1102 }
1103 }
1104
1105 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1106 let run_ends = self
1107 .run_splits
1108 .iter()
1109 .copied()
1110 .chain(std::iter::once(self.parts.len()));
1111 let run_metas = self.run_meta.iter();
1112 let run_parts = run_ends
1113 .scan(0, |start, end| {
1114 let range = *start..end;
1115 *start = end;
1116 Some(range)
1117 })
1118 .filter(|range| !range.is_empty())
1119 .map(|range| &self.parts[range]);
1120 run_metas.zip_eq(run_parts)
1121 }
1122
1123 pub(crate) fn inline_bytes(&self) -> usize {
1124 self.parts.iter().map(|x| x.inline_bytes()).sum()
1125 }
1126
1127 pub(crate) fn is_empty(&self) -> bool {
1128 self.parts.is_empty()
1129 }
1130
1131 pub(crate) fn part_count(&self) -> usize {
1132 self.parts.len()
1133 }
1134
1135 pub fn encoded_size_bytes(&self) -> usize {
1137 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1138 }
1139}
1140
1141impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1143 pub(crate) fn rewrite_ts(
1144 &mut self,
1145 frontier: &Antichain<T>,
1146 new_upper: Antichain<T>,
1147 ) -> Result<(), String> {
1148 if !PartialOrder::less_than(frontier, &new_upper) {
1149 return Err(format!(
1150 "rewrite frontier {:?} !< rewrite upper {:?}",
1151 frontier.elements(),
1152 new_upper.elements(),
1153 ));
1154 }
1155 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1156 return Err(format!(
1157 "rewrite upper {:?} < batch upper {:?}",
1158 new_upper.elements(),
1159 self.desc.upper().elements(),
1160 ));
1161 }
1162
1163 if PartialOrder::less_than(frontier, self.desc.lower()) {
1166 return Err(format!(
1167 "rewrite frontier {:?} < batch lower {:?}",
1168 frontier.elements(),
1169 self.desc.lower().elements(),
1170 ));
1171 }
1172 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1173 return Err(format!(
1174 "batch since {:?} != minimum antichain {:?}",
1175 self.desc.since().elements(),
1176 &[T::minimum()],
1177 ));
1178 }
1179 for part in self.parts.iter() {
1180 let Some(ts_rewrite) = part.ts_rewrite() else {
1181 continue;
1182 };
1183 if PartialOrder::less_than(frontier, ts_rewrite) {
1184 return Err(format!(
1185 "rewrite frontier {:?} < batch rewrite {:?}",
1186 frontier.elements(),
1187 ts_rewrite.elements(),
1188 ));
1189 }
1190 }
1191
1192 self.desc = Description::new(
1193 self.desc.lower().clone(),
1194 new_upper,
1195 self.desc.since().clone(),
1196 );
1197 for part in &mut self.parts {
1198 match part {
1199 RunPart::Single(BatchPart::Hollow(part)) => {
1200 part.ts_rewrite = Some(frontier.clone())
1201 }
1202 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1203 *ts_rewrite = Some(frontier.clone())
1204 }
1205 RunPart::Many(runs) => {
1206 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1209 }
1210 }
1211 }
1212 Ok(())
1213 }
1214}
1215
1216impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1217 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1218 Some(self.cmp(other))
1219 }
1220}
1221
1222impl<T: Ord> Ord for HollowBatchPart<T> {
1223 fn cmp(&self, other: &Self) -> Ordering {
1224 let HollowBatchPart {
1227 key: self_key,
1228 meta: self_meta,
1229 encoded_size_bytes: self_encoded_size_bytes,
1230 key_lower: self_key_lower,
1231 structured_key_lower: self_structured_key_lower,
1232 stats: self_stats,
1233 ts_rewrite: self_ts_rewrite,
1234 diffs_sum: self_diffs_sum,
1235 format: self_format,
1236 schema_id: self_schema_id,
1237 deprecated_schema_id: self_deprecated_schema_id,
1238 } = self;
1239 let HollowBatchPart {
1240 key: other_key,
1241 meta: other_meta,
1242 encoded_size_bytes: other_encoded_size_bytes,
1243 key_lower: other_key_lower,
1244 structured_key_lower: other_structured_key_lower,
1245 stats: other_stats,
1246 ts_rewrite: other_ts_rewrite,
1247 diffs_sum: other_diffs_sum,
1248 format: other_format,
1249 schema_id: other_schema_id,
1250 deprecated_schema_id: other_deprecated_schema_id,
1251 } = other;
1252 (
1253 self_key,
1254 self_meta,
1255 self_encoded_size_bytes,
1256 self_key_lower,
1257 self_structured_key_lower,
1258 self_stats,
1259 self_ts_rewrite.as_ref().map(|x| x.elements()),
1260 self_diffs_sum,
1261 self_format,
1262 self_schema_id,
1263 self_deprecated_schema_id,
1264 )
1265 .cmp(&(
1266 other_key,
1267 other_meta,
1268 other_encoded_size_bytes,
1269 other_key_lower,
1270 other_structured_key_lower,
1271 other_stats,
1272 other_ts_rewrite.as_ref().map(|x| x.elements()),
1273 other_diffs_sum,
1274 other_format,
1275 other_schema_id,
1276 other_deprecated_schema_id,
1277 ))
1278 }
1279}
1280
1281#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1283pub struct HollowRollup {
1284 pub key: PartialRollupKey,
1286 pub encoded_size_bytes: Option<usize>,
1288}
1289
1290#[derive(Debug)]
1292pub enum HollowBlobRef<'a, T> {
1293 Batch(&'a HollowBatch<T>),
1294 Rollup(&'a HollowRollup),
1295}
1296
1297#[derive(
1299 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1300)]
1301pub struct ActiveRollup {
1302 pub seqno: SeqNo,
1303 pub start_ms: u64,
1304}
1305
1306#[derive(
1308 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1309)]
1310pub struct ActiveGc {
1311 pub seqno: SeqNo,
1312 pub start_ms: u64,
1313}
1314
1315#[derive(Debug)]
1320#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1321pub struct NoOpStateTransition<T>(pub T);
1322
1323#[derive(Debug, Clone)]
1325#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1326pub struct StateCollections<T> {
1327 pub(crate) version: Version,
1331
1332 pub(crate) last_gc_req: SeqNo,
1335
1336 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1338
1339 pub(crate) active_rollup: Option<ActiveRollup>,
1341 pub(crate) active_gc: Option<ActiveGc>,
1343
1344 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1345 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1346 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1347 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1348
1349 pub(crate) trace: Trace<T>,
1354}
1355
1356#[derive(Debug, Clone, Serialize, PartialEq)]
1372pub struct EncodedSchemas {
1373 pub key: Bytes,
1375 pub key_data_type: Bytes,
1378 pub val: Bytes,
1380 pub val_data_type: Bytes,
1383}
1384
1385impl EncodedSchemas {
1386 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1387 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1388 DataType::from_proto(proto).expect("valid DataType")
1389 }
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum CompareAndAppendBreak<T> {
1395 AlreadyCommitted,
1396 Upper {
1397 shard_upper: Antichain<T>,
1398 writer_upper: Antichain<T>,
1399 },
1400 InvalidUsage(InvalidUsage<T>),
1401 InlineBackpressure,
1402}
1403
1404#[derive(Debug)]
1405#[cfg_attr(test, derive(PartialEq))]
1406pub enum SnapshotErr<T> {
1407 AsOfNotYetAvailable(SeqNo, Upper<T>),
1408 AsOfHistoricalDistinctionsLost(Since<T>),
1409}
1410
1411impl<T> StateCollections<T>
1412where
1413 T: Timestamp + Lattice + Codec64,
1414{
1415 pub fn add_rollup(
1416 &mut self,
1417 add_rollup: (SeqNo, &HollowRollup),
1418 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1419 let (rollup_seqno, rollup) = add_rollup;
1420 let applied = match self.rollups.get(&rollup_seqno) {
1421 Some(x) => x.key == rollup.key,
1422 None => {
1423 self.active_rollup = None;
1424 self.rollups.insert(rollup_seqno, rollup.to_owned());
1425 true
1426 }
1427 };
1428 Continue(applied)
1432 }
1433
1434 pub fn remove_rollups(
1435 &mut self,
1436 remove_rollups: &[(SeqNo, PartialRollupKey)],
1437 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1438 if remove_rollups.is_empty() || self.is_tombstone() {
1439 return Break(NoOpStateTransition(vec![]));
1440 }
1441
1442 self.active_gc = None;
1445
1446 let mut removed = vec![];
1447 for (seqno, key) in remove_rollups {
1448 let removed_key = self.rollups.remove(seqno);
1449 debug_assert!(
1450 removed_key.as_ref().map_or(true, |x| &x.key == key),
1451 "{} vs {:?}",
1452 key,
1453 removed_key
1454 );
1455
1456 if removed_key.is_some() {
1457 removed.push(*seqno);
1458 }
1459 }
1460
1461 Continue(removed)
1462 }
1463
1464 pub fn register_leased_reader(
1465 &mut self,
1466 hostname: &str,
1467 reader_id: &LeasedReaderId,
1468 purpose: &str,
1469 seqno: SeqNo,
1470 lease_duration: Duration,
1471 heartbeat_timestamp_ms: u64,
1472 use_critical_since: bool,
1473 ) -> ControlFlow<
1474 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1475 (LeasedReaderState<T>, SeqNo),
1476 > {
1477 let since = if use_critical_since {
1478 self.critical_since()
1479 .unwrap_or_else(|| self.trace.since().clone())
1480 } else {
1481 self.trace.since().clone()
1482 };
1483 let reader_state = LeasedReaderState {
1484 debug: HandleDebugState {
1485 hostname: hostname.to_owned(),
1486 purpose: purpose.to_owned(),
1487 },
1488 seqno,
1489 since,
1490 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1491 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1492 .expect("lease duration as millis should fit within u64"),
1493 };
1494
1495 if self.is_tombstone() {
1500 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1501 }
1502
1503 self.leased_readers
1505 .insert(reader_id.clone(), reader_state.clone());
1506 Continue((reader_state, self.seqno_since(seqno)))
1507 }
1508
1509 pub fn register_critical_reader<O: Opaque + Codec64>(
1510 &mut self,
1511 hostname: &str,
1512 reader_id: &CriticalReaderId,
1513 purpose: &str,
1514 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1515 let state = CriticalReaderState {
1516 debug: HandleDebugState {
1517 hostname: hostname.to_owned(),
1518 purpose: purpose.to_owned(),
1519 },
1520 since: self.trace.since().clone(),
1521 opaque: OpaqueState(Codec64::encode(&O::initial())),
1522 opaque_codec: O::codec_name(),
1523 };
1524
1525 if self.is_tombstone() {
1530 return Break(NoOpStateTransition(state));
1531 }
1532
1533 let state = match self.critical_readers.get_mut(reader_id) {
1534 Some(existing_state) => {
1535 existing_state.debug = state.debug;
1536 existing_state.clone()
1537 }
1538 None => {
1539 self.critical_readers
1540 .insert(reader_id.clone(), state.clone());
1541 state
1542 }
1543 };
1544 Continue(state)
1545 }
1546
1547 pub fn register_schema<K: Codec, V: Codec>(
1548 &mut self,
1549 key_schema: &K::Schema,
1550 val_schema: &V::Schema,
1551 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1552 fn encode_data_type(data_type: &DataType) -> Bytes {
1553 let proto = data_type.into_proto();
1554 prost::Message::encode_to_vec(&proto).into()
1555 }
1556
1557 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1569 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1570 });
1571 match existing_id {
1572 Some((schema_id, _)) => {
1573 Break(NoOpStateTransition(Some(*schema_id)))
1578 }
1579 None if self.is_tombstone() => {
1580 Break(NoOpStateTransition(None))
1582 }
1583 None if self.schemas.is_empty() => {
1584 let id = SchemaId(self.schemas.len());
1588 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1589 .expect("valid key schema");
1590 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1591 .expect("valid val schema");
1592 let prev = self.schemas.insert(
1593 id,
1594 EncodedSchemas {
1595 key: K::encode_schema(key_schema),
1596 key_data_type: encode_data_type(&key_data_type),
1597 val: V::encode_schema(val_schema),
1598 val_data_type: encode_data_type(&val_data_type),
1599 },
1600 );
1601 assert_eq!(prev, None);
1602 Continue(Some(id))
1603 }
1604 None => {
1605 info!(
1606 "register_schemas got {:?} expected {:?}",
1607 key_schema,
1608 self.schemas
1609 .iter()
1610 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1611 .collect::<Vec<_>>()
1612 );
1613 Break(NoOpStateTransition(None))
1616 }
1617 }
1618 }
1619
1620 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1621 &mut self,
1622 expected: SchemaId,
1623 key_schema: &K::Schema,
1624 val_schema: &V::Schema,
1625 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1626 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1627 let array = Schema::encoder(schema).expect("valid schema").finish();
1631 Array::data_type(&array).clone()
1632 }
1633
1634 let (current_id, current) = self
1635 .schemas
1636 .last_key_value()
1637 .expect("all shards have a schema");
1638 if *current_id != expected {
1639 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1640 schema_id: *current_id,
1641 key: K::decode_schema(¤t.key),
1642 val: V::decode_schema(¤t.val),
1643 }));
1644 }
1645
1646 let current_key = K::decode_schema(¤t.key);
1647 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1648 let current_val = V::decode_schema(¤t.val);
1649 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1650
1651 let key_dt = data_type(key_schema);
1652 let val_dt = data_type(val_schema);
1653
1654 if current_key == *key_schema
1656 && current_key_dt == key_dt
1657 && current_val == *val_schema
1658 && current_val_dt == val_dt
1659 {
1660 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1661 }
1662
1663 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1664 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1665 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1666 return Break(NoOpStateTransition(CaESchema::Incompatible));
1667 };
1668 if key_fn.contains_drop() || val_fn.contains_drop() {
1672 return Break(NoOpStateTransition(CaESchema::Incompatible));
1673 }
1674
1675 let id = SchemaId(self.schemas.len());
1679 self.schemas.insert(
1680 id,
1681 EncodedSchemas {
1682 key: K::encode_schema(key_schema),
1683 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1684 val: V::encode_schema(val_schema),
1685 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1686 },
1687 );
1688 Continue(CaESchema::Ok(id))
1689 }
1690
1691 pub fn compare_and_append(
1692 &mut self,
1693 batch: &HollowBatch<T>,
1694 writer_id: &WriterId,
1695 heartbeat_timestamp_ms: u64,
1696 lease_duration_ms: u64,
1697 idempotency_token: &IdempotencyToken,
1698 debug_info: &HandleDebugState,
1699 inline_writes_total_max_bytes: usize,
1700 claim_compaction_percent: usize,
1701 claim_compaction_min_version: Option<&Version>,
1702 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1703 if self.is_tombstone() {
1708 assert_eq!(self.trace.upper(), &Antichain::new());
1709 return Break(CompareAndAppendBreak::Upper {
1710 shard_upper: Antichain::new(),
1711 writer_upper: Antichain::new(),
1716 });
1717 }
1718
1719 let writer_state = self
1720 .writers
1721 .entry(writer_id.clone())
1722 .or_insert_with(|| WriterState {
1723 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1724 lease_duration_ms,
1725 most_recent_write_token: IdempotencyToken::SENTINEL,
1726 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1727 debug: debug_info.clone(),
1728 });
1729
1730 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1731 return Break(CompareAndAppendBreak::InvalidUsage(
1732 InvalidUsage::InvalidBounds {
1733 lower: batch.desc.lower().clone(),
1734 upper: batch.desc.upper().clone(),
1735 },
1736 ));
1737 }
1738
1739 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1742 return Break(CompareAndAppendBreak::InvalidUsage(
1743 InvalidUsage::InvalidEmptyTimeInterval {
1744 lower: batch.desc.lower().clone(),
1745 upper: batch.desc.upper().clone(),
1746 keys: batch
1747 .parts
1748 .iter()
1749 .map(|x| x.printable_name().to_owned())
1750 .collect(),
1751 },
1752 ));
1753 }
1754
1755 if idempotency_token == &writer_state.most_recent_write_token {
1756 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1761 assert!(
1762 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1763 "{:?} vs {:?}",
1764 batch.desc.upper(),
1765 self.trace.upper()
1766 );
1767 return Break(CompareAndAppendBreak::AlreadyCommitted);
1768 }
1769
1770 let shard_upper = self.trace.upper();
1771 if shard_upper != batch.desc.lower() {
1772 return Break(CompareAndAppendBreak::Upper {
1773 shard_upper: shard_upper.clone(),
1774 writer_upper: writer_state.most_recent_write_upper.clone(),
1775 });
1776 }
1777
1778 let new_inline_bytes = batch.inline_bytes();
1779 if new_inline_bytes > 0 {
1780 let mut existing_inline_bytes = 0;
1781 self.trace
1782 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1783 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1787 return Break(CompareAndAppendBreak::InlineBackpressure);
1788 }
1789 }
1790
1791 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1792 self.trace.push_batch(batch.clone())
1793 } else {
1794 Vec::new()
1795 };
1796
1797 let all_empty_reqs = merge_reqs
1800 .iter()
1801 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1802 if all_empty_reqs && !batch.is_empty() {
1803 let mut reqs_to_take = claim_compaction_percent / 100;
1804 if (usize::cast_from(idempotency_token.hashed()) % 100)
1805 < (claim_compaction_percent % 100)
1806 {
1807 reqs_to_take += 1;
1808 }
1809 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1810 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1811 merge_reqs.extend(
1812 self.trace
1815 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1816 .take(reqs_to_take),
1817 )
1818 }
1819
1820 for req in &merge_reqs {
1821 self.trace.claim_compaction(
1822 req.id,
1823 ActiveCompaction {
1824 start_ms: heartbeat_timestamp_ms,
1825 },
1826 )
1827 }
1828
1829 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1830 writer_state.most_recent_write_token = idempotency_token.clone();
1831 assert!(
1833 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1834 "{:?} vs {:?}",
1835 &writer_state.most_recent_write_upper,
1836 batch.desc.upper()
1837 );
1838 writer_state
1839 .most_recent_write_upper
1840 .clone_from(batch.desc.upper());
1841
1842 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1844 heartbeat_timestamp_ms,
1845 writer_state.last_heartbeat_timestamp_ms,
1846 );
1847
1848 Continue(merge_reqs)
1849 }
1850
1851 pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1852 &mut self,
1853 res: &FueledMergeRes<T>,
1854 metrics: &ColumnarMetrics,
1855 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1856 if self.is_tombstone() {
1861 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1862 }
1863
1864 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1865 Continue(apply_merge_result)
1866 }
1867
1868 pub fn spine_exert(
1869 &mut self,
1870 fuel: usize,
1871 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1872 let (merge_reqs, did_work) = self.trace.exert(fuel);
1873 if did_work {
1874 Continue(merge_reqs)
1875 } else {
1876 assert!(merge_reqs.is_empty());
1877 Break(NoOpStateTransition(Vec::new()))
1880 }
1881 }
1882
1883 pub fn downgrade_since(
1884 &mut self,
1885 reader_id: &LeasedReaderId,
1886 seqno: SeqNo,
1887 outstanding_seqno: SeqNo,
1888 new_since: &Antichain<T>,
1889 heartbeat_timestamp_ms: u64,
1890 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1891 if self.is_tombstone() {
1896 return Break(NoOpStateTransition(Since(Antichain::new())));
1897 }
1898
1899 let Some(reader_state) = self.leased_reader(reader_id) else {
1902 tracing::warn!(
1903 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1904 );
1905 return Break(NoOpStateTransition(Since(Antichain::new())));
1906 };
1907
1908 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1911 heartbeat_timestamp_ms,
1912 reader_state.last_heartbeat_timestamp_ms,
1913 );
1914
1915 let seqno = {
1916 assert!(
1917 outstanding_seqno >= reader_state.seqno,
1918 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1919 is behind current reader_state ({:?})",
1920 outstanding_seqno,
1921 reader_state.seqno,
1922 );
1923 std::cmp::min(outstanding_seqno, seqno)
1924 };
1925
1926 reader_state.seqno = seqno;
1927
1928 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1929 reader_state.since.clone_from(new_since);
1930 self.update_since();
1931 new_since.clone()
1932 } else {
1933 reader_state.since.clone()
1936 };
1937
1938 Continue(Since(reader_current_since))
1939 }
1940
1941 pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1942 &mut self,
1943 reader_id: &CriticalReaderId,
1944 expected_opaque: &O,
1945 (new_opaque, new_since): (&O, &Antichain<T>),
1946 ) -> ControlFlow<
1947 NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1948 Result<Since<T>, (O, Since<T>)>,
1949 > {
1950 if self.is_tombstone() {
1955 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1959 }
1960
1961 let reader_state = self.critical_reader(reader_id);
1962 assert_eq!(reader_state.opaque_codec, O::codec_name());
1963
1964 if &O::decode(reader_state.opaque.0) != expected_opaque {
1965 return Continue(Err((
1968 Codec64::decode(reader_state.opaque.0),
1969 Since(reader_state.since.clone()),
1970 )));
1971 }
1972
1973 reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1974 if PartialOrder::less_equal(&reader_state.since, new_since) {
1975 reader_state.since.clone_from(new_since);
1976 self.update_since();
1977 Continue(Ok(Since(new_since.clone())))
1978 } else {
1979 Continue(Ok(Since(reader_state.since.clone())))
1983 }
1984 }
1985
1986 pub fn expire_leased_reader(
1987 &mut self,
1988 reader_id: &LeasedReaderId,
1989 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1990 if self.is_tombstone() {
1995 return Break(NoOpStateTransition(false));
1996 }
1997
1998 let existed = self.leased_readers.remove(reader_id).is_some();
1999 if existed {
2000 }
2014 Continue(existed)
2017 }
2018
2019 pub fn expire_critical_reader(
2020 &mut self,
2021 reader_id: &CriticalReaderId,
2022 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2023 if self.is_tombstone() {
2028 return Break(NoOpStateTransition(false));
2029 }
2030
2031 let existed = self.critical_readers.remove(reader_id).is_some();
2032 if existed {
2033 }
2047 Continue(existed)
2051 }
2052
2053 pub fn expire_writer(
2054 &mut self,
2055 writer_id: &WriterId,
2056 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2057 if self.is_tombstone() {
2062 return Break(NoOpStateTransition(false));
2063 }
2064
2065 let existed = self.writers.remove(writer_id).is_some();
2066 Continue(existed)
2070 }
2071
2072 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2073 self.leased_readers.get_mut(id)
2074 }
2075
2076 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2077 self.critical_readers
2078 .get_mut(id)
2079 .unwrap_or_else(|| {
2080 panic!(
2081 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2082 id
2083 )
2084 })
2085 }
2086
2087 fn critical_since(&self) -> Option<Antichain<T>> {
2088 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2089 let mut since = critical_sinces.next().cloned()?;
2090 for s in critical_sinces {
2091 since.meet_assign(s);
2092 }
2093 Some(since)
2094 }
2095
2096 fn update_since(&mut self) {
2097 let mut sinces_iter = self
2098 .leased_readers
2099 .values()
2100 .map(|x| &x.since)
2101 .chain(self.critical_readers.values().map(|x| &x.since));
2102 let mut since = match sinces_iter.next() {
2103 Some(since) => since.clone(),
2104 None => {
2105 return;
2108 }
2109 };
2110 while let Some(s) = sinces_iter.next() {
2111 since.meet_assign(s);
2112 }
2113 self.trace.downgrade_since(&since);
2114 }
2115
2116 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2117 let mut seqno_since = seqno;
2118 for cap in self.leased_readers.values() {
2119 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2120 }
2121 seqno_since
2123 }
2124
2125 fn tombstone_batch() -> HollowBatch<T> {
2126 HollowBatch::empty(Description::new(
2127 Antichain::from_elem(T::minimum()),
2128 Antichain::new(),
2129 Antichain::new(),
2130 ))
2131 }
2132
2133 pub(crate) fn is_tombstone(&self) -> bool {
2134 self.trace.upper().is_empty()
2135 && self.trace.since().is_empty()
2136 && self.writers.is_empty()
2137 && self.leased_readers.is_empty()
2138 && self.critical_readers.is_empty()
2139 }
2140
2141 pub(crate) fn is_single_empty_batch(&self) -> bool {
2142 let mut batch_count = 0;
2143 let mut is_empty = true;
2144 self.trace.map_batches(|b| {
2145 batch_count += 1;
2146 is_empty &= b.is_empty()
2147 });
2148 batch_count <= 1 && is_empty
2149 }
2150
2151 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2152 assert_eq!(self.trace.upper(), &Antichain::new());
2153 assert_eq!(self.trace.since(), &Antichain::new());
2154
2155 let was_tombstone = self.is_tombstone();
2158
2159 self.writers.clear();
2161 self.leased_readers.clear();
2162 self.critical_readers.clear();
2163
2164 debug_assert!(self.is_tombstone());
2165
2166 let mut to_replace = None;
2175 let mut batch_count = 0;
2176 self.trace.map_batches(|b| {
2177 batch_count += 1;
2178 if !b.is_empty() && to_replace.is_none() {
2179 to_replace = Some(b.desc.clone());
2180 }
2181 });
2182 if let Some(desc) = to_replace {
2183 let result = self.trace.apply_tombstone_merge(&desc);
2187 assert!(
2188 result.matched(),
2189 "merge with a matching desc should always match"
2190 );
2191 Continue(())
2192 } else if batch_count > 1 {
2193 let mut new_trace = Trace::default();
2198 new_trace.downgrade_since(&Antichain::new());
2199 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2200 assert_eq!(merge_reqs, Vec::new());
2201 self.trace = new_trace;
2202 Continue(())
2203 } else if !was_tombstone {
2204 Continue(())
2207 } else {
2208 Break(NoOpStateTransition(()))
2211 }
2212 }
2213}
2214
2215#[derive(Debug)]
2217#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2218pub struct State<T> {
2219 pub(crate) shard_id: ShardId,
2220
2221 pub(crate) seqno: SeqNo,
2222 pub(crate) walltime_ms: u64,
2225 pub(crate) hostname: String,
2228 pub(crate) collections: StateCollections<T>,
2229}
2230
2231pub struct TypedState<K, V, T, D> {
2234 pub(crate) state: State<T>,
2235
2236 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2244}
2245
2246impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2247 #[cfg(any(test, debug_assertions))]
2248 pub(crate) fn clone(&self, hostname: String) -> Self {
2249 TypedState {
2250 state: State {
2251 shard_id: self.shard_id.clone(),
2252 seqno: self.seqno.clone(),
2253 walltime_ms: self.walltime_ms,
2254 hostname,
2255 collections: self.collections.clone(),
2256 },
2257 _phantom: PhantomData,
2258 }
2259 }
2260
2261 pub(crate) fn clone_for_rollup(&self) -> Self {
2262 TypedState {
2263 state: State {
2264 shard_id: self.shard_id.clone(),
2265 seqno: self.seqno.clone(),
2266 walltime_ms: self.walltime_ms,
2267 hostname: self.hostname.clone(),
2268 collections: self.collections.clone(),
2269 },
2270 _phantom: PhantomData,
2271 }
2272 }
2273}
2274
2275impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2276 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2277 let TypedState { state, _phantom } = self;
2280 f.debug_struct("TypedState").field("state", state).finish()
2281 }
2282}
2283
2284#[cfg(any(test, debug_assertions))]
2286impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2287 fn eq(&self, other: &Self) -> bool {
2288 let TypedState {
2291 state: self_state,
2292 _phantom,
2293 } = self;
2294 let TypedState {
2295 state: other_state,
2296 _phantom,
2297 } = other;
2298 self_state == other_state
2299 }
2300}
2301
2302impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2303 type Target = State<T>;
2304
2305 fn deref(&self) -> &Self::Target {
2306 &self.state
2307 }
2308}
2309
2310impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2311 fn deref_mut(&mut self) -> &mut Self::Target {
2312 &mut self.state
2313 }
2314}
2315
2316impl<K, V, T, D> TypedState<K, V, T, D>
2317where
2318 K: Codec,
2319 V: Codec,
2320 T: Timestamp + Lattice + Codec64,
2321 D: Codec64,
2322{
2323 pub fn new(
2324 applier_version: Version,
2325 shard_id: ShardId,
2326 hostname: String,
2327 walltime_ms: u64,
2328 ) -> Self {
2329 let state = State {
2330 shard_id,
2331 seqno: SeqNo::minimum(),
2332 walltime_ms,
2333 hostname,
2334 collections: StateCollections {
2335 version: applier_version,
2336 last_gc_req: SeqNo::minimum(),
2337 rollups: BTreeMap::new(),
2338 active_rollup: None,
2339 active_gc: None,
2340 leased_readers: BTreeMap::new(),
2341 critical_readers: BTreeMap::new(),
2342 writers: BTreeMap::new(),
2343 schemas: BTreeMap::new(),
2344 trace: Trace::default(),
2345 },
2346 };
2347 TypedState {
2348 state,
2349 _phantom: PhantomData,
2350 }
2351 }
2352
2353 pub fn clone_apply<R, E, WorkFn>(
2354 &self,
2355 cfg: &PersistConfig,
2356 work_fn: &mut WorkFn,
2357 ) -> ControlFlow<E, (R, Self)>
2358 where
2359 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2360 {
2361 let mut new_state = State {
2363 shard_id: self.shard_id,
2364 seqno: self.seqno.next(),
2365 walltime_ms: (cfg.now)(),
2366 hostname: cfg.hostname.clone(),
2367 collections: self.collections.clone(),
2368 };
2369
2370 if new_state.walltime_ms <= self.walltime_ms {
2373 new_state.walltime_ms = self.walltime_ms + 1;
2374 }
2375
2376 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2377 let new_state = TypedState {
2378 state: new_state,
2379 _phantom: PhantomData,
2380 };
2381 Continue((work_ret, new_state))
2382 }
2383}
2384
2385#[derive(Copy, Clone, Debug)]
2386pub struct GcConfig {
2387 pub use_active_gc: bool,
2388 pub fallback_threshold_ms: u64,
2389 pub min_versions: usize,
2390 pub max_versions: usize,
2391}
2392
2393impl<T> State<T>
2394where
2395 T: Timestamp + Lattice + Codec64,
2396{
2397 pub fn shard_id(&self) -> ShardId {
2398 self.shard_id
2399 }
2400
2401 pub fn seqno(&self) -> SeqNo {
2402 self.seqno
2403 }
2404
2405 pub fn since(&self) -> &Antichain<T> {
2406 self.collections.trace.since()
2407 }
2408
2409 pub fn upper(&self) -> &Antichain<T> {
2410 self.collections.trace.upper()
2411 }
2412
2413 pub fn spine_batch_count(&self) -> usize {
2414 self.collections.trace.num_spine_batches()
2415 }
2416
2417 pub fn size_metrics(&self) -> StateSizeMetrics {
2418 let mut ret = StateSizeMetrics::default();
2419 self.blobs().for_each(|x| match x {
2420 HollowBlobRef::Batch(x) => {
2421 ret.hollow_batch_count += 1;
2422 ret.batch_part_count += x.part_count();
2423 ret.num_updates += x.len;
2424
2425 let batch_size = x.encoded_size_bytes();
2426 for x in x.parts.iter() {
2427 if x.ts_rewrite().is_some() {
2428 ret.rewrite_part_count += 1;
2429 }
2430 if x.is_inline() {
2431 ret.inline_part_count += 1;
2432 ret.inline_part_bytes += x.inline_bytes();
2433 }
2434 }
2435 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2436 ret.state_batches_bytes += batch_size;
2437 }
2438 HollowBlobRef::Rollup(x) => {
2439 ret.state_rollup_count += 1;
2440 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2441 }
2442 });
2443 ret
2444 }
2445
2446 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2447 self.collections
2450 .rollups
2451 .iter()
2452 .rev()
2453 .next()
2454 .expect("State should have at least one rollup if seqno > minimum")
2455 }
2456
2457 pub(crate) fn seqno_since(&self) -> SeqNo {
2458 self.collections.seqno_since(self.seqno)
2459 }
2460
2461 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2473 let GcConfig {
2474 use_active_gc,
2475 fallback_threshold_ms,
2476 min_versions,
2477 max_versions,
2478 } = cfg;
2479 let gc_threshold = if use_active_gc {
2483 u64::cast_from(min_versions)
2484 } else {
2485 std::cmp::max(
2486 1,
2487 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2488 )
2489 };
2490 let new_seqno_since = self.seqno_since();
2491 let gc_until_seqno = new_seqno_since.min(SeqNo(
2494 self.collections
2495 .last_gc_req
2496 .0
2497 .saturating_add(u64::cast_from(max_versions)),
2498 ));
2499 let should_gc = new_seqno_since
2500 .0
2501 .saturating_sub(self.collections.last_gc_req.0)
2502 >= gc_threshold;
2503
2504 let should_gc = if use_active_gc && !should_gc {
2507 match self.collections.active_gc {
2508 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2509 None => false,
2510 }
2511 } else {
2512 should_gc
2513 };
2514 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2517 let tombstone_needs_gc = self.collections.is_tombstone();
2522 let should_gc = should_gc || tombstone_needs_gc;
2523 let should_gc = if use_active_gc {
2524 should_gc
2528 && match self.collections.active_gc {
2529 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2530 None => true,
2531 }
2532 } else {
2533 should_gc
2534 };
2535 if should_gc {
2536 self.collections.last_gc_req = gc_until_seqno;
2537 Some(GcReq {
2538 shard_id: self.shard_id,
2539 new_seqno_since: gc_until_seqno,
2540 })
2541 } else {
2542 None
2543 }
2544 }
2545
2546 pub fn seqnos_held(&self) -> usize {
2548 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2549 }
2550
2551 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2553 let mut metrics = ExpiryMetrics::default();
2554 let shard_id = self.shard_id();
2555 self.collections.leased_readers.retain(|id, state| {
2556 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2557 if !retain {
2558 info!(
2559 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2560 state.debug.purpose
2561 );
2562 metrics.readers_expired += 1;
2563 }
2564 retain
2565 });
2566 self.collections.writers.retain(|id, state| {
2568 let retain =
2569 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2570 if !retain {
2571 info!(
2572 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2573 state.debug.purpose
2574 );
2575 metrics.writers_expired += 1;
2576 }
2577 retain
2578 });
2579 metrics
2580 }
2581
2582 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2586 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2587 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2588 self.collections.trace.since().clone(),
2589 )));
2590 }
2591 let upper = self.collections.trace.upper();
2592 if PartialOrder::less_equal(upper, as_of) {
2593 return Err(SnapshotErr::AsOfNotYetAvailable(
2594 self.seqno,
2595 Upper(upper.clone()),
2596 ));
2597 }
2598
2599 let batches = self
2600 .collections
2601 .trace
2602 .batches()
2603 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2604 .cloned()
2605 .collect();
2606 Ok(batches)
2607 }
2608
2609 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2611 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2612 return Err(Since(self.collections.trace.since().clone()));
2613 }
2614 Ok(())
2615 }
2616
2617 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2618 self.collections
2621 .trace
2622 .batches()
2623 .find(|b| {
2624 PartialOrder::less_equal(b.desc.lower(), frontier)
2625 && PartialOrder::less_than(frontier, b.desc.upper())
2626 })
2627 .cloned()
2628 .ok_or(self.seqno)
2629 }
2630
2631 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2632 self.collections.active_rollup
2633 }
2634
2635 pub fn need_rollup(
2636 &self,
2637 threshold: usize,
2638 use_active_rollup: bool,
2639 fallback_threshold_ms: u64,
2640 now: u64,
2641 ) -> Option<SeqNo> {
2642 let (latest_rollup_seqno, _) = self.latest_rollup();
2643
2644 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2650 return Some(self.seqno);
2651 }
2652
2653 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2654
2655 if use_active_rollup {
2656 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2662 match self.active_rollup() {
2663 Some(active_rollup) => {
2664 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2665 return Some(self.seqno);
2666 }
2667 }
2668 None => {
2669 return Some(self.seqno);
2670 }
2671 }
2672 }
2673 } else {
2674 if seqnos_since_last_rollup > 0
2678 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2679 {
2680 return Some(self.seqno);
2681 }
2682
2683 if seqnos_since_last_rollup
2686 > u64::cast_from(
2687 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2688 )
2689 {
2690 return Some(self.seqno);
2691 }
2692 }
2693
2694 None
2695 }
2696
2697 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2698 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2699 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2700 batches.chain(rollups)
2701 }
2702}
2703
2704fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2705 let val = hex::encode(val);
2706 val.serialize(s)
2707}
2708
2709fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2710 val: &Option<LazyProto<T>>,
2711 s: S,
2712) -> Result<S::Ok, S::Error> {
2713 val.as_ref()
2714 .map(|lazy| hex::encode(&lazy.into_proto()))
2715 .serialize(s)
2716}
2717
2718fn serialize_part_stats<S: Serializer>(
2719 val: &Option<LazyPartStats>,
2720 s: S,
2721) -> Result<S::Ok, S::Error> {
2722 let val = val.as_ref().map(|x| x.decode().key);
2723 val.serialize(s)
2724}
2725
2726fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2727 let val = val.map(i64::decode);
2729 val.serialize(s)
2730}
2731
2732impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2738 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2739 let State {
2740 shard_id,
2741 seqno,
2742 walltime_ms,
2743 hostname,
2744 collections:
2745 StateCollections {
2746 version: applier_version,
2747 last_gc_req,
2748 rollups,
2749 active_rollup,
2750 active_gc,
2751 leased_readers,
2752 critical_readers,
2753 writers,
2754 schemas,
2755 trace,
2756 },
2757 } = self;
2758 let mut s = s.serialize_struct("State", 13)?;
2759 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2760 let () = s.serialize_field("shard_id", shard_id)?;
2761 let () = s.serialize_field("seqno", seqno)?;
2762 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2763 let () = s.serialize_field("hostname", hostname)?;
2764 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2765 let () = s.serialize_field("rollups", rollups)?;
2766 let () = s.serialize_field("active_rollup", active_rollup)?;
2767 let () = s.serialize_field("active_gc", active_gc)?;
2768 let () = s.serialize_field("leased_readers", leased_readers)?;
2769 let () = s.serialize_field("critical_readers", critical_readers)?;
2770 let () = s.serialize_field("writers", writers)?;
2771 let () = s.serialize_field("schemas", schemas)?;
2772 let () = s.serialize_field("since", &trace.since().elements())?;
2773 let () = s.serialize_field("upper", &trace.upper().elements())?;
2774 let trace = trace.flatten();
2775 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2776 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2777 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2778 let () = s.serialize_field("merges", &trace.merges)?;
2779 s.end()
2780 }
2781}
2782
2783#[derive(Debug, Default)]
2784pub struct StateSizeMetrics {
2785 pub hollow_batch_count: usize,
2786 pub batch_part_count: usize,
2787 pub rewrite_part_count: usize,
2788 pub num_updates: usize,
2789 pub largest_batch_bytes: usize,
2790 pub state_batches_bytes: usize,
2791 pub state_rollups_bytes: usize,
2792 pub state_rollup_count: usize,
2793 pub inline_part_count: usize,
2794 pub inline_part_bytes: usize,
2795}
2796
2797#[derive(Default)]
2798pub struct ExpiryMetrics {
2799 pub(crate) readers_expired: usize,
2800 pub(crate) writers_expired: usize,
2801}
2802
2803#[derive(Debug, Clone, PartialEq)]
2805pub struct Since<T>(pub Antichain<T>);
2806
2807#[derive(Debug, PartialEq)]
2809pub struct Upper<T>(pub Antichain<T>);
2810
2811#[cfg(test)]
2812pub(crate) mod tests {
2813 use std::ops::Range;
2814 use std::str::FromStr;
2815
2816 use bytes::Bytes;
2817 use mz_build_info::DUMMY_BUILD_INFO;
2818 use mz_dyncfg::ConfigUpdates;
2819 use mz_ore::now::SYSTEM_TIME;
2820 use mz_ore::{assert_none, assert_ok};
2821 use mz_proto::RustType;
2822 use proptest::prelude::*;
2823 use proptest::strategy::ValueTree;
2824
2825 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2826 use crate::cache::PersistClientCache;
2827 use crate::internal::encoding::any_some_lazy_part_stats;
2828 use crate::internal::paths::RollupId;
2829 use crate::internal::trace::tests::any_trace;
2830 use crate::tests::new_test_client_cache;
2831 use crate::{Diagnostics, PersistLocation};
2832
2833 use super::*;
2834
2835 const LEASE_DURATION_MS: u64 = 900 * 1000;
2836 fn debug_state() -> HandleDebugState {
2837 HandleDebugState {
2838 hostname: "debug".to_owned(),
2839 purpose: "finding the bugs".to_owned(),
2840 }
2841 }
2842
2843 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2844 num_runs: usize,
2845 ) -> impl Strategy<Value = HollowBatch<T>> {
2846 (
2847 any::<T>(),
2848 any::<T>(),
2849 any::<T>(),
2850 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2851 any::<usize>(),
2852 )
2853 .prop_map(move |(t0, t1, since, parts, len)| {
2854 let (lower, upper) = if t0 <= t1 {
2855 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2856 } else {
2857 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2858 };
2859 let since = Antichain::from_elem(since);
2860
2861 let run_splits = (1..num_runs)
2862 .map(|i| i * parts.len() / num_runs)
2863 .collect::<Vec<_>>();
2864
2865 let run_meta = (0..num_runs)
2866 .map(|_| {
2867 let mut meta = RunMeta::default();
2868 meta.id = Some(RunId::new());
2869 meta
2870 })
2871 .collect::<Vec<_>>();
2872
2873 HollowBatch::new(
2874 Description::new(lower, upper, since),
2875 parts,
2876 len % 10,
2877 run_meta,
2878 run_splits,
2879 )
2880 })
2881 }
2882
2883 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2884 Strategy::prop_map(
2885 (
2886 any::<T>(),
2887 any::<T>(),
2888 any::<T>(),
2889 proptest::collection::vec(any_run_part::<T>(), 0..20),
2890 any::<usize>(),
2891 0..=10usize,
2892 proptest::collection::vec(any::<RunId>(), 10),
2893 ),
2894 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2895 let (lower, upper) = if t0 <= t1 {
2896 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2897 } else {
2898 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2899 };
2900 let since = Antichain::from_elem(since);
2901 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2902 let run_splits = (1..num_runs)
2903 .map(|i| i * parts.len() / num_runs)
2904 .collect::<Vec<_>>();
2905
2906 let run_meta = (0..num_runs)
2907 .enumerate()
2908 .map(|(i, _)| {
2909 let mut meta = RunMeta::default();
2910 meta.id = Some(run_ids[i]);
2911 meta
2912 })
2913 .collect::<Vec<_>>();
2914
2915 HollowBatch::new(
2916 Description::new(lower, upper, since),
2917 parts,
2918 len % 10,
2919 run_meta,
2920 run_splits,
2921 )
2922 } else {
2923 HollowBatch::new_run_for_test(
2924 Description::new(lower, upper, since),
2925 parts,
2926 len % 10,
2927 run_ids[0],
2928 )
2929 }
2930 },
2931 )
2932 }
2933
2934 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2935 Strategy::prop_map(
2936 (
2937 any::<bool>(),
2938 any_hollow_batch_part(),
2939 any::<Option<T>>(),
2940 any::<Option<SchemaId>>(),
2941 any::<Option<SchemaId>>(),
2942 ),
2943 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2944 if is_hollow {
2945 BatchPart::Hollow(hollow)
2946 } else {
2947 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2948 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2949 BatchPart::Inline {
2950 updates,
2951 ts_rewrite,
2952 schema_id,
2953 deprecated_schema_id,
2954 }
2955 }
2956 },
2957 )
2958 }
2959
2960 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2961 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2962 }
2963
2964 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2965 -> impl Strategy<Value = HollowBatchPart<T>> {
2966 Strategy::prop_map(
2967 (
2968 any::<PartialBatchKey>(),
2969 any::<usize>(),
2970 any::<Vec<u8>>(),
2971 any_some_lazy_part_stats(),
2972 any::<Option<T>>(),
2973 any::<[u8; 8]>(),
2974 any::<Option<BatchColumnarFormat>>(),
2975 any::<Option<SchemaId>>(),
2976 any::<Option<SchemaId>>(),
2977 ),
2978 |(
2979 key,
2980 encoded_size_bytes,
2981 key_lower,
2982 stats,
2983 ts_rewrite,
2984 diffs_sum,
2985 format,
2986 schema_id,
2987 deprecated_schema_id,
2988 )| {
2989 HollowBatchPart {
2990 key,
2991 meta: Default::default(),
2992 encoded_size_bytes,
2993 key_lower,
2994 structured_key_lower: None,
2995 stats,
2996 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2997 diffs_sum: Some(diffs_sum),
2998 format,
2999 schema_id,
3000 deprecated_schema_id,
3001 }
3002 },
3003 )
3004 }
3005
3006 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3007 Strategy::prop_map(
3008 (
3009 any::<SeqNo>(),
3010 any::<Option<T>>(),
3011 any::<u64>(),
3012 any::<u64>(),
3013 any::<HandleDebugState>(),
3014 ),
3015 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3016 if lease_duration_ms == 0 {
3020 lease_duration_ms += 1;
3021 }
3022 LeasedReaderState {
3023 seqno,
3024 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3025 last_heartbeat_timestamp_ms,
3026 lease_duration_ms,
3027 debug,
3028 }
3029 },
3030 )
3031 }
3032
3033 pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3034 where
3035 T: Arbitrary,
3036 {
3037 Strategy::prop_map(
3038 (
3039 any::<Option<T>>(),
3040 any::<OpaqueState>(),
3041 any::<String>(),
3042 any::<HandleDebugState>(),
3043 ),
3044 |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3045 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3046 opaque,
3047 opaque_codec,
3048 debug,
3049 },
3050 )
3051 }
3052
3053 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3054 Strategy::prop_map(
3055 (
3056 any::<u64>(),
3057 any::<u64>(),
3058 any::<IdempotencyToken>(),
3059 any::<Option<T>>(),
3060 any::<HandleDebugState>(),
3061 ),
3062 |(
3063 last_heartbeat_timestamp_ms,
3064 lease_duration_ms,
3065 most_recent_write_token,
3066 most_recent_write_upper,
3067 debug,
3068 )| WriterState {
3069 last_heartbeat_timestamp_ms,
3070 lease_duration_ms,
3071 most_recent_write_token,
3072 most_recent_write_upper: most_recent_write_upper
3073 .map_or_else(Antichain::new, Antichain::from_elem),
3074 debug,
3075 },
3076 )
3077 }
3078
3079 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3080 Strategy::prop_map(
3081 (
3082 any::<Vec<u8>>(),
3083 any::<Vec<u8>>(),
3084 any::<Vec<u8>>(),
3085 any::<Vec<u8>>(),
3086 ),
3087 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3088 key: Bytes::from(key),
3089 key_data_type: Bytes::from(key_data_type),
3090 val: Bytes::from(val),
3091 val_data_type: Bytes::from(val_data_type),
3092 },
3093 )
3094 }
3095
3096 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3097 num_trace_batches: Range<usize>,
3098 ) -> impl Strategy<Value = State<T>> {
3099 let part1 = (
3100 any::<ShardId>(),
3101 any::<SeqNo>(),
3102 any::<u64>(),
3103 any::<String>(),
3104 any::<SeqNo>(),
3105 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3106 proptest::option::of(any::<ActiveRollup>()),
3107 );
3108
3109 let part2 = (
3110 proptest::option::of(any::<ActiveGc>()),
3111 proptest::collection::btree_map(
3112 any::<LeasedReaderId>(),
3113 any_leased_reader_state::<T>(),
3114 1..3,
3115 ),
3116 proptest::collection::btree_map(
3117 any::<CriticalReaderId>(),
3118 any_critical_reader_state::<T>(),
3119 1..3,
3120 ),
3121 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3122 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3123 any_trace::<T>(num_trace_batches),
3124 );
3125
3126 (part1, part2).prop_map(
3127 |(
3128 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3129 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3130 )| State {
3131 shard_id,
3132 seqno,
3133 walltime_ms,
3134 hostname,
3135 collections: StateCollections {
3136 version: Version::new(1, 2, 3),
3137 last_gc_req,
3138 rollups,
3139 active_rollup,
3140 active_gc,
3141 leased_readers,
3142 critical_readers,
3143 writers,
3144 schemas,
3145 trace,
3146 },
3147 },
3148 )
3149 }
3150
3151 pub(crate) fn hollow<T: Timestamp>(
3152 lower: T,
3153 upper: T,
3154 keys: &[&str],
3155 len: usize,
3156 ) -> HollowBatch<T> {
3157 HollowBatch::new_run(
3158 Description::new(
3159 Antichain::from_elem(lower),
3160 Antichain::from_elem(upper),
3161 Antichain::from_elem(T::minimum()),
3162 ),
3163 keys.iter()
3164 .map(|x| {
3165 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3166 key: PartialBatchKey((*x).to_owned()),
3167 meta: Default::default(),
3168 encoded_size_bytes: 0,
3169 key_lower: vec![],
3170 structured_key_lower: None,
3171 stats: None,
3172 ts_rewrite: None,
3173 diffs_sum: None,
3174 format: None,
3175 schema_id: None,
3176 deprecated_schema_id: None,
3177 }))
3178 })
3179 .collect(),
3180 len,
3181 )
3182 }
3183
3184 #[mz_ore::test]
3185 fn downgrade_since() {
3186 let mut state = TypedState::<(), (), u64, i64>::new(
3187 DUMMY_BUILD_INFO.semver_version(),
3188 ShardId::new(),
3189 "".to_owned(),
3190 0,
3191 );
3192 let reader = LeasedReaderId::new();
3193 let seqno = SeqNo::minimum();
3194 let now = SYSTEM_TIME.clone();
3195 let _ = state.collections.register_leased_reader(
3196 "",
3197 &reader,
3198 "",
3199 seqno,
3200 Duration::from_secs(10),
3201 now(),
3202 false,
3203 );
3204
3205 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3207
3208 assert_eq!(
3210 state.collections.downgrade_since(
3211 &reader,
3212 seqno,
3213 seqno,
3214 &Antichain::from_elem(2),
3215 now()
3216 ),
3217 Continue(Since(Antichain::from_elem(2)))
3218 );
3219 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3220 assert_eq!(
3222 state.collections.downgrade_since(
3223 &reader,
3224 seqno,
3225 seqno,
3226 &Antichain::from_elem(2),
3227 now()
3228 ),
3229 Continue(Since(Antichain::from_elem(2)))
3230 );
3231 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3232 assert_eq!(
3234 state.collections.downgrade_since(
3235 &reader,
3236 seqno,
3237 seqno,
3238 &Antichain::from_elem(1),
3239 now()
3240 ),
3241 Continue(Since(Antichain::from_elem(2)))
3242 );
3243 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3244
3245 let reader2 = LeasedReaderId::new();
3247 let _ = state.collections.register_leased_reader(
3248 "",
3249 &reader2,
3250 "",
3251 seqno,
3252 Duration::from_secs(10),
3253 now(),
3254 false,
3255 );
3256
3257 assert_eq!(
3259 state.collections.downgrade_since(
3260 &reader2,
3261 seqno,
3262 seqno,
3263 &Antichain::from_elem(3),
3264 now()
3265 ),
3266 Continue(Since(Antichain::from_elem(3)))
3267 );
3268 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3269 assert_eq!(
3271 state.collections.downgrade_since(
3272 &reader,
3273 seqno,
3274 seqno,
3275 &Antichain::from_elem(5),
3276 now()
3277 ),
3278 Continue(Since(Antichain::from_elem(5)))
3279 );
3280 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3281
3282 assert_eq!(
3284 state.collections.expire_leased_reader(&reader),
3285 Continue(true)
3286 );
3287 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3288
3289 let reader3 = LeasedReaderId::new();
3291 let _ = state.collections.register_leased_reader(
3292 "",
3293 &reader3,
3294 "",
3295 seqno,
3296 Duration::from_secs(10),
3297 now(),
3298 false,
3299 );
3300
3301 assert_eq!(
3303 state.collections.downgrade_since(
3304 &reader3,
3305 seqno,
3306 seqno,
3307 &Antichain::from_elem(10),
3308 now()
3309 ),
3310 Continue(Since(Antichain::from_elem(10)))
3311 );
3312 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3313
3314 assert_eq!(
3316 state.collections.expire_leased_reader(&reader2),
3317 Continue(true)
3318 );
3319 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3324
3325 assert_eq!(
3327 state.collections.expire_leased_reader(&reader3),
3328 Continue(true)
3329 );
3330 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3335 }
3336
3337 #[mz_ore::test]
3338 fn compare_and_downgrade_since() {
3339 let mut state = TypedState::<(), (), u64, i64>::new(
3340 DUMMY_BUILD_INFO.semver_version(),
3341 ShardId::new(),
3342 "".to_owned(),
3343 0,
3344 );
3345 let reader = CriticalReaderId::new();
3346 let _ = state
3347 .collections
3348 .register_critical_reader::<u64>("", &reader, "");
3349
3350 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3352 assert_eq!(
3354 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3355 u64::initial()
3356 );
3357
3358 assert_eq!(
3360 state.collections.compare_and_downgrade_since::<u64>(
3361 &reader,
3362 &u64::initial(),
3363 (&1, &Antichain::from_elem(2)),
3364 ),
3365 Continue(Ok(Since(Antichain::from_elem(2))))
3366 );
3367 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3368 assert_eq!(
3369 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3370 1
3371 );
3372 assert_eq!(
3374 state.collections.compare_and_downgrade_since::<u64>(
3375 &reader,
3376 &1,
3377 (&2, &Antichain::from_elem(2)),
3378 ),
3379 Continue(Ok(Since(Antichain::from_elem(2))))
3380 );
3381 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3382 assert_eq!(
3383 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3384 2
3385 );
3386 assert_eq!(
3388 state.collections.compare_and_downgrade_since::<u64>(
3389 &reader,
3390 &2,
3391 (&3, &Antichain::from_elem(1)),
3392 ),
3393 Continue(Ok(Since(Antichain::from_elem(2))))
3394 );
3395 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3396 assert_eq!(
3397 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3398 3
3399 );
3400 }
3401
3402 #[mz_ore::test]
3403 fn compare_and_append() {
3404 let state = &mut TypedState::<String, String, u64, i64>::new(
3405 DUMMY_BUILD_INFO.semver_version(),
3406 ShardId::new(),
3407 "".to_owned(),
3408 0,
3409 )
3410 .collections;
3411
3412 let writer_id = WriterId::new();
3413 let now = SYSTEM_TIME.clone();
3414
3415 assert_eq!(state.trace.num_spine_batches(), 0);
3417 assert_eq!(state.trace.num_hollow_batches(), 0);
3418 assert_eq!(state.trace.num_updates(), 0);
3419
3420 assert_eq!(
3422 state.compare_and_append(
3423 &hollow(1, 2, &["key1"], 1),
3424 &writer_id,
3425 now(),
3426 LEASE_DURATION_MS,
3427 &IdempotencyToken::new(),
3428 &debug_state(),
3429 0,
3430 100,
3431 None
3432 ),
3433 Break(CompareAndAppendBreak::Upper {
3434 shard_upper: Antichain::from_elem(0),
3435 writer_upper: Antichain::from_elem(0)
3436 })
3437 );
3438
3439 assert!(
3441 state
3442 .compare_and_append(
3443 &hollow(0, 5, &[], 0),
3444 &writer_id,
3445 now(),
3446 LEASE_DURATION_MS,
3447 &IdempotencyToken::new(),
3448 &debug_state(),
3449 0,
3450 100,
3451 None
3452 )
3453 .is_continue()
3454 );
3455
3456 assert_eq!(
3458 state.compare_and_append(
3459 &hollow(5, 4, &["key1"], 1),
3460 &writer_id,
3461 now(),
3462 LEASE_DURATION_MS,
3463 &IdempotencyToken::new(),
3464 &debug_state(),
3465 0,
3466 100,
3467 None
3468 ),
3469 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3470 lower: Antichain::from_elem(5),
3471 upper: Antichain::from_elem(4)
3472 }))
3473 );
3474
3475 assert_eq!(
3477 state.compare_and_append(
3478 &hollow(5, 5, &["key1"], 1),
3479 &writer_id,
3480 now(),
3481 LEASE_DURATION_MS,
3482 &IdempotencyToken::new(),
3483 &debug_state(),
3484 0,
3485 100,
3486 None
3487 ),
3488 Break(CompareAndAppendBreak::InvalidUsage(
3489 InvalidEmptyTimeInterval {
3490 lower: Antichain::from_elem(5),
3491 upper: Antichain::from_elem(5),
3492 keys: vec!["key1".to_owned()],
3493 }
3494 ))
3495 );
3496
3497 assert!(
3499 state
3500 .compare_and_append(
3501 &hollow(5, 5, &[], 0),
3502 &writer_id,
3503 now(),
3504 LEASE_DURATION_MS,
3505 &IdempotencyToken::new(),
3506 &debug_state(),
3507 0,
3508 100,
3509 None
3510 )
3511 .is_continue()
3512 );
3513 }
3514
3515 #[mz_ore::test]
3516 fn snapshot() {
3517 let now = SYSTEM_TIME.clone();
3518
3519 let mut state = TypedState::<String, String, u64, i64>::new(
3520 DUMMY_BUILD_INFO.semver_version(),
3521 ShardId::new(),
3522 "".to_owned(),
3523 0,
3524 );
3525 assert_eq!(
3527 state.snapshot(&Antichain::from_elem(0)),
3528 Err(SnapshotErr::AsOfNotYetAvailable(
3529 SeqNo(0),
3530 Upper(Antichain::from_elem(0))
3531 ))
3532 );
3533
3534 assert_eq!(
3536 state.snapshot(&Antichain::from_elem(5)),
3537 Err(SnapshotErr::AsOfNotYetAvailable(
3538 SeqNo(0),
3539 Upper(Antichain::from_elem(0))
3540 ))
3541 );
3542
3543 let writer_id = WriterId::new();
3544
3545 assert!(
3547 state
3548 .collections
3549 .compare_and_append(
3550 &hollow(0, 5, &["key1"], 1),
3551 &writer_id,
3552 now(),
3553 LEASE_DURATION_MS,
3554 &IdempotencyToken::new(),
3555 &debug_state(),
3556 0,
3557 100,
3558 None
3559 )
3560 .is_continue()
3561 );
3562
3563 assert_eq!(
3565 state.snapshot(&Antichain::from_elem(0)),
3566 Ok(vec![hollow(0, 5, &["key1"], 1)])
3567 );
3568
3569 assert_eq!(
3571 state.snapshot(&Antichain::from_elem(4)),
3572 Ok(vec![hollow(0, 5, &["key1"], 1)])
3573 );
3574
3575 assert_eq!(
3577 state.snapshot(&Antichain::from_elem(5)),
3578 Err(SnapshotErr::AsOfNotYetAvailable(
3579 SeqNo(0),
3580 Upper(Antichain::from_elem(5))
3581 ))
3582 );
3583 assert_eq!(
3584 state.snapshot(&Antichain::from_elem(6)),
3585 Err(SnapshotErr::AsOfNotYetAvailable(
3586 SeqNo(0),
3587 Upper(Antichain::from_elem(5))
3588 ))
3589 );
3590
3591 let reader = LeasedReaderId::new();
3592 let _ = state.collections.register_leased_reader(
3594 "",
3595 &reader,
3596 "",
3597 SeqNo::minimum(),
3598 Duration::from_secs(10),
3599 now(),
3600 false,
3601 );
3602 assert_eq!(
3603 state.collections.downgrade_since(
3604 &reader,
3605 SeqNo::minimum(),
3606 SeqNo::minimum(),
3607 &Antichain::from_elem(2),
3608 now()
3609 ),
3610 Continue(Since(Antichain::from_elem(2)))
3611 );
3612 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3613 assert_eq!(
3615 state.snapshot(&Antichain::from_elem(1)),
3616 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3617 Antichain::from_elem(2)
3618 )))
3619 );
3620
3621 assert!(
3623 state
3624 .collections
3625 .compare_and_append(
3626 &hollow(5, 10, &[], 0),
3627 &writer_id,
3628 now(),
3629 LEASE_DURATION_MS,
3630 &IdempotencyToken::new(),
3631 &debug_state(),
3632 0,
3633 100,
3634 None
3635 )
3636 .is_continue()
3637 );
3638
3639 assert_eq!(
3641 state.snapshot(&Antichain::from_elem(7)),
3642 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3643 );
3644
3645 assert_eq!(
3647 state.snapshot(&Antichain::from_elem(10)),
3648 Err(SnapshotErr::AsOfNotYetAvailable(
3649 SeqNo(0),
3650 Upper(Antichain::from_elem(10))
3651 ))
3652 );
3653
3654 assert!(
3656 state
3657 .collections
3658 .compare_and_append(
3659 &hollow(10, 15, &["key2"], 1),
3660 &writer_id,
3661 now(),
3662 LEASE_DURATION_MS,
3663 &IdempotencyToken::new(),
3664 &debug_state(),
3665 0,
3666 100,
3667 None
3668 )
3669 .is_continue()
3670 );
3671
3672 assert_eq!(
3675 state.snapshot(&Antichain::from_elem(9)),
3676 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3677 );
3678
3679 assert_eq!(
3681 state.snapshot(&Antichain::from_elem(10)),
3682 Ok(vec![
3683 hollow(0, 5, &["key1"], 1),
3684 hollow(5, 10, &[], 0),
3685 hollow(10, 15, &["key2"], 1)
3686 ])
3687 );
3688
3689 assert_eq!(
3690 state.snapshot(&Antichain::from_elem(11)),
3691 Ok(vec![
3692 hollow(0, 5, &["key1"], 1),
3693 hollow(5, 10, &[], 0),
3694 hollow(10, 15, &["key2"], 1)
3695 ])
3696 );
3697 }
3698
3699 #[mz_ore::test]
3700 fn next_listen_batch() {
3701 let mut state = TypedState::<String, String, u64, i64>::new(
3702 DUMMY_BUILD_INFO.semver_version(),
3703 ShardId::new(),
3704 "".to_owned(),
3705 0,
3706 );
3707
3708 assert_eq!(
3711 state.next_listen_batch(&Antichain::from_elem(0)),
3712 Err(SeqNo(0))
3713 );
3714 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3715
3716 let writer_id = WriterId::new();
3717 let now = SYSTEM_TIME.clone();
3718
3719 assert!(
3721 state
3722 .collections
3723 .compare_and_append(
3724 &hollow(0, 5, &["key1"], 1),
3725 &writer_id,
3726 now(),
3727 LEASE_DURATION_MS,
3728 &IdempotencyToken::new(),
3729 &debug_state(),
3730 0,
3731 100,
3732 None
3733 )
3734 .is_continue()
3735 );
3736 assert!(
3737 state
3738 .collections
3739 .compare_and_append(
3740 &hollow(5, 10, &["key2"], 1),
3741 &writer_id,
3742 now(),
3743 LEASE_DURATION_MS,
3744 &IdempotencyToken::new(),
3745 &debug_state(),
3746 0,
3747 100,
3748 None
3749 )
3750 .is_continue()
3751 );
3752
3753 for t in 0..=4 {
3755 assert_eq!(
3756 state.next_listen_batch(&Antichain::from_elem(t)),
3757 Ok(hollow(0, 5, &["key1"], 1))
3758 );
3759 }
3760
3761 for t in 5..=9 {
3763 assert_eq!(
3764 state.next_listen_batch(&Antichain::from_elem(t)),
3765 Ok(hollow(5, 10, &["key2"], 1))
3766 );
3767 }
3768
3769 assert_eq!(
3771 state.next_listen_batch(&Antichain::from_elem(10)),
3772 Err(SeqNo(0))
3773 );
3774
3775 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3778 }
3779
3780 #[mz_ore::test]
3781 fn expire_writer() {
3782 let mut state = TypedState::<String, String, u64, i64>::new(
3783 DUMMY_BUILD_INFO.semver_version(),
3784 ShardId::new(),
3785 "".to_owned(),
3786 0,
3787 );
3788 let now = SYSTEM_TIME.clone();
3789
3790 let writer_id_one = WriterId::new();
3791
3792 let writer_id_two = WriterId::new();
3793
3794 assert!(
3796 state
3797 .collections
3798 .compare_and_append(
3799 &hollow(0, 2, &["key1"], 1),
3800 &writer_id_one,
3801 now(),
3802 LEASE_DURATION_MS,
3803 &IdempotencyToken::new(),
3804 &debug_state(),
3805 0,
3806 100,
3807 None
3808 )
3809 .is_continue()
3810 );
3811
3812 assert!(
3813 state
3814 .collections
3815 .expire_writer(&writer_id_one)
3816 .is_continue()
3817 );
3818
3819 assert!(
3821 state
3822 .collections
3823 .compare_and_append(
3824 &hollow(2, 5, &["key2"], 1),
3825 &writer_id_two,
3826 now(),
3827 LEASE_DURATION_MS,
3828 &IdempotencyToken::new(),
3829 &debug_state(),
3830 0,
3831 100,
3832 None
3833 )
3834 .is_continue()
3835 );
3836 }
3837
3838 #[mz_ore::test]
3839 fn maybe_gc_active_gc() {
3840 const GC_CONFIG: GcConfig = GcConfig {
3841 use_active_gc: true,
3842 fallback_threshold_ms: 5000,
3843 min_versions: 99,
3844 max_versions: 500,
3845 };
3846 let now_fn = SYSTEM_TIME.clone();
3847
3848 let mut state = TypedState::<String, String, u64, i64>::new(
3849 DUMMY_BUILD_INFO.semver_version(),
3850 ShardId::new(),
3851 "".to_owned(),
3852 0,
3853 );
3854
3855 let now = now_fn();
3856 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3858 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3859
3860 state.seqno = SeqNo(100);
3863 assert_eq!(state.seqno_since(), SeqNo(100));
3864
3865 let writer_id = WriterId::new();
3867 let _ = state.collections.compare_and_append(
3868 &hollow(1, 2, &["key1"], 1),
3869 &writer_id,
3870 now,
3871 LEASE_DURATION_MS,
3872 &IdempotencyToken::new(),
3873 &debug_state(),
3874 0,
3875 100,
3876 None,
3877 );
3878 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3879
3880 assert_eq!(
3882 state.maybe_gc(true, now, GC_CONFIG),
3883 Some(GcReq {
3884 shard_id: state.shard_id,
3885 new_seqno_since: SeqNo(100)
3886 })
3887 );
3888
3889 state.collections.active_gc = Some(ActiveGc {
3891 seqno: state.seqno,
3892 start_ms: now,
3893 });
3894
3895 state.seqno = SeqNo(200);
3896 assert_eq!(state.seqno_since(), SeqNo(200));
3897
3898 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3899
3900 state.seqno = SeqNo(300);
3901 assert_eq!(state.seqno_since(), SeqNo(300));
3902 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3904 assert_eq!(
3905 state.maybe_gc(true, new_now, GC_CONFIG),
3906 Some(GcReq {
3907 shard_id: state.shard_id,
3908 new_seqno_since: SeqNo(300)
3909 })
3910 );
3911
3912 state.seqno = SeqNo(301);
3916 assert_eq!(state.seqno_since(), SeqNo(301));
3917 assert_eq!(
3918 state.maybe_gc(true, new_now, GC_CONFIG),
3919 Some(GcReq {
3920 shard_id: state.shard_id,
3921 new_seqno_since: SeqNo(301)
3922 })
3923 );
3924
3925 state.collections.active_gc = None;
3926
3927 state.seqno = SeqNo(400);
3930 assert_eq!(state.seqno_since(), SeqNo(400));
3931
3932 let now = now_fn();
3933
3934 let _ = state.collections.expire_writer(&writer_id);
3936 assert_eq!(
3937 state.maybe_gc(false, now, GC_CONFIG),
3938 Some(GcReq {
3939 shard_id: state.shard_id,
3940 new_seqno_since: SeqNo(400)
3941 })
3942 );
3943
3944 let previous_seqno = state.seqno;
3946 state.seqno = SeqNo(10_000);
3947 assert_eq!(state.seqno_since(), SeqNo(10_000));
3948
3949 let now = now_fn();
3950 assert_eq!(
3951 state.maybe_gc(true, now, GC_CONFIG),
3952 Some(GcReq {
3953 shard_id: state.shard_id,
3954 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3955 })
3956 );
3957 }
3958
3959 #[mz_ore::test]
3960 fn maybe_gc_classic() {
3961 const GC_CONFIG: GcConfig = GcConfig {
3962 use_active_gc: false,
3963 fallback_threshold_ms: 5000,
3964 min_versions: 16,
3965 max_versions: 128,
3966 };
3967 const NOW_MS: u64 = 0;
3968
3969 let mut state = TypedState::<String, String, u64, i64>::new(
3970 DUMMY_BUILD_INFO.semver_version(),
3971 ShardId::new(),
3972 "".to_owned(),
3973 0,
3974 );
3975
3976 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3978 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3979
3980 state.seqno = SeqNo(100);
3983 assert_eq!(state.seqno_since(), SeqNo(100));
3984
3985 let writer_id = WriterId::new();
3987 let now = SYSTEM_TIME.clone();
3988 let _ = state.collections.compare_and_append(
3989 &hollow(1, 2, &["key1"], 1),
3990 &writer_id,
3991 now(),
3992 LEASE_DURATION_MS,
3993 &IdempotencyToken::new(),
3994 &debug_state(),
3995 0,
3996 100,
3997 None,
3998 );
3999 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4000
4001 assert_eq!(
4003 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4004 Some(GcReq {
4005 shard_id: state.shard_id,
4006 new_seqno_since: SeqNo(100)
4007 })
4008 );
4009
4010 state.seqno = SeqNo(200);
4013 assert_eq!(state.seqno_since(), SeqNo(200));
4014
4015 let _ = state.collections.expire_writer(&writer_id);
4017 assert_eq!(
4018 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4019 Some(GcReq {
4020 shard_id: state.shard_id,
4021 new_seqno_since: SeqNo(200)
4022 })
4023 );
4024 }
4025
4026 #[mz_ore::test]
4027 fn need_rollup_active_rollup() {
4028 const ROLLUP_THRESHOLD: usize = 3;
4029 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4030 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4031 let now = SYSTEM_TIME.clone();
4032
4033 mz_ore::test::init_logging();
4034 let mut state = TypedState::<String, String, u64, i64>::new(
4035 DUMMY_BUILD_INFO.semver_version(),
4036 ShardId::new(),
4037 "".to_owned(),
4038 0,
4039 );
4040
4041 let rollup_seqno = SeqNo(5);
4042 let rollup = HollowRollup {
4043 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4044 encoded_size_bytes: None,
4045 };
4046
4047 assert!(
4048 state
4049 .collections
4050 .add_rollup((rollup_seqno, &rollup))
4051 .is_continue()
4052 );
4053
4054 state.seqno = SeqNo(5);
4056 assert_none!(state.need_rollup(
4057 ROLLUP_THRESHOLD,
4058 ROLLUP_USE_ACTIVE_ROLLUP,
4059 ROLLUP_FALLBACK_THRESHOLD_MS,
4060 now()
4061 ));
4062
4063 state.seqno = SeqNo(6);
4065 assert_none!(state.need_rollup(
4066 ROLLUP_THRESHOLD,
4067 ROLLUP_USE_ACTIVE_ROLLUP,
4068 ROLLUP_FALLBACK_THRESHOLD_MS,
4069 now()
4070 ));
4071 state.seqno = SeqNo(7);
4072 assert_none!(state.need_rollup(
4073 ROLLUP_THRESHOLD,
4074 ROLLUP_USE_ACTIVE_ROLLUP,
4075 ROLLUP_FALLBACK_THRESHOLD_MS,
4076 now()
4077 ));
4078 state.seqno = SeqNo(8);
4079 assert_none!(state.need_rollup(
4080 ROLLUP_THRESHOLD,
4081 ROLLUP_USE_ACTIVE_ROLLUP,
4082 ROLLUP_FALLBACK_THRESHOLD_MS,
4083 now()
4084 ));
4085
4086 let mut current_time = now();
4087 state.seqno = SeqNo(9);
4089 assert_eq!(
4090 state
4091 .need_rollup(
4092 ROLLUP_THRESHOLD,
4093 ROLLUP_USE_ACTIVE_ROLLUP,
4094 ROLLUP_FALLBACK_THRESHOLD_MS,
4095 current_time
4096 )
4097 .expect("rollup"),
4098 SeqNo(9)
4099 );
4100
4101 state.collections.active_rollup = Some(ActiveRollup {
4102 seqno: SeqNo(9),
4103 start_ms: current_time,
4104 });
4105
4106 assert_none!(state.need_rollup(
4108 ROLLUP_THRESHOLD,
4109 ROLLUP_USE_ACTIVE_ROLLUP,
4110 ROLLUP_FALLBACK_THRESHOLD_MS,
4111 current_time
4112 ));
4113
4114 state.seqno = SeqNo(10);
4115 assert_none!(state.need_rollup(
4118 ROLLUP_THRESHOLD,
4119 ROLLUP_USE_ACTIVE_ROLLUP,
4120 ROLLUP_FALLBACK_THRESHOLD_MS,
4121 current_time
4122 ));
4123
4124 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4126 assert_eq!(
4127 state
4128 .need_rollup(
4129 ROLLUP_THRESHOLD,
4130 ROLLUP_USE_ACTIVE_ROLLUP,
4131 ROLLUP_FALLBACK_THRESHOLD_MS,
4132 current_time
4133 )
4134 .expect("rollup"),
4135 SeqNo(10)
4136 );
4137
4138 state.seqno = SeqNo(9);
4139 state.collections.active_rollup = None;
4141 let rollup_seqno = SeqNo(9);
4142 let rollup = HollowRollup {
4143 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4144 encoded_size_bytes: None,
4145 };
4146 assert!(
4147 state
4148 .collections
4149 .add_rollup((rollup_seqno, &rollup))
4150 .is_continue()
4151 );
4152
4153 state.seqno = SeqNo(11);
4154 assert_none!(state.need_rollup(
4156 ROLLUP_THRESHOLD,
4157 ROLLUP_USE_ACTIVE_ROLLUP,
4158 ROLLUP_FALLBACK_THRESHOLD_MS,
4159 current_time
4160 ));
4161 state.seqno = SeqNo(13);
4163 assert_eq!(
4164 state
4165 .need_rollup(
4166 ROLLUP_THRESHOLD,
4167 ROLLUP_USE_ACTIVE_ROLLUP,
4168 ROLLUP_FALLBACK_THRESHOLD_MS,
4169 current_time
4170 )
4171 .expect("rollup"),
4172 SeqNo(13)
4173 );
4174 }
4175
4176 #[mz_ore::test]
4177 fn need_rollup_classic() {
4178 const ROLLUP_THRESHOLD: usize = 3;
4179 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4180 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4181 const NOW: u64 = 0;
4182
4183 mz_ore::test::init_logging();
4184 let mut state = TypedState::<String, String, u64, i64>::new(
4185 DUMMY_BUILD_INFO.semver_version(),
4186 ShardId::new(),
4187 "".to_owned(),
4188 0,
4189 );
4190
4191 let rollup_seqno = SeqNo(5);
4192 let rollup = HollowRollup {
4193 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4194 encoded_size_bytes: None,
4195 };
4196
4197 assert!(
4198 state
4199 .collections
4200 .add_rollup((rollup_seqno, &rollup))
4201 .is_continue()
4202 );
4203
4204 state.seqno = SeqNo(5);
4206 assert_none!(state.need_rollup(
4207 ROLLUP_THRESHOLD,
4208 ROLLUP_USE_ACTIVE_ROLLUP,
4209 ROLLUP_FALLBACK_THRESHOLD_MS,
4210 NOW
4211 ));
4212
4213 state.seqno = SeqNo(6);
4215 assert_none!(state.need_rollup(
4216 ROLLUP_THRESHOLD,
4217 ROLLUP_USE_ACTIVE_ROLLUP,
4218 ROLLUP_FALLBACK_THRESHOLD_MS,
4219 NOW
4220 ));
4221 state.seqno = SeqNo(7);
4222 assert_none!(state.need_rollup(
4223 ROLLUP_THRESHOLD,
4224 ROLLUP_USE_ACTIVE_ROLLUP,
4225 ROLLUP_FALLBACK_THRESHOLD_MS,
4226 NOW
4227 ));
4228
4229 state.seqno = SeqNo(8);
4231 assert_eq!(
4232 state
4233 .need_rollup(
4234 ROLLUP_THRESHOLD,
4235 ROLLUP_USE_ACTIVE_ROLLUP,
4236 ROLLUP_FALLBACK_THRESHOLD_MS,
4237 NOW
4238 )
4239 .expect("rollup"),
4240 SeqNo(8)
4241 );
4242
4243 state.seqno = SeqNo(9);
4245 assert_none!(state.need_rollup(
4246 ROLLUP_THRESHOLD,
4247 ROLLUP_USE_ACTIVE_ROLLUP,
4248 ROLLUP_FALLBACK_THRESHOLD_MS,
4249 NOW
4250 ));
4251
4252 state.seqno = SeqNo(11);
4254 assert_eq!(
4255 state
4256 .need_rollup(
4257 ROLLUP_THRESHOLD,
4258 ROLLUP_USE_ACTIVE_ROLLUP,
4259 ROLLUP_FALLBACK_THRESHOLD_MS,
4260 NOW
4261 )
4262 .expect("rollup"),
4263 SeqNo(11)
4264 );
4265
4266 let rollup_seqno = SeqNo(6);
4268 let rollup = HollowRollup {
4269 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4270 encoded_size_bytes: None,
4271 };
4272 assert!(
4273 state
4274 .collections
4275 .add_rollup((rollup_seqno, &rollup))
4276 .is_continue()
4277 );
4278
4279 state.seqno = SeqNo(8);
4280 assert_none!(state.need_rollup(
4281 ROLLUP_THRESHOLD,
4282 ROLLUP_USE_ACTIVE_ROLLUP,
4283 ROLLUP_FALLBACK_THRESHOLD_MS,
4284 NOW
4285 ));
4286 state.seqno = SeqNo(9);
4287 assert_eq!(
4288 state
4289 .need_rollup(
4290 ROLLUP_THRESHOLD,
4291 ROLLUP_USE_ACTIVE_ROLLUP,
4292 ROLLUP_FALLBACK_THRESHOLD_MS,
4293 NOW
4294 )
4295 .expect("rollup"),
4296 SeqNo(9)
4297 );
4298
4299 let fallback_seqno = SeqNo(
4301 rollup_seqno.0
4302 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4303 );
4304 state.seqno = fallback_seqno;
4305 assert_eq!(
4306 state
4307 .need_rollup(
4308 ROLLUP_THRESHOLD,
4309 ROLLUP_USE_ACTIVE_ROLLUP,
4310 ROLLUP_FALLBACK_THRESHOLD_MS,
4311 NOW
4312 )
4313 .expect("rollup"),
4314 fallback_seqno
4315 );
4316 state.seqno = fallback_seqno.next();
4317 assert_eq!(
4318 state
4319 .need_rollup(
4320 ROLLUP_THRESHOLD,
4321 ROLLUP_USE_ACTIVE_ROLLUP,
4322 ROLLUP_FALLBACK_THRESHOLD_MS,
4323 NOW
4324 )
4325 .expect("rollup"),
4326 fallback_seqno.next()
4327 );
4328 }
4329
4330 #[mz_ore::test]
4331 fn idempotency_token_sentinel() {
4332 assert_eq!(
4333 IdempotencyToken::SENTINEL.to_string(),
4334 "i11111111-1111-1111-1111-111111111111"
4335 );
4336 }
4337
4338 #[mz_ore::test]
4347 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4349 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4350 let mut runner = proptest::test_runner::TestRunner::deterministic();
4351 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4352 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4353 assert_eq!(
4354 json.trim(),
4355 STATE_SERDE_JSON.trim(),
4356 "\n\nNEW GOLDEN\n{}\n",
4357 json
4358 );
4359 }
4360
4361 #[mz_persist_proc::test(tokio::test)]
4362 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4364 let mut clients = new_test_client_cache(&dyncfgs);
4365 let shard_id = ShardId::new();
4366
4367 async fn open_and_write(
4368 clients: &mut PersistClientCache,
4369 version: semver::Version,
4370 shard_id: ShardId,
4371 ) -> Result<(), tokio::task::JoinError> {
4372 clients.cfg.build_version = version.clone();
4373 clients.clear_state_cache();
4374 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4375 mz_ore::task::spawn(|| version.to_string(), async move {
4377 let () = client
4378 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4379 .await
4380 .expect("valid usage");
4381 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4382 let current = *write.upper().as_option().unwrap();
4383 write
4385 .expect_compare_and_append_batch(&mut [], current, current + 1)
4386 .await;
4387 })
4388 .into_tokio_handle()
4389 .await
4390 }
4391
4392 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4394 assert_ok!(res);
4395
4396 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4398 assert_ok!(res);
4399
4400 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4402 assert!(res.unwrap_err().is_panic());
4403
4404 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4406 assert!(res.unwrap_err().is_panic());
4407 }
4408
4409 #[mz_ore::test]
4410 fn runid_roundtrip() {
4411 proptest!(|(runid: RunId)| {
4412 let runid_str = runid.to_string();
4413 let parsed = RunId::from_str(&runid_str);
4414 prop_assert_eq!(parsed, Ok(runid));
4415 });
4416 }
4417}