1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64, Opaque};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::CriticalReaderId;
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75 env!("OUT_DIR"),
76 "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80 env!("OUT_DIR"),
81 "/mz_persist_client.internal.diff.rs"
82));
83
84pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92 "persist_rollup_threshold",
93 128,
94 "The number of seqnos between rollups.",
95);
96
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100 "persist_rollup_fallback_threshold_ms",
101 5000,
102 "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108 "persist_rollup_use_active_rollup",
109 false,
110 "Whether to use the new active rollup tracking mechanism.",
111);
112
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116 "persist_gc_fallback_threshold_ms",
117 900000,
118 "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123 "persist_gc_min_versions",
124 32,
125 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130 "persist_gc_max_versions",
131 128_000,
132 "The maximum number of versions to GC in a single GC run.",
133);
134
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138 "persist_gc_use_active_gc",
139 false,
140 "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144 "persist_enable_incremental_compaction",
145 false,
146 "Whether to enable incremental compaction.",
147);
148
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 write!(f, "i{}", Uuid::from_bytes(self.0))
158 }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164 }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168 type Err = String;
169
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172 }
173}
174
175impl From<IdempotencyToken> for String {
176 fn from(x: IdempotencyToken) -> Self {
177 x.to_string()
178 }
179}
180
181impl IdempotencyToken {
182 pub(crate) fn new() -> Self {
183 IdempotencyToken(*Uuid::new_v4().as_bytes())
184 }
185 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190 pub seqno: SeqNo,
192 pub since: Antichain<T>,
194 pub last_heartbeat_timestamp_ms: u64,
196 pub lease_duration_ms: u64,
199 pub debug: HandleDebugState,
201}
202
203#[derive(Arbitrary, Clone, Debug, PartialEq, Serialize)]
204#[serde(into = "u64")]
205pub struct OpaqueState(pub [u8; 8]);
206
207impl From<OpaqueState> for u64 {
208 fn from(value: OpaqueState) -> Self {
209 u64::from_le_bytes(value.0)
210 }
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct CriticalReaderState<T> {
215 pub since: Antichain<T>,
217 pub opaque: OpaqueState,
219 pub opaque_codec: String,
221 pub debug: HandleDebugState,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize)]
226pub struct WriterState<T> {
227 pub last_heartbeat_timestamp_ms: u64,
229 pub lease_duration_ms: u64,
232 pub most_recent_write_token: IdempotencyToken,
235 pub most_recent_write_upper: Antichain<T>,
238 pub debug: HandleDebugState,
240}
241
242#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
244pub struct HandleDebugState {
245 pub hostname: String,
248 pub purpose: String,
250}
251
252#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
256#[serde(tag = "type")]
257pub enum BatchPart<T> {
258 Hollow(HollowBatchPart<T>),
259 Inline {
260 updates: LazyInlineBatchPart,
261 ts_rewrite: Option<Antichain<T>>,
262 schema_id: Option<SchemaId>,
263
264 deprecated_schema_id: Option<SchemaId>,
266 },
267}
268
269fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
270 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
271 let proto = lower.decode()?;
272 let data = ArrayData::from_proto(proto)?;
273 ensure!(data.len() == 1);
274 Ok(ArrayBound::new(make_array(data), 0))
275 };
276
277 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
278
279 match decoded {
280 Ok(bound) => Some(bound),
281 Err(e) => {
282 soft_panic_or_log!("failed to decode bound: {e:#?}");
283 None
284 }
285 }
286}
287
288impl<T> BatchPart<T> {
289 pub fn hollow_bytes(&self) -> usize {
290 match self {
291 BatchPart::Hollow(x) => x.encoded_size_bytes,
292 BatchPart::Inline { .. } => 0,
293 }
294 }
295
296 pub fn is_inline(&self) -> bool {
297 matches!(self, BatchPart::Inline { .. })
298 }
299
300 pub fn inline_bytes(&self) -> usize {
301 match self {
302 BatchPart::Hollow(_) => 0,
303 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
304 }
305 }
306
307 pub fn writer_key(&self) -> Option<WriterKey> {
308 match self {
309 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
310 BatchPart::Inline { .. } => None,
311 }
312 }
313
314 pub fn encoded_size_bytes(&self) -> usize {
315 match self {
316 BatchPart::Hollow(x) => x.encoded_size_bytes,
317 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
318 }
319 }
320
321 pub fn printable_name(&self) -> &str {
324 match self {
325 BatchPart::Hollow(x) => x.key.0.as_str(),
326 BatchPart::Inline { .. } => "<inline>",
327 }
328 }
329
330 pub fn stats(&self) -> Option<&LazyPartStats> {
331 match self {
332 BatchPart::Hollow(x) => x.stats.as_ref(),
333 BatchPart::Inline { .. } => None,
334 }
335 }
336
337 pub fn key_lower(&self) -> &[u8] {
338 match self {
339 BatchPart::Hollow(x) => x.key_lower.as_slice(),
340 BatchPart::Inline { .. } => &[],
347 }
348 }
349
350 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
351 let part = match self {
352 BatchPart::Hollow(part) => part,
353 BatchPart::Inline { .. } => return None,
354 };
355
356 decode_structured_lower(part.structured_key_lower.as_ref()?)
357 }
358
359 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
360 match self {
361 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
362 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
363 }
364 }
365
366 pub fn schema_id(&self) -> Option<SchemaId> {
367 match self {
368 BatchPart::Hollow(x) => x.schema_id,
369 BatchPart::Inline { schema_id, .. } => *schema_id,
370 }
371 }
372
373 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
374 match self {
375 BatchPart::Hollow(x) => x.deprecated_schema_id,
376 BatchPart::Inline {
377 deprecated_schema_id,
378 ..
379 } => *deprecated_schema_id,
380 }
381 }
382}
383
384impl<T: Timestamp + Codec64> BatchPart<T> {
385 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
386 match self {
387 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
388 BatchPart::Inline { updates, .. } => {
389 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
390 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
391 }
392 }
393 }
394
395 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
396 match self {
397 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
398 BatchPart::Inline { updates, .. } => Some(
399 updates
400 .decode::<T>(metrics)
401 .expect("valid inline part")
402 .updates
403 .diffs_sum(),
404 ),
405 }
406 }
407}
408
409#[derive(Debug, Clone)]
411pub struct HollowRun<T> {
412 pub(crate) parts: Vec<RunPart<T>>,
414}
415
416#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
419pub struct HollowRunRef<T> {
420 pub key: PartialBatchKey,
421
422 pub hollow_bytes: usize,
424
425 pub max_part_bytes: usize,
427
428 pub key_lower: Vec<u8>,
430
431 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
433
434 pub diffs_sum: Option<[u8; 8]>,
435
436 pub(crate) _phantom_data: PhantomData<T>,
437}
438impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
439 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
440 Some(self.cmp(other))
441 }
442}
443
444impl<T: Eq> Ord for HollowRunRef<T> {
445 fn cmp(&self, other: &Self) -> Ordering {
446 self.key.cmp(&other.key)
447 }
448}
449
450impl<T> HollowRunRef<T> {
451 pub fn writer_key(&self) -> Option<WriterKey> {
452 Some(self.key.split()?.0)
453 }
454}
455
456impl<T: Timestamp + Codec64> HollowRunRef<T> {
457 pub async fn set<D: Codec64 + Monoid>(
459 shard_id: ShardId,
460 blob: &dyn Blob,
461 writer: &WriterKey,
462 data: HollowRun<T>,
463 metrics: &Metrics,
464 ) -> Self {
465 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
466 let max_part_bytes = data
467 .parts
468 .iter()
469 .map(|p| p.max_part_bytes())
470 .max()
471 .unwrap_or(0);
472 let key_lower = data
473 .parts
474 .first()
475 .map_or(vec![], |p| p.key_lower().to_vec());
476 let structured_key_lower = match data.parts.first() {
477 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
478 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
479 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
480 };
481 let diffs_sum = data
482 .parts
483 .iter()
484 .map(|p| {
485 p.diffs_sum::<D>(&metrics.columnar)
486 .expect("valid diffs sum")
487 })
488 .reduce(|mut a, b| {
489 a.plus_equals(&b);
490 a
491 })
492 .expect("valid diffs sum")
493 .encode();
494
495 let key = PartialBatchKey::new(writer, &PartId::new());
496 let blob_key = key.complete(&shard_id);
497 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
498 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
499 blob.set(&blob_key, bytes.clone())
500 })
501 .await;
502 Self {
503 key,
504 hollow_bytes,
505 max_part_bytes,
506 key_lower,
507 structured_key_lower,
508 diffs_sum: Some(diffs_sum),
509 _phantom_data: Default::default(),
510 }
511 }
512
513 pub async fn get(
517 &self,
518 shard_id: ShardId,
519 blob: &dyn Blob,
520 metrics: &Metrics,
521 ) -> Option<HollowRun<T>> {
522 let blob_key = self.key.complete(&shard_id);
523 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
524 blob.get(&blob_key)
525 })
526 .await?;
527 let proto_runs: ProtoHollowRun =
528 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
529 let runs = proto_runs
530 .into_rust()
531 .expect("illegal state: invalid encoded runs proto");
532 Some(runs)
533 }
534}
535
536#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
540#[serde(untagged)]
541pub enum RunPart<T> {
542 Single(BatchPart<T>),
543 Many(HollowRunRef<T>),
544}
545
546impl<T: Ord> PartialOrd<Self> for RunPart<T> {
547 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
548 Some(self.cmp(other))
549 }
550}
551
552impl<T: Ord> Ord for RunPart<T> {
553 fn cmp(&self, other: &Self) -> Ordering {
554 match (self, other) {
555 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
556 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
557 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
558 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
559 }
560 }
561}
562
563impl<T> RunPart<T> {
564 #[cfg(test)]
565 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
566 match self {
567 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
568 _ => panic!("expected hollow part!"),
569 }
570 }
571
572 pub fn hollow_bytes(&self) -> usize {
573 match self {
574 Self::Single(p) => p.hollow_bytes(),
575 Self::Many(r) => r.hollow_bytes,
576 }
577 }
578
579 pub fn is_inline(&self) -> bool {
580 match self {
581 Self::Single(p) => p.is_inline(),
582 Self::Many(_) => false,
583 }
584 }
585
586 pub fn inline_bytes(&self) -> usize {
587 match self {
588 Self::Single(p) => p.inline_bytes(),
589 Self::Many(_) => 0,
590 }
591 }
592
593 pub fn max_part_bytes(&self) -> usize {
594 match self {
595 Self::Single(p) => p.encoded_size_bytes(),
596 Self::Many(r) => r.max_part_bytes,
597 }
598 }
599
600 pub fn writer_key(&self) -> Option<WriterKey> {
601 match self {
602 Self::Single(p) => p.writer_key(),
603 Self::Many(r) => r.writer_key(),
604 }
605 }
606
607 pub fn encoded_size_bytes(&self) -> usize {
608 match self {
609 Self::Single(p) => p.encoded_size_bytes(),
610 Self::Many(r) => r.hollow_bytes,
611 }
612 }
613
614 pub fn schema_id(&self) -> Option<SchemaId> {
615 match self {
616 Self::Single(p) => p.schema_id(),
617 Self::Many(_) => None,
618 }
619 }
620
621 pub fn printable_name(&self) -> &str {
624 match self {
625 Self::Single(p) => p.printable_name(),
626 Self::Many(r) => r.key.0.as_str(),
627 }
628 }
629
630 pub fn stats(&self) -> Option<&LazyPartStats> {
631 match self {
632 Self::Single(p) => p.stats(),
633 Self::Many(_) => None,
635 }
636 }
637
638 pub fn key_lower(&self) -> &[u8] {
639 match self {
640 Self::Single(p) => p.key_lower(),
641 Self::Many(r) => r.key_lower.as_slice(),
642 }
643 }
644
645 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
646 match self {
647 Self::Single(p) => p.structured_key_lower(),
648 Self::Many(_) => None,
649 }
650 }
651
652 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
653 match self {
654 Self::Single(p) => p.ts_rewrite(),
655 Self::Many(_) => None,
656 }
657 }
658}
659
660impl<T> RunPart<T>
661where
662 T: Timestamp + Codec64,
663{
664 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
665 match self {
666 Self::Single(p) => p.diffs_sum(metrics),
667 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
668 }
669 }
670}
671
672#[derive(Clone, Debug)]
674pub struct MissingBlob(BlobKey);
675
676impl std::fmt::Display for MissingBlob {
677 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
678 write!(f, "unexpectedly missing key: {}", self.0)
679 }
680}
681
682impl std::error::Error for MissingBlob {}
683
684impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
685 pub fn part_stream<'a>(
686 &'a self,
687 shard_id: ShardId,
688 blob: &'a dyn Blob,
689 metrics: &'a Metrics,
690 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
691 try_stream! {
692 match self {
693 RunPart::Single(p) => {
694 yield Cow::Borrowed(p);
695 }
696 RunPart::Many(r) => {
697 let fetched = r.get(shard_id, blob, metrics).await.ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
698 for run_part in fetched.parts {
699 for await batch_part in run_part.part_stream(shard_id, blob, metrics).boxed() {
700 yield Cow::Owned(batch_part?.into_owned());
701 }
702 }
703 }
704 }
705 }
706 }
707}
708
709impl<T: Ord> PartialOrd for BatchPart<T> {
710 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
711 Some(self.cmp(other))
712 }
713}
714
715impl<T: Ord> Ord for BatchPart<T> {
716 fn cmp(&self, other: &Self) -> Ordering {
717 match (self, other) {
718 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
719 (
720 BatchPart::Inline {
721 updates: s_updates,
722 ts_rewrite: s_ts_rewrite,
723 schema_id: s_schema_id,
724 deprecated_schema_id: s_deprecated_schema_id,
725 },
726 BatchPart::Inline {
727 updates: o_updates,
728 ts_rewrite: o_ts_rewrite,
729 schema_id: o_schema_id,
730 deprecated_schema_id: o_deprecated_schema_id,
731 },
732 ) => (
733 s_updates,
734 s_ts_rewrite.as_ref().map(|x| x.elements()),
735 s_schema_id,
736 s_deprecated_schema_id,
737 )
738 .cmp(&(
739 o_updates,
740 o_ts_rewrite.as_ref().map(|x| x.elements()),
741 o_schema_id,
742 o_deprecated_schema_id,
743 )),
744 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
745 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
746 }
747 }
748}
749
750#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
752pub(crate) enum RunOrder {
753 Unordered,
755 Codec,
757 Structured,
759}
760
761#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
762pub struct RunId(pub(crate) [u8; 16]);
763
764impl std::fmt::Display for RunId {
765 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
766 write!(f, "ri{}", Uuid::from_bytes(self.0))
767 }
768}
769
770impl std::fmt::Debug for RunId {
771 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
772 write!(f, "RunId({})", Uuid::from_bytes(self.0))
773 }
774}
775
776impl std::str::FromStr for RunId {
777 type Err = String;
778
779 fn from_str(s: &str) -> Result<Self, Self::Err> {
780 parse_id("ri", "RunId", s).map(RunId)
781 }
782}
783
784impl From<RunId> for String {
785 fn from(x: RunId) -> Self {
786 x.to_string()
787 }
788}
789
790impl RunId {
791 pub(crate) fn new() -> Self {
792 RunId(*Uuid::new_v4().as_bytes())
793 }
794}
795
796impl Arbitrary for RunId {
797 type Parameters = ();
798 type Strategy = proptest::strategy::BoxedStrategy<Self>;
799 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
800 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
801 RunId(*Uuid::from_u128(n).as_bytes())
802 })
803 .boxed()
804 }
805}
806
807#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
809pub struct RunMeta {
810 pub(crate) order: Option<RunOrder>,
812 pub(crate) schema: Option<SchemaId>,
814
815 pub(crate) deprecated_schema: Option<SchemaId>,
817
818 pub(crate) id: Option<RunId>,
820
821 pub(crate) len: Option<usize>,
823
824 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
826 pub(crate) meta: MetadataMap,
827}
828
829#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
831pub struct HollowBatchPart<T> {
832 pub key: PartialBatchKey,
834 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
836 pub meta: MetadataMap,
837 pub encoded_size_bytes: usize,
839 #[serde(serialize_with = "serialize_part_bytes")]
842 pub key_lower: Vec<u8>,
843 #[serde(serialize_with = "serialize_lazy_proto")]
845 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
846 #[serde(serialize_with = "serialize_part_stats")]
848 pub stats: Option<LazyPartStats>,
849 pub ts_rewrite: Option<Antichain<T>>,
857 #[serde(serialize_with = "serialize_diffs_sum")]
865 pub diffs_sum: Option<[u8; 8]>,
866 pub format: Option<BatchColumnarFormat>,
871 pub schema_id: Option<SchemaId>,
876
877 pub deprecated_schema_id: Option<SchemaId>,
879}
880
881#[derive(Clone, PartialEq, Eq)]
885pub struct HollowBatch<T> {
886 pub desc: Description<T>,
888 pub len: usize,
890 pub(crate) parts: Vec<RunPart<T>>,
892 pub(crate) run_splits: Vec<usize>,
900 pub(crate) run_meta: Vec<RunMeta>,
903}
904
905impl<T: Debug> Debug for HollowBatch<T> {
906 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
907 let HollowBatch {
908 desc,
909 parts,
910 len,
911 run_splits: runs,
912 run_meta,
913 } = self;
914 f.debug_struct("HollowBatch")
915 .field(
916 "desc",
917 &(
918 desc.lower().elements(),
919 desc.upper().elements(),
920 desc.since().elements(),
921 ),
922 )
923 .field("parts", &parts)
924 .field("len", &len)
925 .field("runs", &runs)
926 .field("run_meta", &run_meta)
927 .finish()
928 }
929}
930
931impl<T: Serialize> serde::Serialize for HollowBatch<T> {
932 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
933 let HollowBatch {
934 desc,
935 len,
936 parts: _,
938 run_splits: _,
939 run_meta: _,
940 } = self;
941 let mut s = s.serialize_struct("HollowBatch", 5)?;
942 let () = s.serialize_field("lower", &desc.lower().elements())?;
943 let () = s.serialize_field("upper", &desc.upper().elements())?;
944 let () = s.serialize_field("since", &desc.since().elements())?;
945 let () = s.serialize_field("len", len)?;
946 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
947 s.end()
948 }
949}
950
951impl<T: Ord> PartialOrd for HollowBatch<T> {
952 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
953 Some(self.cmp(other))
954 }
955}
956
957impl<T: Ord> Ord for HollowBatch<T> {
958 fn cmp(&self, other: &Self) -> Ordering {
959 let HollowBatch {
962 desc: self_desc,
963 parts: self_parts,
964 len: self_len,
965 run_splits: self_runs,
966 run_meta: self_run_meta,
967 } = self;
968 let HollowBatch {
969 desc: other_desc,
970 parts: other_parts,
971 len: other_len,
972 run_splits: other_runs,
973 run_meta: other_run_meta,
974 } = other;
975 (
976 self_desc.lower().elements(),
977 self_desc.upper().elements(),
978 self_desc.since().elements(),
979 self_parts,
980 self_len,
981 self_runs,
982 self_run_meta,
983 )
984 .cmp(&(
985 other_desc.lower().elements(),
986 other_desc.upper().elements(),
987 other_desc.since().elements(),
988 other_parts,
989 other_len,
990 other_runs,
991 other_run_meta,
992 ))
993 }
994}
995
996impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
997 pub(crate) fn part_stream<'a>(
998 &'a self,
999 shard_id: ShardId,
1000 blob: &'a dyn Blob,
1001 metrics: &'a Metrics,
1002 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
1003 stream! {
1004 for part in &self.parts {
1005 for await part in part.part_stream(shard_id, blob, metrics) {
1006 yield part;
1007 }
1008 }
1009 }
1010 }
1011}
1012impl<T> HollowBatch<T> {
1013 pub(crate) fn new(
1020 desc: Description<T>,
1021 parts: Vec<RunPart<T>>,
1022 len: usize,
1023 run_meta: Vec<RunMeta>,
1024 run_splits: Vec<usize>,
1025 ) -> Self {
1026 debug_assert!(
1027 run_splits.is_strictly_sorted(),
1028 "run indices should be strictly increasing"
1029 );
1030 debug_assert!(
1031 run_splits.first().map_or(true, |i| *i > 0),
1032 "run indices should be positive"
1033 );
1034 debug_assert!(
1035 run_splits.last().map_or(true, |i| *i < parts.len()),
1036 "run indices should be valid indices into parts"
1037 );
1038 debug_assert!(
1039 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1040 "all metadata should correspond to a run"
1041 );
1042
1043 Self {
1044 desc,
1045 len,
1046 parts,
1047 run_splits,
1048 run_meta,
1049 }
1050 }
1051
1052 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1054 let run_meta = if parts.is_empty() {
1055 vec![]
1056 } else {
1057 vec![RunMeta::default()]
1058 };
1059 Self {
1060 desc,
1061 len,
1062 parts,
1063 run_splits: vec![],
1064 run_meta,
1065 }
1066 }
1067
1068 #[cfg(test)]
1069 pub(crate) fn new_run_for_test(
1070 desc: Description<T>,
1071 parts: Vec<RunPart<T>>,
1072 len: usize,
1073 run_id: RunId,
1074 ) -> Self {
1075 let run_meta = if parts.is_empty() {
1076 vec![]
1077 } else {
1078 let mut meta = RunMeta::default();
1079 meta.id = Some(run_id);
1080 vec![meta]
1081 };
1082 Self {
1083 desc,
1084 len,
1085 parts,
1086 run_splits: vec![],
1087 run_meta,
1088 }
1089 }
1090
1091 pub(crate) fn empty(desc: Description<T>) -> Self {
1093 Self {
1094 desc,
1095 len: 0,
1096 parts: vec![],
1097 run_splits: vec![],
1098 run_meta: vec![],
1099 }
1100 }
1101
1102 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1103 let run_ends = self
1104 .run_splits
1105 .iter()
1106 .copied()
1107 .chain(std::iter::once(self.parts.len()));
1108 let run_metas = self.run_meta.iter();
1109 let run_parts = run_ends
1110 .scan(0, |start, end| {
1111 let range = *start..end;
1112 *start = end;
1113 Some(range)
1114 })
1115 .filter(|range| !range.is_empty())
1116 .map(|range| &self.parts[range]);
1117 run_metas.zip_eq(run_parts)
1118 }
1119
1120 pub(crate) fn inline_bytes(&self) -> usize {
1121 self.parts.iter().map(|x| x.inline_bytes()).sum()
1122 }
1123
1124 pub(crate) fn is_empty(&self) -> bool {
1125 self.parts.is_empty()
1126 }
1127
1128 pub(crate) fn part_count(&self) -> usize {
1129 self.parts.len()
1130 }
1131
1132 pub fn encoded_size_bytes(&self) -> usize {
1134 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1135 }
1136}
1137
1138impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1140 pub(crate) fn rewrite_ts(
1141 &mut self,
1142 frontier: &Antichain<T>,
1143 new_upper: Antichain<T>,
1144 ) -> Result<(), String> {
1145 if !PartialOrder::less_than(frontier, &new_upper) {
1146 return Err(format!(
1147 "rewrite frontier {:?} !< rewrite upper {:?}",
1148 frontier.elements(),
1149 new_upper.elements(),
1150 ));
1151 }
1152 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1153 return Err(format!(
1154 "rewrite upper {:?} < batch upper {:?}",
1155 new_upper.elements(),
1156 self.desc.upper().elements(),
1157 ));
1158 }
1159
1160 if PartialOrder::less_than(frontier, self.desc.lower()) {
1163 return Err(format!(
1164 "rewrite frontier {:?} < batch lower {:?}",
1165 frontier.elements(),
1166 self.desc.lower().elements(),
1167 ));
1168 }
1169 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1170 return Err(format!(
1171 "batch since {:?} != minimum antichain {:?}",
1172 self.desc.since().elements(),
1173 &[T::minimum()],
1174 ));
1175 }
1176 for part in self.parts.iter() {
1177 let Some(ts_rewrite) = part.ts_rewrite() else {
1178 continue;
1179 };
1180 if PartialOrder::less_than(frontier, ts_rewrite) {
1181 return Err(format!(
1182 "rewrite frontier {:?} < batch rewrite {:?}",
1183 frontier.elements(),
1184 ts_rewrite.elements(),
1185 ));
1186 }
1187 }
1188
1189 self.desc = Description::new(
1190 self.desc.lower().clone(),
1191 new_upper,
1192 self.desc.since().clone(),
1193 );
1194 for part in &mut self.parts {
1195 match part {
1196 RunPart::Single(BatchPart::Hollow(part)) => {
1197 part.ts_rewrite = Some(frontier.clone())
1198 }
1199 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1200 *ts_rewrite = Some(frontier.clone())
1201 }
1202 RunPart::Many(runs) => {
1203 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1206 }
1207 }
1208 }
1209 Ok(())
1210 }
1211}
1212
1213impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1214 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1215 Some(self.cmp(other))
1216 }
1217}
1218
1219impl<T: Ord> Ord for HollowBatchPart<T> {
1220 fn cmp(&self, other: &Self) -> Ordering {
1221 let HollowBatchPart {
1224 key: self_key,
1225 meta: self_meta,
1226 encoded_size_bytes: self_encoded_size_bytes,
1227 key_lower: self_key_lower,
1228 structured_key_lower: self_structured_key_lower,
1229 stats: self_stats,
1230 ts_rewrite: self_ts_rewrite,
1231 diffs_sum: self_diffs_sum,
1232 format: self_format,
1233 schema_id: self_schema_id,
1234 deprecated_schema_id: self_deprecated_schema_id,
1235 } = self;
1236 let HollowBatchPart {
1237 key: other_key,
1238 meta: other_meta,
1239 encoded_size_bytes: other_encoded_size_bytes,
1240 key_lower: other_key_lower,
1241 structured_key_lower: other_structured_key_lower,
1242 stats: other_stats,
1243 ts_rewrite: other_ts_rewrite,
1244 diffs_sum: other_diffs_sum,
1245 format: other_format,
1246 schema_id: other_schema_id,
1247 deprecated_schema_id: other_deprecated_schema_id,
1248 } = other;
1249 (
1250 self_key,
1251 self_meta,
1252 self_encoded_size_bytes,
1253 self_key_lower,
1254 self_structured_key_lower,
1255 self_stats,
1256 self_ts_rewrite.as_ref().map(|x| x.elements()),
1257 self_diffs_sum,
1258 self_format,
1259 self_schema_id,
1260 self_deprecated_schema_id,
1261 )
1262 .cmp(&(
1263 other_key,
1264 other_meta,
1265 other_encoded_size_bytes,
1266 other_key_lower,
1267 other_structured_key_lower,
1268 other_stats,
1269 other_ts_rewrite.as_ref().map(|x| x.elements()),
1270 other_diffs_sum,
1271 other_format,
1272 other_schema_id,
1273 other_deprecated_schema_id,
1274 ))
1275 }
1276}
1277
1278#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1280pub struct HollowRollup {
1281 pub key: PartialRollupKey,
1283 pub encoded_size_bytes: Option<usize>,
1285}
1286
1287#[derive(Debug)]
1289pub enum HollowBlobRef<'a, T> {
1290 Batch(&'a HollowBatch<T>),
1291 Rollup(&'a HollowRollup),
1292}
1293
1294#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1296pub struct ActiveRollup {
1297 pub seqno: SeqNo,
1298 pub start_ms: u64,
1299}
1300
1301#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize)]
1303pub struct ActiveGc {
1304 pub seqno: SeqNo,
1305 pub start_ms: u64,
1306}
1307
1308#[derive(Debug)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct NoOpStateTransition<T>(pub T);
1315
1316#[derive(Debug, Clone)]
1318#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1319pub struct StateCollections<T> {
1320 pub(crate) version: Version,
1324
1325 pub(crate) last_gc_req: SeqNo,
1328
1329 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1331
1332 pub(crate) active_rollup: Option<ActiveRollup>,
1334 pub(crate) active_gc: Option<ActiveGc>,
1336
1337 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1338 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1339 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1340 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1341
1342 pub(crate) trace: Trace<T>,
1347}
1348
1349#[derive(Debug, Clone, Serialize, PartialEq)]
1365pub struct EncodedSchemas {
1366 pub key: Bytes,
1368 pub key_data_type: Bytes,
1371 pub val: Bytes,
1373 pub val_data_type: Bytes,
1376}
1377
1378impl EncodedSchemas {
1379 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1380 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1381 DataType::from_proto(proto).expect("valid DataType")
1382 }
1383}
1384
1385#[derive(Debug)]
1386#[cfg_attr(test, derive(PartialEq))]
1387pub enum CompareAndAppendBreak<T> {
1388 AlreadyCommitted,
1389 Upper {
1390 shard_upper: Antichain<T>,
1391 writer_upper: Antichain<T>,
1392 },
1393 InvalidUsage(InvalidUsage<T>),
1394 InlineBackpressure,
1395}
1396
1397#[derive(Debug)]
1398#[cfg_attr(test, derive(PartialEq))]
1399pub enum SnapshotErr<T> {
1400 AsOfNotYetAvailable(SeqNo, Upper<T>),
1401 AsOfHistoricalDistinctionsLost(Since<T>),
1402}
1403
1404impl<T> StateCollections<T>
1405where
1406 T: Timestamp + Lattice + Codec64,
1407{
1408 pub fn add_rollup(
1409 &mut self,
1410 add_rollup: (SeqNo, &HollowRollup),
1411 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1412 let (rollup_seqno, rollup) = add_rollup;
1413 let applied = match self.rollups.get(&rollup_seqno) {
1414 Some(x) => x.key == rollup.key,
1415 None => {
1416 self.active_rollup = None;
1417 self.rollups.insert(rollup_seqno, rollup.to_owned());
1418 true
1419 }
1420 };
1421 Continue(applied)
1425 }
1426
1427 pub fn remove_rollups(
1428 &mut self,
1429 remove_rollups: &[(SeqNo, PartialRollupKey)],
1430 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1431 if remove_rollups.is_empty() || self.is_tombstone() {
1432 return Break(NoOpStateTransition(vec![]));
1433 }
1434
1435 self.active_gc = None;
1438
1439 let mut removed = vec![];
1440 for (seqno, key) in remove_rollups {
1441 let removed_key = self.rollups.remove(seqno);
1442 debug_assert!(
1443 removed_key.as_ref().map_or(true, |x| &x.key == key),
1444 "{} vs {:?}",
1445 key,
1446 removed_key
1447 );
1448
1449 if removed_key.is_some() {
1450 removed.push(*seqno);
1451 }
1452 }
1453
1454 Continue(removed)
1455 }
1456
1457 pub fn register_leased_reader(
1458 &mut self,
1459 hostname: &str,
1460 reader_id: &LeasedReaderId,
1461 purpose: &str,
1462 seqno: SeqNo,
1463 lease_duration: Duration,
1464 heartbeat_timestamp_ms: u64,
1465 use_critical_since: bool,
1466 ) -> ControlFlow<
1467 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1468 (LeasedReaderState<T>, SeqNo),
1469 > {
1470 let since = if use_critical_since {
1471 self.critical_since()
1472 .unwrap_or_else(|| self.trace.since().clone())
1473 } else {
1474 self.trace.since().clone()
1475 };
1476 let reader_state = LeasedReaderState {
1477 debug: HandleDebugState {
1478 hostname: hostname.to_owned(),
1479 purpose: purpose.to_owned(),
1480 },
1481 seqno,
1482 since,
1483 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1484 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1485 .expect("lease duration as millis should fit within u64"),
1486 };
1487
1488 if self.is_tombstone() {
1493 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1494 }
1495
1496 self.leased_readers
1498 .insert(reader_id.clone(), reader_state.clone());
1499 Continue((reader_state, self.seqno_since(seqno)))
1500 }
1501
1502 pub fn register_critical_reader<O: Opaque + Codec64>(
1503 &mut self,
1504 hostname: &str,
1505 reader_id: &CriticalReaderId,
1506 purpose: &str,
1507 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1508 let state = CriticalReaderState {
1509 debug: HandleDebugState {
1510 hostname: hostname.to_owned(),
1511 purpose: purpose.to_owned(),
1512 },
1513 since: self.trace.since().clone(),
1514 opaque: OpaqueState(Codec64::encode(&O::initial())),
1515 opaque_codec: O::codec_name(),
1516 };
1517
1518 if self.is_tombstone() {
1523 return Break(NoOpStateTransition(state));
1524 }
1525
1526 let state = match self.critical_readers.get_mut(reader_id) {
1527 Some(existing_state) => {
1528 existing_state.debug = state.debug;
1529 existing_state.clone()
1530 }
1531 None => {
1532 self.critical_readers
1533 .insert(reader_id.clone(), state.clone());
1534 state
1535 }
1536 };
1537 Continue(state)
1538 }
1539
1540 pub fn register_schema<K: Codec, V: Codec>(
1541 &mut self,
1542 key_schema: &K::Schema,
1543 val_schema: &V::Schema,
1544 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1545 fn encode_data_type(data_type: &DataType) -> Bytes {
1546 let proto = data_type.into_proto();
1547 prost::Message::encode_to_vec(&proto).into()
1548 }
1549
1550 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1562 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1563 });
1564 match existing_id {
1565 Some((schema_id, _)) => {
1566 Break(NoOpStateTransition(Some(*schema_id)))
1571 }
1572 None if self.is_tombstone() => {
1573 Break(NoOpStateTransition(None))
1575 }
1576 None if self.schemas.is_empty() => {
1577 let id = SchemaId(self.schemas.len());
1581 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1582 .expect("valid key schema");
1583 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1584 .expect("valid val schema");
1585 let prev = self.schemas.insert(
1586 id,
1587 EncodedSchemas {
1588 key: K::encode_schema(key_schema),
1589 key_data_type: encode_data_type(&key_data_type),
1590 val: V::encode_schema(val_schema),
1591 val_data_type: encode_data_type(&val_data_type),
1592 },
1593 );
1594 assert_eq!(prev, None);
1595 Continue(Some(id))
1596 }
1597 None => {
1598 info!(
1599 "register_schemas got {:?} expected {:?}",
1600 key_schema,
1601 self.schemas
1602 .iter()
1603 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1604 .collect::<Vec<_>>()
1605 );
1606 Break(NoOpStateTransition(None))
1609 }
1610 }
1611 }
1612
1613 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1614 &mut self,
1615 expected: SchemaId,
1616 key_schema: &K::Schema,
1617 val_schema: &V::Schema,
1618 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1619 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1620 let array = Schema::encoder(schema).expect("valid schema").finish();
1624 Array::data_type(&array).clone()
1625 }
1626
1627 let (current_id, current) = self
1628 .schemas
1629 .last_key_value()
1630 .expect("all shards have a schema");
1631 if *current_id != expected {
1632 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1633 schema_id: *current_id,
1634 key: K::decode_schema(¤t.key),
1635 val: V::decode_schema(¤t.val),
1636 }));
1637 }
1638
1639 let current_key = K::decode_schema(¤t.key);
1640 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1641 let current_val = V::decode_schema(¤t.val);
1642 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1643
1644 let key_dt = data_type(key_schema);
1645 let val_dt = data_type(val_schema);
1646
1647 if current_key == *key_schema
1649 && current_key_dt == key_dt
1650 && current_val == *val_schema
1651 && current_val_dt == val_dt
1652 {
1653 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1654 }
1655
1656 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1657 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1658 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1659 return Break(NoOpStateTransition(CaESchema::Incompatible));
1660 };
1661 if key_fn.contains_drop() || val_fn.contains_drop() {
1665 return Break(NoOpStateTransition(CaESchema::Incompatible));
1666 }
1667
1668 let id = SchemaId(self.schemas.len());
1672 self.schemas.insert(
1673 id,
1674 EncodedSchemas {
1675 key: K::encode_schema(key_schema),
1676 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1677 val: V::encode_schema(val_schema),
1678 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1679 },
1680 );
1681 Continue(CaESchema::Ok(id))
1682 }
1683
1684 pub fn compare_and_append(
1685 &mut self,
1686 batch: &HollowBatch<T>,
1687 writer_id: &WriterId,
1688 heartbeat_timestamp_ms: u64,
1689 lease_duration_ms: u64,
1690 idempotency_token: &IdempotencyToken,
1691 debug_info: &HandleDebugState,
1692 inline_writes_total_max_bytes: usize,
1693 claim_compaction_percent: usize,
1694 claim_compaction_min_version: Option<&Version>,
1695 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1696 if self.is_tombstone() {
1701 assert_eq!(self.trace.upper(), &Antichain::new());
1702 return Break(CompareAndAppendBreak::Upper {
1703 shard_upper: Antichain::new(),
1704 writer_upper: Antichain::new(),
1709 });
1710 }
1711
1712 let writer_state = self
1713 .writers
1714 .entry(writer_id.clone())
1715 .or_insert_with(|| WriterState {
1716 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1717 lease_duration_ms,
1718 most_recent_write_token: IdempotencyToken::SENTINEL,
1719 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1720 debug: debug_info.clone(),
1721 });
1722
1723 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1724 return Break(CompareAndAppendBreak::InvalidUsage(
1725 InvalidUsage::InvalidBounds {
1726 lower: batch.desc.lower().clone(),
1727 upper: batch.desc.upper().clone(),
1728 },
1729 ));
1730 }
1731
1732 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1735 return Break(CompareAndAppendBreak::InvalidUsage(
1736 InvalidUsage::InvalidEmptyTimeInterval {
1737 lower: batch.desc.lower().clone(),
1738 upper: batch.desc.upper().clone(),
1739 keys: batch
1740 .parts
1741 .iter()
1742 .map(|x| x.printable_name().to_owned())
1743 .collect(),
1744 },
1745 ));
1746 }
1747
1748 if idempotency_token == &writer_state.most_recent_write_token {
1749 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1754 assert!(
1755 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1756 "{:?} vs {:?}",
1757 batch.desc.upper(),
1758 self.trace.upper()
1759 );
1760 return Break(CompareAndAppendBreak::AlreadyCommitted);
1761 }
1762
1763 let shard_upper = self.trace.upper();
1764 if shard_upper != batch.desc.lower() {
1765 return Break(CompareAndAppendBreak::Upper {
1766 shard_upper: shard_upper.clone(),
1767 writer_upper: writer_state.most_recent_write_upper.clone(),
1768 });
1769 }
1770
1771 let new_inline_bytes = batch.inline_bytes();
1772 if new_inline_bytes > 0 {
1773 let mut existing_inline_bytes = 0;
1774 self.trace
1775 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1776 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1780 return Break(CompareAndAppendBreak::InlineBackpressure);
1781 }
1782 }
1783
1784 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1785 self.trace.push_batch(batch.clone())
1786 } else {
1787 Vec::new()
1788 };
1789
1790 let all_empty_reqs = merge_reqs
1793 .iter()
1794 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1795 if all_empty_reqs && !batch.is_empty() {
1796 let mut reqs_to_take = claim_compaction_percent / 100;
1797 if (usize::cast_from(idempotency_token.hashed()) % 100)
1798 < (claim_compaction_percent % 100)
1799 {
1800 reqs_to_take += 1;
1801 }
1802 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1803 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1804 merge_reqs.extend(
1805 self.trace
1808 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1809 .take(reqs_to_take),
1810 )
1811 }
1812
1813 for req in &merge_reqs {
1814 self.trace.claim_compaction(
1815 req.id,
1816 ActiveCompaction {
1817 start_ms: heartbeat_timestamp_ms,
1818 },
1819 )
1820 }
1821
1822 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1823 writer_state.most_recent_write_token = idempotency_token.clone();
1824 assert!(
1826 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1827 "{:?} vs {:?}",
1828 &writer_state.most_recent_write_upper,
1829 batch.desc.upper()
1830 );
1831 writer_state
1832 .most_recent_write_upper
1833 .clone_from(batch.desc.upper());
1834
1835 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1837 heartbeat_timestamp_ms,
1838 writer_state.last_heartbeat_timestamp_ms,
1839 );
1840
1841 Continue(merge_reqs)
1842 }
1843
1844 pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1845 &mut self,
1846 res: &FueledMergeRes<T>,
1847 metrics: &ColumnarMetrics,
1848 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1849 if self.is_tombstone() {
1854 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1855 }
1856
1857 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1858 Continue(apply_merge_result)
1859 }
1860
1861 pub fn spine_exert(
1862 &mut self,
1863 fuel: usize,
1864 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1865 let (merge_reqs, did_work) = self.trace.exert(fuel);
1866 if did_work {
1867 Continue(merge_reqs)
1868 } else {
1869 assert!(merge_reqs.is_empty());
1870 Break(NoOpStateTransition(Vec::new()))
1873 }
1874 }
1875
1876 pub fn downgrade_since(
1877 &mut self,
1878 reader_id: &LeasedReaderId,
1879 seqno: SeqNo,
1880 outstanding_seqno: Option<SeqNo>,
1881 new_since: &Antichain<T>,
1882 heartbeat_timestamp_ms: u64,
1883 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1884 if self.is_tombstone() {
1889 return Break(NoOpStateTransition(Since(Antichain::new())));
1890 }
1891
1892 let Some(reader_state) = self.leased_reader(reader_id) else {
1895 tracing::warn!(
1896 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1897 );
1898 return Break(NoOpStateTransition(Since(Antichain::new())));
1899 };
1900
1901 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1904 heartbeat_timestamp_ms,
1905 reader_state.last_heartbeat_timestamp_ms,
1906 );
1907
1908 let seqno = match outstanding_seqno {
1909 Some(outstanding_seqno) => {
1910 assert!(
1911 outstanding_seqno >= reader_state.seqno,
1912 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1913 is behind current reader_state ({:?})",
1914 outstanding_seqno,
1915 reader_state.seqno,
1916 );
1917 std::cmp::min(outstanding_seqno, seqno)
1918 }
1919 None => seqno,
1920 };
1921
1922 reader_state.seqno = seqno;
1923
1924 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1925 reader_state.since.clone_from(new_since);
1926 self.update_since();
1927 new_since.clone()
1928 } else {
1929 reader_state.since.clone()
1932 };
1933
1934 Continue(Since(reader_current_since))
1935 }
1936
1937 pub fn compare_and_downgrade_since<O: Opaque + Codec64>(
1938 &mut self,
1939 reader_id: &CriticalReaderId,
1940 expected_opaque: &O,
1941 (new_opaque, new_since): (&O, &Antichain<T>),
1942 ) -> ControlFlow<
1943 NoOpStateTransition<Result<Since<T>, (O, Since<T>)>>,
1944 Result<Since<T>, (O, Since<T>)>,
1945 > {
1946 if self.is_tombstone() {
1951 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1955 }
1956
1957 let reader_state = self.critical_reader(reader_id);
1958 assert_eq!(reader_state.opaque_codec, O::codec_name());
1959
1960 if &O::decode(reader_state.opaque.0) != expected_opaque {
1961 return Continue(Err((
1964 Codec64::decode(reader_state.opaque.0),
1965 Since(reader_state.since.clone()),
1966 )));
1967 }
1968
1969 reader_state.opaque = OpaqueState(Codec64::encode(new_opaque));
1970 if PartialOrder::less_equal(&reader_state.since, new_since) {
1971 reader_state.since.clone_from(new_since);
1972 self.update_since();
1973 Continue(Ok(Since(new_since.clone())))
1974 } else {
1975 Continue(Ok(Since(reader_state.since.clone())))
1979 }
1980 }
1981
1982 pub fn heartbeat_leased_reader(
1983 &mut self,
1984 reader_id: &LeasedReaderId,
1985 heartbeat_timestamp_ms: u64,
1986 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1987 if self.is_tombstone() {
1992 return Break(NoOpStateTransition(false));
1993 }
1994
1995 match self.leased_readers.get_mut(reader_id) {
1996 Some(reader_state) => {
1997 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1998 heartbeat_timestamp_ms,
1999 reader_state.last_heartbeat_timestamp_ms,
2000 );
2001 Continue(true)
2002 }
2003 None => Continue(false),
2006 }
2007 }
2008
2009 pub fn expire_leased_reader(
2010 &mut self,
2011 reader_id: &LeasedReaderId,
2012 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2013 if self.is_tombstone() {
2018 return Break(NoOpStateTransition(false));
2019 }
2020
2021 let existed = self.leased_readers.remove(reader_id).is_some();
2022 if existed {
2023 }
2037 Continue(existed)
2040 }
2041
2042 pub fn expire_critical_reader(
2043 &mut self,
2044 reader_id: &CriticalReaderId,
2045 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2046 if self.is_tombstone() {
2051 return Break(NoOpStateTransition(false));
2052 }
2053
2054 let existed = self.critical_readers.remove(reader_id).is_some();
2055 if existed {
2056 }
2070 Continue(existed)
2074 }
2075
2076 pub fn expire_writer(
2077 &mut self,
2078 writer_id: &WriterId,
2079 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2080 if self.is_tombstone() {
2085 return Break(NoOpStateTransition(false));
2086 }
2087
2088 let existed = self.writers.remove(writer_id).is_some();
2089 Continue(existed)
2093 }
2094
2095 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2096 self.leased_readers.get_mut(id)
2097 }
2098
2099 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2100 self.critical_readers
2101 .get_mut(id)
2102 .unwrap_or_else(|| {
2103 panic!(
2104 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2105 id
2106 )
2107 })
2108 }
2109
2110 fn critical_since(&self) -> Option<Antichain<T>> {
2111 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2112 let mut since = critical_sinces.next().cloned()?;
2113 for s in critical_sinces {
2114 since.meet_assign(s);
2115 }
2116 Some(since)
2117 }
2118
2119 fn update_since(&mut self) {
2120 let mut sinces_iter = self
2121 .leased_readers
2122 .values()
2123 .map(|x| &x.since)
2124 .chain(self.critical_readers.values().map(|x| &x.since));
2125 let mut since = match sinces_iter.next() {
2126 Some(since) => since.clone(),
2127 None => {
2128 return;
2131 }
2132 };
2133 while let Some(s) = sinces_iter.next() {
2134 since.meet_assign(s);
2135 }
2136 self.trace.downgrade_since(&since);
2137 }
2138
2139 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2140 let mut seqno_since = seqno;
2141 for cap in self.leased_readers.values() {
2142 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2143 }
2144 seqno_since
2146 }
2147
2148 fn tombstone_batch() -> HollowBatch<T> {
2149 HollowBatch::empty(Description::new(
2150 Antichain::from_elem(T::minimum()),
2151 Antichain::new(),
2152 Antichain::new(),
2153 ))
2154 }
2155
2156 pub(crate) fn is_tombstone(&self) -> bool {
2157 self.trace.upper().is_empty()
2158 && self.trace.since().is_empty()
2159 && self.writers.is_empty()
2160 && self.leased_readers.is_empty()
2161 && self.critical_readers.is_empty()
2162 }
2163
2164 pub(crate) fn is_single_empty_batch(&self) -> bool {
2165 let mut batch_count = 0;
2166 let mut is_empty = true;
2167 self.trace.map_batches(|b| {
2168 batch_count += 1;
2169 is_empty &= b.is_empty()
2170 });
2171 batch_count <= 1 && is_empty
2172 }
2173
2174 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2175 assert_eq!(self.trace.upper(), &Antichain::new());
2176 assert_eq!(self.trace.since(), &Antichain::new());
2177
2178 let was_tombstone = self.is_tombstone();
2181
2182 self.writers.clear();
2184 self.leased_readers.clear();
2185 self.critical_readers.clear();
2186
2187 debug_assert!(self.is_tombstone());
2188
2189 let mut to_replace = None;
2198 let mut batch_count = 0;
2199 self.trace.map_batches(|b| {
2200 batch_count += 1;
2201 if !b.is_empty() && to_replace.is_none() {
2202 to_replace = Some(b.desc.clone());
2203 }
2204 });
2205 if let Some(desc) = to_replace {
2206 let result = self.trace.apply_tombstone_merge(&desc);
2210 assert!(
2211 result.matched(),
2212 "merge with a matching desc should always match"
2213 );
2214 Continue(())
2215 } else if batch_count > 1 {
2216 let mut new_trace = Trace::default();
2221 new_trace.downgrade_since(&Antichain::new());
2222 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2223 assert_eq!(merge_reqs, Vec::new());
2224 self.trace = new_trace;
2225 Continue(())
2226 } else if !was_tombstone {
2227 Continue(())
2230 } else {
2231 Break(NoOpStateTransition(()))
2234 }
2235 }
2236}
2237
2238#[derive(Debug)]
2240#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2241pub struct State<T> {
2242 pub(crate) shard_id: ShardId,
2243
2244 pub(crate) seqno: SeqNo,
2245 pub(crate) walltime_ms: u64,
2248 pub(crate) hostname: String,
2251 pub(crate) collections: StateCollections<T>,
2252}
2253
2254pub struct TypedState<K, V, T, D> {
2257 pub(crate) state: State<T>,
2258
2259 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2267}
2268
2269impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2270 #[cfg(any(test, debug_assertions))]
2271 pub(crate) fn clone(&self, hostname: String) -> Self {
2272 TypedState {
2273 state: State {
2274 shard_id: self.shard_id.clone(),
2275 seqno: self.seqno.clone(),
2276 walltime_ms: self.walltime_ms,
2277 hostname,
2278 collections: self.collections.clone(),
2279 },
2280 _phantom: PhantomData,
2281 }
2282 }
2283
2284 pub(crate) fn clone_for_rollup(&self) -> Self {
2285 TypedState {
2286 state: State {
2287 shard_id: self.shard_id.clone(),
2288 seqno: self.seqno.clone(),
2289 walltime_ms: self.walltime_ms,
2290 hostname: self.hostname.clone(),
2291 collections: self.collections.clone(),
2292 },
2293 _phantom: PhantomData,
2294 }
2295 }
2296}
2297
2298impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2299 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2300 let TypedState { state, _phantom } = self;
2303 f.debug_struct("TypedState").field("state", state).finish()
2304 }
2305}
2306
2307#[cfg(any(test, debug_assertions))]
2309impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2310 fn eq(&self, other: &Self) -> bool {
2311 let TypedState {
2314 state: self_state,
2315 _phantom,
2316 } = self;
2317 let TypedState {
2318 state: other_state,
2319 _phantom,
2320 } = other;
2321 self_state == other_state
2322 }
2323}
2324
2325impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2326 type Target = State<T>;
2327
2328 fn deref(&self) -> &Self::Target {
2329 &self.state
2330 }
2331}
2332
2333impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2334 fn deref_mut(&mut self) -> &mut Self::Target {
2335 &mut self.state
2336 }
2337}
2338
2339impl<K, V, T, D> TypedState<K, V, T, D>
2340where
2341 K: Codec,
2342 V: Codec,
2343 T: Timestamp + Lattice + Codec64,
2344 D: Codec64,
2345{
2346 pub fn new(
2347 applier_version: Version,
2348 shard_id: ShardId,
2349 hostname: String,
2350 walltime_ms: u64,
2351 ) -> Self {
2352 let state = State {
2353 shard_id,
2354 seqno: SeqNo::minimum(),
2355 walltime_ms,
2356 hostname,
2357 collections: StateCollections {
2358 version: applier_version,
2359 last_gc_req: SeqNo::minimum(),
2360 rollups: BTreeMap::new(),
2361 active_rollup: None,
2362 active_gc: None,
2363 leased_readers: BTreeMap::new(),
2364 critical_readers: BTreeMap::new(),
2365 writers: BTreeMap::new(),
2366 schemas: BTreeMap::new(),
2367 trace: Trace::default(),
2368 },
2369 };
2370 TypedState {
2371 state,
2372 _phantom: PhantomData,
2373 }
2374 }
2375
2376 pub fn clone_apply<R, E, WorkFn>(
2377 &self,
2378 cfg: &PersistConfig,
2379 work_fn: &mut WorkFn,
2380 ) -> ControlFlow<E, (R, Self)>
2381 where
2382 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2383 {
2384 let mut new_state = State {
2386 shard_id: self.shard_id,
2387 seqno: self.seqno.next(),
2388 walltime_ms: (cfg.now)(),
2389 hostname: cfg.hostname.clone(),
2390 collections: self.collections.clone(),
2391 };
2392
2393 if new_state.walltime_ms <= self.walltime_ms {
2396 new_state.walltime_ms = self.walltime_ms + 1;
2397 }
2398
2399 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2400 let new_state = TypedState {
2401 state: new_state,
2402 _phantom: PhantomData,
2403 };
2404 Continue((work_ret, new_state))
2405 }
2406}
2407
2408#[derive(Copy, Clone, Debug)]
2409pub struct GcConfig {
2410 pub use_active_gc: bool,
2411 pub fallback_threshold_ms: u64,
2412 pub min_versions: usize,
2413 pub max_versions: usize,
2414}
2415
2416impl<T> State<T>
2417where
2418 T: Timestamp + Lattice + Codec64,
2419{
2420 pub fn shard_id(&self) -> ShardId {
2421 self.shard_id
2422 }
2423
2424 pub fn seqno(&self) -> SeqNo {
2425 self.seqno
2426 }
2427
2428 pub fn since(&self) -> &Antichain<T> {
2429 self.collections.trace.since()
2430 }
2431
2432 pub fn upper(&self) -> &Antichain<T> {
2433 self.collections.trace.upper()
2434 }
2435
2436 pub fn spine_batch_count(&self) -> usize {
2437 self.collections.trace.num_spine_batches()
2438 }
2439
2440 pub fn size_metrics(&self) -> StateSizeMetrics {
2441 let mut ret = StateSizeMetrics::default();
2442 self.blobs().for_each(|x| match x {
2443 HollowBlobRef::Batch(x) => {
2444 ret.hollow_batch_count += 1;
2445 ret.batch_part_count += x.part_count();
2446 ret.num_updates += x.len;
2447
2448 let batch_size = x.encoded_size_bytes();
2449 for x in x.parts.iter() {
2450 if x.ts_rewrite().is_some() {
2451 ret.rewrite_part_count += 1;
2452 }
2453 if x.is_inline() {
2454 ret.inline_part_count += 1;
2455 ret.inline_part_bytes += x.inline_bytes();
2456 }
2457 }
2458 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2459 ret.state_batches_bytes += batch_size;
2460 }
2461 HollowBlobRef::Rollup(x) => {
2462 ret.state_rollup_count += 1;
2463 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2464 }
2465 });
2466 ret
2467 }
2468
2469 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2470 self.collections
2473 .rollups
2474 .iter()
2475 .rev()
2476 .next()
2477 .expect("State should have at least one rollup if seqno > minimum")
2478 }
2479
2480 pub(crate) fn seqno_since(&self) -> SeqNo {
2481 self.collections.seqno_since(self.seqno)
2482 }
2483
2484 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2496 let GcConfig {
2497 use_active_gc,
2498 fallback_threshold_ms,
2499 min_versions,
2500 max_versions,
2501 } = cfg;
2502 let gc_threshold = if use_active_gc {
2506 u64::cast_from(min_versions)
2507 } else {
2508 std::cmp::max(
2509 1,
2510 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2511 )
2512 };
2513 let new_seqno_since = self.seqno_since();
2514 let gc_until_seqno = new_seqno_since.min(SeqNo(
2517 self.collections
2518 .last_gc_req
2519 .0
2520 .saturating_add(u64::cast_from(max_versions)),
2521 ));
2522 let should_gc = new_seqno_since
2523 .0
2524 .saturating_sub(self.collections.last_gc_req.0)
2525 >= gc_threshold;
2526
2527 let should_gc = if use_active_gc && !should_gc {
2530 match self.collections.active_gc {
2531 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2532 None => false,
2533 }
2534 } else {
2535 should_gc
2536 };
2537 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2540 let tombstone_needs_gc = self.collections.is_tombstone();
2545 let should_gc = should_gc || tombstone_needs_gc;
2546 let should_gc = if use_active_gc {
2547 should_gc
2551 && match self.collections.active_gc {
2552 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2553 None => true,
2554 }
2555 } else {
2556 should_gc
2557 };
2558 if should_gc {
2559 self.collections.last_gc_req = gc_until_seqno;
2560 Some(GcReq {
2561 shard_id: self.shard_id,
2562 new_seqno_since: gc_until_seqno,
2563 })
2564 } else {
2565 None
2566 }
2567 }
2568
2569 pub fn seqnos_held(&self) -> usize {
2571 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2572 }
2573
2574 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2576 let mut metrics = ExpiryMetrics::default();
2577 let shard_id = self.shard_id();
2578 self.collections.leased_readers.retain(|id, state| {
2579 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2580 if !retain {
2581 info!(
2582 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2583 state.debug.purpose
2584 );
2585 metrics.readers_expired += 1;
2586 }
2587 retain
2588 });
2589 self.collections.writers.retain(|id, state| {
2591 let retain =
2592 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2593 if !retain {
2594 info!(
2595 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2596 state.debug.purpose
2597 );
2598 metrics.writers_expired += 1;
2599 }
2600 retain
2601 });
2602 metrics
2603 }
2604
2605 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2609 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2610 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2611 self.collections.trace.since().clone(),
2612 )));
2613 }
2614 let upper = self.collections.trace.upper();
2615 if PartialOrder::less_equal(upper, as_of) {
2616 return Err(SnapshotErr::AsOfNotYetAvailable(
2617 self.seqno,
2618 Upper(upper.clone()),
2619 ));
2620 }
2621
2622 let batches = self
2623 .collections
2624 .trace
2625 .batches()
2626 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2627 .cloned()
2628 .collect();
2629 Ok(batches)
2630 }
2631
2632 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2634 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2635 return Err(Since(self.collections.trace.since().clone()));
2636 }
2637 Ok(())
2638 }
2639
2640 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2641 self.collections
2644 .trace
2645 .batches()
2646 .find(|b| {
2647 PartialOrder::less_equal(b.desc.lower(), frontier)
2648 && PartialOrder::less_than(frontier, b.desc.upper())
2649 })
2650 .cloned()
2651 .ok_or(self.seqno)
2652 }
2653
2654 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2655 self.collections.active_rollup
2656 }
2657
2658 pub fn need_rollup(
2659 &self,
2660 threshold: usize,
2661 use_active_rollup: bool,
2662 fallback_threshold_ms: u64,
2663 now: u64,
2664 ) -> Option<SeqNo> {
2665 let (latest_rollup_seqno, _) = self.latest_rollup();
2666
2667 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2673 return Some(self.seqno);
2674 }
2675
2676 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2677
2678 if use_active_rollup {
2679 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2685 match self.active_rollup() {
2686 Some(active_rollup) => {
2687 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2688 return Some(self.seqno);
2689 }
2690 }
2691 None => {
2692 return Some(self.seqno);
2693 }
2694 }
2695 }
2696 } else {
2697 if seqnos_since_last_rollup > 0
2701 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2702 {
2703 return Some(self.seqno);
2704 }
2705
2706 if seqnos_since_last_rollup
2709 > u64::cast_from(
2710 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2711 )
2712 {
2713 return Some(self.seqno);
2714 }
2715 }
2716
2717 None
2718 }
2719
2720 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2721 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2722 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2723 batches.chain(rollups)
2724 }
2725}
2726
2727fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2728 let val = hex::encode(val);
2729 val.serialize(s)
2730}
2731
2732fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2733 val: &Option<LazyProto<T>>,
2734 s: S,
2735) -> Result<S::Ok, S::Error> {
2736 val.as_ref()
2737 .map(|lazy| hex::encode(&lazy.into_proto()))
2738 .serialize(s)
2739}
2740
2741fn serialize_part_stats<S: Serializer>(
2742 val: &Option<LazyPartStats>,
2743 s: S,
2744) -> Result<S::Ok, S::Error> {
2745 let val = val.as_ref().map(|x| x.decode().key);
2746 val.serialize(s)
2747}
2748
2749fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2750 let val = val.map(i64::decode);
2752 val.serialize(s)
2753}
2754
2755impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2761 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2762 let State {
2763 shard_id,
2764 seqno,
2765 walltime_ms,
2766 hostname,
2767 collections:
2768 StateCollections {
2769 version: applier_version,
2770 last_gc_req,
2771 rollups,
2772 active_rollup,
2773 active_gc,
2774 leased_readers,
2775 critical_readers,
2776 writers,
2777 schemas,
2778 trace,
2779 },
2780 } = self;
2781 let mut s = s.serialize_struct("State", 13)?;
2782 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2783 let () = s.serialize_field("shard_id", shard_id)?;
2784 let () = s.serialize_field("seqno", seqno)?;
2785 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2786 let () = s.serialize_field("hostname", hostname)?;
2787 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2788 let () = s.serialize_field("rollups", rollups)?;
2789 let () = s.serialize_field("active_rollup", active_rollup)?;
2790 let () = s.serialize_field("active_gc", active_gc)?;
2791 let () = s.serialize_field("leased_readers", leased_readers)?;
2792 let () = s.serialize_field("critical_readers", critical_readers)?;
2793 let () = s.serialize_field("writers", writers)?;
2794 let () = s.serialize_field("schemas", schemas)?;
2795 let () = s.serialize_field("since", &trace.since().elements())?;
2796 let () = s.serialize_field("upper", &trace.upper().elements())?;
2797 let trace = trace.flatten();
2798 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2799 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2800 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2801 let () = s.serialize_field("merges", &trace.merges)?;
2802 s.end()
2803 }
2804}
2805
2806#[derive(Debug, Default)]
2807pub struct StateSizeMetrics {
2808 pub hollow_batch_count: usize,
2809 pub batch_part_count: usize,
2810 pub rewrite_part_count: usize,
2811 pub num_updates: usize,
2812 pub largest_batch_bytes: usize,
2813 pub state_batches_bytes: usize,
2814 pub state_rollups_bytes: usize,
2815 pub state_rollup_count: usize,
2816 pub inline_part_count: usize,
2817 pub inline_part_bytes: usize,
2818}
2819
2820#[derive(Default)]
2821pub struct ExpiryMetrics {
2822 pub(crate) readers_expired: usize,
2823 pub(crate) writers_expired: usize,
2824}
2825
2826#[derive(Debug, Clone, PartialEq)]
2828pub struct Since<T>(pub Antichain<T>);
2829
2830#[derive(Debug, PartialEq)]
2832pub struct Upper<T>(pub Antichain<T>);
2833
2834#[cfg(test)]
2835pub(crate) mod tests {
2836 use std::ops::Range;
2837 use std::str::FromStr;
2838
2839 use bytes::Bytes;
2840 use mz_build_info::DUMMY_BUILD_INFO;
2841 use mz_dyncfg::ConfigUpdates;
2842 use mz_ore::now::SYSTEM_TIME;
2843 use mz_ore::{assert_none, assert_ok};
2844 use mz_proto::RustType;
2845 use proptest::prelude::*;
2846 use proptest::strategy::ValueTree;
2847
2848 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2849 use crate::cache::PersistClientCache;
2850 use crate::internal::encoding::any_some_lazy_part_stats;
2851 use crate::internal::paths::RollupId;
2852 use crate::internal::trace::tests::any_trace;
2853 use crate::tests::new_test_client_cache;
2854 use crate::{Diagnostics, PersistLocation};
2855
2856 use super::*;
2857
2858 const LEASE_DURATION_MS: u64 = 900 * 1000;
2859 fn debug_state() -> HandleDebugState {
2860 HandleDebugState {
2861 hostname: "debug".to_owned(),
2862 purpose: "finding the bugs".to_owned(),
2863 }
2864 }
2865
2866 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2867 num_runs: usize,
2868 ) -> impl Strategy<Value = HollowBatch<T>> {
2869 (
2870 any::<T>(),
2871 any::<T>(),
2872 any::<T>(),
2873 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2874 any::<usize>(),
2875 )
2876 .prop_map(move |(t0, t1, since, parts, len)| {
2877 let (lower, upper) = if t0 <= t1 {
2878 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2879 } else {
2880 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2881 };
2882 let since = Antichain::from_elem(since);
2883
2884 let run_splits = (1..num_runs)
2885 .map(|i| i * parts.len() / num_runs)
2886 .collect::<Vec<_>>();
2887
2888 let run_meta = (0..num_runs)
2889 .map(|_| {
2890 let mut meta = RunMeta::default();
2891 meta.id = Some(RunId::new());
2892 meta
2893 })
2894 .collect::<Vec<_>>();
2895
2896 HollowBatch::new(
2897 Description::new(lower, upper, since),
2898 parts,
2899 len % 10,
2900 run_meta,
2901 run_splits,
2902 )
2903 })
2904 }
2905
2906 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2907 Strategy::prop_map(
2908 (
2909 any::<T>(),
2910 any::<T>(),
2911 any::<T>(),
2912 proptest::collection::vec(any_run_part::<T>(), 0..20),
2913 any::<usize>(),
2914 0..=10usize,
2915 proptest::collection::vec(any::<RunId>(), 10),
2916 ),
2917 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2918 let (lower, upper) = if t0 <= t1 {
2919 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2920 } else {
2921 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2922 };
2923 let since = Antichain::from_elem(since);
2924 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2925 let run_splits = (1..num_runs)
2926 .map(|i| i * parts.len() / num_runs)
2927 .collect::<Vec<_>>();
2928
2929 let run_meta = (0..num_runs)
2930 .enumerate()
2931 .map(|(i, _)| {
2932 let mut meta = RunMeta::default();
2933 meta.id = Some(run_ids[i]);
2934 meta
2935 })
2936 .collect::<Vec<_>>();
2937
2938 HollowBatch::new(
2939 Description::new(lower, upper, since),
2940 parts,
2941 len % 10,
2942 run_meta,
2943 run_splits,
2944 )
2945 } else {
2946 HollowBatch::new_run_for_test(
2947 Description::new(lower, upper, since),
2948 parts,
2949 len % 10,
2950 run_ids[0],
2951 )
2952 }
2953 },
2954 )
2955 }
2956
2957 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2958 Strategy::prop_map(
2959 (
2960 any::<bool>(),
2961 any_hollow_batch_part(),
2962 any::<Option<T>>(),
2963 any::<Option<SchemaId>>(),
2964 any::<Option<SchemaId>>(),
2965 ),
2966 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2967 if is_hollow {
2968 BatchPart::Hollow(hollow)
2969 } else {
2970 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2971 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2972 BatchPart::Inline {
2973 updates,
2974 ts_rewrite,
2975 schema_id,
2976 deprecated_schema_id,
2977 }
2978 }
2979 },
2980 )
2981 }
2982
2983 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2984 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2985 }
2986
2987 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2988 -> impl Strategy<Value = HollowBatchPart<T>> {
2989 Strategy::prop_map(
2990 (
2991 any::<PartialBatchKey>(),
2992 any::<usize>(),
2993 any::<Vec<u8>>(),
2994 any_some_lazy_part_stats(),
2995 any::<Option<T>>(),
2996 any::<[u8; 8]>(),
2997 any::<Option<BatchColumnarFormat>>(),
2998 any::<Option<SchemaId>>(),
2999 any::<Option<SchemaId>>(),
3000 ),
3001 |(
3002 key,
3003 encoded_size_bytes,
3004 key_lower,
3005 stats,
3006 ts_rewrite,
3007 diffs_sum,
3008 format,
3009 schema_id,
3010 deprecated_schema_id,
3011 )| {
3012 HollowBatchPart {
3013 key,
3014 meta: Default::default(),
3015 encoded_size_bytes,
3016 key_lower,
3017 structured_key_lower: None,
3018 stats,
3019 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
3020 diffs_sum: Some(diffs_sum),
3021 format,
3022 schema_id,
3023 deprecated_schema_id,
3024 }
3025 },
3026 )
3027 }
3028
3029 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3030 Strategy::prop_map(
3031 (
3032 any::<SeqNo>(),
3033 any::<Option<T>>(),
3034 any::<u64>(),
3035 any::<u64>(),
3036 any::<HandleDebugState>(),
3037 ),
3038 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3039 if lease_duration_ms == 0 {
3043 lease_duration_ms += 1;
3044 }
3045 LeasedReaderState {
3046 seqno,
3047 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3048 last_heartbeat_timestamp_ms,
3049 lease_duration_ms,
3050 debug,
3051 }
3052 },
3053 )
3054 }
3055
3056 pub fn any_critical_reader_state<T: Arbitrary>() -> impl Strategy<Value = CriticalReaderState<T>>
3057 {
3058 Strategy::prop_map(
3059 (
3060 any::<Option<T>>(),
3061 any::<OpaqueState>(),
3062 any::<String>(),
3063 any::<HandleDebugState>(),
3064 ),
3065 |(since, opaque, opaque_codec, debug)| CriticalReaderState {
3066 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3067 opaque,
3068 opaque_codec,
3069 debug,
3070 },
3071 )
3072 }
3073
3074 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3075 Strategy::prop_map(
3076 (
3077 any::<u64>(),
3078 any::<u64>(),
3079 any::<IdempotencyToken>(),
3080 any::<Option<T>>(),
3081 any::<HandleDebugState>(),
3082 ),
3083 |(
3084 last_heartbeat_timestamp_ms,
3085 lease_duration_ms,
3086 most_recent_write_token,
3087 most_recent_write_upper,
3088 debug,
3089 )| WriterState {
3090 last_heartbeat_timestamp_ms,
3091 lease_duration_ms,
3092 most_recent_write_token,
3093 most_recent_write_upper: most_recent_write_upper
3094 .map_or_else(Antichain::new, Antichain::from_elem),
3095 debug,
3096 },
3097 )
3098 }
3099
3100 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3101 Strategy::prop_map(
3102 (
3103 any::<Vec<u8>>(),
3104 any::<Vec<u8>>(),
3105 any::<Vec<u8>>(),
3106 any::<Vec<u8>>(),
3107 ),
3108 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3109 key: Bytes::from(key),
3110 key_data_type: Bytes::from(key_data_type),
3111 val: Bytes::from(val),
3112 val_data_type: Bytes::from(val_data_type),
3113 },
3114 )
3115 }
3116
3117 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3118 num_trace_batches: Range<usize>,
3119 ) -> impl Strategy<Value = State<T>> {
3120 let part1 = (
3121 any::<ShardId>(),
3122 any::<SeqNo>(),
3123 any::<u64>(),
3124 any::<String>(),
3125 any::<SeqNo>(),
3126 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3127 proptest::option::of(any::<ActiveRollup>()),
3128 );
3129
3130 let part2 = (
3131 proptest::option::of(any::<ActiveGc>()),
3132 proptest::collection::btree_map(
3133 any::<LeasedReaderId>(),
3134 any_leased_reader_state::<T>(),
3135 1..3,
3136 ),
3137 proptest::collection::btree_map(
3138 any::<CriticalReaderId>(),
3139 any_critical_reader_state::<T>(),
3140 1..3,
3141 ),
3142 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3143 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3144 any_trace::<T>(num_trace_batches),
3145 );
3146
3147 (part1, part2).prop_map(
3148 |(
3149 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3150 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3151 )| State {
3152 shard_id,
3153 seqno,
3154 walltime_ms,
3155 hostname,
3156 collections: StateCollections {
3157 version: Version::new(1, 2, 3),
3158 last_gc_req,
3159 rollups,
3160 active_rollup,
3161 active_gc,
3162 leased_readers,
3163 critical_readers,
3164 writers,
3165 schemas,
3166 trace,
3167 },
3168 },
3169 )
3170 }
3171
3172 pub(crate) fn hollow<T: Timestamp>(
3173 lower: T,
3174 upper: T,
3175 keys: &[&str],
3176 len: usize,
3177 ) -> HollowBatch<T> {
3178 HollowBatch::new_run(
3179 Description::new(
3180 Antichain::from_elem(lower),
3181 Antichain::from_elem(upper),
3182 Antichain::from_elem(T::minimum()),
3183 ),
3184 keys.iter()
3185 .map(|x| {
3186 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3187 key: PartialBatchKey((*x).to_owned()),
3188 meta: Default::default(),
3189 encoded_size_bytes: 0,
3190 key_lower: vec![],
3191 structured_key_lower: None,
3192 stats: None,
3193 ts_rewrite: None,
3194 diffs_sum: None,
3195 format: None,
3196 schema_id: None,
3197 deprecated_schema_id: None,
3198 }))
3199 })
3200 .collect(),
3201 len,
3202 )
3203 }
3204
3205 #[mz_ore::test]
3206 fn downgrade_since() {
3207 let mut state = TypedState::<(), (), u64, i64>::new(
3208 DUMMY_BUILD_INFO.semver_version(),
3209 ShardId::new(),
3210 "".to_owned(),
3211 0,
3212 );
3213 let reader = LeasedReaderId::new();
3214 let seqno = SeqNo::minimum();
3215 let now = SYSTEM_TIME.clone();
3216 let _ = state.collections.register_leased_reader(
3217 "",
3218 &reader,
3219 "",
3220 seqno,
3221 Duration::from_secs(10),
3222 now(),
3223 false,
3224 );
3225
3226 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3228
3229 assert_eq!(
3231 state.collections.downgrade_since(
3232 &reader,
3233 seqno,
3234 None,
3235 &Antichain::from_elem(2),
3236 now()
3237 ),
3238 Continue(Since(Antichain::from_elem(2)))
3239 );
3240 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3241 assert_eq!(
3243 state.collections.downgrade_since(
3244 &reader,
3245 seqno,
3246 None,
3247 &Antichain::from_elem(2),
3248 now()
3249 ),
3250 Continue(Since(Antichain::from_elem(2)))
3251 );
3252 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3253 assert_eq!(
3255 state.collections.downgrade_since(
3256 &reader,
3257 seqno,
3258 None,
3259 &Antichain::from_elem(1),
3260 now()
3261 ),
3262 Continue(Since(Antichain::from_elem(2)))
3263 );
3264 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3265
3266 let reader2 = LeasedReaderId::new();
3268 let _ = state.collections.register_leased_reader(
3269 "",
3270 &reader2,
3271 "",
3272 seqno,
3273 Duration::from_secs(10),
3274 now(),
3275 false,
3276 );
3277
3278 assert_eq!(
3280 state.collections.downgrade_since(
3281 &reader2,
3282 seqno,
3283 None,
3284 &Antichain::from_elem(3),
3285 now()
3286 ),
3287 Continue(Since(Antichain::from_elem(3)))
3288 );
3289 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3290 assert_eq!(
3292 state.collections.downgrade_since(
3293 &reader,
3294 seqno,
3295 None,
3296 &Antichain::from_elem(5),
3297 now()
3298 ),
3299 Continue(Since(Antichain::from_elem(5)))
3300 );
3301 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3302
3303 assert_eq!(
3305 state.collections.expire_leased_reader(&reader),
3306 Continue(true)
3307 );
3308 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3309
3310 let reader3 = LeasedReaderId::new();
3312 let _ = state.collections.register_leased_reader(
3313 "",
3314 &reader3,
3315 "",
3316 seqno,
3317 Duration::from_secs(10),
3318 now(),
3319 false,
3320 );
3321
3322 assert_eq!(
3324 state.collections.downgrade_since(
3325 &reader3,
3326 seqno,
3327 None,
3328 &Antichain::from_elem(10),
3329 now()
3330 ),
3331 Continue(Since(Antichain::from_elem(10)))
3332 );
3333 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3334
3335 assert_eq!(
3337 state.collections.expire_leased_reader(&reader2),
3338 Continue(true)
3339 );
3340 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3345
3346 assert_eq!(
3348 state.collections.expire_leased_reader(&reader3),
3349 Continue(true)
3350 );
3351 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3356 }
3357
3358 #[mz_ore::test]
3359 fn compare_and_downgrade_since() {
3360 let mut state = TypedState::<(), (), u64, i64>::new(
3361 DUMMY_BUILD_INFO.semver_version(),
3362 ShardId::new(),
3363 "".to_owned(),
3364 0,
3365 );
3366 let reader = CriticalReaderId::new();
3367 let _ = state
3368 .collections
3369 .register_critical_reader::<u64>("", &reader, "");
3370
3371 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3373 assert_eq!(
3375 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3376 u64::initial()
3377 );
3378
3379 assert_eq!(
3381 state.collections.compare_and_downgrade_since::<u64>(
3382 &reader,
3383 &u64::initial(),
3384 (&1, &Antichain::from_elem(2)),
3385 ),
3386 Continue(Ok(Since(Antichain::from_elem(2))))
3387 );
3388 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3389 assert_eq!(
3390 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3391 1
3392 );
3393 assert_eq!(
3395 state.collections.compare_and_downgrade_since::<u64>(
3396 &reader,
3397 &1,
3398 (&2, &Antichain::from_elem(2)),
3399 ),
3400 Continue(Ok(Since(Antichain::from_elem(2))))
3401 );
3402 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3403 assert_eq!(
3404 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3405 2
3406 );
3407 assert_eq!(
3409 state.collections.compare_and_downgrade_since::<u64>(
3410 &reader,
3411 &2,
3412 (&3, &Antichain::from_elem(1)),
3413 ),
3414 Continue(Ok(Since(Antichain::from_elem(2))))
3415 );
3416 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3417 assert_eq!(
3418 u64::decode(state.collections.critical_reader(&reader).opaque.0),
3419 3
3420 );
3421 }
3422
3423 #[mz_ore::test]
3424 fn compare_and_append() {
3425 let state = &mut TypedState::<String, String, u64, i64>::new(
3426 DUMMY_BUILD_INFO.semver_version(),
3427 ShardId::new(),
3428 "".to_owned(),
3429 0,
3430 )
3431 .collections;
3432
3433 let writer_id = WriterId::new();
3434 let now = SYSTEM_TIME.clone();
3435
3436 assert_eq!(state.trace.num_spine_batches(), 0);
3438 assert_eq!(state.trace.num_hollow_batches(), 0);
3439 assert_eq!(state.trace.num_updates(), 0);
3440
3441 assert_eq!(
3443 state.compare_and_append(
3444 &hollow(1, 2, &["key1"], 1),
3445 &writer_id,
3446 now(),
3447 LEASE_DURATION_MS,
3448 &IdempotencyToken::new(),
3449 &debug_state(),
3450 0,
3451 100,
3452 None
3453 ),
3454 Break(CompareAndAppendBreak::Upper {
3455 shard_upper: Antichain::from_elem(0),
3456 writer_upper: Antichain::from_elem(0)
3457 })
3458 );
3459
3460 assert!(
3462 state
3463 .compare_and_append(
3464 &hollow(0, 5, &[], 0),
3465 &writer_id,
3466 now(),
3467 LEASE_DURATION_MS,
3468 &IdempotencyToken::new(),
3469 &debug_state(),
3470 0,
3471 100,
3472 None
3473 )
3474 .is_continue()
3475 );
3476
3477 assert_eq!(
3479 state.compare_and_append(
3480 &hollow(5, 4, &["key1"], 1),
3481 &writer_id,
3482 now(),
3483 LEASE_DURATION_MS,
3484 &IdempotencyToken::new(),
3485 &debug_state(),
3486 0,
3487 100,
3488 None
3489 ),
3490 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3491 lower: Antichain::from_elem(5),
3492 upper: Antichain::from_elem(4)
3493 }))
3494 );
3495
3496 assert_eq!(
3498 state.compare_and_append(
3499 &hollow(5, 5, &["key1"], 1),
3500 &writer_id,
3501 now(),
3502 LEASE_DURATION_MS,
3503 &IdempotencyToken::new(),
3504 &debug_state(),
3505 0,
3506 100,
3507 None
3508 ),
3509 Break(CompareAndAppendBreak::InvalidUsage(
3510 InvalidEmptyTimeInterval {
3511 lower: Antichain::from_elem(5),
3512 upper: Antichain::from_elem(5),
3513 keys: vec!["key1".to_owned()],
3514 }
3515 ))
3516 );
3517
3518 assert!(
3520 state
3521 .compare_and_append(
3522 &hollow(5, 5, &[], 0),
3523 &writer_id,
3524 now(),
3525 LEASE_DURATION_MS,
3526 &IdempotencyToken::new(),
3527 &debug_state(),
3528 0,
3529 100,
3530 None
3531 )
3532 .is_continue()
3533 );
3534 }
3535
3536 #[mz_ore::test]
3537 fn snapshot() {
3538 let now = SYSTEM_TIME.clone();
3539
3540 let mut state = TypedState::<String, String, u64, i64>::new(
3541 DUMMY_BUILD_INFO.semver_version(),
3542 ShardId::new(),
3543 "".to_owned(),
3544 0,
3545 );
3546 assert_eq!(
3548 state.snapshot(&Antichain::from_elem(0)),
3549 Err(SnapshotErr::AsOfNotYetAvailable(
3550 SeqNo(0),
3551 Upper(Antichain::from_elem(0))
3552 ))
3553 );
3554
3555 assert_eq!(
3557 state.snapshot(&Antichain::from_elem(5)),
3558 Err(SnapshotErr::AsOfNotYetAvailable(
3559 SeqNo(0),
3560 Upper(Antichain::from_elem(0))
3561 ))
3562 );
3563
3564 let writer_id = WriterId::new();
3565
3566 assert!(
3568 state
3569 .collections
3570 .compare_and_append(
3571 &hollow(0, 5, &["key1"], 1),
3572 &writer_id,
3573 now(),
3574 LEASE_DURATION_MS,
3575 &IdempotencyToken::new(),
3576 &debug_state(),
3577 0,
3578 100,
3579 None
3580 )
3581 .is_continue()
3582 );
3583
3584 assert_eq!(
3586 state.snapshot(&Antichain::from_elem(0)),
3587 Ok(vec![hollow(0, 5, &["key1"], 1)])
3588 );
3589
3590 assert_eq!(
3592 state.snapshot(&Antichain::from_elem(4)),
3593 Ok(vec![hollow(0, 5, &["key1"], 1)])
3594 );
3595
3596 assert_eq!(
3598 state.snapshot(&Antichain::from_elem(5)),
3599 Err(SnapshotErr::AsOfNotYetAvailable(
3600 SeqNo(0),
3601 Upper(Antichain::from_elem(5))
3602 ))
3603 );
3604 assert_eq!(
3605 state.snapshot(&Antichain::from_elem(6)),
3606 Err(SnapshotErr::AsOfNotYetAvailable(
3607 SeqNo(0),
3608 Upper(Antichain::from_elem(5))
3609 ))
3610 );
3611
3612 let reader = LeasedReaderId::new();
3613 let _ = state.collections.register_leased_reader(
3615 "",
3616 &reader,
3617 "",
3618 SeqNo::minimum(),
3619 Duration::from_secs(10),
3620 now(),
3621 false,
3622 );
3623 assert_eq!(
3624 state.collections.downgrade_since(
3625 &reader,
3626 SeqNo::minimum(),
3627 None,
3628 &Antichain::from_elem(2),
3629 now()
3630 ),
3631 Continue(Since(Antichain::from_elem(2)))
3632 );
3633 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3634 assert_eq!(
3636 state.snapshot(&Antichain::from_elem(1)),
3637 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3638 Antichain::from_elem(2)
3639 )))
3640 );
3641
3642 assert!(
3644 state
3645 .collections
3646 .compare_and_append(
3647 &hollow(5, 10, &[], 0),
3648 &writer_id,
3649 now(),
3650 LEASE_DURATION_MS,
3651 &IdempotencyToken::new(),
3652 &debug_state(),
3653 0,
3654 100,
3655 None
3656 )
3657 .is_continue()
3658 );
3659
3660 assert_eq!(
3662 state.snapshot(&Antichain::from_elem(7)),
3663 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3664 );
3665
3666 assert_eq!(
3668 state.snapshot(&Antichain::from_elem(10)),
3669 Err(SnapshotErr::AsOfNotYetAvailable(
3670 SeqNo(0),
3671 Upper(Antichain::from_elem(10))
3672 ))
3673 );
3674
3675 assert!(
3677 state
3678 .collections
3679 .compare_and_append(
3680 &hollow(10, 15, &["key2"], 1),
3681 &writer_id,
3682 now(),
3683 LEASE_DURATION_MS,
3684 &IdempotencyToken::new(),
3685 &debug_state(),
3686 0,
3687 100,
3688 None
3689 )
3690 .is_continue()
3691 );
3692
3693 assert_eq!(
3696 state.snapshot(&Antichain::from_elem(9)),
3697 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3698 );
3699
3700 assert_eq!(
3702 state.snapshot(&Antichain::from_elem(10)),
3703 Ok(vec![
3704 hollow(0, 5, &["key1"], 1),
3705 hollow(5, 10, &[], 0),
3706 hollow(10, 15, &["key2"], 1)
3707 ])
3708 );
3709
3710 assert_eq!(
3711 state.snapshot(&Antichain::from_elem(11)),
3712 Ok(vec![
3713 hollow(0, 5, &["key1"], 1),
3714 hollow(5, 10, &[], 0),
3715 hollow(10, 15, &["key2"], 1)
3716 ])
3717 );
3718 }
3719
3720 #[mz_ore::test]
3721 fn next_listen_batch() {
3722 let mut state = TypedState::<String, String, u64, i64>::new(
3723 DUMMY_BUILD_INFO.semver_version(),
3724 ShardId::new(),
3725 "".to_owned(),
3726 0,
3727 );
3728
3729 assert_eq!(
3732 state.next_listen_batch(&Antichain::from_elem(0)),
3733 Err(SeqNo(0))
3734 );
3735 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3736
3737 let writer_id = WriterId::new();
3738 let now = SYSTEM_TIME.clone();
3739
3740 assert!(
3742 state
3743 .collections
3744 .compare_and_append(
3745 &hollow(0, 5, &["key1"], 1),
3746 &writer_id,
3747 now(),
3748 LEASE_DURATION_MS,
3749 &IdempotencyToken::new(),
3750 &debug_state(),
3751 0,
3752 100,
3753 None
3754 )
3755 .is_continue()
3756 );
3757 assert!(
3758 state
3759 .collections
3760 .compare_and_append(
3761 &hollow(5, 10, &["key2"], 1),
3762 &writer_id,
3763 now(),
3764 LEASE_DURATION_MS,
3765 &IdempotencyToken::new(),
3766 &debug_state(),
3767 0,
3768 100,
3769 None
3770 )
3771 .is_continue()
3772 );
3773
3774 for t in 0..=4 {
3776 assert_eq!(
3777 state.next_listen_batch(&Antichain::from_elem(t)),
3778 Ok(hollow(0, 5, &["key1"], 1))
3779 );
3780 }
3781
3782 for t in 5..=9 {
3784 assert_eq!(
3785 state.next_listen_batch(&Antichain::from_elem(t)),
3786 Ok(hollow(5, 10, &["key2"], 1))
3787 );
3788 }
3789
3790 assert_eq!(
3792 state.next_listen_batch(&Antichain::from_elem(10)),
3793 Err(SeqNo(0))
3794 );
3795
3796 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3799 }
3800
3801 #[mz_ore::test]
3802 fn expire_writer() {
3803 let mut state = TypedState::<String, String, u64, i64>::new(
3804 DUMMY_BUILD_INFO.semver_version(),
3805 ShardId::new(),
3806 "".to_owned(),
3807 0,
3808 );
3809 let now = SYSTEM_TIME.clone();
3810
3811 let writer_id_one = WriterId::new();
3812
3813 let writer_id_two = WriterId::new();
3814
3815 assert!(
3817 state
3818 .collections
3819 .compare_and_append(
3820 &hollow(0, 2, &["key1"], 1),
3821 &writer_id_one,
3822 now(),
3823 LEASE_DURATION_MS,
3824 &IdempotencyToken::new(),
3825 &debug_state(),
3826 0,
3827 100,
3828 None
3829 )
3830 .is_continue()
3831 );
3832
3833 assert!(
3834 state
3835 .collections
3836 .expire_writer(&writer_id_one)
3837 .is_continue()
3838 );
3839
3840 assert!(
3842 state
3843 .collections
3844 .compare_and_append(
3845 &hollow(2, 5, &["key2"], 1),
3846 &writer_id_two,
3847 now(),
3848 LEASE_DURATION_MS,
3849 &IdempotencyToken::new(),
3850 &debug_state(),
3851 0,
3852 100,
3853 None
3854 )
3855 .is_continue()
3856 );
3857 }
3858
3859 #[mz_ore::test]
3860 fn maybe_gc_active_gc() {
3861 const GC_CONFIG: GcConfig = GcConfig {
3862 use_active_gc: true,
3863 fallback_threshold_ms: 5000,
3864 min_versions: 99,
3865 max_versions: 500,
3866 };
3867 let now_fn = SYSTEM_TIME.clone();
3868
3869 let mut state = TypedState::<String, String, u64, i64>::new(
3870 DUMMY_BUILD_INFO.semver_version(),
3871 ShardId::new(),
3872 "".to_owned(),
3873 0,
3874 );
3875
3876 let now = now_fn();
3877 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3879 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3880
3881 state.seqno = SeqNo(100);
3884 assert_eq!(state.seqno_since(), SeqNo(100));
3885
3886 let writer_id = WriterId::new();
3888 let _ = state.collections.compare_and_append(
3889 &hollow(1, 2, &["key1"], 1),
3890 &writer_id,
3891 now,
3892 LEASE_DURATION_MS,
3893 &IdempotencyToken::new(),
3894 &debug_state(),
3895 0,
3896 100,
3897 None,
3898 );
3899 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3900
3901 assert_eq!(
3903 state.maybe_gc(true, now, GC_CONFIG),
3904 Some(GcReq {
3905 shard_id: state.shard_id,
3906 new_seqno_since: SeqNo(100)
3907 })
3908 );
3909
3910 state.collections.active_gc = Some(ActiveGc {
3912 seqno: state.seqno,
3913 start_ms: now,
3914 });
3915
3916 state.seqno = SeqNo(200);
3917 assert_eq!(state.seqno_since(), SeqNo(200));
3918
3919 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3920
3921 state.seqno = SeqNo(300);
3922 assert_eq!(state.seqno_since(), SeqNo(300));
3923 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3925 assert_eq!(
3926 state.maybe_gc(true, new_now, GC_CONFIG),
3927 Some(GcReq {
3928 shard_id: state.shard_id,
3929 new_seqno_since: SeqNo(300)
3930 })
3931 );
3932
3933 state.seqno = SeqNo(301);
3937 assert_eq!(state.seqno_since(), SeqNo(301));
3938 assert_eq!(
3939 state.maybe_gc(true, new_now, GC_CONFIG),
3940 Some(GcReq {
3941 shard_id: state.shard_id,
3942 new_seqno_since: SeqNo(301)
3943 })
3944 );
3945
3946 state.collections.active_gc = None;
3947
3948 state.seqno = SeqNo(400);
3951 assert_eq!(state.seqno_since(), SeqNo(400));
3952
3953 let now = now_fn();
3954
3955 let _ = state.collections.expire_writer(&writer_id);
3957 assert_eq!(
3958 state.maybe_gc(false, now, GC_CONFIG),
3959 Some(GcReq {
3960 shard_id: state.shard_id,
3961 new_seqno_since: SeqNo(400)
3962 })
3963 );
3964
3965 let previous_seqno = state.seqno;
3967 state.seqno = SeqNo(10_000);
3968 assert_eq!(state.seqno_since(), SeqNo(10_000));
3969
3970 let now = now_fn();
3971 assert_eq!(
3972 state.maybe_gc(true, now, GC_CONFIG),
3973 Some(GcReq {
3974 shard_id: state.shard_id,
3975 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3976 })
3977 );
3978 }
3979
3980 #[mz_ore::test]
3981 fn maybe_gc_classic() {
3982 const GC_CONFIG: GcConfig = GcConfig {
3983 use_active_gc: false,
3984 fallback_threshold_ms: 5000,
3985 min_versions: 16,
3986 max_versions: 128,
3987 };
3988 const NOW_MS: u64 = 0;
3989
3990 let mut state = TypedState::<String, String, u64, i64>::new(
3991 DUMMY_BUILD_INFO.semver_version(),
3992 ShardId::new(),
3993 "".to_owned(),
3994 0,
3995 );
3996
3997 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3999 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4000
4001 state.seqno = SeqNo(100);
4004 assert_eq!(state.seqno_since(), SeqNo(100));
4005
4006 let writer_id = WriterId::new();
4008 let now = SYSTEM_TIME.clone();
4009 let _ = state.collections.compare_and_append(
4010 &hollow(1, 2, &["key1"], 1),
4011 &writer_id,
4012 now(),
4013 LEASE_DURATION_MS,
4014 &IdempotencyToken::new(),
4015 &debug_state(),
4016 0,
4017 100,
4018 None,
4019 );
4020 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4021
4022 assert_eq!(
4024 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4025 Some(GcReq {
4026 shard_id: state.shard_id,
4027 new_seqno_since: SeqNo(100)
4028 })
4029 );
4030
4031 state.seqno = SeqNo(200);
4034 assert_eq!(state.seqno_since(), SeqNo(200));
4035
4036 let _ = state.collections.expire_writer(&writer_id);
4038 assert_eq!(
4039 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4040 Some(GcReq {
4041 shard_id: state.shard_id,
4042 new_seqno_since: SeqNo(200)
4043 })
4044 );
4045 }
4046
4047 #[mz_ore::test]
4048 fn need_rollup_active_rollup() {
4049 const ROLLUP_THRESHOLD: usize = 3;
4050 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4051 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4052 let now = SYSTEM_TIME.clone();
4053
4054 mz_ore::test::init_logging();
4055 let mut state = TypedState::<String, String, u64, i64>::new(
4056 DUMMY_BUILD_INFO.semver_version(),
4057 ShardId::new(),
4058 "".to_owned(),
4059 0,
4060 );
4061
4062 let rollup_seqno = SeqNo(5);
4063 let rollup = HollowRollup {
4064 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4065 encoded_size_bytes: None,
4066 };
4067
4068 assert!(
4069 state
4070 .collections
4071 .add_rollup((rollup_seqno, &rollup))
4072 .is_continue()
4073 );
4074
4075 state.seqno = SeqNo(5);
4077 assert_none!(state.need_rollup(
4078 ROLLUP_THRESHOLD,
4079 ROLLUP_USE_ACTIVE_ROLLUP,
4080 ROLLUP_FALLBACK_THRESHOLD_MS,
4081 now()
4082 ));
4083
4084 state.seqno = SeqNo(6);
4086 assert_none!(state.need_rollup(
4087 ROLLUP_THRESHOLD,
4088 ROLLUP_USE_ACTIVE_ROLLUP,
4089 ROLLUP_FALLBACK_THRESHOLD_MS,
4090 now()
4091 ));
4092 state.seqno = SeqNo(7);
4093 assert_none!(state.need_rollup(
4094 ROLLUP_THRESHOLD,
4095 ROLLUP_USE_ACTIVE_ROLLUP,
4096 ROLLUP_FALLBACK_THRESHOLD_MS,
4097 now()
4098 ));
4099 state.seqno = SeqNo(8);
4100 assert_none!(state.need_rollup(
4101 ROLLUP_THRESHOLD,
4102 ROLLUP_USE_ACTIVE_ROLLUP,
4103 ROLLUP_FALLBACK_THRESHOLD_MS,
4104 now()
4105 ));
4106
4107 let mut current_time = now();
4108 state.seqno = SeqNo(9);
4110 assert_eq!(
4111 state
4112 .need_rollup(
4113 ROLLUP_THRESHOLD,
4114 ROLLUP_USE_ACTIVE_ROLLUP,
4115 ROLLUP_FALLBACK_THRESHOLD_MS,
4116 current_time
4117 )
4118 .expect("rollup"),
4119 SeqNo(9)
4120 );
4121
4122 state.collections.active_rollup = Some(ActiveRollup {
4123 seqno: SeqNo(9),
4124 start_ms: current_time,
4125 });
4126
4127 assert_none!(state.need_rollup(
4129 ROLLUP_THRESHOLD,
4130 ROLLUP_USE_ACTIVE_ROLLUP,
4131 ROLLUP_FALLBACK_THRESHOLD_MS,
4132 current_time
4133 ));
4134
4135 state.seqno = SeqNo(10);
4136 assert_none!(state.need_rollup(
4139 ROLLUP_THRESHOLD,
4140 ROLLUP_USE_ACTIVE_ROLLUP,
4141 ROLLUP_FALLBACK_THRESHOLD_MS,
4142 current_time
4143 ));
4144
4145 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4147 assert_eq!(
4148 state
4149 .need_rollup(
4150 ROLLUP_THRESHOLD,
4151 ROLLUP_USE_ACTIVE_ROLLUP,
4152 ROLLUP_FALLBACK_THRESHOLD_MS,
4153 current_time
4154 )
4155 .expect("rollup"),
4156 SeqNo(10)
4157 );
4158
4159 state.seqno = SeqNo(9);
4160 state.collections.active_rollup = None;
4162 let rollup_seqno = SeqNo(9);
4163 let rollup = HollowRollup {
4164 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4165 encoded_size_bytes: None,
4166 };
4167 assert!(
4168 state
4169 .collections
4170 .add_rollup((rollup_seqno, &rollup))
4171 .is_continue()
4172 );
4173
4174 state.seqno = SeqNo(11);
4175 assert_none!(state.need_rollup(
4177 ROLLUP_THRESHOLD,
4178 ROLLUP_USE_ACTIVE_ROLLUP,
4179 ROLLUP_FALLBACK_THRESHOLD_MS,
4180 current_time
4181 ));
4182 state.seqno = SeqNo(13);
4184 assert_eq!(
4185 state
4186 .need_rollup(
4187 ROLLUP_THRESHOLD,
4188 ROLLUP_USE_ACTIVE_ROLLUP,
4189 ROLLUP_FALLBACK_THRESHOLD_MS,
4190 current_time
4191 )
4192 .expect("rollup"),
4193 SeqNo(13)
4194 );
4195 }
4196
4197 #[mz_ore::test]
4198 fn need_rollup_classic() {
4199 const ROLLUP_THRESHOLD: usize = 3;
4200 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4201 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4202 const NOW: u64 = 0;
4203
4204 mz_ore::test::init_logging();
4205 let mut state = TypedState::<String, String, u64, i64>::new(
4206 DUMMY_BUILD_INFO.semver_version(),
4207 ShardId::new(),
4208 "".to_owned(),
4209 0,
4210 );
4211
4212 let rollup_seqno = SeqNo(5);
4213 let rollup = HollowRollup {
4214 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4215 encoded_size_bytes: None,
4216 };
4217
4218 assert!(
4219 state
4220 .collections
4221 .add_rollup((rollup_seqno, &rollup))
4222 .is_continue()
4223 );
4224
4225 state.seqno = SeqNo(5);
4227 assert_none!(state.need_rollup(
4228 ROLLUP_THRESHOLD,
4229 ROLLUP_USE_ACTIVE_ROLLUP,
4230 ROLLUP_FALLBACK_THRESHOLD_MS,
4231 NOW
4232 ));
4233
4234 state.seqno = SeqNo(6);
4236 assert_none!(state.need_rollup(
4237 ROLLUP_THRESHOLD,
4238 ROLLUP_USE_ACTIVE_ROLLUP,
4239 ROLLUP_FALLBACK_THRESHOLD_MS,
4240 NOW
4241 ));
4242 state.seqno = SeqNo(7);
4243 assert_none!(state.need_rollup(
4244 ROLLUP_THRESHOLD,
4245 ROLLUP_USE_ACTIVE_ROLLUP,
4246 ROLLUP_FALLBACK_THRESHOLD_MS,
4247 NOW
4248 ));
4249
4250 state.seqno = SeqNo(8);
4252 assert_eq!(
4253 state
4254 .need_rollup(
4255 ROLLUP_THRESHOLD,
4256 ROLLUP_USE_ACTIVE_ROLLUP,
4257 ROLLUP_FALLBACK_THRESHOLD_MS,
4258 NOW
4259 )
4260 .expect("rollup"),
4261 SeqNo(8)
4262 );
4263
4264 state.seqno = SeqNo(9);
4266 assert_none!(state.need_rollup(
4267 ROLLUP_THRESHOLD,
4268 ROLLUP_USE_ACTIVE_ROLLUP,
4269 ROLLUP_FALLBACK_THRESHOLD_MS,
4270 NOW
4271 ));
4272
4273 state.seqno = SeqNo(11);
4275 assert_eq!(
4276 state
4277 .need_rollup(
4278 ROLLUP_THRESHOLD,
4279 ROLLUP_USE_ACTIVE_ROLLUP,
4280 ROLLUP_FALLBACK_THRESHOLD_MS,
4281 NOW
4282 )
4283 .expect("rollup"),
4284 SeqNo(11)
4285 );
4286
4287 let rollup_seqno = SeqNo(6);
4289 let rollup = HollowRollup {
4290 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4291 encoded_size_bytes: None,
4292 };
4293 assert!(
4294 state
4295 .collections
4296 .add_rollup((rollup_seqno, &rollup))
4297 .is_continue()
4298 );
4299
4300 state.seqno = SeqNo(8);
4301 assert_none!(state.need_rollup(
4302 ROLLUP_THRESHOLD,
4303 ROLLUP_USE_ACTIVE_ROLLUP,
4304 ROLLUP_FALLBACK_THRESHOLD_MS,
4305 NOW
4306 ));
4307 state.seqno = SeqNo(9);
4308 assert_eq!(
4309 state
4310 .need_rollup(
4311 ROLLUP_THRESHOLD,
4312 ROLLUP_USE_ACTIVE_ROLLUP,
4313 ROLLUP_FALLBACK_THRESHOLD_MS,
4314 NOW
4315 )
4316 .expect("rollup"),
4317 SeqNo(9)
4318 );
4319
4320 let fallback_seqno = SeqNo(
4322 rollup_seqno.0
4323 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4324 );
4325 state.seqno = fallback_seqno;
4326 assert_eq!(
4327 state
4328 .need_rollup(
4329 ROLLUP_THRESHOLD,
4330 ROLLUP_USE_ACTIVE_ROLLUP,
4331 ROLLUP_FALLBACK_THRESHOLD_MS,
4332 NOW
4333 )
4334 .expect("rollup"),
4335 fallback_seqno
4336 );
4337 state.seqno = fallback_seqno.next();
4338 assert_eq!(
4339 state
4340 .need_rollup(
4341 ROLLUP_THRESHOLD,
4342 ROLLUP_USE_ACTIVE_ROLLUP,
4343 ROLLUP_FALLBACK_THRESHOLD_MS,
4344 NOW
4345 )
4346 .expect("rollup"),
4347 fallback_seqno.next()
4348 );
4349 }
4350
4351 #[mz_ore::test]
4352 fn idempotency_token_sentinel() {
4353 assert_eq!(
4354 IdempotencyToken::SENTINEL.to_string(),
4355 "i11111111-1111-1111-1111-111111111111"
4356 );
4357 }
4358
4359 #[mz_ore::test]
4368 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4370 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4371 let mut runner = proptest::test_runner::TestRunner::deterministic();
4372 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4373 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4374 assert_eq!(
4375 json.trim(),
4376 STATE_SERDE_JSON.trim(),
4377 "\n\nNEW GOLDEN\n{}\n",
4378 json
4379 );
4380 }
4381
4382 #[mz_persist_proc::test(tokio::test)]
4383 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4385 let mut clients = new_test_client_cache(&dyncfgs);
4386 let shard_id = ShardId::new();
4387
4388 async fn open_and_write(
4389 clients: &mut PersistClientCache,
4390 version: semver::Version,
4391 shard_id: ShardId,
4392 ) -> Result<(), tokio::task::JoinError> {
4393 clients.cfg.build_version = version.clone();
4394 clients.clear_state_cache();
4395 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4396 mz_ore::task::spawn(|| version.to_string(), async move {
4398 let () = client
4399 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4400 .await
4401 .expect("valid usage");
4402 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4403 let current = *write.upper().as_option().unwrap();
4404 write
4406 .expect_compare_and_append_batch(&mut [], current, current + 1)
4407 .await;
4408 })
4409 .await
4410 }
4411
4412 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4414 assert_ok!(res);
4415
4416 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4418 assert_ok!(res);
4419
4420 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4422 assert!(res.unwrap_err().is_panic());
4423
4424 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4426 assert!(res.unwrap_err().is_panic());
4427 }
4428
4429 #[mz_ore::test]
4430 fn runid_roundtrip() {
4431 proptest!(|(runid: RunId)| {
4432 let runid_str = runid.to_string();
4433 let parsed = RunId::from_str(&runid_str);
4434 prop_assert_eq!(parsed, Ok(runid));
4435 });
4436 }
4437}