1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::{CriticalReaderId, Opaque};
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75 env!("OUT_DIR"),
76 "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80 env!("OUT_DIR"),
81 "/mz_persist_client.internal.diff.rs"
82));
83
84pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92 "persist_rollup_threshold",
93 128,
94 "The number of seqnos between rollups.",
95);
96
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100 "persist_rollup_fallback_threshold_ms",
101 5000,
102 "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108 "persist_rollup_use_active_rollup",
109 true,
110 "Whether to use the new active rollup tracking mechanism.",
111);
112
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116 "persist_gc_fallback_threshold_ms",
117 900000,
118 "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123 "persist_gc_min_versions",
124 32,
125 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130 "persist_gc_max_versions",
131 128_000,
132 "The maximum number of versions to GC in a single GC run.",
133);
134
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138 "persist_gc_use_active_gc",
139 false,
140 "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144 "persist_enable_incremental_compaction",
145 false,
146 "Whether to enable incremental compaction.",
147);
148
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 write!(f, "i{}", Uuid::from_bytes(self.0))
158 }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164 }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168 type Err = String;
169
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172 }
173}
174
175impl From<IdempotencyToken> for String {
176 fn from(x: IdempotencyToken) -> Self {
177 x.to_string()
178 }
179}
180
181impl IdempotencyToken {
182 pub(crate) fn new() -> Self {
183 IdempotencyToken(*Uuid::new_v4().as_bytes())
184 }
185 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190 pub seqno: SeqNo,
192 pub since: Antichain<T>,
194 pub last_heartbeat_timestamp_ms: u64,
196 pub lease_duration_ms: u64,
199 pub debug: HandleDebugState,
201}
202
203#[derive(Clone, Debug, PartialEq, Serialize)]
204pub struct CriticalReaderState<T> {
205 pub since: Antichain<T>,
207 pub opaque: Opaque,
209 pub debug: HandleDebugState,
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct WriterState<T> {
215 pub last_heartbeat_timestamp_ms: u64,
217 pub lease_duration_ms: u64,
220 pub most_recent_write_token: IdempotencyToken,
223 pub most_recent_write_upper: Antichain<T>,
226 pub debug: HandleDebugState,
228}
229
230#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
232pub struct HandleDebugState {
233 pub hostname: String,
236 pub purpose: String,
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
244#[serde(tag = "type")]
245pub enum BatchPart<T> {
246 Hollow(HollowBatchPart<T>),
247 Inline {
248 updates: LazyInlineBatchPart,
249 ts_rewrite: Option<Antichain<T>>,
250 schema_id: Option<SchemaId>,
251
252 deprecated_schema_id: Option<SchemaId>,
254 },
255}
256
257fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
258 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
259 let proto = lower.decode()?;
260 let data = ArrayData::from_proto(proto)?;
261 ensure!(data.len() == 1);
262 Ok(ArrayBound::new(make_array(data), 0))
263 };
264
265 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
266
267 match decoded {
268 Ok(bound) => Some(bound),
269 Err(e) => {
270 soft_panic_or_log!("failed to decode bound: {e:#?}");
271 None
272 }
273 }
274}
275
276impl<T> BatchPart<T> {
277 pub fn hollow_bytes(&self) -> usize {
278 match self {
279 BatchPart::Hollow(x) => x.encoded_size_bytes,
280 BatchPart::Inline { .. } => 0,
281 }
282 }
283
284 pub fn is_inline(&self) -> bool {
285 matches!(self, BatchPart::Inline { .. })
286 }
287
288 pub fn inline_bytes(&self) -> usize {
289 match self {
290 BatchPart::Hollow(_) => 0,
291 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
292 }
293 }
294
295 pub fn writer_key(&self) -> Option<WriterKey> {
296 match self {
297 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
298 BatchPart::Inline { .. } => None,
299 }
300 }
301
302 pub fn encoded_size_bytes(&self) -> usize {
303 match self {
304 BatchPart::Hollow(x) => x.encoded_size_bytes,
305 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
306 }
307 }
308
309 pub fn printable_name(&self) -> &str {
312 match self {
313 BatchPart::Hollow(x) => x.key.0.as_str(),
314 BatchPart::Inline { .. } => "<inline>",
315 }
316 }
317
318 pub fn stats(&self) -> Option<&LazyPartStats> {
319 match self {
320 BatchPart::Hollow(x) => x.stats.as_ref(),
321 BatchPart::Inline { .. } => None,
322 }
323 }
324
325 pub fn key_lower(&self) -> &[u8] {
326 match self {
327 BatchPart::Hollow(x) => x.key_lower.as_slice(),
328 BatchPart::Inline { .. } => &[],
335 }
336 }
337
338 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
339 let part = match self {
340 BatchPart::Hollow(part) => part,
341 BatchPart::Inline { .. } => return None,
342 };
343
344 decode_structured_lower(part.structured_key_lower.as_ref()?)
345 }
346
347 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
348 match self {
349 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
350 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
351 }
352 }
353
354 pub fn schema_id(&self) -> Option<SchemaId> {
355 match self {
356 BatchPart::Hollow(x) => x.schema_id,
357 BatchPart::Inline { schema_id, .. } => *schema_id,
358 }
359 }
360
361 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
362 match self {
363 BatchPart::Hollow(x) => x.deprecated_schema_id,
364 BatchPart::Inline {
365 deprecated_schema_id,
366 ..
367 } => *deprecated_schema_id,
368 }
369 }
370}
371
372impl<T: Timestamp + Codec64> BatchPart<T> {
373 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
374 match self {
375 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
376 BatchPart::Inline { updates, .. } => {
377 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
378 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
379 }
380 }
381 }
382
383 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
384 match self {
385 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
386 BatchPart::Inline { updates, .. } => Some(
387 updates
388 .decode::<T>(metrics)
389 .expect("valid inline part")
390 .updates
391 .diffs_sum(),
392 ),
393 }
394 }
395}
396
397#[derive(Debug, Clone)]
399pub struct HollowRun<T> {
400 pub(crate) parts: Vec<RunPart<T>>,
402}
403
404#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
407pub struct HollowRunRef<T> {
408 pub key: PartialBatchKey,
409
410 pub hollow_bytes: usize,
412
413 pub max_part_bytes: usize,
415
416 pub key_lower: Vec<u8>,
418
419 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
421
422 pub diffs_sum: Option<[u8; 8]>,
423
424 pub(crate) _phantom_data: PhantomData<T>,
425}
426impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
427 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
428 Some(self.cmp(other))
429 }
430}
431
432impl<T: Eq> Ord for HollowRunRef<T> {
433 fn cmp(&self, other: &Self) -> Ordering {
434 self.key.cmp(&other.key)
435 }
436}
437
438impl<T> HollowRunRef<T> {
439 pub fn writer_key(&self) -> Option<WriterKey> {
440 Some(self.key.split()?.0)
441 }
442}
443
444impl<T: Timestamp + Codec64> HollowRunRef<T> {
445 pub async fn set<D: Codec64 + Monoid>(
447 shard_id: ShardId,
448 blob: &dyn Blob,
449 writer: &WriterKey,
450 data: HollowRun<T>,
451 metrics: &Metrics,
452 ) -> Self {
453 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
454 let max_part_bytes = data
455 .parts
456 .iter()
457 .map(|p| p.max_part_bytes())
458 .max()
459 .unwrap_or(0);
460 let key_lower = data
461 .parts
462 .first()
463 .map_or(vec![], |p| p.key_lower().to_vec());
464 let structured_key_lower = match data.parts.first() {
465 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
466 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
467 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
468 };
469 let diffs_sum = data
470 .parts
471 .iter()
472 .map(|p| {
473 p.diffs_sum::<D>(&metrics.columnar)
474 .expect("valid diffs sum")
475 })
476 .reduce(|mut a, b| {
477 a.plus_equals(&b);
478 a
479 })
480 .expect("valid diffs sum")
481 .encode();
482
483 let key = PartialBatchKey::new(writer, &PartId::new());
484 let blob_key = key.complete(&shard_id);
485 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
486 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
487 blob.set(&blob_key, bytes.clone())
488 })
489 .await;
490 Self {
491 key,
492 hollow_bytes,
493 max_part_bytes,
494 key_lower,
495 structured_key_lower,
496 diffs_sum: Some(diffs_sum),
497 _phantom_data: Default::default(),
498 }
499 }
500
501 pub async fn get(
505 &self,
506 shard_id: ShardId,
507 blob: &dyn Blob,
508 metrics: &Metrics,
509 ) -> Option<HollowRun<T>> {
510 let blob_key = self.key.complete(&shard_id);
511 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
512 blob.get(&blob_key)
513 })
514 .await?;
515 let proto_runs: ProtoHollowRun =
516 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
517 let runs = proto_runs
518 .into_rust()
519 .expect("illegal state: invalid encoded runs proto");
520 Some(runs)
521 }
522}
523
524#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
528#[serde(untagged)]
529pub enum RunPart<T> {
530 Single(BatchPart<T>),
531 Many(HollowRunRef<T>),
532}
533
534impl<T: Ord> PartialOrd<Self> for RunPart<T> {
535 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
536 Some(self.cmp(other))
537 }
538}
539
540impl<T: Ord> Ord for RunPart<T> {
541 fn cmp(&self, other: &Self) -> Ordering {
542 match (self, other) {
543 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
544 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
545 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
546 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
547 }
548 }
549}
550
551impl<T> RunPart<T> {
552 #[cfg(test)]
553 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
554 match self {
555 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
556 _ => panic!("expected hollow part!"),
557 }
558 }
559
560 pub fn hollow_bytes(&self) -> usize {
561 match self {
562 Self::Single(p) => p.hollow_bytes(),
563 Self::Many(r) => r.hollow_bytes,
564 }
565 }
566
567 pub fn is_inline(&self) -> bool {
568 match self {
569 Self::Single(p) => p.is_inline(),
570 Self::Many(_) => false,
571 }
572 }
573
574 pub fn inline_bytes(&self) -> usize {
575 match self {
576 Self::Single(p) => p.inline_bytes(),
577 Self::Many(_) => 0,
578 }
579 }
580
581 pub fn max_part_bytes(&self) -> usize {
582 match self {
583 Self::Single(p) => p.encoded_size_bytes(),
584 Self::Many(r) => r.max_part_bytes,
585 }
586 }
587
588 pub fn writer_key(&self) -> Option<WriterKey> {
589 match self {
590 Self::Single(p) => p.writer_key(),
591 Self::Many(r) => r.writer_key(),
592 }
593 }
594
595 pub fn encoded_size_bytes(&self) -> usize {
596 match self {
597 Self::Single(p) => p.encoded_size_bytes(),
598 Self::Many(r) => r.hollow_bytes,
599 }
600 }
601
602 pub fn schema_id(&self) -> Option<SchemaId> {
603 match self {
604 Self::Single(p) => p.schema_id(),
605 Self::Many(_) => None,
606 }
607 }
608
609 pub fn printable_name(&self) -> &str {
612 match self {
613 Self::Single(p) => p.printable_name(),
614 Self::Many(r) => r.key.0.as_str(),
615 }
616 }
617
618 pub fn stats(&self) -> Option<&LazyPartStats> {
619 match self {
620 Self::Single(p) => p.stats(),
621 Self::Many(_) => None,
623 }
624 }
625
626 pub fn key_lower(&self) -> &[u8] {
627 match self {
628 Self::Single(p) => p.key_lower(),
629 Self::Many(r) => r.key_lower.as_slice(),
630 }
631 }
632
633 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
634 match self {
635 Self::Single(p) => p.structured_key_lower(),
636 Self::Many(_) => None,
637 }
638 }
639
640 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
641 match self {
642 Self::Single(p) => p.ts_rewrite(),
643 Self::Many(_) => None,
644 }
645 }
646}
647
648impl<T> RunPart<T>
649where
650 T: Timestamp + Codec64,
651{
652 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
653 match self {
654 Self::Single(p) => p.diffs_sum(metrics),
655 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
656 }
657 }
658}
659
660#[derive(Clone, Debug)]
662pub struct MissingBlob(BlobKey);
663
664impl std::fmt::Display for MissingBlob {
665 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666 write!(f, "unexpectedly missing key: {}", self.0)
667 }
668}
669
670impl std::error::Error for MissingBlob {}
671
672impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
673 pub fn part_stream<'a>(
674 &'a self,
675 shard_id: ShardId,
676 blob: &'a dyn Blob,
677 metrics: &'a Metrics,
678 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
679 try_stream! {
680 match self {
681 RunPart::Single(p) => {
682 yield Cow::Borrowed(p);
683 }
684 RunPart::Many(r) => {
685 let fetched = r.get(shard_id, blob, metrics).await
686 .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
687 for run_part in fetched.parts {
688 for await batch_part in
689 run_part.part_stream(shard_id, blob, metrics).boxed()
690 {
691 yield Cow::Owned(batch_part?.into_owned());
692 }
693 }
694 }
695 }
696 }
697 }
698}
699
700impl<T: Ord> PartialOrd for BatchPart<T> {
701 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
702 Some(self.cmp(other))
703 }
704}
705
706impl<T: Ord> Ord for BatchPart<T> {
707 fn cmp(&self, other: &Self) -> Ordering {
708 match (self, other) {
709 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
710 (
711 BatchPart::Inline {
712 updates: s_updates,
713 ts_rewrite: s_ts_rewrite,
714 schema_id: s_schema_id,
715 deprecated_schema_id: s_deprecated_schema_id,
716 },
717 BatchPart::Inline {
718 updates: o_updates,
719 ts_rewrite: o_ts_rewrite,
720 schema_id: o_schema_id,
721 deprecated_schema_id: o_deprecated_schema_id,
722 },
723 ) => (
724 s_updates,
725 s_ts_rewrite.as_ref().map(|x| x.elements()),
726 s_schema_id,
727 s_deprecated_schema_id,
728 )
729 .cmp(&(
730 o_updates,
731 o_ts_rewrite.as_ref().map(|x| x.elements()),
732 o_schema_id,
733 o_deprecated_schema_id,
734 )),
735 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
736 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
737 }
738 }
739}
740
741#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
743pub(crate) enum RunOrder {
744 Unordered,
746 Codec,
748 Structured,
750}
751
752#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
753pub struct RunId(pub(crate) [u8; 16]);
754
755impl std::fmt::Display for RunId {
756 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
757 write!(f, "ri{}", Uuid::from_bytes(self.0))
758 }
759}
760
761impl std::fmt::Debug for RunId {
762 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
763 write!(f, "RunId({})", Uuid::from_bytes(self.0))
764 }
765}
766
767impl std::str::FromStr for RunId {
768 type Err = String;
769
770 fn from_str(s: &str) -> Result<Self, Self::Err> {
771 parse_id("ri", "RunId", s).map(RunId)
772 }
773}
774
775impl From<RunId> for String {
776 fn from(x: RunId) -> Self {
777 x.to_string()
778 }
779}
780
781impl RunId {
782 pub(crate) fn new() -> Self {
783 RunId(*Uuid::new_v4().as_bytes())
784 }
785}
786
787impl Arbitrary for RunId {
788 type Parameters = ();
789 type Strategy = proptest::strategy::BoxedStrategy<Self>;
790 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
791 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
792 RunId(*Uuid::from_u128(n).as_bytes())
793 })
794 .boxed()
795 }
796}
797
798#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
800pub struct RunMeta {
801 pub(crate) order: Option<RunOrder>,
803 pub(crate) schema: Option<SchemaId>,
805
806 pub(crate) deprecated_schema: Option<SchemaId>,
808
809 pub(crate) id: Option<RunId>,
811
812 pub(crate) len: Option<usize>,
814
815 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
817 pub(crate) meta: MetadataMap,
818}
819
820#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823 pub key: PartialBatchKey,
825 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
827 pub meta: MetadataMap,
828 pub encoded_size_bytes: usize,
830 #[serde(serialize_with = "serialize_part_bytes")]
833 pub key_lower: Vec<u8>,
834 #[serde(serialize_with = "serialize_lazy_proto")]
836 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837 #[serde(serialize_with = "serialize_part_stats")]
839 pub stats: Option<LazyPartStats>,
840 pub ts_rewrite: Option<Antichain<T>>,
848 #[serde(serialize_with = "serialize_diffs_sum")]
856 pub diffs_sum: Option<[u8; 8]>,
857 pub format: Option<BatchColumnarFormat>,
862 pub schema_id: Option<SchemaId>,
867
868 pub deprecated_schema_id: Option<SchemaId>,
870}
871
872#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877 pub desc: Description<T>,
879 pub len: usize,
881 pub(crate) parts: Vec<RunPart<T>>,
883 pub(crate) run_splits: Vec<usize>,
891 pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898 let HollowBatch {
899 desc,
900 parts,
901 len,
902 run_splits: runs,
903 run_meta,
904 } = self;
905 f.debug_struct("HollowBatch")
906 .field(
907 "desc",
908 &(
909 desc.lower().elements(),
910 desc.upper().elements(),
911 desc.since().elements(),
912 ),
913 )
914 .field("parts", &parts)
915 .field("len", &len)
916 .field("runs", &runs)
917 .field("run_meta", &run_meta)
918 .finish()
919 }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924 let HollowBatch {
925 desc,
926 len,
927 parts: _,
929 run_splits: _,
930 run_meta: _,
931 } = self;
932 let mut s = s.serialize_struct("HollowBatch", 5)?;
933 let () = s.serialize_field("lower", &desc.lower().elements())?;
934 let () = s.serialize_field("upper", &desc.upper().elements())?;
935 let () = s.serialize_field("since", &desc.since().elements())?;
936 let () = s.serialize_field("len", len)?;
937 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938 s.end()
939 }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944 Some(self.cmp(other))
945 }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949 fn cmp(&self, other: &Self) -> Ordering {
950 let HollowBatch {
953 desc: self_desc,
954 parts: self_parts,
955 len: self_len,
956 run_splits: self_runs,
957 run_meta: self_run_meta,
958 } = self;
959 let HollowBatch {
960 desc: other_desc,
961 parts: other_parts,
962 len: other_len,
963 run_splits: other_runs,
964 run_meta: other_run_meta,
965 } = other;
966 (
967 self_desc.lower().elements(),
968 self_desc.upper().elements(),
969 self_desc.since().elements(),
970 self_parts,
971 self_len,
972 self_runs,
973 self_run_meta,
974 )
975 .cmp(&(
976 other_desc.lower().elements(),
977 other_desc.upper().elements(),
978 other_desc.since().elements(),
979 other_parts,
980 other_len,
981 other_runs,
982 other_run_meta,
983 ))
984 }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988 pub(crate) fn part_stream<'a>(
989 &'a self,
990 shard_id: ShardId,
991 blob: &'a dyn Blob,
992 metrics: &'a Metrics,
993 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994 stream! {
995 for part in &self.parts {
996 for await part in part.part_stream(shard_id, blob, metrics) {
997 yield part;
998 }
999 }
1000 }
1001 }
1002}
1003impl<T> HollowBatch<T> {
1004 pub(crate) fn new(
1011 desc: Description<T>,
1012 parts: Vec<RunPart<T>>,
1013 len: usize,
1014 run_meta: Vec<RunMeta>,
1015 run_splits: Vec<usize>,
1016 ) -> Self {
1017 debug_assert!(
1018 run_splits.is_strictly_sorted(),
1019 "run indices should be strictly increasing"
1020 );
1021 debug_assert!(
1022 run_splits.first().map_or(true, |i| *i > 0),
1023 "run indices should be positive"
1024 );
1025 debug_assert!(
1026 run_splits.last().map_or(true, |i| *i < parts.len()),
1027 "run indices should be valid indices into parts"
1028 );
1029 debug_assert!(
1030 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031 "all metadata should correspond to a run"
1032 );
1033
1034 Self {
1035 desc,
1036 len,
1037 parts,
1038 run_splits,
1039 run_meta,
1040 }
1041 }
1042
1043 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045 let run_meta = if parts.is_empty() {
1046 vec![]
1047 } else {
1048 vec![RunMeta::default()]
1049 };
1050 Self {
1051 desc,
1052 len,
1053 parts,
1054 run_splits: vec![],
1055 run_meta,
1056 }
1057 }
1058
1059 #[cfg(test)]
1060 pub(crate) fn new_run_for_test(
1061 desc: Description<T>,
1062 parts: Vec<RunPart<T>>,
1063 len: usize,
1064 run_id: RunId,
1065 ) -> Self {
1066 let run_meta = if parts.is_empty() {
1067 vec![]
1068 } else {
1069 let mut meta = RunMeta::default();
1070 meta.id = Some(run_id);
1071 vec![meta]
1072 };
1073 Self {
1074 desc,
1075 len,
1076 parts,
1077 run_splits: vec![],
1078 run_meta,
1079 }
1080 }
1081
1082 pub(crate) fn empty(desc: Description<T>) -> Self {
1084 Self {
1085 desc,
1086 len: 0,
1087 parts: vec![],
1088 run_splits: vec![],
1089 run_meta: vec![],
1090 }
1091 }
1092
1093 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094 let run_ends = self
1095 .run_splits
1096 .iter()
1097 .copied()
1098 .chain(std::iter::once(self.parts.len()));
1099 let run_metas = self.run_meta.iter();
1100 let run_parts = run_ends
1101 .scan(0, |start, end| {
1102 let range = *start..end;
1103 *start = end;
1104 Some(range)
1105 })
1106 .filter(|range| !range.is_empty())
1107 .map(|range| &self.parts[range]);
1108 run_metas.zip_eq(run_parts)
1109 }
1110
1111 pub(crate) fn inline_bytes(&self) -> usize {
1112 self.parts.iter().map(|x| x.inline_bytes()).sum()
1113 }
1114
1115 pub(crate) fn is_empty(&self) -> bool {
1116 self.parts.is_empty()
1117 }
1118
1119 pub(crate) fn part_count(&self) -> usize {
1120 self.parts.len()
1121 }
1122
1123 pub fn encoded_size_bytes(&self) -> usize {
1125 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126 }
1127}
1128
1129impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131 pub(crate) fn rewrite_ts(
1132 &mut self,
1133 frontier: &Antichain<T>,
1134 new_upper: Antichain<T>,
1135 ) -> Result<(), String> {
1136 if !PartialOrder::less_than(frontier, &new_upper) {
1137 return Err(format!(
1138 "rewrite frontier {:?} !< rewrite upper {:?}",
1139 frontier.elements(),
1140 new_upper.elements(),
1141 ));
1142 }
1143 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144 return Err(format!(
1145 "rewrite upper {:?} < batch upper {:?}",
1146 new_upper.elements(),
1147 self.desc.upper().elements(),
1148 ));
1149 }
1150
1151 if PartialOrder::less_than(frontier, self.desc.lower()) {
1154 return Err(format!(
1155 "rewrite frontier {:?} < batch lower {:?}",
1156 frontier.elements(),
1157 self.desc.lower().elements(),
1158 ));
1159 }
1160 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161 return Err(format!(
1162 "batch since {:?} != minimum antichain {:?}",
1163 self.desc.since().elements(),
1164 &[T::minimum()],
1165 ));
1166 }
1167 for part in self.parts.iter() {
1168 let Some(ts_rewrite) = part.ts_rewrite() else {
1169 continue;
1170 };
1171 if PartialOrder::less_than(frontier, ts_rewrite) {
1172 return Err(format!(
1173 "rewrite frontier {:?} < batch rewrite {:?}",
1174 frontier.elements(),
1175 ts_rewrite.elements(),
1176 ));
1177 }
1178 }
1179
1180 self.desc = Description::new(
1181 self.desc.lower().clone(),
1182 new_upper,
1183 self.desc.since().clone(),
1184 );
1185 for part in &mut self.parts {
1186 match part {
1187 RunPart::Single(BatchPart::Hollow(part)) => {
1188 part.ts_rewrite = Some(frontier.clone())
1189 }
1190 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191 *ts_rewrite = Some(frontier.clone())
1192 }
1193 RunPart::Many(runs) => {
1194 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197 }
1198 }
1199 }
1200 Ok(())
1201 }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206 Some(self.cmp(other))
1207 }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211 fn cmp(&self, other: &Self) -> Ordering {
1212 let HollowBatchPart {
1215 key: self_key,
1216 meta: self_meta,
1217 encoded_size_bytes: self_encoded_size_bytes,
1218 key_lower: self_key_lower,
1219 structured_key_lower: self_structured_key_lower,
1220 stats: self_stats,
1221 ts_rewrite: self_ts_rewrite,
1222 diffs_sum: self_diffs_sum,
1223 format: self_format,
1224 schema_id: self_schema_id,
1225 deprecated_schema_id: self_deprecated_schema_id,
1226 } = self;
1227 let HollowBatchPart {
1228 key: other_key,
1229 meta: other_meta,
1230 encoded_size_bytes: other_encoded_size_bytes,
1231 key_lower: other_key_lower,
1232 structured_key_lower: other_structured_key_lower,
1233 stats: other_stats,
1234 ts_rewrite: other_ts_rewrite,
1235 diffs_sum: other_diffs_sum,
1236 format: other_format,
1237 schema_id: other_schema_id,
1238 deprecated_schema_id: other_deprecated_schema_id,
1239 } = other;
1240 (
1241 self_key,
1242 self_meta,
1243 self_encoded_size_bytes,
1244 self_key_lower,
1245 self_structured_key_lower,
1246 self_stats,
1247 self_ts_rewrite.as_ref().map(|x| x.elements()),
1248 self_diffs_sum,
1249 self_format,
1250 self_schema_id,
1251 self_deprecated_schema_id,
1252 )
1253 .cmp(&(
1254 other_key,
1255 other_meta,
1256 other_encoded_size_bytes,
1257 other_key_lower,
1258 other_structured_key_lower,
1259 other_stats,
1260 other_ts_rewrite.as_ref().map(|x| x.elements()),
1261 other_diffs_sum,
1262 other_format,
1263 other_schema_id,
1264 other_deprecated_schema_id,
1265 ))
1266 }
1267}
1268
1269#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1271pub struct HollowRollup {
1272 pub key: PartialRollupKey,
1274 pub encoded_size_bytes: Option<usize>,
1276}
1277
1278#[derive(Debug)]
1280pub enum HollowBlobRef<'a, T> {
1281 Batch(&'a HollowBatch<T>),
1282 Rollup(&'a HollowRollup),
1283}
1284
1285#[derive(
1287 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1288)]
1289pub struct ActiveRollup {
1290 pub seqno: SeqNo,
1291 pub start_ms: u64,
1292}
1293
1294#[derive(
1296 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1297)]
1298pub struct ActiveGc {
1299 pub seqno: SeqNo,
1300 pub start_ms: u64,
1301}
1302
1303#[derive(Debug)]
1308#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1309pub struct NoOpStateTransition<T>(pub T);
1310
1311#[derive(Debug, Clone)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct StateCollections<T> {
1315 pub(crate) version: Version,
1319
1320 pub(crate) last_gc_req: SeqNo,
1323
1324 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1326
1327 pub(crate) active_rollup: Option<ActiveRollup>,
1329 pub(crate) active_gc: Option<ActiveGc>,
1331
1332 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1333 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1334 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1335 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1336
1337 pub(crate) trace: Trace<T>,
1342}
1343
1344#[derive(Debug, Clone, Serialize, PartialEq)]
1360pub struct EncodedSchemas {
1361 pub key: Bytes,
1363 pub key_data_type: Bytes,
1366 pub val: Bytes,
1368 pub val_data_type: Bytes,
1371}
1372
1373impl EncodedSchemas {
1374 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1375 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1376 DataType::from_proto(proto).expect("valid DataType")
1377 }
1378}
1379
1380#[derive(Debug)]
1381#[cfg_attr(test, derive(PartialEq))]
1382pub enum CompareAndAppendBreak<T> {
1383 AlreadyCommitted,
1384 Upper {
1385 shard_upper: Antichain<T>,
1386 writer_upper: Antichain<T>,
1387 },
1388 InvalidUsage(InvalidUsage<T>),
1389 InlineBackpressure,
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum SnapshotErr<T> {
1395 AsOfNotYetAvailable(SeqNo, Upper<T>),
1396 AsOfHistoricalDistinctionsLost(Since<T>),
1397}
1398
1399impl<T> StateCollections<T>
1400where
1401 T: Timestamp + Lattice + Codec64,
1402{
1403 pub fn add_rollup(
1404 &mut self,
1405 add_rollup: (SeqNo, &HollowRollup),
1406 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1407 let (rollup_seqno, rollup) = add_rollup;
1408 let applied = match self.rollups.get(&rollup_seqno) {
1409 Some(x) => x.key == rollup.key,
1410 None => {
1411 self.active_rollup = None;
1412 self.rollups.insert(rollup_seqno, rollup.to_owned());
1413 true
1414 }
1415 };
1416 Continue(applied)
1420 }
1421
1422 pub fn remove_rollups(
1423 &mut self,
1424 remove_rollups: &[(SeqNo, PartialRollupKey)],
1425 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1426 if remove_rollups.is_empty() || self.is_tombstone() {
1427 return Break(NoOpStateTransition(vec![]));
1428 }
1429
1430 self.active_gc = None;
1433
1434 let mut removed = vec![];
1435 for (seqno, key) in remove_rollups {
1436 let removed_key = self.rollups.remove(seqno);
1437 debug_assert!(
1438 removed_key.as_ref().map_or(true, |x| &x.key == key),
1439 "{} vs {:?}",
1440 key,
1441 removed_key
1442 );
1443
1444 if removed_key.is_some() {
1445 removed.push(*seqno);
1446 }
1447 }
1448
1449 Continue(removed)
1450 }
1451
1452 pub fn register_leased_reader(
1453 &mut self,
1454 hostname: &str,
1455 reader_id: &LeasedReaderId,
1456 purpose: &str,
1457 seqno: SeqNo,
1458 lease_duration: Duration,
1459 heartbeat_timestamp_ms: u64,
1460 use_critical_since: bool,
1461 ) -> ControlFlow<
1462 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1463 (LeasedReaderState<T>, SeqNo),
1464 > {
1465 let since = if use_critical_since {
1466 self.critical_since()
1467 .unwrap_or_else(|| self.trace.since().clone())
1468 } else {
1469 self.trace.since().clone()
1470 };
1471 let reader_state = LeasedReaderState {
1472 debug: HandleDebugState {
1473 hostname: hostname.to_owned(),
1474 purpose: purpose.to_owned(),
1475 },
1476 seqno,
1477 since,
1478 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1479 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1480 .expect("lease duration as millis should fit within u64"),
1481 };
1482
1483 if self.is_tombstone() {
1488 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1489 }
1490
1491 self.leased_readers
1493 .insert(reader_id.clone(), reader_state.clone());
1494 Continue((reader_state, self.seqno_since(seqno)))
1495 }
1496
1497 pub fn register_critical_reader(
1498 &mut self,
1499 hostname: &str,
1500 reader_id: &CriticalReaderId,
1501 opaque: Opaque,
1502 purpose: &str,
1503 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1504 let state = CriticalReaderState {
1505 debug: HandleDebugState {
1506 hostname: hostname.to_owned(),
1507 purpose: purpose.to_owned(),
1508 },
1509 since: self.trace.since().clone(),
1510 opaque,
1511 };
1512
1513 if self.is_tombstone() {
1518 return Break(NoOpStateTransition(state));
1519 }
1520
1521 let state = match self.critical_readers.get_mut(reader_id) {
1522 Some(existing_state) => {
1523 existing_state.debug = state.debug;
1524 existing_state.clone()
1525 }
1526 None => {
1527 self.critical_readers
1528 .insert(reader_id.clone(), state.clone());
1529 state
1530 }
1531 };
1532 Continue(state)
1533 }
1534
1535 pub fn register_schema<K: Codec, V: Codec>(
1536 &mut self,
1537 key_schema: &K::Schema,
1538 val_schema: &V::Schema,
1539 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1540 fn encode_data_type(data_type: &DataType) -> Bytes {
1541 let proto = data_type.into_proto();
1542 prost::Message::encode_to_vec(&proto).into()
1543 }
1544
1545 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1557 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1558 });
1559 match existing_id {
1560 Some((schema_id, _)) => {
1561 Break(NoOpStateTransition(Some(*schema_id)))
1566 }
1567 None if self.is_tombstone() => {
1568 Break(NoOpStateTransition(None))
1570 }
1571 None if self.schemas.is_empty() => {
1572 let id = SchemaId(self.schemas.len());
1576 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1577 .expect("valid key schema");
1578 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1579 .expect("valid val schema");
1580 let prev = self.schemas.insert(
1581 id,
1582 EncodedSchemas {
1583 key: K::encode_schema(key_schema),
1584 key_data_type: encode_data_type(&key_data_type),
1585 val: V::encode_schema(val_schema),
1586 val_data_type: encode_data_type(&val_data_type),
1587 },
1588 );
1589 assert_eq!(prev, None);
1590 Continue(Some(id))
1591 }
1592 None => {
1593 info!(
1594 "register_schemas got {:?} expected {:?}",
1595 key_schema,
1596 self.schemas
1597 .iter()
1598 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1599 .collect::<Vec<_>>()
1600 );
1601 Break(NoOpStateTransition(None))
1604 }
1605 }
1606 }
1607
1608 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1609 &mut self,
1610 expected: SchemaId,
1611 key_schema: &K::Schema,
1612 val_schema: &V::Schema,
1613 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1614 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1615 let array = Schema::encoder(schema).expect("valid schema").finish();
1619 Array::data_type(&array).clone()
1620 }
1621
1622 let (current_id, current) = self
1623 .schemas
1624 .last_key_value()
1625 .expect("all shards have a schema");
1626 if *current_id != expected {
1627 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1628 schema_id: *current_id,
1629 key: K::decode_schema(¤t.key),
1630 val: V::decode_schema(¤t.val),
1631 }));
1632 }
1633
1634 let current_key = K::decode_schema(¤t.key);
1635 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1636 let current_val = V::decode_schema(¤t.val);
1637 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1638
1639 let key_dt = data_type(key_schema);
1640 let val_dt = data_type(val_schema);
1641
1642 if current_key == *key_schema
1644 && current_key_dt == key_dt
1645 && current_val == *val_schema
1646 && current_val_dt == val_dt
1647 {
1648 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1649 }
1650
1651 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1652 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1653 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1654 return Break(NoOpStateTransition(CaESchema::Incompatible));
1655 };
1656 if key_fn.contains_drop() || val_fn.contains_drop() {
1660 return Break(NoOpStateTransition(CaESchema::Incompatible));
1661 }
1662
1663 let id = SchemaId(self.schemas.len());
1667 self.schemas.insert(
1668 id,
1669 EncodedSchemas {
1670 key: K::encode_schema(key_schema),
1671 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1672 val: V::encode_schema(val_schema),
1673 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1674 },
1675 );
1676 Continue(CaESchema::Ok(id))
1677 }
1678
1679 pub fn compare_and_append(
1680 &mut self,
1681 batch: &HollowBatch<T>,
1682 writer_id: &WriterId,
1683 heartbeat_timestamp_ms: u64,
1684 lease_duration_ms: u64,
1685 idempotency_token: &IdempotencyToken,
1686 debug_info: &HandleDebugState,
1687 inline_writes_total_max_bytes: usize,
1688 claim_compaction_percent: usize,
1689 claim_compaction_min_version: Option<&Version>,
1690 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1691 if self.is_tombstone() {
1696 assert_eq!(self.trace.upper(), &Antichain::new());
1697 return Break(CompareAndAppendBreak::Upper {
1698 shard_upper: Antichain::new(),
1699 writer_upper: Antichain::new(),
1704 });
1705 }
1706
1707 let writer_state = self
1708 .writers
1709 .entry(writer_id.clone())
1710 .or_insert_with(|| WriterState {
1711 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1712 lease_duration_ms,
1713 most_recent_write_token: IdempotencyToken::SENTINEL,
1714 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1715 debug: debug_info.clone(),
1716 });
1717
1718 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1719 return Break(CompareAndAppendBreak::InvalidUsage(
1720 InvalidUsage::InvalidBounds {
1721 lower: batch.desc.lower().clone(),
1722 upper: batch.desc.upper().clone(),
1723 },
1724 ));
1725 }
1726
1727 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1730 return Break(CompareAndAppendBreak::InvalidUsage(
1731 InvalidUsage::InvalidEmptyTimeInterval {
1732 lower: batch.desc.lower().clone(),
1733 upper: batch.desc.upper().clone(),
1734 keys: batch
1735 .parts
1736 .iter()
1737 .map(|x| x.printable_name().to_owned())
1738 .collect(),
1739 },
1740 ));
1741 }
1742
1743 if idempotency_token == &writer_state.most_recent_write_token {
1744 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1749 assert!(
1750 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1751 "{:?} vs {:?}",
1752 batch.desc.upper(),
1753 self.trace.upper()
1754 );
1755 return Break(CompareAndAppendBreak::AlreadyCommitted);
1756 }
1757
1758 let shard_upper = self.trace.upper();
1759 if shard_upper != batch.desc.lower() {
1760 return Break(CompareAndAppendBreak::Upper {
1761 shard_upper: shard_upper.clone(),
1762 writer_upper: writer_state.most_recent_write_upper.clone(),
1763 });
1764 }
1765
1766 let new_inline_bytes = batch.inline_bytes();
1767 if new_inline_bytes > 0 {
1768 let mut existing_inline_bytes = 0;
1769 self.trace
1770 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1771 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1775 return Break(CompareAndAppendBreak::InlineBackpressure);
1776 }
1777 }
1778
1779 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1780 self.trace.push_batch(batch.clone())
1781 } else {
1782 Vec::new()
1783 };
1784
1785 let all_empty_reqs = merge_reqs
1788 .iter()
1789 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1790 if all_empty_reqs && !batch.is_empty() {
1791 let mut reqs_to_take = claim_compaction_percent / 100;
1792 if (usize::cast_from(idempotency_token.hashed()) % 100)
1793 < (claim_compaction_percent % 100)
1794 {
1795 reqs_to_take += 1;
1796 }
1797 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1798 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1799 merge_reqs.extend(
1800 self.trace
1803 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1804 .take(reqs_to_take),
1805 )
1806 }
1807
1808 for req in &merge_reqs {
1809 self.trace.claim_compaction(
1810 req.id,
1811 ActiveCompaction {
1812 start_ms: heartbeat_timestamp_ms,
1813 },
1814 )
1815 }
1816
1817 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1818 writer_state.most_recent_write_token = idempotency_token.clone();
1819 assert!(
1821 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1822 "{:?} vs {:?}",
1823 &writer_state.most_recent_write_upper,
1824 batch.desc.upper()
1825 );
1826 writer_state
1827 .most_recent_write_upper
1828 .clone_from(batch.desc.upper());
1829
1830 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1832 heartbeat_timestamp_ms,
1833 writer_state.last_heartbeat_timestamp_ms,
1834 );
1835
1836 Continue(merge_reqs)
1837 }
1838
1839 pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1840 &mut self,
1841 res: &FueledMergeRes<T>,
1842 metrics: &ColumnarMetrics,
1843 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1844 if self.is_tombstone() {
1849 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1850 }
1851
1852 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1853 Continue(apply_merge_result)
1854 }
1855
1856 pub fn spine_exert(
1857 &mut self,
1858 fuel: usize,
1859 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1860 let (merge_reqs, did_work) = self.trace.exert(fuel);
1861 if did_work {
1862 Continue(merge_reqs)
1863 } else {
1864 assert!(merge_reqs.is_empty());
1865 Break(NoOpStateTransition(Vec::new()))
1868 }
1869 }
1870
1871 pub fn downgrade_since(
1872 &mut self,
1873 reader_id: &LeasedReaderId,
1874 seqno: SeqNo,
1875 outstanding_seqno: SeqNo,
1876 new_since: &Antichain<T>,
1877 heartbeat_timestamp_ms: u64,
1878 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1879 if self.is_tombstone() {
1884 return Break(NoOpStateTransition(Since(Antichain::new())));
1885 }
1886
1887 let Some(reader_state) = self.leased_reader(reader_id) else {
1890 tracing::warn!(
1891 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1892 );
1893 return Break(NoOpStateTransition(Since(Antichain::new())));
1894 };
1895
1896 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1899 heartbeat_timestamp_ms,
1900 reader_state.last_heartbeat_timestamp_ms,
1901 );
1902
1903 let seqno = {
1904 assert!(
1905 outstanding_seqno >= reader_state.seqno,
1906 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1907 is behind current reader_state ({:?})",
1908 outstanding_seqno,
1909 reader_state.seqno,
1910 );
1911 std::cmp::min(outstanding_seqno, seqno)
1912 };
1913
1914 reader_state.seqno = seqno;
1915
1916 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1917 reader_state.since.clone_from(new_since);
1918 self.update_since();
1919 new_since.clone()
1920 } else {
1921 reader_state.since.clone()
1924 };
1925
1926 Continue(Since(reader_current_since))
1927 }
1928
1929 pub fn compare_and_downgrade_since(
1930 &mut self,
1931 reader_id: &CriticalReaderId,
1932 expected_opaque: &Opaque,
1933 (new_opaque, new_since): (&Opaque, &Antichain<T>),
1934 ) -> ControlFlow<
1935 NoOpStateTransition<Result<Since<T>, (Opaque, Since<T>)>>,
1936 Result<Since<T>, (Opaque, Since<T>)>,
1937 > {
1938 if self.is_tombstone() {
1943 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1947 }
1948
1949 let reader_state = self.critical_reader(reader_id);
1950
1951 if reader_state.opaque != *expected_opaque {
1952 return Continue(Err((
1955 reader_state.opaque.clone(),
1956 Since(reader_state.since.clone()),
1957 )));
1958 }
1959
1960 reader_state.opaque = new_opaque.clone();
1961 if PartialOrder::less_equal(&reader_state.since, new_since) {
1962 reader_state.since.clone_from(new_since);
1963 self.update_since();
1964 Continue(Ok(Since(new_since.clone())))
1965 } else {
1966 Continue(Ok(Since(reader_state.since.clone())))
1970 }
1971 }
1972
1973 pub fn expire_leased_reader(
1974 &mut self,
1975 reader_id: &LeasedReaderId,
1976 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1977 if self.is_tombstone() {
1982 return Break(NoOpStateTransition(false));
1983 }
1984
1985 let existed = self.leased_readers.remove(reader_id).is_some();
1986 if existed {
1987 }
2001 Continue(existed)
2004 }
2005
2006 pub fn expire_critical_reader(
2007 &mut self,
2008 reader_id: &CriticalReaderId,
2009 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2010 if self.is_tombstone() {
2015 return Break(NoOpStateTransition(false));
2016 }
2017
2018 let existed = self.critical_readers.remove(reader_id).is_some();
2019 if existed {
2020 }
2034 Continue(existed)
2038 }
2039
2040 pub fn expire_writer(
2041 &mut self,
2042 writer_id: &WriterId,
2043 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2044 if self.is_tombstone() {
2049 return Break(NoOpStateTransition(false));
2050 }
2051
2052 let existed = self.writers.remove(writer_id).is_some();
2053 Continue(existed)
2057 }
2058
2059 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2060 self.leased_readers.get_mut(id)
2061 }
2062
2063 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2064 self.critical_readers
2065 .get_mut(id)
2066 .unwrap_or_else(|| {
2067 panic!(
2068 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2069 id
2070 )
2071 })
2072 }
2073
2074 fn critical_since(&self) -> Option<Antichain<T>> {
2075 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2076 let mut since = critical_sinces.next().cloned()?;
2077 for s in critical_sinces {
2078 since.meet_assign(s);
2079 }
2080 Some(since)
2081 }
2082
2083 fn update_since(&mut self) {
2084 let mut sinces_iter = self
2085 .leased_readers
2086 .values()
2087 .map(|x| &x.since)
2088 .chain(self.critical_readers.values().map(|x| &x.since));
2089 let mut since = match sinces_iter.next() {
2090 Some(since) => since.clone(),
2091 None => {
2092 return;
2095 }
2096 };
2097 while let Some(s) = sinces_iter.next() {
2098 since.meet_assign(s);
2099 }
2100 self.trace.downgrade_since(&since);
2101 }
2102
2103 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2104 let mut seqno_since = seqno;
2105 for cap in self.leased_readers.values() {
2106 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2107 }
2108 seqno_since
2110 }
2111
2112 fn tombstone_batch() -> HollowBatch<T> {
2113 HollowBatch::empty(Description::new(
2114 Antichain::from_elem(T::minimum()),
2115 Antichain::new(),
2116 Antichain::new(),
2117 ))
2118 }
2119
2120 pub(crate) fn is_tombstone(&self) -> bool {
2121 self.trace.upper().is_empty()
2122 && self.trace.since().is_empty()
2123 && self.writers.is_empty()
2124 && self.leased_readers.is_empty()
2125 && self.critical_readers.is_empty()
2126 }
2127
2128 pub(crate) fn is_single_empty_batch(&self) -> bool {
2129 let mut batch_count = 0;
2130 let mut is_empty = true;
2131 self.trace.map_batches(|b| {
2132 batch_count += 1;
2133 is_empty &= b.is_empty()
2134 });
2135 batch_count <= 1 && is_empty
2136 }
2137
2138 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2139 assert_eq!(self.trace.upper(), &Antichain::new());
2140 assert_eq!(self.trace.since(), &Antichain::new());
2141
2142 let was_tombstone = self.is_tombstone();
2145
2146 self.writers.clear();
2148 self.leased_readers.clear();
2149 self.critical_readers.clear();
2150
2151 debug_assert!(self.is_tombstone());
2152
2153 let mut to_replace = None;
2162 let mut batch_count = 0;
2163 self.trace.map_batches(|b| {
2164 batch_count += 1;
2165 if !b.is_empty() && to_replace.is_none() {
2166 to_replace = Some(b.desc.clone());
2167 }
2168 });
2169 if let Some(desc) = to_replace {
2170 let result = self.trace.apply_tombstone_merge(&desc);
2174 assert!(
2175 result.matched(),
2176 "merge with a matching desc should always match"
2177 );
2178 Continue(())
2179 } else if batch_count > 1 {
2180 let mut new_trace = Trace::default();
2185 new_trace.downgrade_since(&Antichain::new());
2186 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2187 assert_eq!(merge_reqs, Vec::new());
2188 self.trace = new_trace;
2189 Continue(())
2190 } else if !was_tombstone {
2191 Continue(())
2194 } else {
2195 Break(NoOpStateTransition(()))
2198 }
2199 }
2200}
2201
2202#[derive(Debug)]
2204#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2205pub struct State<T> {
2206 pub(crate) shard_id: ShardId,
2207
2208 pub(crate) seqno: SeqNo,
2209 pub(crate) walltime_ms: u64,
2212 pub(crate) hostname: String,
2215 pub(crate) collections: StateCollections<T>,
2216}
2217
2218pub struct TypedState<K, V, T, D> {
2221 pub(crate) state: State<T>,
2222
2223 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2231}
2232
2233impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2234 #[cfg(any(test, debug_assertions))]
2235 pub(crate) fn clone(&self, hostname: String) -> Self {
2236 TypedState {
2237 state: State {
2238 shard_id: self.shard_id.clone(),
2239 seqno: self.seqno.clone(),
2240 walltime_ms: self.walltime_ms,
2241 hostname,
2242 collections: self.collections.clone(),
2243 },
2244 _phantom: PhantomData,
2245 }
2246 }
2247
2248 pub(crate) fn clone_for_rollup(&self) -> Self {
2249 TypedState {
2250 state: State {
2251 shard_id: self.shard_id.clone(),
2252 seqno: self.seqno.clone(),
2253 walltime_ms: self.walltime_ms,
2254 hostname: self.hostname.clone(),
2255 collections: self.collections.clone(),
2256 },
2257 _phantom: PhantomData,
2258 }
2259 }
2260}
2261
2262impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2263 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2264 let TypedState { state, _phantom } = self;
2267 f.debug_struct("TypedState").field("state", state).finish()
2268 }
2269}
2270
2271#[cfg(any(test, debug_assertions))]
2273impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2274 fn eq(&self, other: &Self) -> bool {
2275 let TypedState {
2278 state: self_state,
2279 _phantom,
2280 } = self;
2281 let TypedState {
2282 state: other_state,
2283 _phantom,
2284 } = other;
2285 self_state == other_state
2286 }
2287}
2288
2289impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2290 type Target = State<T>;
2291
2292 fn deref(&self) -> &Self::Target {
2293 &self.state
2294 }
2295}
2296
2297impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2298 fn deref_mut(&mut self) -> &mut Self::Target {
2299 &mut self.state
2300 }
2301}
2302
2303impl<K, V, T, D> TypedState<K, V, T, D>
2304where
2305 K: Codec,
2306 V: Codec,
2307 T: Timestamp + Lattice + Codec64,
2308 D: Codec64,
2309{
2310 pub fn new(
2311 applier_version: Version,
2312 shard_id: ShardId,
2313 hostname: String,
2314 walltime_ms: u64,
2315 ) -> Self {
2316 let state = State {
2317 shard_id,
2318 seqno: SeqNo::minimum(),
2319 walltime_ms,
2320 hostname,
2321 collections: StateCollections {
2322 version: applier_version,
2323 last_gc_req: SeqNo::minimum(),
2324 rollups: BTreeMap::new(),
2325 active_rollup: None,
2326 active_gc: None,
2327 leased_readers: BTreeMap::new(),
2328 critical_readers: BTreeMap::new(),
2329 writers: BTreeMap::new(),
2330 schemas: BTreeMap::new(),
2331 trace: Trace::default(),
2332 },
2333 };
2334 TypedState {
2335 state,
2336 _phantom: PhantomData,
2337 }
2338 }
2339
2340 pub fn clone_apply<R, E, WorkFn>(
2341 &self,
2342 cfg: &PersistConfig,
2343 work_fn: &mut WorkFn,
2344 ) -> ControlFlow<E, (R, Self)>
2345 where
2346 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2347 {
2348 let mut new_state = State {
2350 shard_id: self.shard_id,
2351 seqno: self.seqno.next(),
2352 walltime_ms: (cfg.now)(),
2353 hostname: cfg.hostname.clone(),
2354 collections: self.collections.clone(),
2355 };
2356
2357 if new_state.walltime_ms <= self.walltime_ms {
2360 new_state.walltime_ms = self.walltime_ms + 1;
2361 }
2362
2363 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2364 let new_state = TypedState {
2365 state: new_state,
2366 _phantom: PhantomData,
2367 };
2368 Continue((work_ret, new_state))
2369 }
2370}
2371
2372#[derive(Copy, Clone, Debug)]
2373pub struct GcConfig {
2374 pub use_active_gc: bool,
2375 pub fallback_threshold_ms: u64,
2376 pub min_versions: usize,
2377 pub max_versions: usize,
2378}
2379
2380impl<T> State<T>
2381where
2382 T: Timestamp + Lattice + Codec64,
2383{
2384 pub fn shard_id(&self) -> ShardId {
2385 self.shard_id
2386 }
2387
2388 pub fn seqno(&self) -> SeqNo {
2389 self.seqno
2390 }
2391
2392 pub fn since(&self) -> &Antichain<T> {
2393 self.collections.trace.since()
2394 }
2395
2396 pub fn upper(&self) -> &Antichain<T> {
2397 self.collections.trace.upper()
2398 }
2399
2400 pub fn spine_batch_count(&self) -> usize {
2401 self.collections.trace.num_spine_batches()
2402 }
2403
2404 pub fn size_metrics(&self) -> StateSizeMetrics {
2405 let mut ret = StateSizeMetrics::default();
2406 self.blobs().for_each(|x| match x {
2407 HollowBlobRef::Batch(x) => {
2408 ret.hollow_batch_count += 1;
2409 ret.batch_part_count += x.part_count();
2410 ret.num_updates += x.len;
2411
2412 let batch_size = x.encoded_size_bytes();
2413 for x in x.parts.iter() {
2414 if x.ts_rewrite().is_some() {
2415 ret.rewrite_part_count += 1;
2416 }
2417 if x.is_inline() {
2418 ret.inline_part_count += 1;
2419 ret.inline_part_bytes += x.inline_bytes();
2420 }
2421 }
2422 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2423 ret.state_batches_bytes += batch_size;
2424 }
2425 HollowBlobRef::Rollup(x) => {
2426 ret.state_rollup_count += 1;
2427 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2428 }
2429 });
2430 ret
2431 }
2432
2433 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2434 self.collections
2437 .rollups
2438 .iter()
2439 .rev()
2440 .next()
2441 .expect("State should have at least one rollup if seqno > minimum")
2442 }
2443
2444 pub(crate) fn seqno_since(&self) -> SeqNo {
2445 self.collections.seqno_since(self.seqno)
2446 }
2447
2448 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2460 let GcConfig {
2461 use_active_gc,
2462 fallback_threshold_ms,
2463 min_versions,
2464 max_versions,
2465 } = cfg;
2466 let gc_threshold = if use_active_gc {
2470 u64::cast_from(min_versions)
2471 } else {
2472 std::cmp::max(
2473 1,
2474 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2475 )
2476 };
2477 let new_seqno_since = self.seqno_since();
2478 let gc_until_seqno = new_seqno_since.min(SeqNo(
2481 self.collections
2482 .last_gc_req
2483 .0
2484 .saturating_add(u64::cast_from(max_versions)),
2485 ));
2486 let should_gc = new_seqno_since
2487 .0
2488 .saturating_sub(self.collections.last_gc_req.0)
2489 >= gc_threshold;
2490
2491 let should_gc = if use_active_gc && !should_gc {
2494 match self.collections.active_gc {
2495 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2496 None => false,
2497 }
2498 } else {
2499 should_gc
2500 };
2501 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2504 let tombstone_needs_gc = self.collections.is_tombstone();
2509 let should_gc = should_gc || tombstone_needs_gc;
2510 let should_gc = if use_active_gc {
2511 should_gc
2515 && match self.collections.active_gc {
2516 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2517 None => true,
2518 }
2519 } else {
2520 should_gc
2521 };
2522 if should_gc {
2523 self.collections.last_gc_req = gc_until_seqno;
2524 Some(GcReq {
2525 shard_id: self.shard_id,
2526 new_seqno_since: gc_until_seqno,
2527 })
2528 } else {
2529 None
2530 }
2531 }
2532
2533 pub fn seqnos_held(&self) -> usize {
2535 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2536 }
2537
2538 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2540 let mut metrics = ExpiryMetrics::default();
2541 let shard_id = self.shard_id();
2542 self.collections.leased_readers.retain(|id, state| {
2543 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2544 if !retain {
2545 info!(
2546 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2547 state.debug.purpose
2548 );
2549 metrics.readers_expired += 1;
2550 }
2551 retain
2552 });
2553 self.collections.writers.retain(|id, state| {
2555 let retain =
2556 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2557 if !retain {
2558 info!(
2559 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2560 state.debug.purpose
2561 );
2562 metrics.writers_expired += 1;
2563 }
2564 retain
2565 });
2566 metrics
2567 }
2568
2569 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2573 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2574 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2575 self.collections.trace.since().clone(),
2576 )));
2577 }
2578 let upper = self.collections.trace.upper();
2579 if PartialOrder::less_equal(upper, as_of) {
2580 return Err(SnapshotErr::AsOfNotYetAvailable(
2581 self.seqno,
2582 Upper(upper.clone()),
2583 ));
2584 }
2585
2586 let batches = self
2587 .collections
2588 .trace
2589 .batches()
2590 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2591 .cloned()
2592 .collect();
2593 Ok(batches)
2594 }
2595
2596 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2598 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2599 return Err(Since(self.collections.trace.since().clone()));
2600 }
2601 Ok(())
2602 }
2603
2604 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2605 self.collections
2608 .trace
2609 .batches()
2610 .find(|b| {
2611 PartialOrder::less_equal(b.desc.lower(), frontier)
2612 && PartialOrder::less_than(frontier, b.desc.upper())
2613 })
2614 .cloned()
2615 .ok_or(self.seqno)
2616 }
2617
2618 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2619 self.collections.active_rollup
2620 }
2621
2622 pub fn need_rollup(
2623 &self,
2624 threshold: usize,
2625 use_active_rollup: bool,
2626 fallback_threshold_ms: u64,
2627 now: u64,
2628 ) -> Option<SeqNo> {
2629 let (latest_rollup_seqno, _) = self.latest_rollup();
2630
2631 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2637 return Some(self.seqno);
2638 }
2639
2640 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2641
2642 if use_active_rollup {
2643 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2649 match self.active_rollup() {
2650 Some(active_rollup) => {
2651 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2652 return Some(self.seqno);
2653 }
2654 }
2655 None => {
2656 return Some(self.seqno);
2657 }
2658 }
2659 }
2660 } else {
2661 if seqnos_since_last_rollup > 0
2665 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2666 {
2667 return Some(self.seqno);
2668 }
2669
2670 if seqnos_since_last_rollup
2673 > u64::cast_from(
2674 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2675 )
2676 {
2677 return Some(self.seqno);
2678 }
2679 }
2680
2681 None
2682 }
2683
2684 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2685 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2686 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2687 batches.chain(rollups)
2688 }
2689}
2690
2691fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2692 let val = hex::encode(val);
2693 val.serialize(s)
2694}
2695
2696fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2697 val: &Option<LazyProto<T>>,
2698 s: S,
2699) -> Result<S::Ok, S::Error> {
2700 val.as_ref()
2701 .map(|lazy| hex::encode(&lazy.into_proto()))
2702 .serialize(s)
2703}
2704
2705fn serialize_part_stats<S: Serializer>(
2706 val: &Option<LazyPartStats>,
2707 s: S,
2708) -> Result<S::Ok, S::Error> {
2709 let val = val.as_ref().map(|x| x.decode().key);
2710 val.serialize(s)
2711}
2712
2713fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2714 let val = val.map(i64::decode);
2716 val.serialize(s)
2717}
2718
2719impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2725 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2726 let State {
2727 shard_id,
2728 seqno,
2729 walltime_ms,
2730 hostname,
2731 collections:
2732 StateCollections {
2733 version: applier_version,
2734 last_gc_req,
2735 rollups,
2736 active_rollup,
2737 active_gc,
2738 leased_readers,
2739 critical_readers,
2740 writers,
2741 schemas,
2742 trace,
2743 },
2744 } = self;
2745 let mut s = s.serialize_struct("State", 13)?;
2746 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2747 let () = s.serialize_field("shard_id", shard_id)?;
2748 let () = s.serialize_field("seqno", seqno)?;
2749 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2750 let () = s.serialize_field("hostname", hostname)?;
2751 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2752 let () = s.serialize_field("rollups", rollups)?;
2753 let () = s.serialize_field("active_rollup", active_rollup)?;
2754 let () = s.serialize_field("active_gc", active_gc)?;
2755 let () = s.serialize_field("leased_readers", leased_readers)?;
2756 let () = s.serialize_field("critical_readers", critical_readers)?;
2757 let () = s.serialize_field("writers", writers)?;
2758 let () = s.serialize_field("schemas", schemas)?;
2759 let () = s.serialize_field("since", &trace.since().elements())?;
2760 let () = s.serialize_field("upper", &trace.upper().elements())?;
2761 let trace = trace.flatten();
2762 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2763 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2764 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2765 let () = s.serialize_field("merges", &trace.merges)?;
2766 s.end()
2767 }
2768}
2769
2770#[derive(Debug, Default)]
2771pub struct StateSizeMetrics {
2772 pub hollow_batch_count: usize,
2773 pub batch_part_count: usize,
2774 pub rewrite_part_count: usize,
2775 pub num_updates: usize,
2776 pub largest_batch_bytes: usize,
2777 pub state_batches_bytes: usize,
2778 pub state_rollups_bytes: usize,
2779 pub state_rollup_count: usize,
2780 pub inline_part_count: usize,
2781 pub inline_part_bytes: usize,
2782}
2783
2784#[derive(Default)]
2785pub struct ExpiryMetrics {
2786 pub(crate) readers_expired: usize,
2787 pub(crate) writers_expired: usize,
2788}
2789
2790#[derive(Debug, Clone, PartialEq)]
2792pub struct Since<T>(pub Antichain<T>);
2793
2794#[derive(Debug, PartialEq)]
2796pub struct Upper<T>(pub Antichain<T>);
2797
2798#[cfg(test)]
2799pub(crate) mod tests {
2800 use std::ops::Range;
2801 use std::str::FromStr;
2802
2803 use bytes::Bytes;
2804 use mz_build_info::DUMMY_BUILD_INFO;
2805 use mz_dyncfg::ConfigUpdates;
2806 use mz_ore::now::SYSTEM_TIME;
2807 use mz_ore::{assert_none, assert_ok};
2808 use mz_proto::RustType;
2809 use proptest::prelude::*;
2810 use proptest::strategy::ValueTree;
2811
2812 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2813 use crate::cache::PersistClientCache;
2814 use crate::internal::encoding::any_some_lazy_part_stats;
2815 use crate::internal::paths::RollupId;
2816 use crate::internal::trace::tests::any_trace;
2817 use crate::tests::new_test_client_cache;
2818 use crate::{Diagnostics, PersistLocation};
2819
2820 use super::*;
2821
2822 const LEASE_DURATION_MS: u64 = 900 * 1000;
2823 fn debug_state() -> HandleDebugState {
2824 HandleDebugState {
2825 hostname: "debug".to_owned(),
2826 purpose: "finding the bugs".to_owned(),
2827 }
2828 }
2829
2830 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2831 num_runs: usize,
2832 ) -> impl Strategy<Value = HollowBatch<T>> {
2833 (
2834 any::<T>(),
2835 any::<T>(),
2836 any::<T>(),
2837 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2838 any::<usize>(),
2839 )
2840 .prop_map(move |(t0, t1, since, parts, len)| {
2841 let (lower, upper) = if t0 <= t1 {
2842 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2843 } else {
2844 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2845 };
2846 let since = Antichain::from_elem(since);
2847
2848 let run_splits = (1..num_runs)
2849 .map(|i| i * parts.len() / num_runs)
2850 .collect::<Vec<_>>();
2851
2852 let run_meta = (0..num_runs)
2853 .map(|_| {
2854 let mut meta = RunMeta::default();
2855 meta.id = Some(RunId::new());
2856 meta
2857 })
2858 .collect::<Vec<_>>();
2859
2860 HollowBatch::new(
2861 Description::new(lower, upper, since),
2862 parts,
2863 len % 10,
2864 run_meta,
2865 run_splits,
2866 )
2867 })
2868 }
2869
2870 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2871 Strategy::prop_map(
2872 (
2873 any::<T>(),
2874 any::<T>(),
2875 any::<T>(),
2876 proptest::collection::vec(any_run_part::<T>(), 0..20),
2877 any::<usize>(),
2878 0..=10usize,
2879 proptest::collection::vec(any::<RunId>(), 10),
2880 ),
2881 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2882 let (lower, upper) = if t0 <= t1 {
2883 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2884 } else {
2885 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2886 };
2887 let since = Antichain::from_elem(since);
2888 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2889 let run_splits = (1..num_runs)
2890 .map(|i| i * parts.len() / num_runs)
2891 .collect::<Vec<_>>();
2892
2893 let run_meta = (0..num_runs)
2894 .enumerate()
2895 .map(|(i, _)| {
2896 let mut meta = RunMeta::default();
2897 meta.id = Some(run_ids[i]);
2898 meta
2899 })
2900 .collect::<Vec<_>>();
2901
2902 HollowBatch::new(
2903 Description::new(lower, upper, since),
2904 parts,
2905 len % 10,
2906 run_meta,
2907 run_splits,
2908 )
2909 } else {
2910 HollowBatch::new_run_for_test(
2911 Description::new(lower, upper, since),
2912 parts,
2913 len % 10,
2914 run_ids[0],
2915 )
2916 }
2917 },
2918 )
2919 }
2920
2921 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2922 Strategy::prop_map(
2923 (
2924 any::<bool>(),
2925 any_hollow_batch_part(),
2926 any::<Option<T>>(),
2927 any::<Option<SchemaId>>(),
2928 any::<Option<SchemaId>>(),
2929 ),
2930 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2931 if is_hollow {
2932 BatchPart::Hollow(hollow)
2933 } else {
2934 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2935 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2936 BatchPart::Inline {
2937 updates,
2938 ts_rewrite,
2939 schema_id,
2940 deprecated_schema_id,
2941 }
2942 }
2943 },
2944 )
2945 }
2946
2947 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2948 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2949 }
2950
2951 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2952 -> impl Strategy<Value = HollowBatchPart<T>> {
2953 Strategy::prop_map(
2954 (
2955 any::<PartialBatchKey>(),
2956 any::<usize>(),
2957 any::<Vec<u8>>(),
2958 any_some_lazy_part_stats(),
2959 any::<Option<T>>(),
2960 any::<[u8; 8]>(),
2961 any::<Option<BatchColumnarFormat>>(),
2962 any::<Option<SchemaId>>(),
2963 any::<Option<SchemaId>>(),
2964 ),
2965 |(
2966 key,
2967 encoded_size_bytes,
2968 key_lower,
2969 stats,
2970 ts_rewrite,
2971 diffs_sum,
2972 format,
2973 schema_id,
2974 deprecated_schema_id,
2975 )| {
2976 HollowBatchPart {
2977 key,
2978 meta: Default::default(),
2979 encoded_size_bytes,
2980 key_lower,
2981 structured_key_lower: None,
2982 stats,
2983 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2984 diffs_sum: Some(diffs_sum),
2985 format,
2986 schema_id,
2987 deprecated_schema_id,
2988 }
2989 },
2990 )
2991 }
2992
2993 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
2994 Strategy::prop_map(
2995 (
2996 any::<SeqNo>(),
2997 any::<Option<T>>(),
2998 any::<u64>(),
2999 any::<u64>(),
3000 any::<HandleDebugState>(),
3001 ),
3002 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3003 if lease_duration_ms == 0 {
3007 lease_duration_ms += 1;
3008 }
3009 LeasedReaderState {
3010 seqno,
3011 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3012 last_heartbeat_timestamp_ms,
3013 lease_duration_ms,
3014 debug,
3015 }
3016 },
3017 )
3018 }
3019
3020 pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3021 where
3022 T: Arbitrary,
3023 {
3024 Strategy::prop_map(
3025 (
3026 any::<Option<T>>(),
3027 any::<Opaque>(),
3028 any::<HandleDebugState>(),
3029 ),
3030 |(since, opaque, debug)| CriticalReaderState {
3031 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3032 opaque,
3033 debug,
3034 },
3035 )
3036 }
3037
3038 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3039 Strategy::prop_map(
3040 (
3041 any::<u64>(),
3042 any::<u64>(),
3043 any::<IdempotencyToken>(),
3044 any::<Option<T>>(),
3045 any::<HandleDebugState>(),
3046 ),
3047 |(
3048 last_heartbeat_timestamp_ms,
3049 lease_duration_ms,
3050 most_recent_write_token,
3051 most_recent_write_upper,
3052 debug,
3053 )| WriterState {
3054 last_heartbeat_timestamp_ms,
3055 lease_duration_ms,
3056 most_recent_write_token,
3057 most_recent_write_upper: most_recent_write_upper
3058 .map_or_else(Antichain::new, Antichain::from_elem),
3059 debug,
3060 },
3061 )
3062 }
3063
3064 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3065 Strategy::prop_map(
3066 (
3067 any::<Vec<u8>>(),
3068 any::<Vec<u8>>(),
3069 any::<Vec<u8>>(),
3070 any::<Vec<u8>>(),
3071 ),
3072 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3073 key: Bytes::from(key),
3074 key_data_type: Bytes::from(key_data_type),
3075 val: Bytes::from(val),
3076 val_data_type: Bytes::from(val_data_type),
3077 },
3078 )
3079 }
3080
3081 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3082 num_trace_batches: Range<usize>,
3083 ) -> impl Strategy<Value = State<T>> {
3084 let part1 = (
3085 any::<ShardId>(),
3086 any::<SeqNo>(),
3087 any::<u64>(),
3088 any::<String>(),
3089 any::<SeqNo>(),
3090 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3091 proptest::option::of(any::<ActiveRollup>()),
3092 );
3093
3094 let part2 = (
3095 proptest::option::of(any::<ActiveGc>()),
3096 proptest::collection::btree_map(
3097 any::<LeasedReaderId>(),
3098 any_leased_reader_state::<T>(),
3099 1..3,
3100 ),
3101 proptest::collection::btree_map(
3102 any::<CriticalReaderId>(),
3103 any_critical_reader_state::<T>(),
3104 1..3,
3105 ),
3106 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3107 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3108 any_trace::<T>(num_trace_batches),
3109 );
3110
3111 (part1, part2).prop_map(
3112 |(
3113 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3114 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3115 )| State {
3116 shard_id,
3117 seqno,
3118 walltime_ms,
3119 hostname,
3120 collections: StateCollections {
3121 version: Version::new(1, 2, 3),
3122 last_gc_req,
3123 rollups,
3124 active_rollup,
3125 active_gc,
3126 leased_readers,
3127 critical_readers,
3128 writers,
3129 schemas,
3130 trace,
3131 },
3132 },
3133 )
3134 }
3135
3136 pub(crate) fn hollow<T: Timestamp>(
3137 lower: T,
3138 upper: T,
3139 keys: &[&str],
3140 len: usize,
3141 ) -> HollowBatch<T> {
3142 HollowBatch::new_run(
3143 Description::new(
3144 Antichain::from_elem(lower),
3145 Antichain::from_elem(upper),
3146 Antichain::from_elem(T::minimum()),
3147 ),
3148 keys.iter()
3149 .map(|x| {
3150 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3151 key: PartialBatchKey((*x).to_owned()),
3152 meta: Default::default(),
3153 encoded_size_bytes: 0,
3154 key_lower: vec![],
3155 structured_key_lower: None,
3156 stats: None,
3157 ts_rewrite: None,
3158 diffs_sum: None,
3159 format: None,
3160 schema_id: None,
3161 deprecated_schema_id: None,
3162 }))
3163 })
3164 .collect(),
3165 len,
3166 )
3167 }
3168
3169 #[mz_ore::test]
3170 fn downgrade_since() {
3171 let mut state = TypedState::<(), (), u64, i64>::new(
3172 DUMMY_BUILD_INFO.semver_version(),
3173 ShardId::new(),
3174 "".to_owned(),
3175 0,
3176 );
3177 let reader = LeasedReaderId::new();
3178 let seqno = SeqNo::minimum();
3179 let now = SYSTEM_TIME.clone();
3180 let _ = state.collections.register_leased_reader(
3181 "",
3182 &reader,
3183 "",
3184 seqno,
3185 Duration::from_secs(10),
3186 now(),
3187 false,
3188 );
3189
3190 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3192
3193 assert_eq!(
3195 state.collections.downgrade_since(
3196 &reader,
3197 seqno,
3198 seqno,
3199 &Antichain::from_elem(2),
3200 now()
3201 ),
3202 Continue(Since(Antichain::from_elem(2)))
3203 );
3204 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3205 assert_eq!(
3207 state.collections.downgrade_since(
3208 &reader,
3209 seqno,
3210 seqno,
3211 &Antichain::from_elem(2),
3212 now()
3213 ),
3214 Continue(Since(Antichain::from_elem(2)))
3215 );
3216 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3217 assert_eq!(
3219 state.collections.downgrade_since(
3220 &reader,
3221 seqno,
3222 seqno,
3223 &Antichain::from_elem(1),
3224 now()
3225 ),
3226 Continue(Since(Antichain::from_elem(2)))
3227 );
3228 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3229
3230 let reader2 = LeasedReaderId::new();
3232 let _ = state.collections.register_leased_reader(
3233 "",
3234 &reader2,
3235 "",
3236 seqno,
3237 Duration::from_secs(10),
3238 now(),
3239 false,
3240 );
3241
3242 assert_eq!(
3244 state.collections.downgrade_since(
3245 &reader2,
3246 seqno,
3247 seqno,
3248 &Antichain::from_elem(3),
3249 now()
3250 ),
3251 Continue(Since(Antichain::from_elem(3)))
3252 );
3253 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3254 assert_eq!(
3256 state.collections.downgrade_since(
3257 &reader,
3258 seqno,
3259 seqno,
3260 &Antichain::from_elem(5),
3261 now()
3262 ),
3263 Continue(Since(Antichain::from_elem(5)))
3264 );
3265 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3266
3267 assert_eq!(
3269 state.collections.expire_leased_reader(&reader),
3270 Continue(true)
3271 );
3272 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3273
3274 let reader3 = LeasedReaderId::new();
3276 let _ = state.collections.register_leased_reader(
3277 "",
3278 &reader3,
3279 "",
3280 seqno,
3281 Duration::from_secs(10),
3282 now(),
3283 false,
3284 );
3285
3286 assert_eq!(
3288 state.collections.downgrade_since(
3289 &reader3,
3290 seqno,
3291 seqno,
3292 &Antichain::from_elem(10),
3293 now()
3294 ),
3295 Continue(Since(Antichain::from_elem(10)))
3296 );
3297 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3298
3299 assert_eq!(
3301 state.collections.expire_leased_reader(&reader2),
3302 Continue(true)
3303 );
3304 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3309
3310 assert_eq!(
3312 state.collections.expire_leased_reader(&reader3),
3313 Continue(true)
3314 );
3315 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3320 }
3321
3322 #[mz_ore::test]
3323 fn compare_and_downgrade_since() {
3324 let mut state = TypedState::<(), (), u64, i64>::new(
3325 DUMMY_BUILD_INFO.semver_version(),
3326 ShardId::new(),
3327 "".to_owned(),
3328 0,
3329 );
3330 let reader = CriticalReaderId::new();
3331 let _ = state
3332 .collections
3333 .register_critical_reader("", &reader, Opaque::encode(&0u64), "");
3334
3335 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3337 assert_eq!(
3339 state
3340 .collections
3341 .critical_reader(&reader)
3342 .opaque
3343 .decode::<u64>(),
3344 u64::MIN
3345 );
3346
3347 assert_eq!(
3349 state.collections.compare_and_downgrade_since(
3350 &reader,
3351 &Opaque::encode(&0u64),
3352 (&Opaque::encode(&1u64), &Antichain::from_elem(2)),
3353 ),
3354 Continue(Ok(Since(Antichain::from_elem(2))))
3355 );
3356 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3357 assert_eq!(
3358 state
3359 .collections
3360 .critical_reader(&reader)
3361 .opaque
3362 .decode::<u64>(),
3363 1
3364 );
3365 assert_eq!(
3367 state.collections.compare_and_downgrade_since(
3368 &reader,
3369 &Opaque::encode(&1u64),
3370 (&Opaque::encode(&2u64), &Antichain::from_elem(2)),
3371 ),
3372 Continue(Ok(Since(Antichain::from_elem(2))))
3373 );
3374 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3375 assert_eq!(
3376 state
3377 .collections
3378 .critical_reader(&reader)
3379 .opaque
3380 .decode::<u64>(),
3381 2
3382 );
3383 assert_eq!(
3385 state.collections.compare_and_downgrade_since(
3386 &reader,
3387 &Opaque::encode(&2u64),
3388 (&Opaque::encode(&3u64), &Antichain::from_elem(1)),
3389 ),
3390 Continue(Ok(Since(Antichain::from_elem(2))))
3391 );
3392 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3393 assert_eq!(
3394 state
3395 .collections
3396 .critical_reader(&reader)
3397 .opaque
3398 .decode::<u64>(),
3399 3
3400 );
3401 }
3402
3403 #[mz_ore::test]
3404 fn compare_and_append() {
3405 let state = &mut TypedState::<String, String, u64, i64>::new(
3406 DUMMY_BUILD_INFO.semver_version(),
3407 ShardId::new(),
3408 "".to_owned(),
3409 0,
3410 )
3411 .collections;
3412
3413 let writer_id = WriterId::new();
3414 let now = SYSTEM_TIME.clone();
3415
3416 assert_eq!(state.trace.num_spine_batches(), 0);
3418 assert_eq!(state.trace.num_hollow_batches(), 0);
3419 assert_eq!(state.trace.num_updates(), 0);
3420
3421 assert_eq!(
3423 state.compare_and_append(
3424 &hollow(1, 2, &["key1"], 1),
3425 &writer_id,
3426 now(),
3427 LEASE_DURATION_MS,
3428 &IdempotencyToken::new(),
3429 &debug_state(),
3430 0,
3431 100,
3432 None
3433 ),
3434 Break(CompareAndAppendBreak::Upper {
3435 shard_upper: Antichain::from_elem(0),
3436 writer_upper: Antichain::from_elem(0)
3437 })
3438 );
3439
3440 assert!(
3442 state
3443 .compare_and_append(
3444 &hollow(0, 5, &[], 0),
3445 &writer_id,
3446 now(),
3447 LEASE_DURATION_MS,
3448 &IdempotencyToken::new(),
3449 &debug_state(),
3450 0,
3451 100,
3452 None
3453 )
3454 .is_continue()
3455 );
3456
3457 assert_eq!(
3459 state.compare_and_append(
3460 &hollow(5, 4, &["key1"], 1),
3461 &writer_id,
3462 now(),
3463 LEASE_DURATION_MS,
3464 &IdempotencyToken::new(),
3465 &debug_state(),
3466 0,
3467 100,
3468 None
3469 ),
3470 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3471 lower: Antichain::from_elem(5),
3472 upper: Antichain::from_elem(4)
3473 }))
3474 );
3475
3476 assert_eq!(
3478 state.compare_and_append(
3479 &hollow(5, 5, &["key1"], 1),
3480 &writer_id,
3481 now(),
3482 LEASE_DURATION_MS,
3483 &IdempotencyToken::new(),
3484 &debug_state(),
3485 0,
3486 100,
3487 None
3488 ),
3489 Break(CompareAndAppendBreak::InvalidUsage(
3490 InvalidEmptyTimeInterval {
3491 lower: Antichain::from_elem(5),
3492 upper: Antichain::from_elem(5),
3493 keys: vec!["key1".to_owned()],
3494 }
3495 ))
3496 );
3497
3498 assert!(
3500 state
3501 .compare_and_append(
3502 &hollow(5, 5, &[], 0),
3503 &writer_id,
3504 now(),
3505 LEASE_DURATION_MS,
3506 &IdempotencyToken::new(),
3507 &debug_state(),
3508 0,
3509 100,
3510 None
3511 )
3512 .is_continue()
3513 );
3514 }
3515
3516 #[mz_ore::test]
3517 fn snapshot() {
3518 let now = SYSTEM_TIME.clone();
3519
3520 let mut state = TypedState::<String, String, u64, i64>::new(
3521 DUMMY_BUILD_INFO.semver_version(),
3522 ShardId::new(),
3523 "".to_owned(),
3524 0,
3525 );
3526 assert_eq!(
3528 state.snapshot(&Antichain::from_elem(0)),
3529 Err(SnapshotErr::AsOfNotYetAvailable(
3530 SeqNo(0),
3531 Upper(Antichain::from_elem(0))
3532 ))
3533 );
3534
3535 assert_eq!(
3537 state.snapshot(&Antichain::from_elem(5)),
3538 Err(SnapshotErr::AsOfNotYetAvailable(
3539 SeqNo(0),
3540 Upper(Antichain::from_elem(0))
3541 ))
3542 );
3543
3544 let writer_id = WriterId::new();
3545
3546 assert!(
3548 state
3549 .collections
3550 .compare_and_append(
3551 &hollow(0, 5, &["key1"], 1),
3552 &writer_id,
3553 now(),
3554 LEASE_DURATION_MS,
3555 &IdempotencyToken::new(),
3556 &debug_state(),
3557 0,
3558 100,
3559 None
3560 )
3561 .is_continue()
3562 );
3563
3564 assert_eq!(
3566 state.snapshot(&Antichain::from_elem(0)),
3567 Ok(vec![hollow(0, 5, &["key1"], 1)])
3568 );
3569
3570 assert_eq!(
3572 state.snapshot(&Antichain::from_elem(4)),
3573 Ok(vec![hollow(0, 5, &["key1"], 1)])
3574 );
3575
3576 assert_eq!(
3578 state.snapshot(&Antichain::from_elem(5)),
3579 Err(SnapshotErr::AsOfNotYetAvailable(
3580 SeqNo(0),
3581 Upper(Antichain::from_elem(5))
3582 ))
3583 );
3584 assert_eq!(
3585 state.snapshot(&Antichain::from_elem(6)),
3586 Err(SnapshotErr::AsOfNotYetAvailable(
3587 SeqNo(0),
3588 Upper(Antichain::from_elem(5))
3589 ))
3590 );
3591
3592 let reader = LeasedReaderId::new();
3593 let _ = state.collections.register_leased_reader(
3595 "",
3596 &reader,
3597 "",
3598 SeqNo::minimum(),
3599 Duration::from_secs(10),
3600 now(),
3601 false,
3602 );
3603 assert_eq!(
3604 state.collections.downgrade_since(
3605 &reader,
3606 SeqNo::minimum(),
3607 SeqNo::minimum(),
3608 &Antichain::from_elem(2),
3609 now()
3610 ),
3611 Continue(Since(Antichain::from_elem(2)))
3612 );
3613 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3614 assert_eq!(
3616 state.snapshot(&Antichain::from_elem(1)),
3617 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3618 Antichain::from_elem(2)
3619 )))
3620 );
3621
3622 assert!(
3624 state
3625 .collections
3626 .compare_and_append(
3627 &hollow(5, 10, &[], 0),
3628 &writer_id,
3629 now(),
3630 LEASE_DURATION_MS,
3631 &IdempotencyToken::new(),
3632 &debug_state(),
3633 0,
3634 100,
3635 None
3636 )
3637 .is_continue()
3638 );
3639
3640 assert_eq!(
3642 state.snapshot(&Antichain::from_elem(7)),
3643 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3644 );
3645
3646 assert_eq!(
3648 state.snapshot(&Antichain::from_elem(10)),
3649 Err(SnapshotErr::AsOfNotYetAvailable(
3650 SeqNo(0),
3651 Upper(Antichain::from_elem(10))
3652 ))
3653 );
3654
3655 assert!(
3657 state
3658 .collections
3659 .compare_and_append(
3660 &hollow(10, 15, &["key2"], 1),
3661 &writer_id,
3662 now(),
3663 LEASE_DURATION_MS,
3664 &IdempotencyToken::new(),
3665 &debug_state(),
3666 0,
3667 100,
3668 None
3669 )
3670 .is_continue()
3671 );
3672
3673 assert_eq!(
3676 state.snapshot(&Antichain::from_elem(9)),
3677 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3678 );
3679
3680 assert_eq!(
3682 state.snapshot(&Antichain::from_elem(10)),
3683 Ok(vec![
3684 hollow(0, 5, &["key1"], 1),
3685 hollow(5, 10, &[], 0),
3686 hollow(10, 15, &["key2"], 1)
3687 ])
3688 );
3689
3690 assert_eq!(
3691 state.snapshot(&Antichain::from_elem(11)),
3692 Ok(vec![
3693 hollow(0, 5, &["key1"], 1),
3694 hollow(5, 10, &[], 0),
3695 hollow(10, 15, &["key2"], 1)
3696 ])
3697 );
3698 }
3699
3700 #[mz_ore::test]
3701 fn next_listen_batch() {
3702 let mut state = TypedState::<String, String, u64, i64>::new(
3703 DUMMY_BUILD_INFO.semver_version(),
3704 ShardId::new(),
3705 "".to_owned(),
3706 0,
3707 );
3708
3709 assert_eq!(
3712 state.next_listen_batch(&Antichain::from_elem(0)),
3713 Err(SeqNo(0))
3714 );
3715 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3716
3717 let writer_id = WriterId::new();
3718 let now = SYSTEM_TIME.clone();
3719
3720 assert!(
3722 state
3723 .collections
3724 .compare_and_append(
3725 &hollow(0, 5, &["key1"], 1),
3726 &writer_id,
3727 now(),
3728 LEASE_DURATION_MS,
3729 &IdempotencyToken::new(),
3730 &debug_state(),
3731 0,
3732 100,
3733 None
3734 )
3735 .is_continue()
3736 );
3737 assert!(
3738 state
3739 .collections
3740 .compare_and_append(
3741 &hollow(5, 10, &["key2"], 1),
3742 &writer_id,
3743 now(),
3744 LEASE_DURATION_MS,
3745 &IdempotencyToken::new(),
3746 &debug_state(),
3747 0,
3748 100,
3749 None
3750 )
3751 .is_continue()
3752 );
3753
3754 for t in 0..=4 {
3756 assert_eq!(
3757 state.next_listen_batch(&Antichain::from_elem(t)),
3758 Ok(hollow(0, 5, &["key1"], 1))
3759 );
3760 }
3761
3762 for t in 5..=9 {
3764 assert_eq!(
3765 state.next_listen_batch(&Antichain::from_elem(t)),
3766 Ok(hollow(5, 10, &["key2"], 1))
3767 );
3768 }
3769
3770 assert_eq!(
3772 state.next_listen_batch(&Antichain::from_elem(10)),
3773 Err(SeqNo(0))
3774 );
3775
3776 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3779 }
3780
3781 #[mz_ore::test]
3782 fn expire_writer() {
3783 let mut state = TypedState::<String, String, u64, i64>::new(
3784 DUMMY_BUILD_INFO.semver_version(),
3785 ShardId::new(),
3786 "".to_owned(),
3787 0,
3788 );
3789 let now = SYSTEM_TIME.clone();
3790
3791 let writer_id_one = WriterId::new();
3792
3793 let writer_id_two = WriterId::new();
3794
3795 assert!(
3797 state
3798 .collections
3799 .compare_and_append(
3800 &hollow(0, 2, &["key1"], 1),
3801 &writer_id_one,
3802 now(),
3803 LEASE_DURATION_MS,
3804 &IdempotencyToken::new(),
3805 &debug_state(),
3806 0,
3807 100,
3808 None
3809 )
3810 .is_continue()
3811 );
3812
3813 assert!(
3814 state
3815 .collections
3816 .expire_writer(&writer_id_one)
3817 .is_continue()
3818 );
3819
3820 assert!(
3822 state
3823 .collections
3824 .compare_and_append(
3825 &hollow(2, 5, &["key2"], 1),
3826 &writer_id_two,
3827 now(),
3828 LEASE_DURATION_MS,
3829 &IdempotencyToken::new(),
3830 &debug_state(),
3831 0,
3832 100,
3833 None
3834 )
3835 .is_continue()
3836 );
3837 }
3838
3839 #[mz_ore::test]
3840 fn maybe_gc_active_gc() {
3841 const GC_CONFIG: GcConfig = GcConfig {
3842 use_active_gc: true,
3843 fallback_threshold_ms: 5000,
3844 min_versions: 99,
3845 max_versions: 500,
3846 };
3847 let now_fn = SYSTEM_TIME.clone();
3848
3849 let mut state = TypedState::<String, String, u64, i64>::new(
3850 DUMMY_BUILD_INFO.semver_version(),
3851 ShardId::new(),
3852 "".to_owned(),
3853 0,
3854 );
3855
3856 let now = now_fn();
3857 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3859 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3860
3861 state.seqno = SeqNo(100);
3864 assert_eq!(state.seqno_since(), SeqNo(100));
3865
3866 let writer_id = WriterId::new();
3868 let _ = state.collections.compare_and_append(
3869 &hollow(1, 2, &["key1"], 1),
3870 &writer_id,
3871 now,
3872 LEASE_DURATION_MS,
3873 &IdempotencyToken::new(),
3874 &debug_state(),
3875 0,
3876 100,
3877 None,
3878 );
3879 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3880
3881 assert_eq!(
3883 state.maybe_gc(true, now, GC_CONFIG),
3884 Some(GcReq {
3885 shard_id: state.shard_id,
3886 new_seqno_since: SeqNo(100)
3887 })
3888 );
3889
3890 state.collections.active_gc = Some(ActiveGc {
3892 seqno: state.seqno,
3893 start_ms: now,
3894 });
3895
3896 state.seqno = SeqNo(200);
3897 assert_eq!(state.seqno_since(), SeqNo(200));
3898
3899 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3900
3901 state.seqno = SeqNo(300);
3902 assert_eq!(state.seqno_since(), SeqNo(300));
3903 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3905 assert_eq!(
3906 state.maybe_gc(true, new_now, GC_CONFIG),
3907 Some(GcReq {
3908 shard_id: state.shard_id,
3909 new_seqno_since: SeqNo(300)
3910 })
3911 );
3912
3913 state.seqno = SeqNo(301);
3917 assert_eq!(state.seqno_since(), SeqNo(301));
3918 assert_eq!(
3919 state.maybe_gc(true, new_now, GC_CONFIG),
3920 Some(GcReq {
3921 shard_id: state.shard_id,
3922 new_seqno_since: SeqNo(301)
3923 })
3924 );
3925
3926 state.collections.active_gc = None;
3927
3928 state.seqno = SeqNo(400);
3931 assert_eq!(state.seqno_since(), SeqNo(400));
3932
3933 let now = now_fn();
3934
3935 let _ = state.collections.expire_writer(&writer_id);
3937 assert_eq!(
3938 state.maybe_gc(false, now, GC_CONFIG),
3939 Some(GcReq {
3940 shard_id: state.shard_id,
3941 new_seqno_since: SeqNo(400)
3942 })
3943 );
3944
3945 let previous_seqno = state.seqno;
3947 state.seqno = SeqNo(10_000);
3948 assert_eq!(state.seqno_since(), SeqNo(10_000));
3949
3950 let now = now_fn();
3951 assert_eq!(
3952 state.maybe_gc(true, now, GC_CONFIG),
3953 Some(GcReq {
3954 shard_id: state.shard_id,
3955 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3956 })
3957 );
3958 }
3959
3960 #[mz_ore::test]
3961 fn maybe_gc_classic() {
3962 const GC_CONFIG: GcConfig = GcConfig {
3963 use_active_gc: false,
3964 fallback_threshold_ms: 5000,
3965 min_versions: 16,
3966 max_versions: 128,
3967 };
3968 const NOW_MS: u64 = 0;
3969
3970 let mut state = TypedState::<String, String, u64, i64>::new(
3971 DUMMY_BUILD_INFO.semver_version(),
3972 ShardId::new(),
3973 "".to_owned(),
3974 0,
3975 );
3976
3977 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3979 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3980
3981 state.seqno = SeqNo(100);
3984 assert_eq!(state.seqno_since(), SeqNo(100));
3985
3986 let writer_id = WriterId::new();
3988 let now = SYSTEM_TIME.clone();
3989 let _ = state.collections.compare_and_append(
3990 &hollow(1, 2, &["key1"], 1),
3991 &writer_id,
3992 now(),
3993 LEASE_DURATION_MS,
3994 &IdempotencyToken::new(),
3995 &debug_state(),
3996 0,
3997 100,
3998 None,
3999 );
4000 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4001
4002 assert_eq!(
4004 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4005 Some(GcReq {
4006 shard_id: state.shard_id,
4007 new_seqno_since: SeqNo(100)
4008 })
4009 );
4010
4011 state.seqno = SeqNo(200);
4014 assert_eq!(state.seqno_since(), SeqNo(200));
4015
4016 let _ = state.collections.expire_writer(&writer_id);
4018 assert_eq!(
4019 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4020 Some(GcReq {
4021 shard_id: state.shard_id,
4022 new_seqno_since: SeqNo(200)
4023 })
4024 );
4025 }
4026
4027 #[mz_ore::test]
4028 fn need_rollup_active_rollup() {
4029 const ROLLUP_THRESHOLD: usize = 3;
4030 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4031 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4032 let now = SYSTEM_TIME.clone();
4033
4034 mz_ore::test::init_logging();
4035 let mut state = TypedState::<String, String, u64, i64>::new(
4036 DUMMY_BUILD_INFO.semver_version(),
4037 ShardId::new(),
4038 "".to_owned(),
4039 0,
4040 );
4041
4042 let rollup_seqno = SeqNo(5);
4043 let rollup = HollowRollup {
4044 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4045 encoded_size_bytes: None,
4046 };
4047
4048 assert!(
4049 state
4050 .collections
4051 .add_rollup((rollup_seqno, &rollup))
4052 .is_continue()
4053 );
4054
4055 state.seqno = SeqNo(5);
4057 assert_none!(state.need_rollup(
4058 ROLLUP_THRESHOLD,
4059 ROLLUP_USE_ACTIVE_ROLLUP,
4060 ROLLUP_FALLBACK_THRESHOLD_MS,
4061 now()
4062 ));
4063
4064 state.seqno = SeqNo(6);
4066 assert_none!(state.need_rollup(
4067 ROLLUP_THRESHOLD,
4068 ROLLUP_USE_ACTIVE_ROLLUP,
4069 ROLLUP_FALLBACK_THRESHOLD_MS,
4070 now()
4071 ));
4072 state.seqno = SeqNo(7);
4073 assert_none!(state.need_rollup(
4074 ROLLUP_THRESHOLD,
4075 ROLLUP_USE_ACTIVE_ROLLUP,
4076 ROLLUP_FALLBACK_THRESHOLD_MS,
4077 now()
4078 ));
4079 state.seqno = SeqNo(8);
4080 assert_none!(state.need_rollup(
4081 ROLLUP_THRESHOLD,
4082 ROLLUP_USE_ACTIVE_ROLLUP,
4083 ROLLUP_FALLBACK_THRESHOLD_MS,
4084 now()
4085 ));
4086
4087 let mut current_time = now();
4088 state.seqno = SeqNo(9);
4090 assert_eq!(
4091 state
4092 .need_rollup(
4093 ROLLUP_THRESHOLD,
4094 ROLLUP_USE_ACTIVE_ROLLUP,
4095 ROLLUP_FALLBACK_THRESHOLD_MS,
4096 current_time
4097 )
4098 .expect("rollup"),
4099 SeqNo(9)
4100 );
4101
4102 state.collections.active_rollup = Some(ActiveRollup {
4103 seqno: SeqNo(9),
4104 start_ms: current_time,
4105 });
4106
4107 assert_none!(state.need_rollup(
4109 ROLLUP_THRESHOLD,
4110 ROLLUP_USE_ACTIVE_ROLLUP,
4111 ROLLUP_FALLBACK_THRESHOLD_MS,
4112 current_time
4113 ));
4114
4115 state.seqno = SeqNo(10);
4116 assert_none!(state.need_rollup(
4119 ROLLUP_THRESHOLD,
4120 ROLLUP_USE_ACTIVE_ROLLUP,
4121 ROLLUP_FALLBACK_THRESHOLD_MS,
4122 current_time
4123 ));
4124
4125 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4127 assert_eq!(
4128 state
4129 .need_rollup(
4130 ROLLUP_THRESHOLD,
4131 ROLLUP_USE_ACTIVE_ROLLUP,
4132 ROLLUP_FALLBACK_THRESHOLD_MS,
4133 current_time
4134 )
4135 .expect("rollup"),
4136 SeqNo(10)
4137 );
4138
4139 state.seqno = SeqNo(9);
4140 state.collections.active_rollup = None;
4142 let rollup_seqno = SeqNo(9);
4143 let rollup = HollowRollup {
4144 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4145 encoded_size_bytes: None,
4146 };
4147 assert!(
4148 state
4149 .collections
4150 .add_rollup((rollup_seqno, &rollup))
4151 .is_continue()
4152 );
4153
4154 state.seqno = SeqNo(11);
4155 assert_none!(state.need_rollup(
4157 ROLLUP_THRESHOLD,
4158 ROLLUP_USE_ACTIVE_ROLLUP,
4159 ROLLUP_FALLBACK_THRESHOLD_MS,
4160 current_time
4161 ));
4162 state.seqno = SeqNo(13);
4164 assert_eq!(
4165 state
4166 .need_rollup(
4167 ROLLUP_THRESHOLD,
4168 ROLLUP_USE_ACTIVE_ROLLUP,
4169 ROLLUP_FALLBACK_THRESHOLD_MS,
4170 current_time
4171 )
4172 .expect("rollup"),
4173 SeqNo(13)
4174 );
4175 }
4176
4177 #[mz_ore::test]
4178 fn need_rollup_classic() {
4179 const ROLLUP_THRESHOLD: usize = 3;
4180 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4181 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4182 const NOW: u64 = 0;
4183
4184 mz_ore::test::init_logging();
4185 let mut state = TypedState::<String, String, u64, i64>::new(
4186 DUMMY_BUILD_INFO.semver_version(),
4187 ShardId::new(),
4188 "".to_owned(),
4189 0,
4190 );
4191
4192 let rollup_seqno = SeqNo(5);
4193 let rollup = HollowRollup {
4194 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4195 encoded_size_bytes: None,
4196 };
4197
4198 assert!(
4199 state
4200 .collections
4201 .add_rollup((rollup_seqno, &rollup))
4202 .is_continue()
4203 );
4204
4205 state.seqno = SeqNo(5);
4207 assert_none!(state.need_rollup(
4208 ROLLUP_THRESHOLD,
4209 ROLLUP_USE_ACTIVE_ROLLUP,
4210 ROLLUP_FALLBACK_THRESHOLD_MS,
4211 NOW
4212 ));
4213
4214 state.seqno = SeqNo(6);
4216 assert_none!(state.need_rollup(
4217 ROLLUP_THRESHOLD,
4218 ROLLUP_USE_ACTIVE_ROLLUP,
4219 ROLLUP_FALLBACK_THRESHOLD_MS,
4220 NOW
4221 ));
4222 state.seqno = SeqNo(7);
4223 assert_none!(state.need_rollup(
4224 ROLLUP_THRESHOLD,
4225 ROLLUP_USE_ACTIVE_ROLLUP,
4226 ROLLUP_FALLBACK_THRESHOLD_MS,
4227 NOW
4228 ));
4229
4230 state.seqno = SeqNo(8);
4232 assert_eq!(
4233 state
4234 .need_rollup(
4235 ROLLUP_THRESHOLD,
4236 ROLLUP_USE_ACTIVE_ROLLUP,
4237 ROLLUP_FALLBACK_THRESHOLD_MS,
4238 NOW
4239 )
4240 .expect("rollup"),
4241 SeqNo(8)
4242 );
4243
4244 state.seqno = SeqNo(9);
4246 assert_none!(state.need_rollup(
4247 ROLLUP_THRESHOLD,
4248 ROLLUP_USE_ACTIVE_ROLLUP,
4249 ROLLUP_FALLBACK_THRESHOLD_MS,
4250 NOW
4251 ));
4252
4253 state.seqno = SeqNo(11);
4255 assert_eq!(
4256 state
4257 .need_rollup(
4258 ROLLUP_THRESHOLD,
4259 ROLLUP_USE_ACTIVE_ROLLUP,
4260 ROLLUP_FALLBACK_THRESHOLD_MS,
4261 NOW
4262 )
4263 .expect("rollup"),
4264 SeqNo(11)
4265 );
4266
4267 let rollup_seqno = SeqNo(6);
4269 let rollup = HollowRollup {
4270 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4271 encoded_size_bytes: None,
4272 };
4273 assert!(
4274 state
4275 .collections
4276 .add_rollup((rollup_seqno, &rollup))
4277 .is_continue()
4278 );
4279
4280 state.seqno = SeqNo(8);
4281 assert_none!(state.need_rollup(
4282 ROLLUP_THRESHOLD,
4283 ROLLUP_USE_ACTIVE_ROLLUP,
4284 ROLLUP_FALLBACK_THRESHOLD_MS,
4285 NOW
4286 ));
4287 state.seqno = SeqNo(9);
4288 assert_eq!(
4289 state
4290 .need_rollup(
4291 ROLLUP_THRESHOLD,
4292 ROLLUP_USE_ACTIVE_ROLLUP,
4293 ROLLUP_FALLBACK_THRESHOLD_MS,
4294 NOW
4295 )
4296 .expect("rollup"),
4297 SeqNo(9)
4298 );
4299
4300 let fallback_seqno = SeqNo(
4302 rollup_seqno.0
4303 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4304 );
4305 state.seqno = fallback_seqno;
4306 assert_eq!(
4307 state
4308 .need_rollup(
4309 ROLLUP_THRESHOLD,
4310 ROLLUP_USE_ACTIVE_ROLLUP,
4311 ROLLUP_FALLBACK_THRESHOLD_MS,
4312 NOW
4313 )
4314 .expect("rollup"),
4315 fallback_seqno
4316 );
4317 state.seqno = fallback_seqno.next();
4318 assert_eq!(
4319 state
4320 .need_rollup(
4321 ROLLUP_THRESHOLD,
4322 ROLLUP_USE_ACTIVE_ROLLUP,
4323 ROLLUP_FALLBACK_THRESHOLD_MS,
4324 NOW
4325 )
4326 .expect("rollup"),
4327 fallback_seqno.next()
4328 );
4329 }
4330
4331 #[mz_ore::test]
4332 fn idempotency_token_sentinel() {
4333 assert_eq!(
4334 IdempotencyToken::SENTINEL.to_string(),
4335 "i11111111-1111-1111-1111-111111111111"
4336 );
4337 }
4338
4339 #[mz_ore::test]
4348 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4350 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4351 let mut runner = proptest::test_runner::TestRunner::deterministic();
4352 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4353 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4354 assert_eq!(
4355 json.trim(),
4356 STATE_SERDE_JSON.trim(),
4357 "\n\nNEW GOLDEN\n{}\n",
4358 json
4359 );
4360 }
4361
4362 #[mz_persist_proc::test(tokio::test)]
4363 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4365 let mut clients = new_test_client_cache(&dyncfgs);
4366 let shard_id = ShardId::new();
4367
4368 async fn open_and_write(
4369 clients: &mut PersistClientCache,
4370 version: semver::Version,
4371 shard_id: ShardId,
4372 ) -> Result<(), tokio::task::JoinError> {
4373 clients.cfg.build_version = version.clone();
4374 clients.clear_state_cache();
4375 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4376 mz_ore::task::spawn(|| version.to_string(), async move {
4378 let () = client
4379 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4380 .await
4381 .expect("valid usage");
4382 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4383 let current = *write.upper().as_option().unwrap();
4384 write
4386 .expect_compare_and_append_batch(&mut [], current, current + 1)
4387 .await;
4388 })
4389 .into_tokio_handle()
4390 .await
4391 }
4392
4393 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4395 assert_ok!(res);
4396
4397 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4399 assert_ok!(res);
4400
4401 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4403 assert!(res.unwrap_err().is_panic());
4404
4405 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4407 assert!(res.unwrap_err().is_panic());
4408 }
4409
4410 #[mz_ore::test]
4411 fn runid_roundtrip() {
4412 proptest!(|(runid: RunId)| {
4413 let runid_str = runid.to_string();
4414 let parsed = RunId::from_str(&runid_str);
4415 prop_assert_eq!(parsed, Ok(runid));
4416 });
4417 }
4418}