1use anyhow::ensure;
11use async_stream::{stream, try_stream};
12use differential_dataflow::difference::Monoid;
13use mz_persist::metrics::ColumnarMetrics;
14use proptest::prelude::{Arbitrary, Strategy};
15use std::borrow::Cow;
16use std::cmp::Ordering;
17use std::collections::BTreeMap;
18use std::fmt::{Debug, Formatter};
19use std::marker::PhantomData;
20use std::ops::ControlFlow::{self, Break, Continue};
21use std::ops::{Deref, DerefMut};
22use std::time::Duration;
23
24use arrow::array::{Array, ArrayData, make_array};
25use arrow::datatypes::DataType;
26use bytes::Bytes;
27use differential_dataflow::Hashable;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::Description;
30use differential_dataflow::trace::implementations::BatchContainer;
31use futures::Stream;
32use futures_util::StreamExt;
33use itertools::Itertools;
34use mz_dyncfg::Config;
35use mz_ore::cast::CastFrom;
36use mz_ore::now::EpochMillis;
37use mz_ore::soft_panic_or_log;
38use mz_ore::vec::PartialOrdVecExt;
39use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceUpdates};
40use mz_persist::location::{Blob, SeqNo};
41use mz_persist_types::arrow::{ArrayBound, ProtoArrayData};
42use mz_persist_types::columnar::{ColumnEncoder, Schema};
43use mz_persist_types::schema::{SchemaId, backward_compatible};
44use mz_persist_types::{Codec, Codec64};
45use mz_proto::ProtoType;
46use mz_proto::RustType;
47use proptest_derive::Arbitrary;
48use semver::Version;
49use serde::ser::SerializeStruct;
50use serde::{Serialize, Serializer};
51use timely::PartialOrder;
52use timely::order::TotalOrder;
53use timely::progress::{Antichain, Timestamp};
54use tracing::info;
55use uuid::Uuid;
56
57use crate::critical::{CriticalReaderId, Opaque};
58use crate::error::InvalidUsage;
59use crate::internal::encoding::{
60 LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61};
62use crate::internal::gc::GcReq;
63use crate::internal::machine::retry_external;
64use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
65use crate::internal::trace::{
66 ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
67};
68use crate::metrics::Metrics;
69use crate::read::LeasedReaderId;
70use crate::schema::CaESchema;
71use crate::write::WriterId;
72use crate::{PersistConfig, ShardId};
73
74include!(concat!(
75 env!("OUT_DIR"),
76 "/mz_persist_client.internal.state.rs"
77));
78
79include!(concat!(
80 env!("OUT_DIR"),
81 "/mz_persist_client.internal.diff.rs"
82));
83
84pub(crate) const ROLLUP_THRESHOLD: Config<usize> = Config::new(
92 "persist_rollup_threshold",
93 128,
94 "The number of seqnos between rollups.",
95);
96
97pub(crate) const ROLLUP_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
100 "persist_rollup_fallback_threshold_ms",
101 5000,
102 "The number of milliseconds before a worker claims an already claimed rollup.",
103);
104
105pub(crate) const ROLLUP_USE_ACTIVE_ROLLUP: Config<bool> = Config::new(
108 "persist_rollup_use_active_rollup",
109 true,
110 "Whether to use the new active rollup tracking mechanism.",
111);
112
113pub(crate) const GC_FALLBACK_THRESHOLD_MS: Config<usize> = Config::new(
116 "persist_gc_fallback_threshold_ms",
117 900000,
118 "The number of milliseconds before a worker claims an already claimed GC.",
119);
120
121pub(crate) const GC_MIN_VERSIONS: Config<usize> = Config::new(
123 "persist_gc_min_versions",
124 32,
125 "The number of un-GCd versions that may exist in state before we'll trigger a GC.",
126);
127
128pub(crate) const GC_MAX_VERSIONS: Config<usize> = Config::new(
130 "persist_gc_max_versions",
131 128_000,
132 "The maximum number of versions to GC in a single GC run.",
133);
134
135pub(crate) const GC_USE_ACTIVE_GC: Config<bool> = Config::new(
138 "persist_gc_use_active_gc",
139 false,
140 "Whether to use the new active GC tracking mechanism.",
141);
142
143pub(crate) const ENABLE_INCREMENTAL_COMPACTION: Config<bool> = Config::new(
144 "persist_enable_incremental_compaction",
145 false,
146 "Whether to enable incremental compaction.",
147);
148
149#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)]
152#[serde(into = "String")]
153pub struct IdempotencyToken(pub(crate) [u8; 16]);
154
155impl std::fmt::Display for IdempotencyToken {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 write!(f, "i{}", Uuid::from_bytes(self.0))
158 }
159}
160
161impl std::fmt::Debug for IdempotencyToken {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(f, "IdempotencyToken({})", Uuid::from_bytes(self.0))
164 }
165}
166
167impl std::str::FromStr for IdempotencyToken {
168 type Err = String;
169
170 fn from_str(s: &str) -> Result<Self, Self::Err> {
171 parse_id("i", "IdempotencyToken", s).map(IdempotencyToken)
172 }
173}
174
175impl From<IdempotencyToken> for String {
176 fn from(x: IdempotencyToken) -> Self {
177 x.to_string()
178 }
179}
180
181impl IdempotencyToken {
182 pub(crate) fn new() -> Self {
183 IdempotencyToken(*Uuid::new_v4().as_bytes())
184 }
185 pub(crate) const SENTINEL: IdempotencyToken = IdempotencyToken([17u8; 16]);
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize)]
189pub struct LeasedReaderState<T> {
190 pub seqno: SeqNo,
192 pub since: Antichain<T>,
194 pub last_heartbeat_timestamp_ms: u64,
196 pub lease_duration_ms: u64,
199 pub debug: HandleDebugState,
201}
202
203#[derive(Clone, Debug, PartialEq, Serialize)]
204pub struct CriticalReaderState<T> {
205 pub since: Antichain<T>,
207 pub opaque: Opaque,
209 pub debug: HandleDebugState,
211}
212
213#[derive(Clone, Debug, PartialEq, Serialize)]
214pub struct WriterState<T> {
215 pub last_heartbeat_timestamp_ms: u64,
217 pub lease_duration_ms: u64,
220 pub most_recent_write_token: IdempotencyToken,
223 pub most_recent_write_upper: Antichain<T>,
226 pub debug: HandleDebugState,
228}
229
230#[derive(Arbitrary, Clone, Debug, Default, PartialEq, Serialize)]
232pub struct HandleDebugState {
233 pub hostname: String,
236 pub purpose: String,
238}
239
240#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
244#[serde(tag = "type")]
245pub enum BatchPart<T> {
246 Hollow(HollowBatchPart<T>),
247 Inline {
248 updates: LazyInlineBatchPart,
249 ts_rewrite: Option<Antichain<T>>,
250 schema_id: Option<SchemaId>,
251
252 deprecated_schema_id: Option<SchemaId>,
254 },
255}
256
257fn decode_structured_lower(lower: &LazyProto<ProtoArrayData>) -> Option<ArrayBound> {
258 let try_decode = |lower: &LazyProto<ProtoArrayData>| {
259 let proto = lower.decode()?;
260 let data = ArrayData::from_proto(proto)?;
261 ensure!(data.len() == 1);
262 Ok(ArrayBound::new(make_array(data), 0))
263 };
264
265 let decoded: anyhow::Result<ArrayBound> = try_decode(lower);
266
267 match decoded {
268 Ok(bound) => Some(bound),
269 Err(e) => {
270 soft_panic_or_log!("failed to decode bound: {e:#?}");
271 None
272 }
273 }
274}
275
276impl<T> BatchPart<T> {
277 pub fn hollow_bytes(&self) -> usize {
278 match self {
279 BatchPart::Hollow(x) => x.encoded_size_bytes,
280 BatchPart::Inline { .. } => 0,
281 }
282 }
283
284 pub fn is_inline(&self) -> bool {
285 matches!(self, BatchPart::Inline { .. })
286 }
287
288 pub fn inline_bytes(&self) -> usize {
289 match self {
290 BatchPart::Hollow(_) => 0,
291 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
292 }
293 }
294
295 pub fn writer_key(&self) -> Option<WriterKey> {
296 match self {
297 BatchPart::Hollow(x) => x.key.split().map(|(writer, _part)| writer),
298 BatchPart::Inline { .. } => None,
299 }
300 }
301
302 pub fn encoded_size_bytes(&self) -> usize {
303 match self {
304 BatchPart::Hollow(x) => x.encoded_size_bytes,
305 BatchPart::Inline { updates, .. } => updates.encoded_size_bytes(),
306 }
307 }
308
309 pub fn printable_name(&self) -> &str {
312 match self {
313 BatchPart::Hollow(x) => x.key.0.as_str(),
314 BatchPart::Inline { .. } => "<inline>",
315 }
316 }
317
318 pub fn stats(&self) -> Option<&LazyPartStats> {
319 match self {
320 BatchPart::Hollow(x) => x.stats.as_ref(),
321 BatchPart::Inline { .. } => None,
322 }
323 }
324
325 pub fn key_lower(&self) -> &[u8] {
326 match self {
327 BatchPart::Hollow(x) => x.key_lower.as_slice(),
328 BatchPart::Inline { .. } => &[],
335 }
336 }
337
338 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
339 let part = match self {
340 BatchPart::Hollow(part) => part,
341 BatchPart::Inline { .. } => return None,
342 };
343
344 decode_structured_lower(part.structured_key_lower.as_ref()?)
345 }
346
347 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
348 match self {
349 BatchPart::Hollow(x) => x.ts_rewrite.as_ref(),
350 BatchPart::Inline { ts_rewrite, .. } => ts_rewrite.as_ref(),
351 }
352 }
353
354 pub fn schema_id(&self) -> Option<SchemaId> {
355 match self {
356 BatchPart::Hollow(x) => x.schema_id,
357 BatchPart::Inline { schema_id, .. } => *schema_id,
358 }
359 }
360
361 pub fn deprecated_schema_id(&self) -> Option<SchemaId> {
362 match self {
363 BatchPart::Hollow(x) => x.deprecated_schema_id,
364 BatchPart::Inline {
365 deprecated_schema_id,
366 ..
367 } => *deprecated_schema_id,
368 }
369 }
370}
371
372impl<T: Timestamp + Codec64> BatchPart<T> {
373 pub fn is_structured_only(&self, metrics: &ColumnarMetrics) -> bool {
374 match self {
375 BatchPart::Hollow(x) => matches!(x.format, Some(BatchColumnarFormat::Structured)),
376 BatchPart::Inline { updates, .. } => {
377 let inline_part = updates.decode::<T>(metrics).expect("valid inline part");
378 matches!(inline_part.updates, BlobTraceUpdates::Structured { .. })
379 }
380 }
381 }
382
383 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
384 match self {
385 BatchPart::Hollow(x) => x.diffs_sum.map(D::decode),
386 BatchPart::Inline { updates, .. } => Some(
387 updates
388 .decode::<T>(metrics)
389 .expect("valid inline part")
390 .updates
391 .diffs_sum(),
392 ),
393 }
394 }
395}
396
397#[derive(Debug, Clone)]
399pub struct HollowRun<T> {
400 pub(crate) parts: Vec<RunPart<T>>,
402}
403
404#[derive(Debug, Eq, PartialEq, Clone, Serialize)]
407pub struct HollowRunRef<T> {
408 pub key: PartialBatchKey,
409
410 pub hollow_bytes: usize,
412
413 pub max_part_bytes: usize,
415
416 pub key_lower: Vec<u8>,
418
419 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
421
422 pub diffs_sum: Option<[u8; 8]>,
423
424 pub(crate) _phantom_data: PhantomData<T>,
425}
426impl<T: Eq> PartialOrd<Self> for HollowRunRef<T> {
427 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
428 Some(self.cmp(other))
429 }
430}
431
432impl<T: Eq> Ord for HollowRunRef<T> {
433 fn cmp(&self, other: &Self) -> Ordering {
434 self.key.cmp(&other.key)
435 }
436}
437
438impl<T> HollowRunRef<T> {
439 pub fn writer_key(&self) -> Option<WriterKey> {
440 Some(self.key.split()?.0)
441 }
442}
443
444impl<T: Timestamp + Codec64> HollowRunRef<T> {
445 pub async fn set<D: Codec64 + Monoid>(
447 shard_id: ShardId,
448 blob: &dyn Blob,
449 writer: &WriterKey,
450 data: HollowRun<T>,
451 metrics: &Metrics,
452 ) -> Self {
453 let hollow_bytes = data.parts.iter().map(|p| p.hollow_bytes()).sum();
454 let max_part_bytes = data
455 .parts
456 .iter()
457 .map(|p| p.max_part_bytes())
458 .max()
459 .unwrap_or(0);
460 let key_lower = data
461 .parts
462 .first()
463 .map_or(vec![], |p| p.key_lower().to_vec());
464 let structured_key_lower = match data.parts.first() {
465 Some(RunPart::Many(r)) => r.structured_key_lower.clone(),
466 Some(RunPart::Single(BatchPart::Hollow(p))) => p.structured_key_lower.clone(),
467 Some(RunPart::Single(BatchPart::Inline { .. })) | None => None,
468 };
469 let diffs_sum = data
470 .parts
471 .iter()
472 .map(|p| {
473 p.diffs_sum::<D>(&metrics.columnar)
474 .expect("valid diffs sum")
475 })
476 .reduce(|mut a, b| {
477 a.plus_equals(&b);
478 a
479 })
480 .expect("valid diffs sum")
481 .encode();
482
483 let key = PartialBatchKey::new(writer, &PartId::new());
484 let blob_key = key.complete(&shard_id);
485 let bytes = Bytes::from(prost::Message::encode_to_vec(&data.into_proto()));
486 let () = retry_external(&metrics.retries.external.hollow_run_set, || {
487 blob.set(&blob_key, bytes.clone())
488 })
489 .await;
490 Self {
491 key,
492 hollow_bytes,
493 max_part_bytes,
494 key_lower,
495 structured_key_lower,
496 diffs_sum: Some(diffs_sum),
497 _phantom_data: Default::default(),
498 }
499 }
500
501 pub async fn get(
505 &self,
506 shard_id: ShardId,
507 blob: &dyn Blob,
508 metrics: &Metrics,
509 ) -> Option<HollowRun<T>> {
510 let blob_key = self.key.complete(&shard_id);
511 let mut bytes = retry_external(&metrics.retries.external.hollow_run_get, || {
512 blob.get(&blob_key)
513 })
514 .await?;
515 let proto_runs: ProtoHollowRun =
516 prost::Message::decode(&mut bytes).expect("illegal state: invalid proto bytes");
517 let runs = proto_runs
518 .into_rust()
519 .expect("illegal state: invalid encoded runs proto");
520 Some(runs)
521 }
522}
523
524#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
528#[serde(untagged)]
529pub enum RunPart<T> {
530 Single(BatchPart<T>),
531 Many(HollowRunRef<T>),
532}
533
534impl<T: Ord> PartialOrd<Self> for RunPart<T> {
535 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
536 Some(self.cmp(other))
537 }
538}
539
540impl<T: Ord> Ord for RunPart<T> {
541 fn cmp(&self, other: &Self) -> Ordering {
542 match (self, other) {
543 (RunPart::Single(a), RunPart::Single(b)) => a.cmp(b),
544 (RunPart::Single(_), RunPart::Many(_)) => Ordering::Less,
545 (RunPart::Many(_), RunPart::Single(_)) => Ordering::Greater,
546 (RunPart::Many(a), RunPart::Many(b)) => a.cmp(b),
547 }
548 }
549}
550
551impl<T> RunPart<T> {
552 #[cfg(test)]
553 pub fn expect_hollow_part(&self) -> &HollowBatchPart<T> {
554 match self {
555 RunPart::Single(BatchPart::Hollow(hollow)) => hollow,
556 _ => panic!("expected hollow part!"),
557 }
558 }
559
560 pub fn hollow_bytes(&self) -> usize {
561 match self {
562 Self::Single(p) => p.hollow_bytes(),
563 Self::Many(r) => r.hollow_bytes,
564 }
565 }
566
567 pub fn is_inline(&self) -> bool {
568 match self {
569 Self::Single(p) => p.is_inline(),
570 Self::Many(_) => false,
571 }
572 }
573
574 pub fn inline_bytes(&self) -> usize {
575 match self {
576 Self::Single(p) => p.inline_bytes(),
577 Self::Many(_) => 0,
578 }
579 }
580
581 pub fn max_part_bytes(&self) -> usize {
582 match self {
583 Self::Single(p) => p.encoded_size_bytes(),
584 Self::Many(r) => r.max_part_bytes,
585 }
586 }
587
588 pub fn writer_key(&self) -> Option<WriterKey> {
589 match self {
590 Self::Single(p) => p.writer_key(),
591 Self::Many(r) => r.writer_key(),
592 }
593 }
594
595 pub fn encoded_size_bytes(&self) -> usize {
596 match self {
597 Self::Single(p) => p.encoded_size_bytes(),
598 Self::Many(r) => r.hollow_bytes,
599 }
600 }
601
602 pub fn schema_id(&self) -> Option<SchemaId> {
603 match self {
604 Self::Single(p) => p.schema_id(),
605 Self::Many(_) => None,
606 }
607 }
608
609 pub fn printable_name(&self) -> &str {
612 match self {
613 Self::Single(p) => p.printable_name(),
614 Self::Many(r) => r.key.0.as_str(),
615 }
616 }
617
618 pub fn stats(&self) -> Option<&LazyPartStats> {
619 match self {
620 Self::Single(p) => p.stats(),
621 Self::Many(_) => None,
623 }
624 }
625
626 pub fn key_lower(&self) -> &[u8] {
627 match self {
628 Self::Single(p) => p.key_lower(),
629 Self::Many(r) => r.key_lower.as_slice(),
630 }
631 }
632
633 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
634 match self {
635 Self::Single(p) => p.structured_key_lower(),
636 Self::Many(_) => None,
637 }
638 }
639
640 pub fn ts_rewrite(&self) -> Option<&Antichain<T>> {
641 match self {
642 Self::Single(p) => p.ts_rewrite(),
643 Self::Many(_) => None,
644 }
645 }
646}
647
648impl<T> RunPart<T>
649where
650 T: Timestamp + Codec64,
651{
652 pub fn diffs_sum<D: Codec64 + Monoid>(&self, metrics: &ColumnarMetrics) -> Option<D> {
653 match self {
654 Self::Single(p) => p.diffs_sum(metrics),
655 Self::Many(hollow_run) => hollow_run.diffs_sum.map(D::decode),
656 }
657 }
658}
659
660#[derive(Clone, Debug)]
662pub struct MissingBlob(BlobKey);
663
664impl std::fmt::Display for MissingBlob {
665 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
666 write!(f, "unexpectedly missing key: {}", self.0)
667 }
668}
669
670impl std::error::Error for MissingBlob {}
671
672impl<T: Timestamp + Codec64 + Sync> RunPart<T> {
673 pub fn part_stream<'a>(
674 &'a self,
675 shard_id: ShardId,
676 blob: &'a dyn Blob,
677 metrics: &'a Metrics,
678 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + Send + 'a {
679 try_stream! {
680 match self {
681 RunPart::Single(p) => {
682 yield Cow::Borrowed(p);
683 }
684 RunPart::Many(r) => {
685 let fetched = r.get(shard_id, blob, metrics).await
686 .ok_or_else(|| MissingBlob(r.key.complete(&shard_id)))?;
687 for run_part in fetched.parts {
688 for await batch_part in
689 run_part.part_stream(shard_id, blob, metrics).boxed()
690 {
691 yield Cow::Owned(batch_part?.into_owned());
692 }
693 }
694 }
695 }
696 }
697 }
698}
699
700impl<T: Ord> PartialOrd for BatchPart<T> {
701 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
702 Some(self.cmp(other))
703 }
704}
705
706impl<T: Ord> Ord for BatchPart<T> {
707 fn cmp(&self, other: &Self) -> Ordering {
708 match (self, other) {
709 (BatchPart::Hollow(s), BatchPart::Hollow(o)) => s.cmp(o),
710 (
711 BatchPart::Inline {
712 updates: s_updates,
713 ts_rewrite: s_ts_rewrite,
714 schema_id: s_schema_id,
715 deprecated_schema_id: s_deprecated_schema_id,
716 },
717 BatchPart::Inline {
718 updates: o_updates,
719 ts_rewrite: o_ts_rewrite,
720 schema_id: o_schema_id,
721 deprecated_schema_id: o_deprecated_schema_id,
722 },
723 ) => (
724 s_updates,
725 s_ts_rewrite.as_ref().map(|x| x.elements()),
726 s_schema_id,
727 s_deprecated_schema_id,
728 )
729 .cmp(&(
730 o_updates,
731 o_ts_rewrite.as_ref().map(|x| x.elements()),
732 o_schema_id,
733 o_deprecated_schema_id,
734 )),
735 (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
736 (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
737 }
738 }
739}
740
741#[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Serialize)]
743pub(crate) enum RunOrder {
744 Unordered,
746 Codec,
748 Structured,
750}
751
752#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Copy, Hash)]
753pub struct RunId(pub(crate) [u8; 16]);
754
755impl std::fmt::Display for RunId {
756 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
757 write!(f, "ri{}", Uuid::from_bytes(self.0))
758 }
759}
760
761impl std::fmt::Debug for RunId {
762 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
763 write!(f, "RunId({})", Uuid::from_bytes(self.0))
764 }
765}
766
767impl std::str::FromStr for RunId {
768 type Err = String;
769
770 fn from_str(s: &str) -> Result<Self, Self::Err> {
771 parse_id("ri", "RunId", s).map(RunId)
772 }
773}
774
775impl From<RunId> for String {
776 fn from(x: RunId) -> Self {
777 x.to_string()
778 }
779}
780
781impl RunId {
782 pub(crate) fn new() -> Self {
783 RunId(*Uuid::new_v4().as_bytes())
784 }
785}
786
787impl Arbitrary for RunId {
788 type Parameters = ();
789 type Strategy = proptest::strategy::BoxedStrategy<Self>;
790 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
791 Strategy::prop_map(proptest::prelude::any::<u128>(), |n| {
792 RunId(*Uuid::from_u128(n).as_bytes())
793 })
794 .boxed()
795 }
796}
797
798#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize)]
800pub struct RunMeta {
801 pub(crate) order: Option<RunOrder>,
803 pub(crate) schema: Option<SchemaId>,
805
806 pub(crate) deprecated_schema: Option<SchemaId>,
808
809 pub(crate) id: Option<RunId>,
811
812 pub(crate) len: Option<usize>,
814
815 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
817 pub(crate) meta: MetadataMap,
818}
819
820#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
822pub struct HollowBatchPart<T> {
823 pub key: PartialBatchKey,
825 #[serde(skip_serializing_if = "MetadataMap::is_empty")]
827 pub meta: MetadataMap,
828 pub encoded_size_bytes: usize,
830 #[serde(serialize_with = "serialize_part_bytes")]
833 pub key_lower: Vec<u8>,
834 #[serde(serialize_with = "serialize_lazy_proto")]
836 pub structured_key_lower: Option<LazyProto<ProtoArrayData>>,
837 #[serde(serialize_with = "serialize_part_stats")]
839 pub stats: Option<LazyPartStats>,
840 pub ts_rewrite: Option<Antichain<T>>,
848 #[serde(serialize_with = "serialize_diffs_sum")]
856 pub diffs_sum: Option<[u8; 8]>,
857 pub format: Option<BatchColumnarFormat>,
862 pub schema_id: Option<SchemaId>,
867
868 pub deprecated_schema_id: Option<SchemaId>,
870}
871
872#[derive(Clone, PartialEq, Eq)]
876pub struct HollowBatch<T> {
877 pub desc: Description<T>,
879 pub len: usize,
881 pub(crate) parts: Vec<RunPart<T>>,
883 pub(crate) run_splits: Vec<usize>,
891 pub(crate) run_meta: Vec<RunMeta>,
894}
895
896impl<T: Debug> Debug for HollowBatch<T> {
897 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
898 let HollowBatch {
899 desc,
900 parts,
901 len,
902 run_splits: runs,
903 run_meta,
904 } = self;
905 f.debug_struct("HollowBatch")
906 .field(
907 "desc",
908 &(
909 desc.lower().elements(),
910 desc.upper().elements(),
911 desc.since().elements(),
912 ),
913 )
914 .field("parts", &parts)
915 .field("len", &len)
916 .field("runs", &runs)
917 .field("run_meta", &run_meta)
918 .finish()
919 }
920}
921
922impl<T: Serialize> serde::Serialize for HollowBatch<T> {
923 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
924 let HollowBatch {
925 desc,
926 len,
927 parts: _,
929 run_splits: _,
930 run_meta: _,
931 } = self;
932 let mut s = s.serialize_struct("HollowBatch", 5)?;
933 let () = s.serialize_field("lower", &desc.lower().elements())?;
934 let () = s.serialize_field("upper", &desc.upper().elements())?;
935 let () = s.serialize_field("since", &desc.since().elements())?;
936 let () = s.serialize_field("len", len)?;
937 let () = s.serialize_field("part_runs", &self.runs().collect::<Vec<_>>())?;
938 s.end()
939 }
940}
941
942impl<T: Ord> PartialOrd for HollowBatch<T> {
943 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944 Some(self.cmp(other))
945 }
946}
947
948impl<T: Ord> Ord for HollowBatch<T> {
949 fn cmp(&self, other: &Self) -> Ordering {
950 let HollowBatch {
953 desc: self_desc,
954 parts: self_parts,
955 len: self_len,
956 run_splits: self_runs,
957 run_meta: self_run_meta,
958 } = self;
959 let HollowBatch {
960 desc: other_desc,
961 parts: other_parts,
962 len: other_len,
963 run_splits: other_runs,
964 run_meta: other_run_meta,
965 } = other;
966 (
967 self_desc.lower().elements(),
968 self_desc.upper().elements(),
969 self_desc.since().elements(),
970 self_parts,
971 self_len,
972 self_runs,
973 self_run_meta,
974 )
975 .cmp(&(
976 other_desc.lower().elements(),
977 other_desc.upper().elements(),
978 other_desc.since().elements(),
979 other_parts,
980 other_len,
981 other_runs,
982 other_run_meta,
983 ))
984 }
985}
986
987impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
988 pub(crate) fn part_stream<'a>(
989 &'a self,
990 shard_id: ShardId,
991 blob: &'a dyn Blob,
992 metrics: &'a Metrics,
993 ) -> impl Stream<Item = Result<Cow<'a, BatchPart<T>>, MissingBlob>> + 'a {
994 stream! {
995 for part in &self.parts {
996 for await part in part.part_stream(shard_id, blob, metrics) {
997 yield part;
998 }
999 }
1000 }
1001 }
1002}
1003impl<T> HollowBatch<T> {
1004 pub(crate) fn new(
1011 desc: Description<T>,
1012 parts: Vec<RunPart<T>>,
1013 len: usize,
1014 run_meta: Vec<RunMeta>,
1015 run_splits: Vec<usize>,
1016 ) -> Self {
1017 debug_assert!(
1018 run_splits.is_strictly_sorted(),
1019 "run indices should be strictly increasing"
1020 );
1021 debug_assert!(
1022 run_splits.first().map_or(true, |i| *i > 0),
1023 "run indices should be positive"
1024 );
1025 debug_assert!(
1026 run_splits.last().map_or(true, |i| *i < parts.len()),
1027 "run indices should be valid indices into parts"
1028 );
1029 debug_assert!(
1030 parts.is_empty() || run_meta.len() == run_splits.len() + 1,
1031 "all metadata should correspond to a run"
1032 );
1033
1034 Self {
1035 desc,
1036 len,
1037 parts,
1038 run_splits,
1039 run_meta,
1040 }
1041 }
1042
1043 pub(crate) fn new_run(desc: Description<T>, parts: Vec<RunPart<T>>, len: usize) -> Self {
1045 let run_meta = if parts.is_empty() {
1046 vec![]
1047 } else {
1048 vec![RunMeta::default()]
1049 };
1050 Self {
1051 desc,
1052 len,
1053 parts,
1054 run_splits: vec![],
1055 run_meta,
1056 }
1057 }
1058
1059 #[cfg(test)]
1060 pub(crate) fn new_run_for_test(
1061 desc: Description<T>,
1062 parts: Vec<RunPart<T>>,
1063 len: usize,
1064 run_id: RunId,
1065 ) -> Self {
1066 let run_meta = if parts.is_empty() {
1067 vec![]
1068 } else {
1069 let mut meta = RunMeta::default();
1070 meta.id = Some(run_id);
1071 vec![meta]
1072 };
1073 Self {
1074 desc,
1075 len,
1076 parts,
1077 run_splits: vec![],
1078 run_meta,
1079 }
1080 }
1081
1082 pub(crate) fn empty(desc: Description<T>) -> Self {
1084 Self {
1085 desc,
1086 len: 0,
1087 parts: vec![],
1088 run_splits: vec![],
1089 run_meta: vec![],
1090 }
1091 }
1092
1093 pub(crate) fn runs(&self) -> impl Iterator<Item = (&RunMeta, &[RunPart<T>])> {
1094 let run_ends = self
1095 .run_splits
1096 .iter()
1097 .copied()
1098 .chain(std::iter::once(self.parts.len()));
1099 let run_metas = self.run_meta.iter();
1100 let run_parts = run_ends
1101 .scan(0, |start, end| {
1102 let range = *start..end;
1103 *start = end;
1104 Some(range)
1105 })
1106 .filter(|range| !range.is_empty())
1107 .map(|range| &self.parts[range]);
1108 run_metas.zip_eq(run_parts)
1109 }
1110
1111 pub(crate) fn inline_bytes(&self) -> usize {
1112 self.parts.iter().map(|x| x.inline_bytes()).sum()
1113 }
1114
1115 pub(crate) fn is_empty(&self) -> bool {
1116 self.parts.is_empty()
1117 }
1118
1119 pub(crate) fn part_count(&self) -> usize {
1120 self.parts.len()
1121 }
1122
1123 pub fn encoded_size_bytes(&self) -> usize {
1125 self.parts.iter().map(|p| p.encoded_size_bytes()).sum()
1126 }
1127}
1128
1129impl<T: Timestamp + TotalOrder> HollowBatch<T> {
1131 pub(crate) fn rewrite_ts(
1132 &mut self,
1133 frontier: &Antichain<T>,
1134 new_upper: Antichain<T>,
1135 ) -> Result<(), String> {
1136 if !PartialOrder::less_than(frontier, &new_upper) {
1137 return Err(format!(
1138 "rewrite frontier {:?} !< rewrite upper {:?}",
1139 frontier.elements(),
1140 new_upper.elements(),
1141 ));
1142 }
1143 if PartialOrder::less_than(&new_upper, self.desc.upper()) {
1144 return Err(format!(
1145 "rewrite upper {:?} < batch upper {:?}",
1146 new_upper.elements(),
1147 self.desc.upper().elements(),
1148 ));
1149 }
1150
1151 if PartialOrder::less_than(frontier, self.desc.lower()) {
1154 return Err(format!(
1155 "rewrite frontier {:?} < batch lower {:?}",
1156 frontier.elements(),
1157 self.desc.lower().elements(),
1158 ));
1159 }
1160 if self.desc.since() != &Antichain::from_elem(T::minimum()) {
1161 return Err(format!(
1162 "batch since {:?} != minimum antichain {:?}",
1163 self.desc.since().elements(),
1164 &[T::minimum()],
1165 ));
1166 }
1167 for part in self.parts.iter() {
1168 let Some(ts_rewrite) = part.ts_rewrite() else {
1169 continue;
1170 };
1171 if PartialOrder::less_than(frontier, ts_rewrite) {
1172 return Err(format!(
1173 "rewrite frontier {:?} < batch rewrite {:?}",
1174 frontier.elements(),
1175 ts_rewrite.elements(),
1176 ));
1177 }
1178 }
1179
1180 self.desc = Description::new(
1181 self.desc.lower().clone(),
1182 new_upper,
1183 self.desc.since().clone(),
1184 );
1185 for part in &mut self.parts {
1186 match part {
1187 RunPart::Single(BatchPart::Hollow(part)) => {
1188 part.ts_rewrite = Some(frontier.clone())
1189 }
1190 RunPart::Single(BatchPart::Inline { ts_rewrite, .. }) => {
1191 *ts_rewrite = Some(frontier.clone())
1192 }
1193 RunPart::Many(runs) => {
1194 panic!("unexpected rewrite of a hollow runs ref: {runs:?}");
1197 }
1198 }
1199 }
1200 Ok(())
1201 }
1202}
1203
1204impl<T: Ord> PartialOrd for HollowBatchPart<T> {
1205 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1206 Some(self.cmp(other))
1207 }
1208}
1209
1210impl<T: Ord> Ord for HollowBatchPart<T> {
1211 fn cmp(&self, other: &Self) -> Ordering {
1212 let HollowBatchPart {
1215 key: self_key,
1216 meta: self_meta,
1217 encoded_size_bytes: self_encoded_size_bytes,
1218 key_lower: self_key_lower,
1219 structured_key_lower: self_structured_key_lower,
1220 stats: self_stats,
1221 ts_rewrite: self_ts_rewrite,
1222 diffs_sum: self_diffs_sum,
1223 format: self_format,
1224 schema_id: self_schema_id,
1225 deprecated_schema_id: self_deprecated_schema_id,
1226 } = self;
1227 let HollowBatchPart {
1228 key: other_key,
1229 meta: other_meta,
1230 encoded_size_bytes: other_encoded_size_bytes,
1231 key_lower: other_key_lower,
1232 structured_key_lower: other_structured_key_lower,
1233 stats: other_stats,
1234 ts_rewrite: other_ts_rewrite,
1235 diffs_sum: other_diffs_sum,
1236 format: other_format,
1237 schema_id: other_schema_id,
1238 deprecated_schema_id: other_deprecated_schema_id,
1239 } = other;
1240 (
1241 self_key,
1242 self_meta,
1243 self_encoded_size_bytes,
1244 self_key_lower,
1245 self_structured_key_lower,
1246 self_stats,
1247 self_ts_rewrite.as_ref().map(|x| x.elements()),
1248 self_diffs_sum,
1249 self_format,
1250 self_schema_id,
1251 self_deprecated_schema_id,
1252 )
1253 .cmp(&(
1254 other_key,
1255 other_meta,
1256 other_encoded_size_bytes,
1257 other_key_lower,
1258 other_structured_key_lower,
1259 other_stats,
1260 other_ts_rewrite.as_ref().map(|x| x.elements()),
1261 other_diffs_sum,
1262 other_format,
1263 other_schema_id,
1264 other_deprecated_schema_id,
1265 ))
1266 }
1267}
1268
1269#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1271pub struct HollowRollup {
1272 pub key: PartialRollupKey,
1274 pub encoded_size_bytes: Option<usize>,
1276}
1277
1278#[derive(Debug)]
1280pub enum HollowBlobRef<'a, T> {
1281 Batch(&'a HollowBatch<T>),
1282 Rollup(&'a HollowRollup),
1283}
1284
1285#[derive(
1287 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1288)]
1289pub struct ActiveRollup {
1290 pub seqno: SeqNo,
1291 pub start_ms: u64,
1292}
1293
1294#[derive(
1296 Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Arbitrary, Serialize
1297)]
1298pub struct ActiveGc {
1299 pub seqno: SeqNo,
1300 pub start_ms: u64,
1301}
1302
1303#[derive(Debug)]
1308#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1309pub struct NoOpStateTransition<T>(pub T);
1310
1311#[derive(Debug, Clone)]
1313#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
1314pub struct StateCollections<T> {
1315 pub(crate) version: Version,
1319
1320 pub(crate) last_gc_req: SeqNo,
1323
1324 pub(crate) rollups: BTreeMap<SeqNo, HollowRollup>,
1326
1327 pub(crate) active_rollup: Option<ActiveRollup>,
1329 pub(crate) active_gc: Option<ActiveGc>,
1331
1332 pub(crate) leased_readers: BTreeMap<LeasedReaderId, LeasedReaderState<T>>,
1333 pub(crate) critical_readers: BTreeMap<CriticalReaderId, CriticalReaderState<T>>,
1334 pub(crate) writers: BTreeMap<WriterId, WriterState<T>>,
1335 pub(crate) schemas: BTreeMap<SchemaId, EncodedSchemas>,
1336
1337 pub(crate) trace: Trace<T>,
1342}
1343
1344#[derive(Debug, Clone, Serialize, PartialEq)]
1360pub struct EncodedSchemas {
1361 pub key: Bytes,
1363 pub key_data_type: Bytes,
1366 pub val: Bytes,
1368 pub val_data_type: Bytes,
1371}
1372
1373impl EncodedSchemas {
1374 pub(crate) fn decode_data_type(buf: &[u8]) -> DataType {
1375 let proto = prost::Message::decode(buf).expect("valid ProtoDataType");
1376 DataType::from_proto(proto).expect("valid DataType")
1377 }
1378}
1379
1380#[derive(Debug)]
1381#[cfg_attr(test, derive(PartialEq))]
1382pub enum CompareAndAppendBreak<T> {
1383 AlreadyCommitted,
1384 Upper {
1385 shard_upper: Antichain<T>,
1386 writer_upper: Antichain<T>,
1387 },
1388 InvalidUsage(InvalidUsage<T>),
1389 InlineBackpressure,
1390}
1391
1392#[derive(Debug)]
1393#[cfg_attr(test, derive(PartialEq))]
1394pub enum SnapshotErr<T> {
1395 AsOfNotYetAvailable(SeqNo, Upper<T>),
1396 AsOfHistoricalDistinctionsLost(Since<T>),
1397}
1398
1399impl<T> StateCollections<T>
1400where
1401 T: Timestamp + Lattice + Codec64,
1402{
1403 pub fn add_rollup(
1404 &mut self,
1405 add_rollup: (SeqNo, &HollowRollup),
1406 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1407 let (rollup_seqno, rollup) = add_rollup;
1408 let applied = match self.rollups.get(&rollup_seqno) {
1409 Some(x) => x.key == rollup.key,
1410 None => {
1411 self.active_rollup = None;
1412 self.rollups.insert(rollup_seqno, rollup.to_owned());
1413 true
1414 }
1415 };
1416 Continue(applied)
1420 }
1421
1422 pub fn remove_rollups(
1423 &mut self,
1424 remove_rollups: &[(SeqNo, PartialRollupKey)],
1425 ) -> ControlFlow<NoOpStateTransition<Vec<SeqNo>>, Vec<SeqNo>> {
1426 if self.is_tombstone() {
1427 return Break(NoOpStateTransition(vec![]));
1428 }
1429
1430 let active_gc_was_set = self.active_gc.take().is_some();
1433
1434 if remove_rollups.is_empty() {
1435 return if active_gc_was_set {
1436 Continue(vec![])
1437 } else {
1438 Break(NoOpStateTransition(vec![]))
1439 };
1440 }
1441
1442 let mut removed = vec![];
1443 for (seqno, key) in remove_rollups {
1444 let removed_key = self.rollups.remove(seqno);
1445 debug_assert!(
1446 removed_key.as_ref().map_or(true, |x| &x.key == key),
1447 "{} vs {:?}",
1448 key,
1449 removed_key
1450 );
1451
1452 if removed_key.is_some() {
1453 removed.push(*seqno);
1454 }
1455 }
1456
1457 Continue(removed)
1458 }
1459
1460 pub fn register_leased_reader(
1461 &mut self,
1462 hostname: &str,
1463 reader_id: &LeasedReaderId,
1464 purpose: &str,
1465 seqno: SeqNo,
1466 lease_duration: Duration,
1467 heartbeat_timestamp_ms: u64,
1468 use_critical_since: bool,
1469 ) -> ControlFlow<
1470 NoOpStateTransition<(LeasedReaderState<T>, SeqNo)>,
1471 (LeasedReaderState<T>, SeqNo),
1472 > {
1473 let since = if use_critical_since {
1474 self.critical_since()
1475 .unwrap_or_else(|| self.trace.since().clone())
1476 } else {
1477 self.trace.since().clone()
1478 };
1479 let reader_state = LeasedReaderState {
1480 debug: HandleDebugState {
1481 hostname: hostname.to_owned(),
1482 purpose: purpose.to_owned(),
1483 },
1484 seqno,
1485 since,
1486 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1487 lease_duration_ms: u64::try_from(lease_duration.as_millis())
1488 .expect("lease duration as millis should fit within u64"),
1489 };
1490
1491 if self.is_tombstone() {
1496 return Break(NoOpStateTransition((reader_state, self.seqno_since(seqno))));
1497 }
1498
1499 self.leased_readers
1501 .insert(reader_id.clone(), reader_state.clone());
1502 Continue((reader_state, self.seqno_since(seqno)))
1503 }
1504
1505 pub fn register_critical_reader(
1506 &mut self,
1507 hostname: &str,
1508 reader_id: &CriticalReaderId,
1509 opaque: Opaque,
1510 purpose: &str,
1511 ) -> ControlFlow<NoOpStateTransition<CriticalReaderState<T>>, CriticalReaderState<T>> {
1512 let state = CriticalReaderState {
1513 debug: HandleDebugState {
1514 hostname: hostname.to_owned(),
1515 purpose: purpose.to_owned(),
1516 },
1517 since: self.trace.since().clone(),
1518 opaque,
1519 };
1520
1521 if self.is_tombstone() {
1526 return Break(NoOpStateTransition(state));
1527 }
1528
1529 let state = match self.critical_readers.get_mut(reader_id) {
1530 Some(existing_state) => {
1531 existing_state.debug = state.debug;
1532 existing_state.clone()
1533 }
1534 None => {
1535 self.critical_readers
1536 .insert(reader_id.clone(), state.clone());
1537 state
1538 }
1539 };
1540 Continue(state)
1541 }
1542
1543 pub fn register_schema<K: Codec, V: Codec>(
1544 &mut self,
1545 key_schema: &K::Schema,
1546 val_schema: &V::Schema,
1547 ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
1548 fn encode_data_type(data_type: &DataType) -> Bytes {
1549 let proto = data_type.into_proto();
1550 prost::Message::encode_to_vec(&proto).into()
1551 }
1552
1553 let existing_id = self.schemas.iter().rev().find(|(_, x)| {
1565 K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
1566 });
1567 match existing_id {
1568 Some((schema_id, _)) => {
1569 Break(NoOpStateTransition(Some(*schema_id)))
1574 }
1575 None if self.is_tombstone() => {
1576 Break(NoOpStateTransition(None))
1578 }
1579 None if self.schemas.is_empty() => {
1580 let id = SchemaId(self.schemas.len());
1584 let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
1585 .expect("valid key schema");
1586 let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
1587 .expect("valid val schema");
1588 let prev = self.schemas.insert(
1589 id,
1590 EncodedSchemas {
1591 key: K::encode_schema(key_schema),
1592 key_data_type: encode_data_type(&key_data_type),
1593 val: V::encode_schema(val_schema),
1594 val_data_type: encode_data_type(&val_data_type),
1595 },
1596 );
1597 assert_eq!(prev, None);
1598 Continue(Some(id))
1599 }
1600 None => {
1601 info!(
1602 "register_schemas got {:?} expected {:?}",
1603 key_schema,
1604 self.schemas
1605 .iter()
1606 .map(|(id, x)| (id, K::decode_schema(&x.key)))
1607 .collect::<Vec<_>>()
1608 );
1609 Break(NoOpStateTransition(None))
1612 }
1613 }
1614 }
1615
1616 pub fn compare_and_evolve_schema<K: Codec, V: Codec>(
1617 &mut self,
1618 expected: SchemaId,
1619 key_schema: &K::Schema,
1620 val_schema: &V::Schema,
1621 ) -> ControlFlow<NoOpStateTransition<CaESchema<K, V>>, CaESchema<K, V>> {
1622 fn data_type<T>(schema: &impl Schema<T>) -> DataType {
1623 let array = Schema::encoder(schema).expect("valid schema").finish();
1627 Array::data_type(&array).clone()
1628 }
1629
1630 let (current_id, current) = self
1631 .schemas
1632 .last_key_value()
1633 .expect("all shards have a schema");
1634 if *current_id != expected {
1635 return Break(NoOpStateTransition(CaESchema::ExpectedMismatch {
1636 schema_id: *current_id,
1637 key: K::decode_schema(¤t.key),
1638 val: V::decode_schema(¤t.val),
1639 }));
1640 }
1641
1642 let current_key = K::decode_schema(¤t.key);
1643 let current_key_dt = EncodedSchemas::decode_data_type(¤t.key_data_type);
1644 let current_val = V::decode_schema(¤t.val);
1645 let current_val_dt = EncodedSchemas::decode_data_type(¤t.val_data_type);
1646
1647 let key_dt = data_type(key_schema);
1648 let val_dt = data_type(val_schema);
1649
1650 if current_key == *key_schema
1652 && current_key_dt == key_dt
1653 && current_val == *val_schema
1654 && current_val_dt == val_dt
1655 {
1656 return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
1657 }
1658
1659 let key_fn = backward_compatible(¤t_key_dt, &key_dt);
1660 let val_fn = backward_compatible(¤t_val_dt, &val_dt);
1661 let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
1662 return Break(NoOpStateTransition(CaESchema::Incompatible));
1663 };
1664 if key_fn.contains_drop() || val_fn.contains_drop() {
1668 return Break(NoOpStateTransition(CaESchema::Incompatible));
1669 }
1670
1671 let id = SchemaId(self.schemas.len());
1675 self.schemas.insert(
1676 id,
1677 EncodedSchemas {
1678 key: K::encode_schema(key_schema),
1679 key_data_type: prost::Message::encode_to_vec(&key_dt.into_proto()).into(),
1680 val: V::encode_schema(val_schema),
1681 val_data_type: prost::Message::encode_to_vec(&val_dt.into_proto()).into(),
1682 },
1683 );
1684 Continue(CaESchema::Ok(id))
1685 }
1686
1687 pub fn compare_and_append(
1688 &mut self,
1689 batch: &HollowBatch<T>,
1690 writer_id: &WriterId,
1691 heartbeat_timestamp_ms: u64,
1692 lease_duration_ms: u64,
1693 idempotency_token: &IdempotencyToken,
1694 debug_info: &HandleDebugState,
1695 inline_writes_total_max_bytes: usize,
1696 claim_compaction_percent: usize,
1697 claim_compaction_min_version: Option<&Version>,
1698 ) -> ControlFlow<CompareAndAppendBreak<T>, Vec<FueledMergeReq<T>>> {
1699 if self.is_tombstone() {
1704 assert_eq!(self.trace.upper(), &Antichain::new());
1705 return Break(CompareAndAppendBreak::Upper {
1706 shard_upper: Antichain::new(),
1707 writer_upper: Antichain::new(),
1712 });
1713 }
1714
1715 let writer_state = self
1716 .writers
1717 .entry(writer_id.clone())
1718 .or_insert_with(|| WriterState {
1719 last_heartbeat_timestamp_ms: heartbeat_timestamp_ms,
1720 lease_duration_ms,
1721 most_recent_write_token: IdempotencyToken::SENTINEL,
1722 most_recent_write_upper: Antichain::from_elem(T::minimum()),
1723 debug: debug_info.clone(),
1724 });
1725
1726 if PartialOrder::less_than(batch.desc.upper(), batch.desc.lower()) {
1727 return Break(CompareAndAppendBreak::InvalidUsage(
1728 InvalidUsage::InvalidBounds {
1729 lower: batch.desc.lower().clone(),
1730 upper: batch.desc.upper().clone(),
1731 },
1732 ));
1733 }
1734
1735 if batch.desc.upper() == batch.desc.lower() && !batch.is_empty() {
1738 return Break(CompareAndAppendBreak::InvalidUsage(
1739 InvalidUsage::InvalidEmptyTimeInterval {
1740 lower: batch.desc.lower().clone(),
1741 upper: batch.desc.upper().clone(),
1742 keys: batch
1743 .parts
1744 .iter()
1745 .map(|x| x.printable_name().to_owned())
1746 .collect(),
1747 },
1748 ));
1749 }
1750
1751 if idempotency_token == &writer_state.most_recent_write_token {
1752 assert_eq!(batch.desc.upper(), &writer_state.most_recent_write_upper);
1757 assert!(
1758 PartialOrder::less_equal(batch.desc.upper(), self.trace.upper()),
1759 "{:?} vs {:?}",
1760 batch.desc.upper(),
1761 self.trace.upper()
1762 );
1763 return Break(CompareAndAppendBreak::AlreadyCommitted);
1764 }
1765
1766 let shard_upper = self.trace.upper();
1767 if shard_upper != batch.desc.lower() {
1768 return Break(CompareAndAppendBreak::Upper {
1769 shard_upper: shard_upper.clone(),
1770 writer_upper: writer_state.most_recent_write_upper.clone(),
1771 });
1772 }
1773
1774 let new_inline_bytes = batch.inline_bytes();
1775 if new_inline_bytes > 0 {
1776 let mut existing_inline_bytes = 0;
1777 self.trace
1778 .map_batches(|x| existing_inline_bytes += x.inline_bytes());
1779 if existing_inline_bytes + new_inline_bytes >= inline_writes_total_max_bytes {
1783 return Break(CompareAndAppendBreak::InlineBackpressure);
1784 }
1785 }
1786
1787 let mut merge_reqs = if batch.desc.upper() != batch.desc.lower() {
1788 self.trace.push_batch(batch.clone())
1789 } else {
1790 Vec::new()
1791 };
1792
1793 let all_empty_reqs = merge_reqs
1796 .iter()
1797 .all(|req| req.inputs.iter().all(|b| b.batch.is_empty()));
1798 if all_empty_reqs && !batch.is_empty() {
1799 let mut reqs_to_take = claim_compaction_percent / 100;
1800 if (usize::cast_from(idempotency_token.hashed()) % 100)
1801 < (claim_compaction_percent % 100)
1802 {
1803 reqs_to_take += 1;
1804 }
1805 let threshold_ms = heartbeat_timestamp_ms.saturating_sub(lease_duration_ms);
1806 let min_writer = claim_compaction_min_version.map(WriterKey::for_version);
1807 merge_reqs.extend(
1808 self.trace
1811 .fueled_merge_reqs_before_ms(threshold_ms, min_writer)
1812 .take(reqs_to_take),
1813 )
1814 }
1815
1816 for req in &merge_reqs {
1817 self.trace.claim_compaction(
1818 req.id,
1819 ActiveCompaction {
1820 start_ms: heartbeat_timestamp_ms,
1821 },
1822 )
1823 }
1824
1825 debug_assert_eq!(self.trace.upper(), batch.desc.upper());
1826 writer_state.most_recent_write_token = idempotency_token.clone();
1827 assert!(
1829 PartialOrder::less_equal(&writer_state.most_recent_write_upper, batch.desc.upper()),
1830 "{:?} vs {:?}",
1831 &writer_state.most_recent_write_upper,
1832 batch.desc.upper()
1833 );
1834 writer_state
1835 .most_recent_write_upper
1836 .clone_from(batch.desc.upper());
1837
1838 writer_state.last_heartbeat_timestamp_ms = std::cmp::max(
1840 heartbeat_timestamp_ms,
1841 writer_state.last_heartbeat_timestamp_ms,
1842 );
1843
1844 Continue(merge_reqs)
1845 }
1846
1847 pub fn apply_merge_res<D: Codec64 + Monoid + PartialEq>(
1848 &mut self,
1849 res: &FueledMergeRes<T>,
1850 metrics: &ColumnarMetrics,
1851 ) -> ControlFlow<NoOpStateTransition<ApplyMergeResult>, ApplyMergeResult> {
1852 if self.is_tombstone() {
1857 return Break(NoOpStateTransition(ApplyMergeResult::NotAppliedNoMatch));
1858 }
1859
1860 let apply_merge_result = self.trace.apply_merge_res_checked::<D>(res, metrics);
1861 Continue(apply_merge_result)
1862 }
1863
1864 pub fn spine_exert(
1865 &mut self,
1866 fuel: usize,
1867 ) -> ControlFlow<NoOpStateTransition<Vec<FueledMergeReq<T>>>, Vec<FueledMergeReq<T>>> {
1868 let (merge_reqs, did_work) = self.trace.exert(fuel);
1869 if did_work {
1870 Continue(merge_reqs)
1871 } else {
1872 assert!(merge_reqs.is_empty());
1873 Break(NoOpStateTransition(Vec::new()))
1876 }
1877 }
1878
1879 pub fn downgrade_since(
1880 &mut self,
1881 reader_id: &LeasedReaderId,
1882 seqno: SeqNo,
1883 outstanding_seqno: SeqNo,
1884 new_since: &Antichain<T>,
1885 heartbeat_timestamp_ms: u64,
1886 ) -> ControlFlow<NoOpStateTransition<Since<T>>, Since<T>> {
1887 if self.is_tombstone() {
1892 return Break(NoOpStateTransition(Since(Antichain::new())));
1893 }
1894
1895 let Some(reader_state) = self.leased_reader(reader_id) else {
1898 tracing::warn!(
1899 "Leased reader {reader_id} was expired due to inactivity. Did the machine go to sleep?",
1900 );
1901 return Break(NoOpStateTransition(Since(Antichain::new())));
1902 };
1903
1904 reader_state.last_heartbeat_timestamp_ms = std::cmp::max(
1907 heartbeat_timestamp_ms,
1908 reader_state.last_heartbeat_timestamp_ms,
1909 );
1910
1911 let seqno = {
1912 assert!(
1913 outstanding_seqno >= reader_state.seqno,
1914 "SeqNos cannot go backward; however, oldest leased SeqNo ({:?}) \
1915 is behind current reader_state ({:?})",
1916 outstanding_seqno,
1917 reader_state.seqno,
1918 );
1919 std::cmp::min(outstanding_seqno, seqno)
1920 };
1921
1922 reader_state.seqno = seqno;
1923
1924 let reader_current_since = if PartialOrder::less_than(&reader_state.since, new_since) {
1925 reader_state.since.clone_from(new_since);
1926 self.update_since();
1927 new_since.clone()
1928 } else {
1929 reader_state.since.clone()
1932 };
1933
1934 Continue(Since(reader_current_since))
1935 }
1936
1937 pub fn compare_and_downgrade_since(
1938 &mut self,
1939 reader_id: &CriticalReaderId,
1940 expected_opaque: &Opaque,
1941 (new_opaque, new_since): (&Opaque, &Antichain<T>),
1942 ) -> ControlFlow<
1943 NoOpStateTransition<Result<Since<T>, (Opaque, Since<T>)>>,
1944 Result<Since<T>, (Opaque, Since<T>)>,
1945 > {
1946 if self.is_tombstone() {
1951 return Break(NoOpStateTransition(Ok(Since(Antichain::new()))));
1955 }
1956
1957 let reader_state = self.critical_reader(reader_id);
1958
1959 if reader_state.opaque != *expected_opaque {
1960 return Continue(Err((
1963 reader_state.opaque.clone(),
1964 Since(reader_state.since.clone()),
1965 )));
1966 }
1967
1968 reader_state.opaque = new_opaque.clone();
1969 if PartialOrder::less_equal(&reader_state.since, new_since) {
1970 reader_state.since.clone_from(new_since);
1971 self.update_since();
1972 Continue(Ok(Since(new_since.clone())))
1973 } else {
1974 Continue(Ok(Since(reader_state.since.clone())))
1978 }
1979 }
1980
1981 pub fn expire_leased_reader(
1982 &mut self,
1983 reader_id: &LeasedReaderId,
1984 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
1985 if self.is_tombstone() {
1990 return Break(NoOpStateTransition(false));
1991 }
1992
1993 let existed = self.leased_readers.remove(reader_id).is_some();
1994 if existed {
1995 }
2009 Continue(existed)
2012 }
2013
2014 pub fn expire_critical_reader(
2015 &mut self,
2016 reader_id: &CriticalReaderId,
2017 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2018 if self.is_tombstone() {
2023 return Break(NoOpStateTransition(false));
2024 }
2025
2026 let existed = self.critical_readers.remove(reader_id).is_some();
2027 if existed {
2028 }
2042 Continue(existed)
2046 }
2047
2048 pub fn expire_writer(
2049 &mut self,
2050 writer_id: &WriterId,
2051 ) -> ControlFlow<NoOpStateTransition<bool>, bool> {
2052 if self.is_tombstone() {
2057 return Break(NoOpStateTransition(false));
2058 }
2059
2060 let existed = self.writers.remove(writer_id).is_some();
2061 Continue(existed)
2065 }
2066
2067 fn leased_reader(&mut self, id: &LeasedReaderId) -> Option<&mut LeasedReaderState<T>> {
2068 self.leased_readers.get_mut(id)
2069 }
2070
2071 fn critical_reader(&mut self, id: &CriticalReaderId) -> &mut CriticalReaderState<T> {
2072 self.critical_readers
2073 .get_mut(id)
2074 .unwrap_or_else(|| {
2075 panic!(
2076 "Unknown CriticalReaderId({}). It was either never registered, or has been manually expired.",
2077 id
2078 )
2079 })
2080 }
2081
2082 fn critical_since(&self) -> Option<Antichain<T>> {
2083 let mut critical_sinces = self.critical_readers.values().map(|r| &r.since);
2084 let mut since = critical_sinces.next().cloned()?;
2085 for s in critical_sinces {
2086 since.meet_assign(s);
2087 }
2088 Some(since)
2089 }
2090
2091 fn update_since(&mut self) {
2092 let mut sinces_iter = self
2093 .leased_readers
2094 .values()
2095 .map(|x| &x.since)
2096 .chain(self.critical_readers.values().map(|x| &x.since));
2097 let mut since = match sinces_iter.next() {
2098 Some(since) => since.clone(),
2099 None => {
2100 return;
2103 }
2104 };
2105 while let Some(s) = sinces_iter.next() {
2106 since.meet_assign(s);
2107 }
2108 self.trace.downgrade_since(&since);
2109 }
2110
2111 fn seqno_since(&self, seqno: SeqNo) -> SeqNo {
2112 let mut seqno_since = seqno;
2113 for cap in self.leased_readers.values() {
2114 seqno_since = std::cmp::min(seqno_since, cap.seqno);
2115 }
2116 seqno_since
2118 }
2119
2120 fn tombstone_batch() -> HollowBatch<T> {
2121 HollowBatch::empty(Description::new(
2122 Antichain::from_elem(T::minimum()),
2123 Antichain::new(),
2124 Antichain::new(),
2125 ))
2126 }
2127
2128 pub(crate) fn is_tombstone(&self) -> bool {
2129 self.trace.upper().is_empty()
2130 && self.trace.since().is_empty()
2131 && self.writers.is_empty()
2132 && self.leased_readers.is_empty()
2133 && self.critical_readers.is_empty()
2134 }
2135
2136 pub(crate) fn is_single_empty_batch(&self) -> bool {
2137 let mut batch_count = 0;
2138 let mut is_empty = true;
2139 self.trace.map_batches(|b| {
2140 batch_count += 1;
2141 is_empty &= b.is_empty()
2142 });
2143 batch_count <= 1 && is_empty
2144 }
2145
2146 pub fn become_tombstone_and_shrink(&mut self) -> ControlFlow<NoOpStateTransition<()>, ()> {
2147 assert_eq!(self.trace.upper(), &Antichain::new());
2148 assert_eq!(self.trace.since(), &Antichain::new());
2149
2150 let was_tombstone = self.is_tombstone();
2153
2154 self.writers.clear();
2156 self.leased_readers.clear();
2157 self.critical_readers.clear();
2158
2159 debug_assert!(self.is_tombstone());
2160
2161 let mut to_replace = None;
2170 let mut batch_count = 0;
2171 self.trace.map_batches(|b| {
2172 batch_count += 1;
2173 if !b.is_empty() && to_replace.is_none() {
2174 to_replace = Some(b.desc.clone());
2175 }
2176 });
2177 if let Some(desc) = to_replace {
2178 let result = self.trace.apply_tombstone_merge(&desc);
2182 assert!(
2183 result.matched(),
2184 "merge with a matching desc should always match"
2185 );
2186 Continue(())
2187 } else if batch_count > 1 {
2188 let mut new_trace = Trace::default();
2193 new_trace.downgrade_since(&Antichain::new());
2194 let merge_reqs = new_trace.push_batch(Self::tombstone_batch());
2195 assert_eq!(merge_reqs, Vec::new());
2196 self.trace = new_trace;
2197 Continue(())
2198 } else if !was_tombstone {
2199 Continue(())
2202 } else {
2203 Break(NoOpStateTransition(()))
2206 }
2207 }
2208}
2209
2210#[derive(Debug)]
2212#[cfg_attr(any(test, debug_assertions), derive(Clone, PartialEq))]
2213pub struct State<T> {
2214 pub(crate) shard_id: ShardId,
2215
2216 pub(crate) seqno: SeqNo,
2217 pub(crate) walltime_ms: u64,
2220 pub(crate) hostname: String,
2223 pub(crate) collections: StateCollections<T>,
2224}
2225
2226pub struct TypedState<K, V, T, D> {
2229 pub(crate) state: State<T>,
2230
2231 pub(crate) _phantom: PhantomData<fn() -> (K, V, D)>,
2239}
2240
2241impl<K, V, T: Clone, D> TypedState<K, V, T, D> {
2242 #[cfg(any(test, debug_assertions))]
2243 pub(crate) fn clone(&self, hostname: String) -> Self {
2244 TypedState {
2245 state: State {
2246 shard_id: self.shard_id.clone(),
2247 seqno: self.seqno.clone(),
2248 walltime_ms: self.walltime_ms,
2249 hostname,
2250 collections: self.collections.clone(),
2251 },
2252 _phantom: PhantomData,
2253 }
2254 }
2255
2256 pub(crate) fn clone_for_rollup(&self) -> Self {
2257 TypedState {
2258 state: State {
2259 shard_id: self.shard_id.clone(),
2260 seqno: self.seqno.clone(),
2261 walltime_ms: self.walltime_ms,
2262 hostname: self.hostname.clone(),
2263 collections: self.collections.clone(),
2264 },
2265 _phantom: PhantomData,
2266 }
2267 }
2268}
2269
2270impl<K, V, T: Debug, D> Debug for TypedState<K, V, T, D> {
2271 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2272 let TypedState { state, _phantom } = self;
2275 f.debug_struct("TypedState").field("state", state).finish()
2276 }
2277}
2278
2279#[cfg(any(test, debug_assertions))]
2281impl<K, V, T: PartialEq, D> PartialEq for TypedState<K, V, T, D> {
2282 fn eq(&self, other: &Self) -> bool {
2283 let TypedState {
2286 state: self_state,
2287 _phantom,
2288 } = self;
2289 let TypedState {
2290 state: other_state,
2291 _phantom,
2292 } = other;
2293 self_state == other_state
2294 }
2295}
2296
2297impl<K, V, T, D> Deref for TypedState<K, V, T, D> {
2298 type Target = State<T>;
2299
2300 fn deref(&self) -> &Self::Target {
2301 &self.state
2302 }
2303}
2304
2305impl<K, V, T, D> DerefMut for TypedState<K, V, T, D> {
2306 fn deref_mut(&mut self) -> &mut Self::Target {
2307 &mut self.state
2308 }
2309}
2310
2311impl<K, V, T, D> TypedState<K, V, T, D>
2312where
2313 K: Codec,
2314 V: Codec,
2315 T: Timestamp + Lattice + Codec64,
2316 D: Codec64,
2317{
2318 pub fn new(
2319 applier_version: Version,
2320 shard_id: ShardId,
2321 hostname: String,
2322 walltime_ms: u64,
2323 ) -> Self {
2324 let state = State {
2325 shard_id,
2326 seqno: SeqNo::minimum(),
2327 walltime_ms,
2328 hostname,
2329 collections: StateCollections {
2330 version: applier_version,
2331 last_gc_req: SeqNo::minimum(),
2332 rollups: BTreeMap::new(),
2333 active_rollup: None,
2334 active_gc: None,
2335 leased_readers: BTreeMap::new(),
2336 critical_readers: BTreeMap::new(),
2337 writers: BTreeMap::new(),
2338 schemas: BTreeMap::new(),
2339 trace: Trace::default(),
2340 },
2341 };
2342 TypedState {
2343 state,
2344 _phantom: PhantomData,
2345 }
2346 }
2347
2348 pub fn clone_apply<R, E, WorkFn>(
2349 &self,
2350 cfg: &PersistConfig,
2351 work_fn: &mut WorkFn,
2352 ) -> ControlFlow<E, (R, Self)>
2353 where
2354 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
2355 {
2356 let mut new_state = State {
2358 shard_id: self.shard_id,
2359 seqno: self.seqno.next(),
2360 walltime_ms: (cfg.now)(),
2361 hostname: cfg.hostname.clone(),
2362 collections: self.collections.clone(),
2363 };
2364
2365 if new_state.walltime_ms <= self.walltime_ms {
2368 new_state.walltime_ms = self.walltime_ms + 1;
2369 }
2370
2371 let work_ret = work_fn(new_state.seqno, cfg, &mut new_state.collections)?;
2372 let new_state = TypedState {
2373 state: new_state,
2374 _phantom: PhantomData,
2375 };
2376 Continue((work_ret, new_state))
2377 }
2378}
2379
2380#[derive(Copy, Clone, Debug)]
2381pub struct GcConfig {
2382 pub use_active_gc: bool,
2383 pub fallback_threshold_ms: u64,
2384 pub min_versions: usize,
2385 pub max_versions: usize,
2386}
2387
2388impl<T> State<T>
2389where
2390 T: Timestamp + Lattice + Codec64,
2391{
2392 pub fn shard_id(&self) -> ShardId {
2393 self.shard_id
2394 }
2395
2396 pub fn seqno(&self) -> SeqNo {
2397 self.seqno
2398 }
2399
2400 pub fn since(&self) -> &Antichain<T> {
2401 self.collections.trace.since()
2402 }
2403
2404 pub fn upper(&self) -> &Antichain<T> {
2405 self.collections.trace.upper()
2406 }
2407
2408 pub fn spine_batch_count(&self) -> usize {
2409 self.collections.trace.num_spine_batches()
2410 }
2411
2412 pub fn size_metrics(&self) -> StateSizeMetrics {
2413 let mut ret = StateSizeMetrics::default();
2414 self.blobs().for_each(|x| match x {
2415 HollowBlobRef::Batch(x) => {
2416 ret.hollow_batch_count += 1;
2417 ret.batch_part_count += x.part_count();
2418 ret.num_updates += x.len;
2419
2420 let batch_size = x.encoded_size_bytes();
2421 for x in x.parts.iter() {
2422 if x.ts_rewrite().is_some() {
2423 ret.rewrite_part_count += 1;
2424 }
2425 if x.is_inline() {
2426 ret.inline_part_count += 1;
2427 ret.inline_part_bytes += x.inline_bytes();
2428 }
2429 }
2430 ret.largest_batch_bytes = std::cmp::max(ret.largest_batch_bytes, batch_size);
2431 ret.state_batches_bytes += batch_size;
2432 }
2433 HollowBlobRef::Rollup(x) => {
2434 ret.state_rollup_count += 1;
2435 ret.state_rollups_bytes += x.encoded_size_bytes.unwrap_or_default()
2436 }
2437 });
2438 ret
2439 }
2440
2441 pub fn latest_rollup(&self) -> (&SeqNo, &HollowRollup) {
2442 self.collections
2445 .rollups
2446 .iter()
2447 .rev()
2448 .next()
2449 .expect("State should have at least one rollup if seqno > minimum")
2450 }
2451
2452 pub(crate) fn seqno_since(&self) -> SeqNo {
2453 self.collections.seqno_since(self.seqno)
2454 }
2455
2456 pub fn maybe_gc(&mut self, is_write: bool, now: u64, cfg: GcConfig) -> Option<GcReq> {
2468 let GcConfig {
2469 use_active_gc,
2470 fallback_threshold_ms,
2471 min_versions,
2472 max_versions,
2473 } = cfg;
2474 let gc_threshold = if use_active_gc {
2478 u64::cast_from(min_versions)
2479 } else {
2480 std::cmp::max(
2481 1,
2482 u64::cast_from(self.seqno.0.next_power_of_two().trailing_zeros()),
2483 )
2484 };
2485 let new_seqno_since = self.seqno_since();
2486 let gc_until_seqno = new_seqno_since.min(SeqNo(
2489 self.collections
2490 .last_gc_req
2491 .0
2492 .saturating_add(u64::cast_from(max_versions)),
2493 ));
2494 let should_gc = new_seqno_since
2495 .0
2496 .saturating_sub(self.collections.last_gc_req.0)
2497 >= gc_threshold;
2498
2499 let should_gc = if use_active_gc && !should_gc {
2502 match self.collections.active_gc {
2503 Some(active_gc) => now.saturating_sub(active_gc.start_ms) > fallback_threshold_ms,
2504 None => false,
2505 }
2506 } else {
2507 should_gc
2508 };
2509 let should_gc = should_gc && (is_write || self.collections.writers.is_empty());
2512 let tombstone_needs_gc = self.collections.is_tombstone();
2517 let should_gc = should_gc || tombstone_needs_gc;
2518 let should_gc = if use_active_gc {
2519 should_gc
2523 && match self.collections.active_gc {
2524 Some(active) => now.saturating_sub(active.start_ms) > fallback_threshold_ms,
2525 None => true,
2526 }
2527 } else {
2528 should_gc
2529 };
2530 if should_gc {
2531 self.collections.last_gc_req = gc_until_seqno;
2532 Some(GcReq {
2533 shard_id: self.shard_id,
2534 new_seqno_since: gc_until_seqno,
2535 })
2536 } else {
2537 None
2538 }
2539 }
2540
2541 pub fn seqnos_held(&self) -> usize {
2543 usize::cast_from(self.seqno.0.saturating_sub(self.seqno_since().0))
2544 }
2545
2546 pub fn expire_at(&mut self, walltime_ms: EpochMillis) -> ExpiryMetrics {
2548 let mut metrics = ExpiryMetrics::default();
2549 let shard_id = self.shard_id();
2550 self.collections.leased_readers.retain(|id, state| {
2551 let retain = state.last_heartbeat_timestamp_ms + state.lease_duration_ms >= walltime_ms;
2552 if !retain {
2553 info!(
2554 "Force expiring reader {id} ({}) of shard {shard_id} due to inactivity",
2555 state.debug.purpose
2556 );
2557 metrics.readers_expired += 1;
2558 }
2559 retain
2560 });
2561 self.collections.writers.retain(|id, state| {
2563 let retain =
2564 (state.last_heartbeat_timestamp_ms + state.lease_duration_ms) >= walltime_ms;
2565 if !retain {
2566 info!(
2567 "Force expiring writer {id} ({}) of shard {shard_id} due to inactivity",
2568 state.debug.purpose
2569 );
2570 metrics.writers_expired += 1;
2571 }
2572 retain
2573 });
2574 metrics
2575 }
2576
2577 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
2581 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2582 return Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
2583 self.collections.trace.since().clone(),
2584 )));
2585 }
2586 let upper = self.collections.trace.upper();
2587 if PartialOrder::less_equal(upper, as_of) {
2588 return Err(SnapshotErr::AsOfNotYetAvailable(
2589 self.seqno,
2590 Upper(upper.clone()),
2591 ));
2592 }
2593
2594 let batches = self
2595 .collections
2596 .trace
2597 .batches()
2598 .filter(|b| !PartialOrder::less_than(as_of, b.desc.lower()))
2599 .cloned()
2600 .collect();
2601 Ok(batches)
2602 }
2603
2604 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
2606 if PartialOrder::less_than(as_of, self.collections.trace.since()) {
2607 return Err(Since(self.collections.trace.since().clone()));
2608 }
2609 Ok(())
2610 }
2611
2612 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
2613 self.collections
2616 .trace
2617 .batches()
2618 .find(|b| {
2619 PartialOrder::less_equal(b.desc.lower(), frontier)
2620 && PartialOrder::less_than(frontier, b.desc.upper())
2621 })
2622 .cloned()
2623 .ok_or(self.seqno)
2624 }
2625
2626 pub fn active_rollup(&self) -> Option<ActiveRollup> {
2627 self.collections.active_rollup
2628 }
2629
2630 pub fn need_rollup(
2631 &self,
2632 threshold: usize,
2633 use_active_rollup: bool,
2634 fallback_threshold_ms: u64,
2635 now: u64,
2636 ) -> Option<SeqNo> {
2637 let (latest_rollup_seqno, _) = self.latest_rollup();
2638
2639 if self.collections.is_tombstone() && latest_rollup_seqno.next() < self.seqno {
2645 return Some(self.seqno);
2646 }
2647
2648 let seqnos_since_last_rollup = self.seqno.0.saturating_sub(latest_rollup_seqno.0);
2649
2650 if use_active_rollup {
2651 if seqnos_since_last_rollup > u64::cast_from(threshold) {
2657 match self.active_rollup() {
2658 Some(active_rollup) => {
2659 if now.saturating_sub(active_rollup.start_ms) > fallback_threshold_ms {
2660 return Some(self.seqno);
2661 }
2662 }
2663 None => {
2664 return Some(self.seqno);
2665 }
2666 }
2667 }
2668 } else {
2669 if seqnos_since_last_rollup > 0
2673 && seqnos_since_last_rollup % u64::cast_from(threshold) == 0
2674 {
2675 return Some(self.seqno);
2676 }
2677
2678 if seqnos_since_last_rollup
2681 > u64::cast_from(
2682 threshold * PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER,
2683 )
2684 {
2685 return Some(self.seqno);
2686 }
2687 }
2688
2689 None
2690 }
2691
2692 pub(crate) fn blobs(&self) -> impl Iterator<Item = HollowBlobRef<'_, T>> {
2693 let batches = self.collections.trace.batches().map(HollowBlobRef::Batch);
2694 let rollups = self.collections.rollups.values().map(HollowBlobRef::Rollup);
2695 batches.chain(rollups)
2696 }
2697}
2698
2699fn serialize_part_bytes<S: Serializer>(val: &[u8], s: S) -> Result<S::Ok, S::Error> {
2700 let val = hex::encode(val);
2701 val.serialize(s)
2702}
2703
2704fn serialize_lazy_proto<S: Serializer, T: prost::Message + Default>(
2705 val: &Option<LazyProto<T>>,
2706 s: S,
2707) -> Result<S::Ok, S::Error> {
2708 val.as_ref()
2709 .map(|lazy| hex::encode(&lazy.into_proto()))
2710 .serialize(s)
2711}
2712
2713fn serialize_part_stats<S: Serializer>(
2714 val: &Option<LazyPartStats>,
2715 s: S,
2716) -> Result<S::Ok, S::Error> {
2717 let val = val.as_ref().map(|x| x.decode().key);
2718 val.serialize(s)
2719}
2720
2721fn serialize_diffs_sum<S: Serializer>(val: &Option<[u8; 8]>, s: S) -> Result<S::Ok, S::Error> {
2722 let val = val.map(i64::decode);
2724 val.serialize(s)
2725}
2726
2727impl<T: Serialize + Timestamp + Lattice> Serialize for State<T> {
2733 fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
2734 let State {
2735 shard_id,
2736 seqno,
2737 walltime_ms,
2738 hostname,
2739 collections:
2740 StateCollections {
2741 version: applier_version,
2742 last_gc_req,
2743 rollups,
2744 active_rollup,
2745 active_gc,
2746 leased_readers,
2747 critical_readers,
2748 writers,
2749 schemas,
2750 trace,
2751 },
2752 } = self;
2753 let mut s = s.serialize_struct("State", 13)?;
2754 let () = s.serialize_field("applier_version", &applier_version.to_string())?;
2755 let () = s.serialize_field("shard_id", shard_id)?;
2756 let () = s.serialize_field("seqno", seqno)?;
2757 let () = s.serialize_field("walltime_ms", walltime_ms)?;
2758 let () = s.serialize_field("hostname", hostname)?;
2759 let () = s.serialize_field("last_gc_req", last_gc_req)?;
2760 let () = s.serialize_field("rollups", rollups)?;
2761 let () = s.serialize_field("active_rollup", active_rollup)?;
2762 let () = s.serialize_field("active_gc", active_gc)?;
2763 let () = s.serialize_field("leased_readers", leased_readers)?;
2764 let () = s.serialize_field("critical_readers", critical_readers)?;
2765 let () = s.serialize_field("writers", writers)?;
2766 let () = s.serialize_field("schemas", schemas)?;
2767 let () = s.serialize_field("since", &trace.since().elements())?;
2768 let () = s.serialize_field("upper", &trace.upper().elements())?;
2769 let trace = trace.flatten();
2770 let () = s.serialize_field("batches", &trace.legacy_batches.keys().collect::<Vec<_>>())?;
2771 let () = s.serialize_field("hollow_batches", &trace.hollow_batches)?;
2772 let () = s.serialize_field("spine_batches", &trace.spine_batches)?;
2773 let () = s.serialize_field("merges", &trace.merges)?;
2774 s.end()
2775 }
2776}
2777
2778#[derive(Debug, Default)]
2779pub struct StateSizeMetrics {
2780 pub hollow_batch_count: usize,
2781 pub batch_part_count: usize,
2782 pub rewrite_part_count: usize,
2783 pub num_updates: usize,
2784 pub largest_batch_bytes: usize,
2785 pub state_batches_bytes: usize,
2786 pub state_rollups_bytes: usize,
2787 pub state_rollup_count: usize,
2788 pub inline_part_count: usize,
2789 pub inline_part_bytes: usize,
2790}
2791
2792#[derive(Default)]
2793pub struct ExpiryMetrics {
2794 pub(crate) readers_expired: usize,
2795 pub(crate) writers_expired: usize,
2796}
2797
2798#[derive(Debug, Clone, PartialEq)]
2800pub struct Since<T>(pub Antichain<T>);
2801
2802#[derive(Debug, PartialEq)]
2804pub struct Upper<T>(pub Antichain<T>);
2805
2806#[cfg(test)]
2807pub(crate) mod tests {
2808 use std::ops::Range;
2809 use std::str::FromStr;
2810
2811 use bytes::Bytes;
2812 use mz_build_info::DUMMY_BUILD_INFO;
2813 use mz_dyncfg::ConfigUpdates;
2814 use mz_ore::now::SYSTEM_TIME;
2815 use mz_ore::{assert_none, assert_ok};
2816 use mz_proto::RustType;
2817 use proptest::prelude::*;
2818 use proptest::strategy::ValueTree;
2819
2820 use crate::InvalidUsage::{InvalidBounds, InvalidEmptyTimeInterval};
2821 use crate::cache::PersistClientCache;
2822 use crate::internal::encoding::any_some_lazy_part_stats;
2823 use crate::internal::paths::RollupId;
2824 use crate::internal::trace::tests::any_trace;
2825 use crate::tests::new_test_client_cache;
2826 use crate::{Diagnostics, PersistLocation};
2827
2828 use super::*;
2829
2830 const LEASE_DURATION_MS: u64 = 900 * 1000;
2831 fn debug_state() -> HandleDebugState {
2832 HandleDebugState {
2833 hostname: "debug".to_owned(),
2834 purpose: "finding the bugs".to_owned(),
2835 }
2836 }
2837
2838 pub fn any_hollow_batch_with_exact_runs<T: Arbitrary + Timestamp>(
2839 num_runs: usize,
2840 ) -> impl Strategy<Value = HollowBatch<T>> {
2841 (
2842 any::<T>(),
2843 any::<T>(),
2844 any::<T>(),
2845 proptest::collection::vec(any_run_part::<T>(), num_runs + 1..20),
2846 any::<usize>(),
2847 )
2848 .prop_map(move |(t0, t1, since, parts, len)| {
2849 let (lower, upper) = if t0 <= t1 {
2850 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2851 } else {
2852 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2853 };
2854 let since = Antichain::from_elem(since);
2855
2856 let run_splits = (1..num_runs)
2857 .map(|i| i * parts.len() / num_runs)
2858 .collect::<Vec<_>>();
2859
2860 let run_meta = (0..num_runs)
2861 .map(|_| {
2862 let mut meta = RunMeta::default();
2863 meta.id = Some(RunId::new());
2864 meta
2865 })
2866 .collect::<Vec<_>>();
2867
2868 HollowBatch::new(
2869 Description::new(lower, upper, since),
2870 parts,
2871 len % 10,
2872 run_meta,
2873 run_splits,
2874 )
2875 })
2876 }
2877
2878 pub fn any_hollow_batch<T: Arbitrary + Timestamp>() -> impl Strategy<Value = HollowBatch<T>> {
2879 Strategy::prop_map(
2880 (
2881 any::<T>(),
2882 any::<T>(),
2883 any::<T>(),
2884 proptest::collection::vec(any_run_part::<T>(), 0..20),
2885 any::<usize>(),
2886 0..=10usize,
2887 proptest::collection::vec(any::<RunId>(), 10),
2888 ),
2889 |(t0, t1, since, parts, len, num_runs, run_ids)| {
2890 let (lower, upper) = if t0 <= t1 {
2891 (Antichain::from_elem(t0), Antichain::from_elem(t1))
2892 } else {
2893 (Antichain::from_elem(t1), Antichain::from_elem(t0))
2894 };
2895 let since = Antichain::from_elem(since);
2896 if num_runs > 0 && parts.len() > 2 && num_runs < parts.len() {
2897 let run_splits = (1..num_runs)
2898 .map(|i| i * parts.len() / num_runs)
2899 .collect::<Vec<_>>();
2900
2901 let run_meta = (0..num_runs)
2902 .enumerate()
2903 .map(|(i, _)| {
2904 let mut meta = RunMeta::default();
2905 meta.id = Some(run_ids[i]);
2906 meta
2907 })
2908 .collect::<Vec<_>>();
2909
2910 HollowBatch::new(
2911 Description::new(lower, upper, since),
2912 parts,
2913 len % 10,
2914 run_meta,
2915 run_splits,
2916 )
2917 } else {
2918 HollowBatch::new_run_for_test(
2919 Description::new(lower, upper, since),
2920 parts,
2921 len % 10,
2922 run_ids[0],
2923 )
2924 }
2925 },
2926 )
2927 }
2928
2929 pub fn any_batch_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = BatchPart<T>> {
2930 Strategy::prop_map(
2931 (
2932 any::<bool>(),
2933 any_hollow_batch_part(),
2934 any::<Option<T>>(),
2935 any::<Option<SchemaId>>(),
2936 any::<Option<SchemaId>>(),
2937 ),
2938 |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
2939 if is_hollow {
2940 BatchPart::Hollow(hollow)
2941 } else {
2942 let updates = LazyInlineBatchPart::from_proto(Bytes::new()).unwrap();
2943 let ts_rewrite = ts_rewrite.map(Antichain::from_elem);
2944 BatchPart::Inline {
2945 updates,
2946 ts_rewrite,
2947 schema_id,
2948 deprecated_schema_id,
2949 }
2950 }
2951 },
2952 )
2953 }
2954
2955 pub fn any_run_part<T: Arbitrary + Timestamp>() -> impl Strategy<Value = RunPart<T>> {
2956 Strategy::prop_map(any_batch_part(), |part| RunPart::Single(part))
2957 }
2958
2959 pub fn any_hollow_batch_part<T: Arbitrary + Timestamp>()
2960 -> impl Strategy<Value = HollowBatchPart<T>> {
2961 Strategy::prop_map(
2962 (
2963 any::<PartialBatchKey>(),
2964 any::<usize>(),
2965 any::<Vec<u8>>(),
2966 any_some_lazy_part_stats(),
2967 any::<Option<T>>(),
2968 any::<[u8; 8]>(),
2969 any::<Option<BatchColumnarFormat>>(),
2970 any::<Option<SchemaId>>(),
2971 any::<Option<SchemaId>>(),
2972 ),
2973 |(
2974 key,
2975 encoded_size_bytes,
2976 key_lower,
2977 stats,
2978 ts_rewrite,
2979 diffs_sum,
2980 format,
2981 schema_id,
2982 deprecated_schema_id,
2983 )| {
2984 HollowBatchPart {
2985 key,
2986 meta: Default::default(),
2987 encoded_size_bytes,
2988 key_lower,
2989 structured_key_lower: None,
2990 stats,
2991 ts_rewrite: ts_rewrite.map(Antichain::from_elem),
2992 diffs_sum: Some(diffs_sum),
2993 format,
2994 schema_id,
2995 deprecated_schema_id,
2996 }
2997 },
2998 )
2999 }
3000
3001 pub fn any_leased_reader_state<T: Arbitrary>() -> impl Strategy<Value = LeasedReaderState<T>> {
3002 Strategy::prop_map(
3003 (
3004 any::<SeqNo>(),
3005 any::<Option<T>>(),
3006 any::<u64>(),
3007 any::<u64>(),
3008 any::<HandleDebugState>(),
3009 ),
3010 |(seqno, since, last_heartbeat_timestamp_ms, mut lease_duration_ms, debug)| {
3011 if lease_duration_ms == 0 {
3015 lease_duration_ms += 1;
3016 }
3017 LeasedReaderState {
3018 seqno,
3019 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3020 last_heartbeat_timestamp_ms,
3021 lease_duration_ms,
3022 debug,
3023 }
3024 },
3025 )
3026 }
3027
3028 pub fn any_critical_reader_state<T>() -> impl Strategy<Value = CriticalReaderState<T>>
3029 where
3030 T: Arbitrary,
3031 {
3032 Strategy::prop_map(
3033 (
3034 any::<Option<T>>(),
3035 any::<Opaque>(),
3036 any::<HandleDebugState>(),
3037 ),
3038 |(since, opaque, debug)| CriticalReaderState {
3039 since: since.map_or_else(Antichain::new, Antichain::from_elem),
3040 opaque,
3041 debug,
3042 },
3043 )
3044 }
3045
3046 pub fn any_writer_state<T: Arbitrary>() -> impl Strategy<Value = WriterState<T>> {
3047 Strategy::prop_map(
3048 (
3049 any::<u64>(),
3050 any::<u64>(),
3051 any::<IdempotencyToken>(),
3052 any::<Option<T>>(),
3053 any::<HandleDebugState>(),
3054 ),
3055 |(
3056 last_heartbeat_timestamp_ms,
3057 lease_duration_ms,
3058 most_recent_write_token,
3059 most_recent_write_upper,
3060 debug,
3061 )| WriterState {
3062 last_heartbeat_timestamp_ms,
3063 lease_duration_ms,
3064 most_recent_write_token,
3065 most_recent_write_upper: most_recent_write_upper
3066 .map_or_else(Antichain::new, Antichain::from_elem),
3067 debug,
3068 },
3069 )
3070 }
3071
3072 pub fn any_encoded_schemas() -> impl Strategy<Value = EncodedSchemas> {
3073 Strategy::prop_map(
3074 (
3075 any::<Vec<u8>>(),
3076 any::<Vec<u8>>(),
3077 any::<Vec<u8>>(),
3078 any::<Vec<u8>>(),
3079 ),
3080 |(key, key_data_type, val, val_data_type)| EncodedSchemas {
3081 key: Bytes::from(key),
3082 key_data_type: Bytes::from(key_data_type),
3083 val: Bytes::from(val),
3084 val_data_type: Bytes::from(val_data_type),
3085 },
3086 )
3087 }
3088
3089 pub fn any_state<T: Arbitrary + Timestamp + Lattice>(
3090 num_trace_batches: Range<usize>,
3091 ) -> impl Strategy<Value = State<T>> {
3092 let part1 = (
3093 any::<ShardId>(),
3094 any::<SeqNo>(),
3095 any::<u64>(),
3096 any::<String>(),
3097 any::<SeqNo>(),
3098 proptest::collection::btree_map(any::<SeqNo>(), any::<HollowRollup>(), 1..3),
3099 proptest::option::of(any::<ActiveRollup>()),
3100 );
3101
3102 let part2 = (
3103 proptest::option::of(any::<ActiveGc>()),
3104 proptest::collection::btree_map(
3105 any::<LeasedReaderId>(),
3106 any_leased_reader_state::<T>(),
3107 1..3,
3108 ),
3109 proptest::collection::btree_map(
3110 any::<CriticalReaderId>(),
3111 any_critical_reader_state::<T>(),
3112 1..3,
3113 ),
3114 proptest::collection::btree_map(any::<WriterId>(), any_writer_state::<T>(), 0..3),
3115 proptest::collection::btree_map(any::<SchemaId>(), any_encoded_schemas(), 0..3),
3116 any_trace::<T>(num_trace_batches),
3117 );
3118
3119 (part1, part2).prop_map(
3120 |(
3121 (shard_id, seqno, walltime_ms, hostname, last_gc_req, rollups, active_rollup),
3122 (active_gc, leased_readers, critical_readers, writers, schemas, trace),
3123 )| State {
3124 shard_id,
3125 seqno,
3126 walltime_ms,
3127 hostname,
3128 collections: StateCollections {
3129 version: Version::new(1, 2, 3),
3130 last_gc_req,
3131 rollups,
3132 active_rollup,
3133 active_gc,
3134 leased_readers,
3135 critical_readers,
3136 writers,
3137 schemas,
3138 trace,
3139 },
3140 },
3141 )
3142 }
3143
3144 pub(crate) fn hollow<T: Timestamp>(
3145 lower: T,
3146 upper: T,
3147 keys: &[&str],
3148 len: usize,
3149 ) -> HollowBatch<T> {
3150 HollowBatch::new_run(
3151 Description::new(
3152 Antichain::from_elem(lower),
3153 Antichain::from_elem(upper),
3154 Antichain::from_elem(T::minimum()),
3155 ),
3156 keys.iter()
3157 .map(|x| {
3158 RunPart::Single(BatchPart::Hollow(HollowBatchPart {
3159 key: PartialBatchKey((*x).to_owned()),
3160 meta: Default::default(),
3161 encoded_size_bytes: 0,
3162 key_lower: vec![],
3163 structured_key_lower: None,
3164 stats: None,
3165 ts_rewrite: None,
3166 diffs_sum: None,
3167 format: None,
3168 schema_id: None,
3169 deprecated_schema_id: None,
3170 }))
3171 })
3172 .collect(),
3173 len,
3174 )
3175 }
3176
3177 #[mz_ore::test]
3178 fn downgrade_since() {
3179 let mut state = TypedState::<(), (), u64, i64>::new(
3180 DUMMY_BUILD_INFO.semver_version(),
3181 ShardId::new(),
3182 "".to_owned(),
3183 0,
3184 );
3185 let reader = LeasedReaderId::new();
3186 let seqno = SeqNo::minimum();
3187 let now = SYSTEM_TIME.clone();
3188 let _ = state.collections.register_leased_reader(
3189 "",
3190 &reader,
3191 "",
3192 seqno,
3193 Duration::from_secs(10),
3194 now(),
3195 false,
3196 );
3197
3198 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3200
3201 assert_eq!(
3203 state.collections.downgrade_since(
3204 &reader,
3205 seqno,
3206 seqno,
3207 &Antichain::from_elem(2),
3208 now()
3209 ),
3210 Continue(Since(Antichain::from_elem(2)))
3211 );
3212 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3213 assert_eq!(
3215 state.collections.downgrade_since(
3216 &reader,
3217 seqno,
3218 seqno,
3219 &Antichain::from_elem(2),
3220 now()
3221 ),
3222 Continue(Since(Antichain::from_elem(2)))
3223 );
3224 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3225 assert_eq!(
3227 state.collections.downgrade_since(
3228 &reader,
3229 seqno,
3230 seqno,
3231 &Antichain::from_elem(1),
3232 now()
3233 ),
3234 Continue(Since(Antichain::from_elem(2)))
3235 );
3236 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3237
3238 let reader2 = LeasedReaderId::new();
3240 let _ = state.collections.register_leased_reader(
3241 "",
3242 &reader2,
3243 "",
3244 seqno,
3245 Duration::from_secs(10),
3246 now(),
3247 false,
3248 );
3249
3250 assert_eq!(
3252 state.collections.downgrade_since(
3253 &reader2,
3254 seqno,
3255 seqno,
3256 &Antichain::from_elem(3),
3257 now()
3258 ),
3259 Continue(Since(Antichain::from_elem(3)))
3260 );
3261 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3262 assert_eq!(
3264 state.collections.downgrade_since(
3265 &reader,
3266 seqno,
3267 seqno,
3268 &Antichain::from_elem(5),
3269 now()
3270 ),
3271 Continue(Since(Antichain::from_elem(5)))
3272 );
3273 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3274
3275 assert_eq!(
3277 state.collections.expire_leased_reader(&reader),
3278 Continue(true)
3279 );
3280 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3281
3282 let reader3 = LeasedReaderId::new();
3284 let _ = state.collections.register_leased_reader(
3285 "",
3286 &reader3,
3287 "",
3288 seqno,
3289 Duration::from_secs(10),
3290 now(),
3291 false,
3292 );
3293
3294 assert_eq!(
3296 state.collections.downgrade_since(
3297 &reader3,
3298 seqno,
3299 seqno,
3300 &Antichain::from_elem(10),
3301 now()
3302 ),
3303 Continue(Since(Antichain::from_elem(10)))
3304 );
3305 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3306
3307 assert_eq!(
3309 state.collections.expire_leased_reader(&reader2),
3310 Continue(true)
3311 );
3312 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3317
3318 assert_eq!(
3320 state.collections.expire_leased_reader(&reader3),
3321 Continue(true)
3322 );
3323 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(3));
3328 }
3329
3330 #[mz_ore::test]
3331 fn compare_and_downgrade_since() {
3332 let mut state = TypedState::<(), (), u64, i64>::new(
3333 DUMMY_BUILD_INFO.semver_version(),
3334 ShardId::new(),
3335 "".to_owned(),
3336 0,
3337 );
3338 let reader = CriticalReaderId::new();
3339 let _ = state
3340 .collections
3341 .register_critical_reader("", &reader, Opaque::encode(&0u64), "");
3342
3343 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(0));
3345 assert_eq!(
3347 state
3348 .collections
3349 .critical_reader(&reader)
3350 .opaque
3351 .decode::<u64>(),
3352 u64::MIN
3353 );
3354
3355 assert_eq!(
3357 state.collections.compare_and_downgrade_since(
3358 &reader,
3359 &Opaque::encode(&0u64),
3360 (&Opaque::encode(&1u64), &Antichain::from_elem(2)),
3361 ),
3362 Continue(Ok(Since(Antichain::from_elem(2))))
3363 );
3364 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3365 assert_eq!(
3366 state
3367 .collections
3368 .critical_reader(&reader)
3369 .opaque
3370 .decode::<u64>(),
3371 1
3372 );
3373 assert_eq!(
3375 state.collections.compare_and_downgrade_since(
3376 &reader,
3377 &Opaque::encode(&1u64),
3378 (&Opaque::encode(&2u64), &Antichain::from_elem(2)),
3379 ),
3380 Continue(Ok(Since(Antichain::from_elem(2))))
3381 );
3382 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3383 assert_eq!(
3384 state
3385 .collections
3386 .critical_reader(&reader)
3387 .opaque
3388 .decode::<u64>(),
3389 2
3390 );
3391 assert_eq!(
3393 state.collections.compare_and_downgrade_since(
3394 &reader,
3395 &Opaque::encode(&2u64),
3396 (&Opaque::encode(&3u64), &Antichain::from_elem(1)),
3397 ),
3398 Continue(Ok(Since(Antichain::from_elem(2))))
3399 );
3400 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3401 assert_eq!(
3402 state
3403 .collections
3404 .critical_reader(&reader)
3405 .opaque
3406 .decode::<u64>(),
3407 3
3408 );
3409 }
3410
3411 #[mz_ore::test]
3412 fn compare_and_append() {
3413 let state = &mut TypedState::<String, String, u64, i64>::new(
3414 DUMMY_BUILD_INFO.semver_version(),
3415 ShardId::new(),
3416 "".to_owned(),
3417 0,
3418 )
3419 .collections;
3420
3421 let writer_id = WriterId::new();
3422 let now = SYSTEM_TIME.clone();
3423
3424 assert_eq!(state.trace.num_spine_batches(), 0);
3426 assert_eq!(state.trace.num_hollow_batches(), 0);
3427 assert_eq!(state.trace.num_updates(), 0);
3428
3429 assert_eq!(
3431 state.compare_and_append(
3432 &hollow(1, 2, &["key1"], 1),
3433 &writer_id,
3434 now(),
3435 LEASE_DURATION_MS,
3436 &IdempotencyToken::new(),
3437 &debug_state(),
3438 0,
3439 100,
3440 None
3441 ),
3442 Break(CompareAndAppendBreak::Upper {
3443 shard_upper: Antichain::from_elem(0),
3444 writer_upper: Antichain::from_elem(0)
3445 })
3446 );
3447
3448 assert!(
3450 state
3451 .compare_and_append(
3452 &hollow(0, 5, &[], 0),
3453 &writer_id,
3454 now(),
3455 LEASE_DURATION_MS,
3456 &IdempotencyToken::new(),
3457 &debug_state(),
3458 0,
3459 100,
3460 None
3461 )
3462 .is_continue()
3463 );
3464
3465 assert_eq!(
3467 state.compare_and_append(
3468 &hollow(5, 4, &["key1"], 1),
3469 &writer_id,
3470 now(),
3471 LEASE_DURATION_MS,
3472 &IdempotencyToken::new(),
3473 &debug_state(),
3474 0,
3475 100,
3476 None
3477 ),
3478 Break(CompareAndAppendBreak::InvalidUsage(InvalidBounds {
3479 lower: Antichain::from_elem(5),
3480 upper: Antichain::from_elem(4)
3481 }))
3482 );
3483
3484 assert_eq!(
3486 state.compare_and_append(
3487 &hollow(5, 5, &["key1"], 1),
3488 &writer_id,
3489 now(),
3490 LEASE_DURATION_MS,
3491 &IdempotencyToken::new(),
3492 &debug_state(),
3493 0,
3494 100,
3495 None
3496 ),
3497 Break(CompareAndAppendBreak::InvalidUsage(
3498 InvalidEmptyTimeInterval {
3499 lower: Antichain::from_elem(5),
3500 upper: Antichain::from_elem(5),
3501 keys: vec!["key1".to_owned()],
3502 }
3503 ))
3504 );
3505
3506 assert!(
3508 state
3509 .compare_and_append(
3510 &hollow(5, 5, &[], 0),
3511 &writer_id,
3512 now(),
3513 LEASE_DURATION_MS,
3514 &IdempotencyToken::new(),
3515 &debug_state(),
3516 0,
3517 100,
3518 None
3519 )
3520 .is_continue()
3521 );
3522 }
3523
3524 #[mz_ore::test]
3525 fn snapshot() {
3526 let now = SYSTEM_TIME.clone();
3527
3528 let mut state = TypedState::<String, String, u64, i64>::new(
3529 DUMMY_BUILD_INFO.semver_version(),
3530 ShardId::new(),
3531 "".to_owned(),
3532 0,
3533 );
3534 assert_eq!(
3536 state.snapshot(&Antichain::from_elem(0)),
3537 Err(SnapshotErr::AsOfNotYetAvailable(
3538 SeqNo(0),
3539 Upper(Antichain::from_elem(0))
3540 ))
3541 );
3542
3543 assert_eq!(
3545 state.snapshot(&Antichain::from_elem(5)),
3546 Err(SnapshotErr::AsOfNotYetAvailable(
3547 SeqNo(0),
3548 Upper(Antichain::from_elem(0))
3549 ))
3550 );
3551
3552 let writer_id = WriterId::new();
3553
3554 assert!(
3556 state
3557 .collections
3558 .compare_and_append(
3559 &hollow(0, 5, &["key1"], 1),
3560 &writer_id,
3561 now(),
3562 LEASE_DURATION_MS,
3563 &IdempotencyToken::new(),
3564 &debug_state(),
3565 0,
3566 100,
3567 None
3568 )
3569 .is_continue()
3570 );
3571
3572 assert_eq!(
3574 state.snapshot(&Antichain::from_elem(0)),
3575 Ok(vec![hollow(0, 5, &["key1"], 1)])
3576 );
3577
3578 assert_eq!(
3580 state.snapshot(&Antichain::from_elem(4)),
3581 Ok(vec![hollow(0, 5, &["key1"], 1)])
3582 );
3583
3584 assert_eq!(
3586 state.snapshot(&Antichain::from_elem(5)),
3587 Err(SnapshotErr::AsOfNotYetAvailable(
3588 SeqNo(0),
3589 Upper(Antichain::from_elem(5))
3590 ))
3591 );
3592 assert_eq!(
3593 state.snapshot(&Antichain::from_elem(6)),
3594 Err(SnapshotErr::AsOfNotYetAvailable(
3595 SeqNo(0),
3596 Upper(Antichain::from_elem(5))
3597 ))
3598 );
3599
3600 let reader = LeasedReaderId::new();
3601 let _ = state.collections.register_leased_reader(
3603 "",
3604 &reader,
3605 "",
3606 SeqNo::minimum(),
3607 Duration::from_secs(10),
3608 now(),
3609 false,
3610 );
3611 assert_eq!(
3612 state.collections.downgrade_since(
3613 &reader,
3614 SeqNo::minimum(),
3615 SeqNo::minimum(),
3616 &Antichain::from_elem(2),
3617 now()
3618 ),
3619 Continue(Since(Antichain::from_elem(2)))
3620 );
3621 assert_eq!(state.collections.trace.since(), &Antichain::from_elem(2));
3622 assert_eq!(
3624 state.snapshot(&Antichain::from_elem(1)),
3625 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(
3626 Antichain::from_elem(2)
3627 )))
3628 );
3629
3630 assert!(
3632 state
3633 .collections
3634 .compare_and_append(
3635 &hollow(5, 10, &[], 0),
3636 &writer_id,
3637 now(),
3638 LEASE_DURATION_MS,
3639 &IdempotencyToken::new(),
3640 &debug_state(),
3641 0,
3642 100,
3643 None
3644 )
3645 .is_continue()
3646 );
3647
3648 assert_eq!(
3650 state.snapshot(&Antichain::from_elem(7)),
3651 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3652 );
3653
3654 assert_eq!(
3656 state.snapshot(&Antichain::from_elem(10)),
3657 Err(SnapshotErr::AsOfNotYetAvailable(
3658 SeqNo(0),
3659 Upper(Antichain::from_elem(10))
3660 ))
3661 );
3662
3663 assert!(
3665 state
3666 .collections
3667 .compare_and_append(
3668 &hollow(10, 15, &["key2"], 1),
3669 &writer_id,
3670 now(),
3671 LEASE_DURATION_MS,
3672 &IdempotencyToken::new(),
3673 &debug_state(),
3674 0,
3675 100,
3676 None
3677 )
3678 .is_continue()
3679 );
3680
3681 assert_eq!(
3684 state.snapshot(&Antichain::from_elem(9)),
3685 Ok(vec![hollow(0, 5, &["key1"], 1), hollow(5, 10, &[], 0)])
3686 );
3687
3688 assert_eq!(
3690 state.snapshot(&Antichain::from_elem(10)),
3691 Ok(vec![
3692 hollow(0, 5, &["key1"], 1),
3693 hollow(5, 10, &[], 0),
3694 hollow(10, 15, &["key2"], 1)
3695 ])
3696 );
3697
3698 assert_eq!(
3699 state.snapshot(&Antichain::from_elem(11)),
3700 Ok(vec![
3701 hollow(0, 5, &["key1"], 1),
3702 hollow(5, 10, &[], 0),
3703 hollow(10, 15, &["key2"], 1)
3704 ])
3705 );
3706 }
3707
3708 #[mz_ore::test]
3709 fn next_listen_batch() {
3710 let mut state = TypedState::<String, String, u64, i64>::new(
3711 DUMMY_BUILD_INFO.semver_version(),
3712 ShardId::new(),
3713 "".to_owned(),
3714 0,
3715 );
3716
3717 assert_eq!(
3720 state.next_listen_batch(&Antichain::from_elem(0)),
3721 Err(SeqNo(0))
3722 );
3723 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3724
3725 let writer_id = WriterId::new();
3726 let now = SYSTEM_TIME.clone();
3727
3728 assert!(
3730 state
3731 .collections
3732 .compare_and_append(
3733 &hollow(0, 5, &["key1"], 1),
3734 &writer_id,
3735 now(),
3736 LEASE_DURATION_MS,
3737 &IdempotencyToken::new(),
3738 &debug_state(),
3739 0,
3740 100,
3741 None
3742 )
3743 .is_continue()
3744 );
3745 assert!(
3746 state
3747 .collections
3748 .compare_and_append(
3749 &hollow(5, 10, &["key2"], 1),
3750 &writer_id,
3751 now(),
3752 LEASE_DURATION_MS,
3753 &IdempotencyToken::new(),
3754 &debug_state(),
3755 0,
3756 100,
3757 None
3758 )
3759 .is_continue()
3760 );
3761
3762 for t in 0..=4 {
3764 assert_eq!(
3765 state.next_listen_batch(&Antichain::from_elem(t)),
3766 Ok(hollow(0, 5, &["key1"], 1))
3767 );
3768 }
3769
3770 for t in 5..=9 {
3772 assert_eq!(
3773 state.next_listen_batch(&Antichain::from_elem(t)),
3774 Ok(hollow(5, 10, &["key2"], 1))
3775 );
3776 }
3777
3778 assert_eq!(
3780 state.next_listen_batch(&Antichain::from_elem(10)),
3781 Err(SeqNo(0))
3782 );
3783
3784 assert_eq!(state.next_listen_batch(&Antichain::new()), Err(SeqNo(0)));
3787 }
3788
3789 #[mz_ore::test]
3790 fn expire_writer() {
3791 let mut state = TypedState::<String, String, u64, i64>::new(
3792 DUMMY_BUILD_INFO.semver_version(),
3793 ShardId::new(),
3794 "".to_owned(),
3795 0,
3796 );
3797 let now = SYSTEM_TIME.clone();
3798
3799 let writer_id_one = WriterId::new();
3800
3801 let writer_id_two = WriterId::new();
3802
3803 assert!(
3805 state
3806 .collections
3807 .compare_and_append(
3808 &hollow(0, 2, &["key1"], 1),
3809 &writer_id_one,
3810 now(),
3811 LEASE_DURATION_MS,
3812 &IdempotencyToken::new(),
3813 &debug_state(),
3814 0,
3815 100,
3816 None
3817 )
3818 .is_continue()
3819 );
3820
3821 assert!(
3822 state
3823 .collections
3824 .expire_writer(&writer_id_one)
3825 .is_continue()
3826 );
3827
3828 assert!(
3830 state
3831 .collections
3832 .compare_and_append(
3833 &hollow(2, 5, &["key2"], 1),
3834 &writer_id_two,
3835 now(),
3836 LEASE_DURATION_MS,
3837 &IdempotencyToken::new(),
3838 &debug_state(),
3839 0,
3840 100,
3841 None
3842 )
3843 .is_continue()
3844 );
3845 }
3846
3847 #[mz_ore::test]
3848 fn maybe_gc_active_gc() {
3849 const GC_CONFIG: GcConfig = GcConfig {
3850 use_active_gc: true,
3851 fallback_threshold_ms: 5000,
3852 min_versions: 99,
3853 max_versions: 500,
3854 };
3855 let now_fn = SYSTEM_TIME.clone();
3856
3857 let mut state = TypedState::<String, String, u64, i64>::new(
3858 DUMMY_BUILD_INFO.semver_version(),
3859 ShardId::new(),
3860 "".to_owned(),
3861 0,
3862 );
3863
3864 let now = now_fn();
3865 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3867 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3868
3869 state.seqno = SeqNo(100);
3872 assert_eq!(state.seqno_since(), SeqNo(100));
3873
3874 let writer_id = WriterId::new();
3876 let _ = state.collections.compare_and_append(
3877 &hollow(1, 2, &["key1"], 1),
3878 &writer_id,
3879 now,
3880 LEASE_DURATION_MS,
3881 &IdempotencyToken::new(),
3882 &debug_state(),
3883 0,
3884 100,
3885 None,
3886 );
3887 assert_eq!(state.maybe_gc(false, now, GC_CONFIG), None);
3888
3889 assert_eq!(
3891 state.maybe_gc(true, now, GC_CONFIG),
3892 Some(GcReq {
3893 shard_id: state.shard_id,
3894 new_seqno_since: SeqNo(100)
3895 })
3896 );
3897
3898 state.collections.active_gc = Some(ActiveGc {
3900 seqno: state.seqno,
3901 start_ms: now,
3902 });
3903
3904 state.seqno = SeqNo(200);
3905 assert_eq!(state.seqno_since(), SeqNo(200));
3906
3907 assert_eq!(state.maybe_gc(true, now, GC_CONFIG), None);
3908
3909 state.seqno = SeqNo(300);
3910 assert_eq!(state.seqno_since(), SeqNo(300));
3911 let new_now = now + GC_CONFIG.fallback_threshold_ms + 1;
3913 assert_eq!(
3914 state.maybe_gc(true, new_now, GC_CONFIG),
3915 Some(GcReq {
3916 shard_id: state.shard_id,
3917 new_seqno_since: SeqNo(300)
3918 })
3919 );
3920
3921 state.seqno = SeqNo(301);
3925 assert_eq!(state.seqno_since(), SeqNo(301));
3926 assert_eq!(
3927 state.maybe_gc(true, new_now, GC_CONFIG),
3928 Some(GcReq {
3929 shard_id: state.shard_id,
3930 new_seqno_since: SeqNo(301)
3931 })
3932 );
3933
3934 state.collections.active_gc = None;
3935
3936 state.seqno = SeqNo(400);
3939 assert_eq!(state.seqno_since(), SeqNo(400));
3940
3941 let now = now_fn();
3942
3943 let _ = state.collections.expire_writer(&writer_id);
3945 assert_eq!(
3946 state.maybe_gc(false, now, GC_CONFIG),
3947 Some(GcReq {
3948 shard_id: state.shard_id,
3949 new_seqno_since: SeqNo(400)
3950 })
3951 );
3952
3953 let previous_seqno = state.seqno;
3955 state.seqno = SeqNo(10_000);
3956 assert_eq!(state.seqno_since(), SeqNo(10_000));
3957
3958 let now = now_fn();
3959 assert_eq!(
3960 state.maybe_gc(true, now, GC_CONFIG),
3961 Some(GcReq {
3962 shard_id: state.shard_id,
3963 new_seqno_since: SeqNo(previous_seqno.0 + u64::cast_from(GC_CONFIG.max_versions))
3964 })
3965 );
3966 }
3967
3968 #[mz_ore::test]
3969 fn maybe_gc_classic() {
3970 const GC_CONFIG: GcConfig = GcConfig {
3971 use_active_gc: false,
3972 fallback_threshold_ms: 5000,
3973 min_versions: 16,
3974 max_versions: 128,
3975 };
3976 const NOW_MS: u64 = 0;
3977
3978 let mut state = TypedState::<String, String, u64, i64>::new(
3979 DUMMY_BUILD_INFO.semver_version(),
3980 ShardId::new(),
3981 "".to_owned(),
3982 0,
3983 );
3984
3985 assert_eq!(state.maybe_gc(true, NOW_MS, GC_CONFIG), None);
3987 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
3988
3989 state.seqno = SeqNo(100);
3992 assert_eq!(state.seqno_since(), SeqNo(100));
3993
3994 let writer_id = WriterId::new();
3996 let now = SYSTEM_TIME.clone();
3997 let _ = state.collections.compare_and_append(
3998 &hollow(1, 2, &["key1"], 1),
3999 &writer_id,
4000 now(),
4001 LEASE_DURATION_MS,
4002 &IdempotencyToken::new(),
4003 &debug_state(),
4004 0,
4005 100,
4006 None,
4007 );
4008 assert_eq!(state.maybe_gc(false, NOW_MS, GC_CONFIG), None);
4009
4010 assert_eq!(
4012 state.maybe_gc(true, NOW_MS, GC_CONFIG),
4013 Some(GcReq {
4014 shard_id: state.shard_id,
4015 new_seqno_since: SeqNo(100)
4016 })
4017 );
4018
4019 state.seqno = SeqNo(200);
4022 assert_eq!(state.seqno_since(), SeqNo(200));
4023
4024 let _ = state.collections.expire_writer(&writer_id);
4026 assert_eq!(
4027 state.maybe_gc(false, NOW_MS, GC_CONFIG),
4028 Some(GcReq {
4029 shard_id: state.shard_id,
4030 new_seqno_since: SeqNo(200)
4031 })
4032 );
4033 }
4034
4035 #[mz_ore::test]
4036 fn need_rollup_active_rollup() {
4037 const ROLLUP_THRESHOLD: usize = 3;
4038 const ROLLUP_USE_ACTIVE_ROLLUP: bool = true;
4039 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 5000;
4040 let now = SYSTEM_TIME.clone();
4041
4042 mz_ore::test::init_logging();
4043 let mut state = TypedState::<String, String, u64, i64>::new(
4044 DUMMY_BUILD_INFO.semver_version(),
4045 ShardId::new(),
4046 "".to_owned(),
4047 0,
4048 );
4049
4050 let rollup_seqno = SeqNo(5);
4051 let rollup = HollowRollup {
4052 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4053 encoded_size_bytes: None,
4054 };
4055
4056 assert!(
4057 state
4058 .collections
4059 .add_rollup((rollup_seqno, &rollup))
4060 .is_continue()
4061 );
4062
4063 state.seqno = SeqNo(5);
4065 assert_none!(state.need_rollup(
4066 ROLLUP_THRESHOLD,
4067 ROLLUP_USE_ACTIVE_ROLLUP,
4068 ROLLUP_FALLBACK_THRESHOLD_MS,
4069 now()
4070 ));
4071
4072 state.seqno = SeqNo(6);
4074 assert_none!(state.need_rollup(
4075 ROLLUP_THRESHOLD,
4076 ROLLUP_USE_ACTIVE_ROLLUP,
4077 ROLLUP_FALLBACK_THRESHOLD_MS,
4078 now()
4079 ));
4080 state.seqno = SeqNo(7);
4081 assert_none!(state.need_rollup(
4082 ROLLUP_THRESHOLD,
4083 ROLLUP_USE_ACTIVE_ROLLUP,
4084 ROLLUP_FALLBACK_THRESHOLD_MS,
4085 now()
4086 ));
4087 state.seqno = SeqNo(8);
4088 assert_none!(state.need_rollup(
4089 ROLLUP_THRESHOLD,
4090 ROLLUP_USE_ACTIVE_ROLLUP,
4091 ROLLUP_FALLBACK_THRESHOLD_MS,
4092 now()
4093 ));
4094
4095 let mut current_time = now();
4096 state.seqno = SeqNo(9);
4098 assert_eq!(
4099 state
4100 .need_rollup(
4101 ROLLUP_THRESHOLD,
4102 ROLLUP_USE_ACTIVE_ROLLUP,
4103 ROLLUP_FALLBACK_THRESHOLD_MS,
4104 current_time
4105 )
4106 .expect("rollup"),
4107 SeqNo(9)
4108 );
4109
4110 state.collections.active_rollup = Some(ActiveRollup {
4111 seqno: SeqNo(9),
4112 start_ms: current_time,
4113 });
4114
4115 assert_none!(state.need_rollup(
4117 ROLLUP_THRESHOLD,
4118 ROLLUP_USE_ACTIVE_ROLLUP,
4119 ROLLUP_FALLBACK_THRESHOLD_MS,
4120 current_time
4121 ));
4122
4123 state.seqno = SeqNo(10);
4124 assert_none!(state.need_rollup(
4127 ROLLUP_THRESHOLD,
4128 ROLLUP_USE_ACTIVE_ROLLUP,
4129 ROLLUP_FALLBACK_THRESHOLD_MS,
4130 current_time
4131 ));
4132
4133 current_time += u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS) + 1;
4135 assert_eq!(
4136 state
4137 .need_rollup(
4138 ROLLUP_THRESHOLD,
4139 ROLLUP_USE_ACTIVE_ROLLUP,
4140 ROLLUP_FALLBACK_THRESHOLD_MS,
4141 current_time
4142 )
4143 .expect("rollup"),
4144 SeqNo(10)
4145 );
4146
4147 state.seqno = SeqNo(9);
4148 state.collections.active_rollup = None;
4150 let rollup_seqno = SeqNo(9);
4151 let rollup = HollowRollup {
4152 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4153 encoded_size_bytes: None,
4154 };
4155 assert!(
4156 state
4157 .collections
4158 .add_rollup((rollup_seqno, &rollup))
4159 .is_continue()
4160 );
4161
4162 state.seqno = SeqNo(11);
4163 assert_none!(state.need_rollup(
4165 ROLLUP_THRESHOLD,
4166 ROLLUP_USE_ACTIVE_ROLLUP,
4167 ROLLUP_FALLBACK_THRESHOLD_MS,
4168 current_time
4169 ));
4170 state.seqno = SeqNo(13);
4172 assert_eq!(
4173 state
4174 .need_rollup(
4175 ROLLUP_THRESHOLD,
4176 ROLLUP_USE_ACTIVE_ROLLUP,
4177 ROLLUP_FALLBACK_THRESHOLD_MS,
4178 current_time
4179 )
4180 .expect("rollup"),
4181 SeqNo(13)
4182 );
4183 }
4184
4185 #[mz_ore::test]
4186 fn need_rollup_classic() {
4187 const ROLLUP_THRESHOLD: usize = 3;
4188 const ROLLUP_USE_ACTIVE_ROLLUP: bool = false;
4189 const ROLLUP_FALLBACK_THRESHOLD_MS: u64 = 0;
4190 const NOW: u64 = 0;
4191
4192 mz_ore::test::init_logging();
4193 let mut state = TypedState::<String, String, u64, i64>::new(
4194 DUMMY_BUILD_INFO.semver_version(),
4195 ShardId::new(),
4196 "".to_owned(),
4197 0,
4198 );
4199
4200 let rollup_seqno = SeqNo(5);
4201 let rollup = HollowRollup {
4202 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4203 encoded_size_bytes: None,
4204 };
4205
4206 assert!(
4207 state
4208 .collections
4209 .add_rollup((rollup_seqno, &rollup))
4210 .is_continue()
4211 );
4212
4213 state.seqno = SeqNo(5);
4215 assert_none!(state.need_rollup(
4216 ROLLUP_THRESHOLD,
4217 ROLLUP_USE_ACTIVE_ROLLUP,
4218 ROLLUP_FALLBACK_THRESHOLD_MS,
4219 NOW
4220 ));
4221
4222 state.seqno = SeqNo(6);
4224 assert_none!(state.need_rollup(
4225 ROLLUP_THRESHOLD,
4226 ROLLUP_USE_ACTIVE_ROLLUP,
4227 ROLLUP_FALLBACK_THRESHOLD_MS,
4228 NOW
4229 ));
4230 state.seqno = SeqNo(7);
4231 assert_none!(state.need_rollup(
4232 ROLLUP_THRESHOLD,
4233 ROLLUP_USE_ACTIVE_ROLLUP,
4234 ROLLUP_FALLBACK_THRESHOLD_MS,
4235 NOW
4236 ));
4237
4238 state.seqno = SeqNo(8);
4240 assert_eq!(
4241 state
4242 .need_rollup(
4243 ROLLUP_THRESHOLD,
4244 ROLLUP_USE_ACTIVE_ROLLUP,
4245 ROLLUP_FALLBACK_THRESHOLD_MS,
4246 NOW
4247 )
4248 .expect("rollup"),
4249 SeqNo(8)
4250 );
4251
4252 state.seqno = SeqNo(9);
4254 assert_none!(state.need_rollup(
4255 ROLLUP_THRESHOLD,
4256 ROLLUP_USE_ACTIVE_ROLLUP,
4257 ROLLUP_FALLBACK_THRESHOLD_MS,
4258 NOW
4259 ));
4260
4261 state.seqno = SeqNo(11);
4263 assert_eq!(
4264 state
4265 .need_rollup(
4266 ROLLUP_THRESHOLD,
4267 ROLLUP_USE_ACTIVE_ROLLUP,
4268 ROLLUP_FALLBACK_THRESHOLD_MS,
4269 NOW
4270 )
4271 .expect("rollup"),
4272 SeqNo(11)
4273 );
4274
4275 let rollup_seqno = SeqNo(6);
4277 let rollup = HollowRollup {
4278 key: PartialRollupKey::new(rollup_seqno, &RollupId::new()),
4279 encoded_size_bytes: None,
4280 };
4281 assert!(
4282 state
4283 .collections
4284 .add_rollup((rollup_seqno, &rollup))
4285 .is_continue()
4286 );
4287
4288 state.seqno = SeqNo(8);
4289 assert_none!(state.need_rollup(
4290 ROLLUP_THRESHOLD,
4291 ROLLUP_USE_ACTIVE_ROLLUP,
4292 ROLLUP_FALLBACK_THRESHOLD_MS,
4293 NOW
4294 ));
4295 state.seqno = SeqNo(9);
4296 assert_eq!(
4297 state
4298 .need_rollup(
4299 ROLLUP_THRESHOLD,
4300 ROLLUP_USE_ACTIVE_ROLLUP,
4301 ROLLUP_FALLBACK_THRESHOLD_MS,
4302 NOW
4303 )
4304 .expect("rollup"),
4305 SeqNo(9)
4306 );
4307
4308 let fallback_seqno = SeqNo(
4310 rollup_seqno.0
4311 * u64::cast_from(PersistConfig::DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER),
4312 );
4313 state.seqno = fallback_seqno;
4314 assert_eq!(
4315 state
4316 .need_rollup(
4317 ROLLUP_THRESHOLD,
4318 ROLLUP_USE_ACTIVE_ROLLUP,
4319 ROLLUP_FALLBACK_THRESHOLD_MS,
4320 NOW
4321 )
4322 .expect("rollup"),
4323 fallback_seqno
4324 );
4325 state.seqno = fallback_seqno.next();
4326 assert_eq!(
4327 state
4328 .need_rollup(
4329 ROLLUP_THRESHOLD,
4330 ROLLUP_USE_ACTIVE_ROLLUP,
4331 ROLLUP_FALLBACK_THRESHOLD_MS,
4332 NOW
4333 )
4334 .expect("rollup"),
4335 fallback_seqno.next()
4336 );
4337 }
4338
4339 #[mz_ore::test]
4340 fn idempotency_token_sentinel() {
4341 assert_eq!(
4342 IdempotencyToken::SENTINEL.to_string(),
4343 "i11111111-1111-1111-1111-111111111111"
4344 );
4345 }
4346
4347 #[mz_ore::test]
4356 #[cfg_attr(miri, ignore)] fn state_inspect_serde_json() {
4358 const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
4359 let mut runner = proptest::test_runner::TestRunner::deterministic();
4360 let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
4361 let json = serde_json::to_string_pretty(&tree.current()).unwrap();
4362 assert_eq!(
4363 json.trim(),
4364 STATE_SERDE_JSON.trim(),
4365 "\n\nNEW GOLDEN\n{}\n",
4366 json
4367 );
4368 }
4369
4370 #[mz_persist_proc::test(tokio::test)]
4371 #[cfg_attr(miri, ignore)] async fn sneaky_downgrades(dyncfgs: ConfigUpdates) {
4373 let mut clients = new_test_client_cache(&dyncfgs);
4374 let shard_id = ShardId::new();
4375
4376 async fn open_and_write(
4377 clients: &mut PersistClientCache,
4378 version: semver::Version,
4379 shard_id: ShardId,
4380 ) -> Result<(), tokio::task::JoinError> {
4381 clients.cfg.build_version = version.clone();
4382 clients.clear_state_cache();
4383 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
4384 mz_ore::task::spawn(|| version.to_string(), async move {
4386 let () = client
4387 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
4388 .await
4389 .expect("valid usage");
4390 let (mut write, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
4391 let current = *write.upper().as_option().unwrap();
4392 write
4394 .expect_compare_and_append_batch(&mut [], current, current + 1)
4395 .await;
4396 })
4397 .into_tokio_handle()
4398 .await
4399 }
4400
4401 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4403 assert_ok!(res);
4404
4405 let res = open_and_write(&mut clients, Version::new(0, 11, 0), shard_id).await;
4407 assert_ok!(res);
4408
4409 let res = open_and_write(&mut clients, Version::new(0, 10, 0), shard_id).await;
4411 assert!(res.unwrap_err().is_panic());
4412
4413 let res = open_and_write(&mut clients, Version::new(0, 9, 0), shard_id).await;
4415 assert!(res.unwrap_err().is_panic());
4416 }
4417
4418 #[mz_ore::test]
4419 fn runid_roundtrip() {
4420 proptest!(|(runid: RunId)| {
4421 let runid_str = runid.to_string();
4422 let parsed = RunId::from_str(&runid_str);
4423 prop_assert_eq!(parsed, Ok(runid));
4424 });
4425 }
4426}